""" SKU生命周期核心分析服务 负责执行生命周期分析的核心逻辑 """ import pandas as pd from typing import Dict, Any from .lifecycle_analyzer_service import ( get_dataframe_hash, enhanced_data_preprocessing, calculate_lifecycle_indicators, assess_lifecycle_completeness, calculate_hot_product_coefficient, MIN_DATA_DAYS ) from .stage_divider_service import ( intelligent_stage_division, analyze_incomplete_lifecycle_stages, determine_current_stage_intelligent, calculate_stage_statistics, LIFECYCLE_STAGES, MIN_STAGE_DURATION ) from .cache_service import ( get_redis_connection, generate_cache_key, save_to_cache, load_from_cache, validate_analysis_results ) from .utils import convert_to_native_types, ensure_native_type # 分析参数 HOT_PRODUCT_THRESHOLD = 0.6 # 爆款阈值 def analyze_sku_lifecycle(df: pd.DataFrame, filename: str = None) -> Dict[str, Any]: """ 执行SKU生命周期分析 Args: df: 清洗后的数据DataFrame filename: 文件名(可选,用于缓存) Returns: 分析结果字典 """ # 初始化Redis并尝试从缓存加载 redis_client = get_redis_connection() df_hash = get_dataframe_hash(df) cache_key = generate_cache_key(df_hash, filename) if redis_client: cached_results = load_from_cache(redis_client, cache_key) if cached_results and validate_analysis_results(cached_results): print("🎯 从缓存中加载分析结果") return cached_results results = {} print(f"📊 开始分析,共 {len(df):,} 条记录") # 按SKU和日期重采样 daily_revenue = df.set_index('订单付款时间').groupby('SKU')['单笔销售额'].resample('D').sum() daily_quantity = df.set_index('订单付款时间').groupby('SKU')['购买数量'].resample('D').sum() skus = df['SKU'].unique() total_skus = len(skus) print(f"📊 识别到 {total_skus} 个SKU") hot_product_count = 0 for idx, sku in enumerate(skus): if (idx + 1) % 10 == 0 or (idx + 1) == total_skus: print(f"📊 分析进度: {idx + 1}/{total_skus}") revenue_series = daily_revenue.get(sku, None) quantity_series = daily_quantity.get(sku, None) # 数据验证 if revenue_series is None or quantity_series is None or len(revenue_series) < MIN_DATA_DAYS: results[sku] = _create_insufficient_data_result() continue # 去除无效数据 valid_days = (revenue_series > 0) | (quantity_series > 0) revenue_series = revenue_series[valid_days] quantity_series = quantity_series[valid_days] if len(revenue_series) < MIN_DATA_DAYS: results[sku] = _create_insufficient_data_result() continue # 执行分析 result_data = _analyze_single_sku( sku, revenue_series, quantity_series ) if result_data['is_hot_product']: hot_product_count += 1 results[sku] = result_data # 添加分析概要 results['_analysis_summary_'] = _create_analysis_summary( results, total_skus, hot_product_count ) # 转换数据类型 results = convert_to_native_types(results) # 保存到缓存 if redis_client: save_to_cache(redis_client, cache_key, results) return results def _create_insufficient_data_result() -> Dict[str, Any]: """创建数据不足的结果""" return { 'stage': '数据不足', 'current_stage': '数据不足', 'details': f'有效销售天数少于{MIN_DATA_DAYS}天', 'is_complete': False, 'hot_product_coefficient': 0, 'is_hot_product': False } def _analyze_single_sku(sku: str, revenue_series, quantity_series) -> Dict[str, Any]: """分析单个SKU""" # 数据预处理 smoothed_revenue, smoothed_quantity = enhanced_data_preprocessing( revenue_series, quantity_series ) # 计算指标 indicators = calculate_lifecycle_indicators(smoothed_revenue, smoothed_quantity) is_complete, completeness_score, completion_details = assess_lifecycle_completeness(indicators) # 阶段划分 stages, stage_boundaries, current_stage, details, next_stage_prediction = _divide_stages( is_complete, sku, revenue_series, quantity_series, smoothed_revenue, smoothed_quantity, indicators, completeness_score ) # 确保边界包含日期 for boundary in stage_boundaries: if 'date' not in boundary and 'index' in boundary: idx = boundary['index'] if 0 <= idx < len(revenue_series): boundary['date'] = revenue_series.index[idx].isoformat() # 构建结果 result_data = { 'stage': '完整生命周期' if is_complete else '不完整生命周期', 'details': details, 'revenue_series': [ensure_native_type(v, decimal_places=2) for v in revenue_series.tolist()], 'quantity_series': [ensure_native_type(v, decimal_places=2) for v in quantity_series.tolist()], 'smoothed_revenue': [ensure_native_type(v, decimal_places=2) for v in smoothed_revenue.tolist()], 'smoothed_quantity': [ensure_native_type(v, decimal_places=2) for v in smoothed_quantity.tolist()], 'is_complete': bool(is_complete), 'peak_revenue': ensure_native_type(indicators['revenue_peak'], decimal_places=2), 'peak_quantity': ensure_native_type(indicators['quantity_peak'], decimal_places=2), 'peak_revenue_date': revenue_series.index[indicators['revenue_peak_idx']].isoformat(), 'peak_quantity_date': quantity_series.index[indicators['quantity_peak_idx']].isoformat(), 'revenue_peak_idx': indicators['revenue_peak_idx'], 'quantity_peak_idx': indicators['quantity_peak_idx'], 'stages_map': stages, 'stage_boundaries': stage_boundaries, 'completeness_score': ensure_native_type(completeness_score, decimal_places=2), 'completion_details': completion_details, 'next_stage_prediction': next_stage_prediction if not is_complete else "", 'date_series': [d.isoformat() for d in revenue_series.index], 'current_stage': stages[-1] if is_complete and stages else current_stage, 'total_revenue': ensure_native_type(sum(revenue_series.tolist()), decimal_places=2), 'total_quantity': ensure_native_type(sum(quantity_series.tolist()), decimal_places=2) } # 计算阶段统计 if stages: stage_statistics = calculate_stage_statistics( revenue_series.tolist(), quantity_series.tolist(), [d.isoformat() for d in revenue_series.index], stage_boundaries, stages ) result_data['stage_statistics'] = stage_statistics else: result_data['stage_statistics'] = {} # 计算爆款系数 hot_product_coefficient = ensure_native_type( calculate_hot_product_coefficient(result_data, indicators, len(revenue_series)), decimal_places=2 ) result_data['hot_product_coefficient'] = hot_product_coefficient result_data['is_hot_product'] = bool(hot_product_coefficient > HOT_PRODUCT_THRESHOLD) return result_data def _divide_stages(is_complete, sku, revenue_series, quantity_series, smoothed_revenue, smoothed_quantity, indicators, completeness_score): """执行阶段划分""" current_stage = '引入期' stage_boundaries = [] stages = [] details = "" next_stage_prediction = "" if is_complete: peak_time_revenue = revenue_series.index[indicators['revenue_peak_idx']] peak_time_quantity = quantity_series.index[indicators['quantity_peak_idx']] details = f"该SKU在 {peak_time_revenue.strftime('%Y-%m-%d')} 达到销售额峰值。完整性得分:{completeness_score}/100" stage_boundaries = intelligent_stage_division( smoothed_revenue, smoothed_quantity, indicators, revenue_series, MIN_STAGE_DURATION ) if len(stage_boundaries) < 3: is_complete = False details += f" 但阶段划分不完整" stages = [LIFECYCLE_STAGES[0]] * len(revenue_series) else: stages = _build_stages_array(revenue_series, stage_boundaries) # 验证阶段持续时间 stage_durations = {s: sum(1 for st in stages if st == s) for s in LIFECYCLE_STAGES} insufficient_stages = [s for s, d in stage_durations.items() if d > 0 and d < MIN_STAGE_DURATION] if insufficient_stages: is_complete = False details += f" 某些阶段持续时间不足{MIN_STAGE_DURATION}天" if not is_complete: stage_boundaries = analyze_incomplete_lifecycle_stages( smoothed_revenue, smoothed_quantity, indicators, revenue_series ) current_stage, stage_description, next_stage_prediction = determine_current_stage_intelligent( smoothed_revenue, smoothed_quantity, indicators, revenue_series ) details = stage_description stages = _build_stages_array(revenue_series, stage_boundaries) # 只保留当前阶段及之前的阶段 if current_stage in LIFECYCLE_STAGES: current_stage_index = LIFECYCLE_STAGES.index(current_stage) valid_stages = LIFECYCLE_STAGES[:current_stage_index+1] for i in range(len(stages)): if stages[i] not in valid_stages: stages[i] = current_stage stage_boundaries = [b for b in stage_boundaries if b.get('type', '').split('→')[-1] in valid_stages] return stages, stage_boundaries, current_stage, details, next_stage_prediction def _build_stages_array(revenue_series, stage_boundaries): """根据边界构建阶段数组""" stages = [LIFECYCLE_STAGES[0]] * len(revenue_series) for boundary in stage_boundaries: boundary_type = boundary.get('type', '') boundary_index = boundary.get('index', 0) if boundary_index < len(stages): if '引入期→成长期' in boundary_type: for i in range(boundary_index, len(stages)): if stages[i] == '引入期': stages[i] = '成长期' elif '成长期→成熟期' in boundary_type: for i in range(boundary_index, len(stages)): if stages[i] == '成长期': stages[i] = '成熟期' elif '成熟期→衰退期' in boundary_type: for i in range(boundary_index, len(stages)): if stages[i] == '成熟期': stages[i] = '衰退期' return stages def _create_analysis_summary(results, total_skus, hot_product_count): """创建分析概要""" complete_skus = sum(1 for r in results.values() if r.get('is_complete', False)) incomplete_skus = sum(1 for r in results.values() if not r.get('is_complete', True) and r.get('stage') != '数据不足') insufficient_skus = sum(1 for r in results.values() if r.get('stage') == '数据不足') return { 'total_skus': total_skus, 'complete_skus': complete_skus, 'incomplete_skus': incomplete_skus, 'insufficient_skus': insufficient_skus, 'hot_product_count': hot_product_count, 'hot_product_rate': hot_product_count / total_skus if total_skus > 0 else 0 }