| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263 |
- """缓存服务 - 统一管理Redis缓存和内存缓存"""
- import redis
- import pickle
- import hashlib
- import urllib.parse
- import os
- import pandas as pd
- from flask import jsonify
- from typing import Dict, Any, Optional
- # Redis配置
- REDIS_HOST = os.getenv('REDIS_HOST', 'localhost')
- REDIS_PORT = int(os.getenv('REDIS_PORT', 6379))
- REDIS_DB = int(os.getenv('REDIS_DB', 0))
- REDIS_PASSWORD = os.getenv('REDIS_PASSWORD', None)
- CACHE_PREFIX = os.getenv('CACHE_PREFIX', 'sku_analysis')
- CACHE_EXPIRE_SECONDS = 86400 # 24小时
- CACHE_VERSION = '2.0'
- # 全局内存缓存
- _global_cache: Dict[str, Any] = {
- 'latest_df': None,
- 'latest_analysis_results': None,
- 'latest_spu_analysis_results': None,
- 'latest_hotproduct_results': None,
- 'latest_spu_hotproduct_results': None,
- 'latest_filename': None
- }
- def get_redis_connection():
- """创建Redis连接"""
- try:
- redis_client = redis.Redis(
- host=REDIS_HOST,
- port=REDIS_PORT,
- db=REDIS_DB,
- password=REDIS_PASSWORD,
- decode_responses=False,
- socket_connect_timeout=5,
- socket_timeout=5
- )
- # 测试连接
- redis_client.ping()
- return redis_client
- except Exception as e:
- print(f"无法连接到Redis服务器: {e}")
- return None
- def _ensure_list_type(keys_result):
- """确保keys_result是列表类型"""
- # 处理可能的异步结果或其他非列表类型
- if keys_result is None:
- return []
-
- # 如果是字节字符串,转换为列表
- if isinstance(keys_result, bytes):
- return [keys_result]
-
- # 如果已经是列表或元组,直接转换为列表
- if isinstance(keys_result, (list, tuple)):
- return list(keys_result)
-
- # 尝试转换为列表
- try:
- return list(keys_result)
- except (TypeError, ValueError):
- # 如果无法转换,返回空列表
- return []
- def _ensure_int_type(value):
- """确保值是整数类型"""
- if value is None:
- return 0
- if isinstance(value, int):
- return value
- try:
- return int(value)
- except (TypeError, ValueError):
- return 0
- def get_cache_info():
- """获取缓存信息"""
- try:
- # 初始化Redis连接
- redis_client = get_redis_connection()
- if not redis_client:
- return jsonify({'message': 'Redis连接失败'}), 500
-
- # 获取所有以sku_analysis开头的键(包括旧版本和新版本的缓存)
- keys_result = redis_client.keys(f"{CACHE_PREFIX}:*")
-
- # 确保keys_result是列表类型
- keys = _ensure_list_type(keys_result)
-
- cache_details = []
-
- if keys:
- for key in keys:
- try:
- key_str = key.decode('utf-8') if isinstance(key, bytes) else key
- # 获取缓存数据大小
- size = redis_client.memory_usage(key)
-
- # 获取过期时间
- ttl = redis_client.ttl(key)
- # 确保ttl是整数类型
- ttl_int = _ensure_int_type(ttl)
-
- # 从键中提取文件名信息
- filename = "未知文件"
- if ":" in key_str:
- parts = key_str.split(":")
- if len(parts) >= 3:
- filename = parts[2]
-
- cache_info = {
- 'key': key_str,
- 'filename': filename,
- 'size': f"{size} bytes" if size else "未知",
- 'ttl': f"{ttl_int} 秒" if ttl_int > 0 else "永久"
- }
-
- cache_details.append(cache_info)
- except Exception as e:
- print(f"获取缓存键详细信息失败: {e}")
-
- return jsonify({
- 'message': '缓存信息获取成功',
- 'keyCount': len(cache_details),
- 'cacheDetails': cache_details
- })
- except Exception as e:
- return jsonify({'error': f'获取缓存信息失败: {str(e)}'}), 500
- def clear_cache():
- """清空缓存"""
- try:
- # 初始化Redis连接
- redis_client = get_redis_connection()
- if not redis_client:
- return jsonify({'message': 'Redis连接失败'}), 500
-
- # 清空所有以sku_analysis开头的键(包括所有版本的缓存)
- keys_result = redis_client.keys(f"{CACHE_PREFIX}:*")
-
- # 确保 keys_result 是列表类型
- keys = _ensure_list_type(keys_result)
-
- deleted_count = 0
- if keys:
- deleted_count = redis_client.delete(*keys)
-
- return jsonify({'message': '缓存清空成功', 'deletedCount': deleted_count})
- except Exception as e:
- return jsonify({'error': f'清空缓存失败: {str(e)}'}), 500
- # ==================== Redis缓存管理函数 ====================
- def generate_cache_key(df_hash: str, filename: Optional[str] = None) -> str:
- """生成缓存键(包含版本号)"""
- return generate_cache_key_with_prefix(df_hash, filename, CACHE_PREFIX)
- def generate_cache_key_with_prefix(
- df_hash: str,
- filename: Optional[str] = None,
- prefix: Optional[str] = None
- ) -> str:
- """生成缓存键,允许自定义前缀"""
- actual_prefix = prefix or CACHE_PREFIX
- if filename:
- encoded_filename = urllib.parse.quote(filename, safe='')
- key_str = f"{actual_prefix}:v{CACHE_VERSION}:{df_hash}:{encoded_filename}"
- else:
- key_str = f"{actual_prefix}:v{CACHE_VERSION}:{df_hash}"
- return key_str
- def save_to_cache(redis_client, cache_key: str, data: Any) -> bool:
- """将数据保存到Redis缓存"""
- if redis_client is None:
- return False
- try:
- serialized_data = pickle.dumps(data)
- redis_client.setex(cache_key, CACHE_EXPIRE_SECONDS, serialized_data)
- return True
- except Exception as e:
- print(f"保存到缓存失败: {e}")
- return False
- def load_from_cache(redis_client, cache_key: str) -> Optional[Any]:
- """仏Redis缓存加载数据"""
- if redis_client is None:
- return None
- try:
- cached_data = redis_client.get(cache_key)
- if cached_data:
- return pickle.loads(cached_data)
- return None
- except Exception as e:
- print(f"从缓存加载失败: {e}")
- return None
- def validate_analysis_results(results: Any) -> bool:
- """验证分析结果的数据完整性"""
- if not results or not isinstance(results, dict):
- return False
-
- sku_keys = [k for k in results.keys() if k != '_analysis_summary_']
- if not sku_keys:
- return False
-
- first_sku = sku_keys[0]
- sku_data = results.get(first_sku, {})
-
- required_fields = [
- 'revenue_peak_idx', 'quantity_peak_idx',
- 'peak_revenue_date', 'peak_quantity_date',
- 'stages_map', 'stage_boundaries'
- ]
-
- for field in required_fields:
- if field not in sku_data:
- print(f"⚠️ 缓存数据缺少字段: {field},将重新计算")
- return False
-
- boundaries = sku_data.get('stage_boundaries', [])
- if boundaries:
- for boundary in boundaries:
- if 'index' not in boundary or 'date' not in boundary:
- print(f"⚠️ 缓存数据的 stage_boundaries 格式不完整,将重新计算")
- return False
-
- return True
- # ==================== 全局内存缓存管理 ====================
- def set_global_cache(key: str, value: Any) -> None:
- """设置全局内存缓存"""
- print(f"💾 [CACHE] 设置缓存 key={key}, value_type={type(value).__name__}")
- _global_cache[key] = value
- print(f"💾 [CACHE] 缓存设置完成")
- def get_global_cache(key: str) -> Any:
- """获取全局内存缓存"""
- value = _global_cache.get(key)
- print(f"🔍 [CACHE] 获取缓存 key={key}, value_type={type(value).__name__}, is_none={value is None}")
- return value
- def clear_global_cache() -> None:
- """清空全局内存缓存"""
- _global_cache['latest_df'] = None
- _global_cache['latest_analysis_results'] = None
- _global_cache['latest_spu_analysis_results'] = None
- _global_cache['latest_hotproduct_results'] = None
- _global_cache['latest_spu_hotproduct_results'] = None
- _global_cache['latest_filename'] = None
|