""" 生命周期分析核心模块 """ import pandas as pd import numpy as np from scipy.signal import savgol_filter, find_peaks import hashlib from .utils import ensure_native_type # 分析参数 MIN_PROMINENCE = 0.1 # 峰值检测最小突出度 HOT_PRODUCT_THRESHOLD = 0.6 # 爆款阈值 COMPLETENESS_SCORE_THRESHOLD = 80 # 完整性得分阈值 MIN_DATA_DAYS = 30 # 最小数据天数 def get_dataframe_hash(df): """计算DataFrame的哈希值用于缓存键""" sample_data = df[['订单状态', '订单付款时间', '买家实际支付金额', 'SKU']].head(100) hash_str = f"{sample_data.to_string().encode()}_{df.shape}" return hashlib.md5(hash_str.encode()).hexdigest() def enhanced_data_preprocessing(revenue_series, quantity_series): """增强的数据预处理,使用多种平滑技术""" # 移除异常值 def remove_outliers(series): Q1 = series.quantile(0.25) Q3 = series.quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR return series.clip(lower=max(0, lower_bound), upper=upper_bound) revenue_clean = remove_outliers(revenue_series) quantity_clean = remove_outliers(quantity_series) # 自适应平滑窗口大小 data_length = len(revenue_series) if data_length <= 30: window_length = 5 elif data_length <= 90: window_length = 7 elif data_length <= 180: window_length = 11 else: window_length = 15 window_length = min(window_length, data_length - 1 if data_length % 2 == 0 else data_length) if window_length < 3: window_length = 3 if window_length % 2 == 0: window_length -= 1 # 使用Savgol滤波器进行平滑 smoothed_revenue = savgol_filter(revenue_clean.values, window_length, 2) smoothed_quantity = savgol_filter(quantity_clean.values, window_length, 2) # 应用移动平均进行二次平滑 ma_window = max(3, window_length // 3) revenue_ma = pd.Series(smoothed_revenue, index=revenue_series.index).rolling(window=ma_window, center=True).mean() revenue_ma = pd.Series(revenue_ma).bfill().ffill() quantity_ma = pd.Series(smoothed_quantity, index=quantity_series.index).rolling(window=ma_window, center=True).mean() quantity_ma = pd.Series(quantity_ma).bfill().ffill() return revenue_ma, quantity_ma def detect_significant_peaks(data, min_prominence=MIN_PROMINENCE): """检测显著峰值,考虑多峰情况""" data_range = np.max(data) - np.min(data) prominence_threshold = data_range * min_prominence peaks, properties = find_peaks(data, prominence=prominence_threshold, distance=10) if len(peaks) == 0: return [np.argmax(data)] if len(peaks) == 1: return peaks.tolist() else: main_peak_idx = np.argmax(properties['prominences']) return [peaks[main_peak_idx]] def calculate_lifecycle_indicators(revenue_data, quantity_data): """计算生命周期关键指标""" revenue_peaks = detect_significant_peaks(revenue_data) quantity_peaks = detect_significant_peaks(quantity_data) revenue_peak_idx = revenue_peaks[0] if revenue_peaks else len(revenue_data) // 2 quantity_peak_idx = quantity_peaks[0] if quantity_peaks else len(quantity_data) // 2 revenue_peak = revenue_data.iloc[revenue_peak_idx] quantity_peak = quantity_data.iloc[quantity_peak_idx] revenue_growth_rate = (revenue_peak - revenue_data.iloc[0]) / (revenue_data.iloc[0] + 1e-8) quantity_growth_rate = (quantity_peak - quantity_data.iloc[0]) / (quantity_data.iloc[0] + 1e-8) revenue_decline_rate = (revenue_data.iloc[-1] - revenue_peak) / (revenue_peak + 1e-8) quantity_decline_rate = (quantity_data.iloc[-1] - quantity_peak) / (quantity_peak + 1e-8) revenue_mean = np.mean(revenue_data) quantity_mean = np.mean(quantity_data) revenue_cv = np.std(revenue_data) / (revenue_mean + 1e-8) quantity_cv = np.std(quantity_data) / (quantity_mean + 1e-8) return { 'revenue_peak_idx': revenue_peak_idx, 'quantity_peak_idx': quantity_peak_idx, 'revenue_peak': revenue_peak, 'quantity_peak': quantity_peak, 'revenue_growth_rate': revenue_growth_rate, 'quantity_growth_rate': quantity_growth_rate, 'revenue_decline_rate': revenue_decline_rate, 'quantity_decline_rate': quantity_decline_rate, 'revenue_cv': revenue_cv, 'quantity_cv': quantity_cv, 'total_length': len(revenue_data), 'revenue_mean': revenue_mean, 'quantity_mean': quantity_mean } def assess_lifecycle_completeness(indicators): """评估生命周期完整性""" score = 0 breakdown = {} # 1. 时间长度(权重15) sufficient_time = indicators['total_length'] >= 120 score += 15 if sufficient_time else 0 breakdown['sufficient_time'] = {'hit': sufficient_time, 'weight': 15} # 2. 峰值位置合理性(权重15) reasonable_peak_position = ( 0.25 <= indicators['revenue_peak_idx'] / indicators['total_length'] <= 0.75 and 0.25 <= indicators['quantity_peak_idx'] / indicators['total_length'] <= 0.75 ) score += 15 if reasonable_peak_position else 0 breakdown['reasonable_peak_position'] = {'hit': reasonable_peak_position, 'weight': 15} # 3. 显著增长(权重15) significant_growth = ( indicators['revenue_growth_rate'] >= 1.0 or indicators['quantity_growth_rate'] >= 0.8 ) score += 15 if significant_growth else 0 breakdown['significant_growth'] = {'hit': significant_growth, 'weight': 15} # 4. 明显衰退(权重15) noticeable_decline = ( indicators['revenue_decline_rate'] <= -0.35 or indicators['quantity_decline_rate'] <= -0.30 ) score += 15 if noticeable_decline else 0 breakdown['noticeable_decline'] = {'hit': noticeable_decline, 'weight': 15} # 5. 生命周期形状(权重10) has_lifecycle_shape = ( indicators['revenue_cv'] >= 0.3 and indicators['quantity_cv'] >= 0.25 ) score += 10 if has_lifecycle_shape else 0 breakdown['has_lifecycle_shape'] = {'hit': has_lifecycle_shape, 'weight': 10} # 6. 峰值显著性(权重10) peak_significance = ( indicators['revenue_peak'] >= indicators['revenue_mean'] * 1.8 and indicators['quantity_peak'] >= indicators['quantity_mean'] * 1.8 ) score += 10 if peak_significance else 0 breakdown['peak_significance'] = {'hit': peak_significance, 'weight': 10} # 7. 数据质量检查(权重10) data_quality_check = ( indicators['total_length'] >= 120 and indicators['revenue_peak'] > 0 and indicators['quantity_peak'] > 0 and (indicators['revenue_growth_rate'] > 0 or indicators['quantity_growth_rate'] > 0) and (indicators['revenue_decline_rate'] < 0 or indicators['quantity_decline_rate'] < 0) ) score += 10 if data_quality_check else 0 breakdown['data_quality_check'] = {'hit': data_quality_check, 'weight': 10} # 8. 周期完整性(权重5) cycle_completeness = ( sufficient_time and reasonable_peak_position and significant_growth and noticeable_decline and has_lifecycle_shape ) score += 5 if cycle_completeness else 0 breakdown['cycle_completeness'] = {'hit': cycle_completeness, 'weight': 5} # 9. 趋势一致性(权重5) trend_consistency = ( abs(indicators['revenue_growth_rate'] - indicators['quantity_growth_rate']) <= 0.8 or abs(indicators['revenue_decline_rate'] - indicators['quantity_decline_rate']) <= 0.4 or (indicators['revenue_growth_rate'] * indicators['quantity_growth_rate'] > 0 and indicators['revenue_decline_rate'] * indicators['quantity_decline_rate'] > 0) ) score += 5 if trend_consistency else 0 breakdown['trend_consistency'] = {'hit': trend_consistency, 'weight': 5} is_complete = bool(score >= COMPLETENESS_SCORE_THRESHOLD) return is_complete, float(score) if isinstance(score, (np.integer, np.floating)) else score, breakdown def calculate_hot_product_coefficient(result, indicators, data_length): """计算爆款系数""" peak_score = min(100, (indicators['revenue_peak'] / (indicators['revenue_mean'] + 1e-8)) * 25) growth_score = max(0, min(100, (indicators['revenue_growth_rate'] + indicators['quantity_growth_rate']) * 10)) stability_score = max(0, min(100, (1 - indicators['revenue_cv'] - indicators['quantity_cv']) * 15)) completeness_factor = 20 if result.get('is_complete', False) else 0 growth_stage_score = 0 if '成长期' in result.get('stages_map', []): growth_stage_score = 20 total_score = (peak_score + growth_score + stability_score + completeness_factor + growth_stage_score) / 100 return min(1.0, total_score)