spu_lifecycle_service.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. """
  2. SPU生命周期核心分析服务
  3. 负责执行生命周期分析的核心逻辑
  4. """
  5. import pandas as pd
  6. from typing import Dict, Any
  7. from .lifecycle_analyzer_service import (
  8. get_dataframe_hash,
  9. enhanced_data_preprocessing,
  10. calculate_lifecycle_indicators,
  11. assess_lifecycle_completeness,
  12. calculate_hot_product_coefficient,
  13. MIN_DATA_DAYS
  14. )
  15. from .stage_divider_service import (
  16. intelligent_stage_division,
  17. analyze_incomplete_lifecycle_stages,
  18. determine_current_stage_intelligent,
  19. calculate_stage_statistics,
  20. LIFECYCLE_STAGES,
  21. MIN_STAGE_DURATION
  22. )
  23. from .cache_service import (
  24. get_redis_connection,
  25. generate_cache_key_with_prefix,
  26. save_to_cache,
  27. load_from_cache,
  28. validate_analysis_results
  29. )
  30. from .utils import convert_to_native_types, ensure_native_type
  31. # 分析参数
  32. HOT_PRODUCT_THRESHOLD = 0.6 # 爆款阈值
  33. def analyze_spu_lifecycle(df: pd.DataFrame, filename: str = None) -> Dict[str, Any]:
  34. """
  35. 执行SPU生命周期分析
  36. Args:
  37. df: 清洗后的数据DataFrame
  38. filename: 文件名(可选,用于缓存)
  39. Returns:
  40. 分析结果字典
  41. """
  42. # 初始化Redis并尝试从缓存加载
  43. redis_client = get_redis_connection()
  44. df_hash = get_dataframe_hash(df)
  45. cache_key = generate_cache_key_with_prefix(df_hash, filename, 'spu_analysis')
  46. if redis_client:
  47. cached_results = load_from_cache(redis_client, cache_key)
  48. if cached_results and validate_analysis_results(cached_results):
  49. print("🎯 从缓存中加载分析结果")
  50. return cached_results
  51. results = {}
  52. print(f"📊 开始分析,共 {len(df):,} 条记录")
  53. # 按SPU和日期重采样
  54. daily_revenue = df.set_index('订单付款时间').groupby('SPU')['单笔销售额'].resample('D').sum()
  55. daily_quantity = df.set_index('订单付款时间').groupby('SPU')['购买数量'].resample('D').sum()
  56. spus = df['SPU'].unique()
  57. total_spus = len(spus)
  58. print(f"📊 识别到 {total_spus} 个SPU")
  59. hot_product_count = 0
  60. for idx, spu in enumerate(spus):
  61. if (idx + 1) % 10 == 0 or (idx + 1) == total_spus:
  62. print(f"📊 分析进度: {idx + 1}/{total_spus}")
  63. revenue_series = daily_revenue.get(spu, None)
  64. quantity_series = daily_quantity.get(spu, None)
  65. # 数据验证
  66. if revenue_series is None or quantity_series is None or len(revenue_series) < MIN_DATA_DAYS:
  67. results[spu] = _create_insufficient_data_result()
  68. continue
  69. # 去除无效数据
  70. valid_days = (revenue_series > 0) | (quantity_series > 0)
  71. revenue_series = revenue_series[valid_days]
  72. quantity_series = quantity_series[valid_days]
  73. if len(revenue_series) < MIN_DATA_DAYS:
  74. results[spu] = _create_insufficient_data_result()
  75. continue
  76. # 执行分析
  77. result_data = _analyze_single_spu(
  78. spu, revenue_series, quantity_series
  79. )
  80. if result_data['is_hot_product']:
  81. hot_product_count += 1
  82. results[spu] = result_data
  83. # 添加分析概要
  84. results['_analysis_summary_'] = _create_analysis_summary(
  85. results, total_spus, hot_product_count
  86. )
  87. # 转换数据类型
  88. results = convert_to_native_types(results)
  89. # 保存到缓存
  90. if redis_client:
  91. save_to_cache(redis_client, cache_key, results)
  92. return results
  93. def _create_insufficient_data_result() -> Dict[str, Any]:
  94. """创建数据不足的结果"""
  95. return {
  96. 'stage': '数据不足',
  97. 'current_stage': '数据不足',
  98. 'details': f'有效销售天数少于{MIN_DATA_DAYS}天',
  99. 'is_complete': False,
  100. 'hot_product_coefficient': 0,
  101. 'is_hot_product': False
  102. }
  103. def _analyze_single_spu(spu: str, revenue_series, quantity_series) -> Dict[str, Any]:
  104. """分析单个SPU"""
  105. # 数据预处理
  106. smoothed_revenue, smoothed_quantity = enhanced_data_preprocessing(
  107. revenue_series, quantity_series
  108. )
  109. # 计算指标
  110. indicators = calculate_lifecycle_indicators(smoothed_revenue, smoothed_quantity)
  111. is_complete, completeness_score, completion_details = assess_lifecycle_completeness(indicators)
  112. # 阶段划分
  113. stages, stage_boundaries, current_stage, details, next_stage_prediction = _divide_stages(
  114. is_complete, spu, revenue_series, quantity_series,
  115. smoothed_revenue, smoothed_quantity, indicators, completeness_score
  116. )
  117. # 确保边界包含日期
  118. for boundary in stage_boundaries:
  119. if 'date' not in boundary and 'index' in boundary:
  120. idx = boundary['index']
  121. if 0 <= idx < len(revenue_series):
  122. boundary['date'] = revenue_series.index[idx].isoformat()
  123. # 构建结果
  124. result_data = {
  125. 'stage': '完整生命周期' if is_complete else '不完整生命周期',
  126. 'details': details,
  127. 'revenue_series': [ensure_native_type(v, decimal_places=2) for v in revenue_series.tolist()],
  128. 'quantity_series': [ensure_native_type(v, decimal_places=2) for v in quantity_series.tolist()],
  129. 'smoothed_revenue': [ensure_native_type(v, decimal_places=2) for v in smoothed_revenue.tolist()],
  130. 'smoothed_quantity': [ensure_native_type(v, decimal_places=2) for v in smoothed_quantity.tolist()],
  131. 'is_complete': bool(is_complete),
  132. 'peak_revenue': ensure_native_type(indicators['revenue_peak'], decimal_places=2),
  133. 'peak_quantity': ensure_native_type(indicators['quantity_peak'], decimal_places=2),
  134. 'peak_revenue_date': revenue_series.index[indicators['revenue_peak_idx']].isoformat(),
  135. 'peak_quantity_date': quantity_series.index[indicators['quantity_peak_idx']].isoformat(),
  136. 'revenue_peak_idx': indicators['revenue_peak_idx'],
  137. 'quantity_peak_idx': indicators['quantity_peak_idx'],
  138. 'stages_map': stages,
  139. 'stage_boundaries': stage_boundaries,
  140. 'completeness_score': ensure_native_type(completeness_score, decimal_places=2),
  141. 'completion_details': completion_details,
  142. 'next_stage_prediction': next_stage_prediction if not is_complete else "",
  143. 'date_series': [d.isoformat() for d in revenue_series.index],
  144. 'current_stage': stages[-1] if is_complete and stages else current_stage,
  145. 'total_revenue': ensure_native_type(sum(revenue_series.tolist()), decimal_places=2),
  146. 'total_quantity': ensure_native_type(sum(quantity_series.tolist()), decimal_places=2)
  147. }
  148. # 计算阶段统计
  149. if stages:
  150. stage_statistics = calculate_stage_statistics(
  151. revenue_series.tolist(), quantity_series.tolist(),
  152. [d.isoformat() for d in revenue_series.index], stage_boundaries, stages
  153. )
  154. result_data['stage_statistics'] = stage_statistics
  155. else:
  156. result_data['stage_statistics'] = {}
  157. # 计算爆款系数
  158. hot_product_coefficient = ensure_native_type(
  159. calculate_hot_product_coefficient(result_data, indicators, len(revenue_series)), decimal_places=2
  160. )
  161. result_data['hot_product_coefficient'] = hot_product_coefficient
  162. result_data['is_hot_product'] = bool(hot_product_coefficient > HOT_PRODUCT_THRESHOLD)
  163. return result_data
  164. def _divide_stages(is_complete, spu, revenue_series, quantity_series,
  165. smoothed_revenue, smoothed_quantity, indicators, completeness_score):
  166. """执行阶段划分"""
  167. current_stage = '引入期'
  168. stage_boundaries = []
  169. stages = []
  170. details = ""
  171. next_stage_prediction = ""
  172. if is_complete:
  173. peak_time_revenue = revenue_series.index[indicators['revenue_peak_idx']]
  174. peak_time_quantity = quantity_series.index[indicators['quantity_peak_idx']]
  175. details = f"该SPU在 {peak_time_revenue.strftime('%Y-%m-%d')} 达到销售额峰值。完整性得分:{completeness_score}/100"
  176. stage_boundaries = intelligent_stage_division(
  177. smoothed_revenue, smoothed_quantity, indicators, revenue_series, MIN_STAGE_DURATION
  178. )
  179. if len(stage_boundaries) < 3:
  180. is_complete = False
  181. details += f" 但阶段划分不完整"
  182. stages = [LIFECYCLE_STAGES[0]] * len(revenue_series)
  183. else:
  184. stages = _build_stages_array(revenue_series, stage_boundaries)
  185. # 验证阶段持续时间
  186. stage_durations = {s: sum(1 for st in stages if st == s) for s in LIFECYCLE_STAGES}
  187. insufficient_stages = [s for s, d in stage_durations.items()
  188. if d > 0 and d < MIN_STAGE_DURATION]
  189. if insufficient_stages:
  190. is_complete = False
  191. details += f" 某些阶段持续时间不足{MIN_STAGE_DURATION}天"
  192. if not is_complete:
  193. stage_boundaries = analyze_incomplete_lifecycle_stages(
  194. smoothed_revenue, smoothed_quantity, indicators, revenue_series
  195. )
  196. current_stage, stage_description, next_stage_prediction = determine_current_stage_intelligent(
  197. smoothed_revenue, smoothed_quantity, indicators, revenue_series
  198. )
  199. details = stage_description
  200. stages = _build_stages_array(revenue_series, stage_boundaries)
  201. # 只保留当前阶段及之前的阶段
  202. if current_stage in LIFECYCLE_STAGES:
  203. current_stage_index = LIFECYCLE_STAGES.index(current_stage)
  204. valid_stages = LIFECYCLE_STAGES[:current_stage_index+1]
  205. for i in range(len(stages)):
  206. if stages[i] not in valid_stages:
  207. stages[i] = current_stage
  208. stage_boundaries = [b for b in stage_boundaries
  209. if b.get('type', '').split('→')[-1] in valid_stages]
  210. return stages, stage_boundaries, current_stage, details, next_stage_prediction
  211. def _build_stages_array(revenue_series, stage_boundaries):
  212. """根据边界构建阶段数组"""
  213. stages = [LIFECYCLE_STAGES[0]] * len(revenue_series)
  214. for boundary in stage_boundaries:
  215. boundary_type = boundary.get('type', '')
  216. boundary_index = boundary.get('index', 0)
  217. if boundary_index < len(stages):
  218. if '引入期→成长期' in boundary_type:
  219. for i in range(boundary_index, len(stages)):
  220. if stages[i] == '引入期':
  221. stages[i] = '成长期'
  222. elif '成长期→成熟期' in boundary_type:
  223. for i in range(boundary_index, len(stages)):
  224. if stages[i] == '成长期':
  225. stages[i] = '成熟期'
  226. elif '成熟期→衰退期' in boundary_type:
  227. for i in range(boundary_index, len(stages)):
  228. if stages[i] == '成熟期':
  229. stages[i] = '衰退期'
  230. return stages
  231. def _create_analysis_summary(results, total_spus, hot_product_count):
  232. """创建分析概要"""
  233. complete_spus = sum(1 for r in results.values() if r.get('is_complete', False))
  234. incomplete_spus = sum(1 for r in results.values()
  235. if not r.get('is_complete', True) and r.get('stage') != '数据不足')
  236. insufficient_spus = sum(1 for r in results.values() if r.get('stage') == '数据不足')
  237. return {
  238. 'total_spus': total_spus,
  239. 'complete_spus': complete_spus,
  240. 'incomplete_spus': incomplete_spus,
  241. 'insufficient_spus': insufficient_spus,
  242. 'hot_product_count': hot_product_count,
  243. 'hot_product_rate': hot_product_count / total_spus if total_spus > 0 else 0
  244. }