cache_service.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. """缓存服务 - 统一管理Redis缓存和内存缓存"""
  2. import redis
  3. import pickle
  4. import hashlib
  5. import urllib.parse
  6. import os
  7. import pandas as pd
  8. from flask import jsonify
  9. from typing import Dict, Any, Optional
  10. # Redis配置
  11. REDIS_HOST = os.getenv('REDIS_HOST', 'localhost')
  12. REDIS_PORT = int(os.getenv('REDIS_PORT', 6379))
  13. REDIS_DB = int(os.getenv('REDIS_DB', 0))
  14. REDIS_PASSWORD = os.getenv('REDIS_PASSWORD', None)
  15. CACHE_PREFIX = os.getenv('CACHE_PREFIX', 'sku_analysis')
  16. CACHE_EXPIRE_SECONDS = 86400 # 24小时
  17. CACHE_VERSION = '2.0'
  18. # 全局内存缓存
  19. _global_cache: Dict[str, Any] = {
  20. 'latest_df': None,
  21. 'latest_analysis_results': None,
  22. 'latest_spu_analysis_results': None,
  23. 'latest_hotproduct_results': None,
  24. 'latest_spu_hotproduct_results': None,
  25. 'latest_filename': None
  26. }
  27. def get_redis_connection():
  28. """创建Redis连接"""
  29. try:
  30. redis_client = redis.Redis(
  31. host=REDIS_HOST,
  32. port=REDIS_PORT,
  33. db=REDIS_DB,
  34. password=REDIS_PASSWORD,
  35. decode_responses=False,
  36. socket_connect_timeout=5,
  37. socket_timeout=5
  38. )
  39. # 测试连接
  40. redis_client.ping()
  41. return redis_client
  42. except Exception as e:
  43. print(f"无法连接到Redis服务器: {e}")
  44. return None
  45. def _ensure_list_type(keys_result):
  46. """确保keys_result是列表类型"""
  47. # 处理可能的异步结果或其他非列表类型
  48. if keys_result is None:
  49. return []
  50. # 如果是字节字符串,转换为列表
  51. if isinstance(keys_result, bytes):
  52. return [keys_result]
  53. # 如果已经是列表或元组,直接转换为列表
  54. if isinstance(keys_result, (list, tuple)):
  55. return list(keys_result)
  56. # 尝试转换为列表
  57. try:
  58. return list(keys_result)
  59. except (TypeError, ValueError):
  60. # 如果无法转换,返回空列表
  61. return []
  62. def _ensure_int_type(value):
  63. """确保值是整数类型"""
  64. if value is None:
  65. return 0
  66. if isinstance(value, int):
  67. return value
  68. try:
  69. return int(value)
  70. except (TypeError, ValueError):
  71. return 0
  72. def get_cache_info():
  73. """获取缓存信息"""
  74. try:
  75. # 初始化Redis连接
  76. redis_client = get_redis_connection()
  77. if not redis_client:
  78. return jsonify({'message': 'Redis连接失败'}), 500
  79. # 获取所有以sku_analysis开头的键(包括旧版本和新版本的缓存)
  80. keys_result = redis_client.keys(f"{CACHE_PREFIX}:*")
  81. # 确保keys_result是列表类型
  82. keys = _ensure_list_type(keys_result)
  83. cache_details = []
  84. if keys:
  85. for key in keys:
  86. try:
  87. key_str = key.decode('utf-8') if isinstance(key, bytes) else key
  88. # 获取缓存数据大小
  89. size = redis_client.memory_usage(key)
  90. # 获取过期时间
  91. ttl = redis_client.ttl(key)
  92. # 确保ttl是整数类型
  93. ttl_int = _ensure_int_type(ttl)
  94. # 从键中提取文件名信息
  95. filename = "未知文件"
  96. if ":" in key_str:
  97. parts = key_str.split(":")
  98. if len(parts) >= 3:
  99. filename = parts[2]
  100. cache_info = {
  101. 'key': key_str,
  102. 'filename': filename,
  103. 'size': f"{size} bytes" if size else "未知",
  104. 'ttl': f"{ttl_int} 秒" if ttl_int > 0 else "永久"
  105. }
  106. cache_details.append(cache_info)
  107. except Exception as e:
  108. print(f"获取缓存键详细信息失败: {e}")
  109. return jsonify({
  110. 'message': '缓存信息获取成功',
  111. 'keyCount': len(cache_details),
  112. 'cacheDetails': cache_details
  113. })
  114. except Exception as e:
  115. return jsonify({'error': f'获取缓存信息失败: {str(e)}'}), 500
  116. def clear_cache():
  117. """清空缓存"""
  118. try:
  119. # 初始化Redis连接
  120. redis_client = get_redis_connection()
  121. if not redis_client:
  122. return jsonify({'message': 'Redis连接失败'}), 500
  123. # 清空所有以sku_analysis开头的键(包括所有版本的缓存)
  124. keys_result = redis_client.keys(f"{CACHE_PREFIX}:*")
  125. # 确保 keys_result 是列表类型
  126. keys = _ensure_list_type(keys_result)
  127. deleted_count = 0
  128. if keys:
  129. deleted_count = redis_client.delete(*keys)
  130. return jsonify({'message': '缓存清空成功', 'deletedCount': deleted_count})
  131. except Exception as e:
  132. return jsonify({'error': f'清空缓存失败: {str(e)}'}), 500
  133. # ==================== Redis缓存管理函数 ====================
  134. def generate_cache_key(df_hash: str, filename: Optional[str] = None) -> str:
  135. """生成缓存键(包含版本号)"""
  136. return generate_cache_key_with_prefix(df_hash, filename, CACHE_PREFIX)
  137. def generate_cache_key_with_prefix(
  138. df_hash: str,
  139. filename: Optional[str] = None,
  140. prefix: Optional[str] = None
  141. ) -> str:
  142. """生成缓存键,允许自定义前缀"""
  143. actual_prefix = prefix or CACHE_PREFIX
  144. if filename:
  145. encoded_filename = urllib.parse.quote(filename, safe='')
  146. key_str = f"{actual_prefix}:v{CACHE_VERSION}:{df_hash}:{encoded_filename}"
  147. else:
  148. key_str = f"{actual_prefix}:v{CACHE_VERSION}:{df_hash}"
  149. return key_str
  150. def save_to_cache(redis_client, cache_key: str, data: Any) -> bool:
  151. """将数据保存到Redis缓存"""
  152. if redis_client is None:
  153. return False
  154. try:
  155. serialized_data = pickle.dumps(data)
  156. redis_client.setex(cache_key, CACHE_EXPIRE_SECONDS, serialized_data)
  157. return True
  158. except Exception as e:
  159. print(f"保存到缓存失败: {e}")
  160. return False
  161. def load_from_cache(redis_client, cache_key: str) -> Optional[Any]:
  162. """仏Redis缓存加载数据"""
  163. if redis_client is None:
  164. return None
  165. try:
  166. cached_data = redis_client.get(cache_key)
  167. if cached_data:
  168. return pickle.loads(cached_data)
  169. return None
  170. except Exception as e:
  171. print(f"从缓存加载失败: {e}")
  172. return None
  173. def validate_analysis_results(results: Any) -> bool:
  174. """验证分析结果的数据完整性"""
  175. if not results or not isinstance(results, dict):
  176. return False
  177. sku_keys = [k for k in results.keys() if k != '_analysis_summary_']
  178. if not sku_keys:
  179. return False
  180. first_sku = sku_keys[0]
  181. sku_data = results.get(first_sku, {})
  182. required_fields = [
  183. 'revenue_peak_idx', 'quantity_peak_idx',
  184. 'peak_revenue_date', 'peak_quantity_date',
  185. 'stages_map', 'stage_boundaries'
  186. ]
  187. for field in required_fields:
  188. if field not in sku_data:
  189. print(f"⚠️ 缓存数据缺少字段: {field},将重新计算")
  190. return False
  191. boundaries = sku_data.get('stage_boundaries', [])
  192. if boundaries:
  193. for boundary in boundaries:
  194. if 'index' not in boundary or 'date' not in boundary:
  195. print(f"⚠️ 缓存数据的 stage_boundaries 格式不完整,将重新计算")
  196. return False
  197. return True
  198. # ==================== 全局内存缓存管理 ====================
  199. def set_global_cache(key: str, value: Any) -> None:
  200. """设置全局内存缓存"""
  201. print(f"💾 [CACHE] 设置缓存 key={key}, value_type={type(value).__name__}")
  202. _global_cache[key] = value
  203. print(f"💾 [CACHE] 缓存设置完成")
  204. def get_global_cache(key: str) -> Any:
  205. """获取全局内存缓存"""
  206. value = _global_cache.get(key)
  207. print(f"🔍 [CACHE] 获取缓存 key={key}, value_type={type(value).__name__}, is_none={value is None}")
  208. return value
  209. def clear_global_cache() -> None:
  210. """清空全局内存缓存"""
  211. _global_cache['latest_df'] = None
  212. _global_cache['latest_analysis_results'] = None
  213. _global_cache['latest_spu_analysis_results'] = None
  214. _global_cache['latest_hotproduct_results'] = None
  215. _global_cache['latest_spu_hotproduct_results'] = None
  216. _global_cache['latest_filename'] = None