"""缓存服务 - 统一管理Redis缓存和内存缓存""" import redis import pickle import hashlib import urllib.parse import os import pandas as pd from flask import jsonify from typing import Dict, Any, Optional # Redis配置 REDIS_HOST = os.getenv('REDIS_HOST', 'localhost') REDIS_PORT = int(os.getenv('REDIS_PORT', 6379)) REDIS_DB = int(os.getenv('REDIS_DB', 0)) REDIS_PASSWORD = os.getenv('REDIS_PASSWORD', None) CACHE_PREFIX = os.getenv('CACHE_PREFIX', 'sku_analysis') CACHE_EXPIRE_SECONDS = 86400 # 24小时 CACHE_VERSION = '2.0' # 全局内存缓存 _global_cache: Dict[str, Any] = { 'latest_df': None, 'latest_analysis_results': None, 'latest_spu_analysis_results': None, 'latest_hotproduct_results': None, 'latest_spu_hotproduct_results': None, 'latest_filename': None } def get_redis_connection(): """创建Redis连接""" try: redis_client = redis.Redis( host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, password=REDIS_PASSWORD, decode_responses=False, socket_connect_timeout=5, socket_timeout=5 ) # 测试连接 redis_client.ping() return redis_client except Exception as e: print(f"无法连接到Redis服务器: {e}") return None def _ensure_list_type(keys_result): """确保keys_result是列表类型""" # 处理可能的异步结果或其他非列表类型 if keys_result is None: return [] # 如果是字节字符串,转换为列表 if isinstance(keys_result, bytes): return [keys_result] # 如果已经是列表或元组,直接转换为列表 if isinstance(keys_result, (list, tuple)): return list(keys_result) # 尝试转换为列表 try: return list(keys_result) except (TypeError, ValueError): # 如果无法转换,返回空列表 return [] def _ensure_int_type(value): """确保值是整数类型""" if value is None: return 0 if isinstance(value, int): return value try: return int(value) except (TypeError, ValueError): return 0 def get_cache_info(): """获取缓存信息""" try: # 初始化Redis连接 redis_client = get_redis_connection() if not redis_client: return jsonify({'message': 'Redis连接失败'}), 500 # 获取所有以sku_analysis开头的键(包括旧版本和新版本的缓存) keys_result = redis_client.keys(f"{CACHE_PREFIX}:*") # 确保keys_result是列表类型 keys = _ensure_list_type(keys_result) cache_details = [] if keys: for key in keys: try: key_str = key.decode('utf-8') if isinstance(key, bytes) else key # 获取缓存数据大小 size = redis_client.memory_usage(key) # 获取过期时间 ttl = redis_client.ttl(key) # 确保ttl是整数类型 ttl_int = _ensure_int_type(ttl) # 从键中提取文件名信息 filename = "未知文件" if ":" in key_str: parts = key_str.split(":") if len(parts) >= 3: filename = parts[2] cache_info = { 'key': key_str, 'filename': filename, 'size': f"{size} bytes" if size else "未知", 'ttl': f"{ttl_int} 秒" if ttl_int > 0 else "永久" } cache_details.append(cache_info) except Exception as e: print(f"获取缓存键详细信息失败: {e}") return jsonify({ 'message': '缓存信息获取成功', 'keyCount': len(cache_details), 'cacheDetails': cache_details }) except Exception as e: return jsonify({'error': f'获取缓存信息失败: {str(e)}'}), 500 def clear_cache(): """清空缓存""" try: # 初始化Redis连接 redis_client = get_redis_connection() if not redis_client: return jsonify({'message': 'Redis连接失败'}), 500 # 清空所有以sku_analysis开头的键(包括所有版本的缓存) keys_result = redis_client.keys(f"{CACHE_PREFIX}:*") # 确保 keys_result 是列表类型 keys = _ensure_list_type(keys_result) deleted_count = 0 if keys: deleted_count = redis_client.delete(*keys) return jsonify({'message': '缓存清空成功', 'deletedCount': deleted_count}) except Exception as e: return jsonify({'error': f'清空缓存失败: {str(e)}'}), 500 # ==================== Redis缓存管理函数 ==================== def generate_cache_key(df_hash: str, filename: Optional[str] = None) -> str: """生成缓存键(包含版本号)""" return generate_cache_key_with_prefix(df_hash, filename, CACHE_PREFIX) def generate_cache_key_with_prefix( df_hash: str, filename: Optional[str] = None, prefix: Optional[str] = None ) -> str: """生成缓存键,允许自定义前缀""" actual_prefix = prefix or CACHE_PREFIX if filename: encoded_filename = urllib.parse.quote(filename, safe='') key_str = f"{actual_prefix}:v{CACHE_VERSION}:{df_hash}:{encoded_filename}" else: key_str = f"{actual_prefix}:v{CACHE_VERSION}:{df_hash}" return key_str def save_to_cache(redis_client, cache_key: str, data: Any) -> bool: """将数据保存到Redis缓存""" if redis_client is None: return False try: serialized_data = pickle.dumps(data) redis_client.setex(cache_key, CACHE_EXPIRE_SECONDS, serialized_data) return True except Exception as e: print(f"保存到缓存失败: {e}") return False def load_from_cache(redis_client, cache_key: str) -> Optional[Any]: """仏Redis缓存加载数据""" if redis_client is None: return None try: cached_data = redis_client.get(cache_key) if cached_data: return pickle.loads(cached_data) return None except Exception as e: print(f"从缓存加载失败: {e}") return None def validate_analysis_results(results: Any) -> bool: """验证分析结果的数据完整性""" if not results or not isinstance(results, dict): return False sku_keys = [k for k in results.keys() if k != '_analysis_summary_'] if not sku_keys: return False first_sku = sku_keys[0] sku_data = results.get(first_sku, {}) required_fields = [ 'revenue_peak_idx', 'quantity_peak_idx', 'peak_revenue_date', 'peak_quantity_date', 'stages_map', 'stage_boundaries' ] for field in required_fields: if field not in sku_data: print(f"⚠️ 缓存数据缺少字段: {field},将重新计算") return False boundaries = sku_data.get('stage_boundaries', []) if boundaries: for boundary in boundaries: if 'index' not in boundary or 'date' not in boundary: print(f"⚠️ 缓存数据的 stage_boundaries 格式不完整,将重新计算") return False return True # ==================== 全局内存缓存管理 ==================== def set_global_cache(key: str, value: Any) -> None: """设置全局内存缓存""" print(f"💾 [CACHE] 设置缓存 key={key}, value_type={type(value).__name__}") _global_cache[key] = value print(f"💾 [CACHE] 缓存设置完成") def get_global_cache(key: str) -> Any: """获取全局内存缓存""" value = _global_cache.get(key) print(f"🔍 [CACHE] 获取缓存 key={key}, value_type={type(value).__name__}, is_none={value is None}") return value def clear_global_cache() -> None: """清空全局内存缓存""" _global_cache['latest_df'] = None _global_cache['latest_analysis_results'] = None _global_cache['latest_spu_analysis_results'] = None _global_cache['latest_hotproduct_results'] = None _global_cache['latest_spu_hotproduct_results'] = None _global_cache['latest_filename'] = None