| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 |
- """
- SKU生命周期核心分析服务
- 负责执行生命周期分析的核心逻辑
- """
- import pandas as pd
- from typing import Dict, Any
- from .lifecycle_analyzer_service import (
- get_dataframe_hash,
- enhanced_data_preprocessing,
- calculate_lifecycle_indicators,
- assess_lifecycle_completeness,
- calculate_hot_product_coefficient,
- MIN_DATA_DAYS
- )
- from .stage_divider_service import (
- intelligent_stage_division,
- analyze_incomplete_lifecycle_stages,
- determine_current_stage_intelligent,
- calculate_stage_statistics,
- LIFECYCLE_STAGES,
- MIN_STAGE_DURATION
- )
- from .cache_service import (
- get_redis_connection,
- generate_cache_key,
- save_to_cache,
- load_from_cache,
- validate_analysis_results
- )
- from .utils import convert_to_native_types, ensure_native_type
- # 分析参数
- HOT_PRODUCT_THRESHOLD = 0.6 # 爆款阈值
- def analyze_sku_lifecycle(df: pd.DataFrame, filename: str = None) -> Dict[str, Any]:
- """
- 执行SKU生命周期分析
-
- Args:
- df: 清洗后的数据DataFrame
- filename: 文件名(可选,用于缓存)
-
- Returns:
- 分析结果字典
- """
- # 初始化Redis并尝试从缓存加载
- redis_client = get_redis_connection()
- df_hash = get_dataframe_hash(df)
- cache_key = generate_cache_key(df_hash, filename)
-
- if redis_client:
- cached_results = load_from_cache(redis_client, cache_key)
- if cached_results and validate_analysis_results(cached_results):
- print("🎯 从缓存中加载分析结果")
- return cached_results
-
- results = {}
- print(f"📊 开始分析,共 {len(df):,} 条记录")
-
- # 按SKU和日期重采样
- daily_revenue = df.set_index('订单付款时间').groupby('SKU')['单笔销售额'].resample('D').sum()
- daily_quantity = df.set_index('订单付款时间').groupby('SKU')['购买数量'].resample('D').sum()
-
- skus = df['SKU'].unique()
- total_skus = len(skus)
- print(f"📊 识别到 {total_skus} 个SKU")
-
- hot_product_count = 0
-
- for idx, sku in enumerate(skus):
- if (idx + 1) % 10 == 0 or (idx + 1) == total_skus:
- print(f"📊 分析进度: {idx + 1}/{total_skus}")
-
- revenue_series = daily_revenue.get(sku, None)
- quantity_series = daily_quantity.get(sku, None)
-
- # 数据验证
- if revenue_series is None or quantity_series is None or len(revenue_series) < MIN_DATA_DAYS:
- results[sku] = _create_insufficient_data_result()
- continue
-
- # 去除无效数据
- valid_days = (revenue_series > 0) | (quantity_series > 0)
- revenue_series = revenue_series[valid_days]
- quantity_series = quantity_series[valid_days]
-
- if len(revenue_series) < MIN_DATA_DAYS:
- results[sku] = _create_insufficient_data_result()
- continue
-
- # 执行分析
- result_data = _analyze_single_sku(
- sku, revenue_series, quantity_series
- )
-
- if result_data['is_hot_product']:
- hot_product_count += 1
-
- results[sku] = result_data
-
- # 添加分析概要
- results['_analysis_summary_'] = _create_analysis_summary(
- results, total_skus, hot_product_count
- )
-
- # 转换数据类型
- results = convert_to_native_types(results)
-
- # 保存到缓存
- if redis_client:
- save_to_cache(redis_client, cache_key, results)
-
- return results
- def _create_insufficient_data_result() -> Dict[str, Any]:
- """创建数据不足的结果"""
- return {
- 'stage': '数据不足',
- 'current_stage': '数据不足',
- 'details': f'有效销售天数少于{MIN_DATA_DAYS}天',
- 'is_complete': False,
- 'hot_product_coefficient': 0,
- 'is_hot_product': False
- }
- def _analyze_single_sku(sku: str, revenue_series, quantity_series) -> Dict[str, Any]:
- """分析单个SKU"""
- # 数据预处理
- smoothed_revenue, smoothed_quantity = enhanced_data_preprocessing(
- revenue_series, quantity_series
- )
-
- # 计算指标
- indicators = calculate_lifecycle_indicators(smoothed_revenue, smoothed_quantity)
- is_complete, completeness_score, completion_details = assess_lifecycle_completeness(indicators)
-
- # 阶段划分
- stages, stage_boundaries, current_stage, details, next_stage_prediction = _divide_stages(
- is_complete, sku, revenue_series, quantity_series,
- smoothed_revenue, smoothed_quantity, indicators, completeness_score
- )
-
- # 确保边界包含日期
- for boundary in stage_boundaries:
- if 'date' not in boundary and 'index' in boundary:
- idx = boundary['index']
- if 0 <= idx < len(revenue_series):
- boundary['date'] = revenue_series.index[idx].isoformat()
-
- # 构建结果
- result_data = {
- 'stage': '完整生命周期' if is_complete else '不完整生命周期',
- 'details': details,
- 'revenue_series': [ensure_native_type(v, decimal_places=2) for v in revenue_series.tolist()],
- 'quantity_series': [ensure_native_type(v, decimal_places=2) for v in quantity_series.tolist()],
- 'smoothed_revenue': [ensure_native_type(v, decimal_places=2) for v in smoothed_revenue.tolist()],
- 'smoothed_quantity': [ensure_native_type(v, decimal_places=2) for v in smoothed_quantity.tolist()],
- 'is_complete': bool(is_complete),
- 'peak_revenue': ensure_native_type(indicators['revenue_peak'], decimal_places=2),
- 'peak_quantity': ensure_native_type(indicators['quantity_peak'], decimal_places=2),
- 'peak_revenue_date': revenue_series.index[indicators['revenue_peak_idx']].isoformat(),
- 'peak_quantity_date': quantity_series.index[indicators['quantity_peak_idx']].isoformat(),
- 'revenue_peak_idx': indicators['revenue_peak_idx'],
- 'quantity_peak_idx': indicators['quantity_peak_idx'],
- 'stages_map': stages,
- 'stage_boundaries': stage_boundaries,
- 'completeness_score': ensure_native_type(completeness_score, decimal_places=2),
- 'completion_details': completion_details,
- 'next_stage_prediction': next_stage_prediction if not is_complete else "",
- 'date_series': [d.isoformat() for d in revenue_series.index],
- 'current_stage': stages[-1] if is_complete and stages else current_stage,
- 'total_revenue': ensure_native_type(sum(revenue_series.tolist()), decimal_places=2),
- 'total_quantity': ensure_native_type(sum(quantity_series.tolist()), decimal_places=2)
- }
-
- # 计算阶段统计
- if stages:
- stage_statistics = calculate_stage_statistics(
- revenue_series.tolist(), quantity_series.tolist(),
- [d.isoformat() for d in revenue_series.index], stage_boundaries, stages
- )
- result_data['stage_statistics'] = stage_statistics
- else:
- result_data['stage_statistics'] = {}
-
- # 计算爆款系数
- hot_product_coefficient = ensure_native_type(
- calculate_hot_product_coefficient(result_data, indicators, len(revenue_series)), decimal_places=2
- )
- result_data['hot_product_coefficient'] = hot_product_coefficient
- result_data['is_hot_product'] = bool(hot_product_coefficient > HOT_PRODUCT_THRESHOLD)
-
- return result_data
- def _divide_stages(is_complete, sku, revenue_series, quantity_series,
- smoothed_revenue, smoothed_quantity, indicators, completeness_score):
- """执行阶段划分"""
- current_stage = '引入期'
- stage_boundaries = []
- stages = []
- details = ""
- next_stage_prediction = ""
-
- if is_complete:
- peak_time_revenue = revenue_series.index[indicators['revenue_peak_idx']]
- peak_time_quantity = quantity_series.index[indicators['quantity_peak_idx']]
- details = f"该SKU在 {peak_time_revenue.strftime('%Y-%m-%d')} 达到销售额峰值。完整性得分:{completeness_score}/100"
-
- stage_boundaries = intelligent_stage_division(
- smoothed_revenue, smoothed_quantity, indicators, revenue_series, MIN_STAGE_DURATION
- )
-
- if len(stage_boundaries) < 3:
- is_complete = False
- details += f" 但阶段划分不完整"
- stages = [LIFECYCLE_STAGES[0]] * len(revenue_series)
- else:
- stages = _build_stages_array(revenue_series, stage_boundaries)
-
- # 验证阶段持续时间
- stage_durations = {s: sum(1 for st in stages if st == s) for s in LIFECYCLE_STAGES}
- insufficient_stages = [s for s, d in stage_durations.items()
- if d > 0 and d < MIN_STAGE_DURATION]
- if insufficient_stages:
- is_complete = False
- details += f" 某些阶段持续时间不足{MIN_STAGE_DURATION}天"
-
- if not is_complete:
- stage_boundaries = analyze_incomplete_lifecycle_stages(
- smoothed_revenue, smoothed_quantity, indicators, revenue_series
- )
- current_stage, stage_description, next_stage_prediction = determine_current_stage_intelligent(
- smoothed_revenue, smoothed_quantity, indicators, revenue_series
- )
- details = stage_description
-
- stages = _build_stages_array(revenue_series, stage_boundaries)
-
- # 只保留当前阶段及之前的阶段
- if current_stage in LIFECYCLE_STAGES:
- current_stage_index = LIFECYCLE_STAGES.index(current_stage)
- valid_stages = LIFECYCLE_STAGES[:current_stage_index+1]
- for i in range(len(stages)):
- if stages[i] not in valid_stages:
- stages[i] = current_stage
- stage_boundaries = [b for b in stage_boundaries
- if b.get('type', '').split('→')[-1] in valid_stages]
-
- return stages, stage_boundaries, current_stage, details, next_stage_prediction
- def _build_stages_array(revenue_series, stage_boundaries):
- """根据边界构建阶段数组"""
- stages = [LIFECYCLE_STAGES[0]] * len(revenue_series)
- for boundary in stage_boundaries:
- boundary_type = boundary.get('type', '')
- boundary_index = boundary.get('index', 0)
- if boundary_index < len(stages):
- if '引入期→成长期' in boundary_type:
- for i in range(boundary_index, len(stages)):
- if stages[i] == '引入期':
- stages[i] = '成长期'
- elif '成长期→成熟期' in boundary_type:
- for i in range(boundary_index, len(stages)):
- if stages[i] == '成长期':
- stages[i] = '成熟期'
- elif '成熟期→衰退期' in boundary_type:
- for i in range(boundary_index, len(stages)):
- if stages[i] == '成熟期':
- stages[i] = '衰退期'
- return stages
- def _create_analysis_summary(results, total_skus, hot_product_count):
- """创建分析概要"""
- complete_skus = sum(1 for r in results.values() if r.get('is_complete', False))
- incomplete_skus = sum(1 for r in results.values()
- if not r.get('is_complete', True) and r.get('stage') != '数据不足')
- insufficient_skus = sum(1 for r in results.values() if r.get('stage') == '数据不足')
-
- return {
- 'total_skus': total_skus,
- 'complete_skus': complete_skus,
- 'incomplete_skus': incomplete_skus,
- 'insufficient_skus': insufficient_skus,
- 'hot_product_count': hot_product_count,
- 'hot_product_rate': hot_product_count / total_skus if total_skus > 0 else 0
- }
|