| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222 |
- """
- 生命周期分析核心模块
- """
- import pandas as pd
- import numpy as np
- from scipy.signal import savgol_filter, find_peaks
- import hashlib
- from .utils import ensure_native_type
- # 分析参数
- MIN_PROMINENCE = 0.1 # 峰值检测最小突出度
- HOT_PRODUCT_THRESHOLD = 0.6 # 爆款阈值
- COMPLETENESS_SCORE_THRESHOLD = 80 # 完整性得分阈值
- MIN_DATA_DAYS = 30 # 最小数据天数
- def get_dataframe_hash(df):
- """计算DataFrame的哈希值用于缓存键"""
- sample_data = df[['订单状态', '订单付款时间', '买家实际支付金额', 'SKU']].head(100)
- hash_str = f"{sample_data.to_string().encode()}_{df.shape}"
- return hashlib.md5(hash_str.encode()).hexdigest()
- def enhanced_data_preprocessing(revenue_series, quantity_series):
- """增强的数据预处理,使用多种平滑技术"""
- # 移除异常值
- def remove_outliers(series):
- Q1 = series.quantile(0.25)
- Q3 = series.quantile(0.75)
- IQR = Q3 - Q1
- lower_bound = Q1 - 1.5 * IQR
- upper_bound = Q3 + 1.5 * IQR
- return series.clip(lower=max(0, lower_bound), upper=upper_bound)
-
- revenue_clean = remove_outliers(revenue_series)
- quantity_clean = remove_outliers(quantity_series)
-
- # 自适应平滑窗口大小
- data_length = len(revenue_series)
- if data_length <= 30:
- window_length = 5
- elif data_length <= 90:
- window_length = 7
- elif data_length <= 180:
- window_length = 11
- else:
- window_length = 15
-
- window_length = min(window_length, data_length - 1 if data_length % 2 == 0 else data_length)
- if window_length < 3:
- window_length = 3
- if window_length % 2 == 0:
- window_length -= 1
-
- # 使用Savgol滤波器进行平滑
- smoothed_revenue = savgol_filter(revenue_clean.values, window_length, 2)
- smoothed_quantity = savgol_filter(quantity_clean.values, window_length, 2)
-
- # 应用移动平均进行二次平滑
- ma_window = max(3, window_length // 3)
- revenue_ma = pd.Series(smoothed_revenue, index=revenue_series.index).rolling(window=ma_window, center=True).mean()
- revenue_ma = pd.Series(revenue_ma).bfill().ffill()
- quantity_ma = pd.Series(smoothed_quantity, index=quantity_series.index).rolling(window=ma_window, center=True).mean()
- quantity_ma = pd.Series(quantity_ma).bfill().ffill()
-
- return revenue_ma, quantity_ma
- def detect_significant_peaks(data, min_prominence=MIN_PROMINENCE):
- """检测显著峰值,考虑多峰情况"""
- data_range = np.max(data) - np.min(data)
- prominence_threshold = data_range * min_prominence
-
- peaks, properties = find_peaks(data, prominence=prominence_threshold, distance=10)
-
- if len(peaks) == 0:
- return [np.argmax(data)]
-
- if len(peaks) == 1:
- return peaks.tolist()
- else:
- main_peak_idx = np.argmax(properties['prominences'])
- return [peaks[main_peak_idx]]
- def calculate_lifecycle_indicators(revenue_data, quantity_data):
- """计算生命周期关键指标"""
- revenue_peaks = detect_significant_peaks(revenue_data)
- quantity_peaks = detect_significant_peaks(quantity_data)
-
- revenue_peak_idx = revenue_peaks[0] if revenue_peaks else len(revenue_data) // 2
- quantity_peak_idx = quantity_peaks[0] if quantity_peaks else len(quantity_data) // 2
-
- revenue_peak = revenue_data.iloc[revenue_peak_idx]
- quantity_peak = quantity_data.iloc[quantity_peak_idx]
-
- revenue_growth_rate = (revenue_peak - revenue_data.iloc[0]) / (revenue_data.iloc[0] + 1e-8)
- quantity_growth_rate = (quantity_peak - quantity_data.iloc[0]) / (quantity_data.iloc[0] + 1e-8)
-
- revenue_decline_rate = (revenue_data.iloc[-1] - revenue_peak) / (revenue_peak + 1e-8)
- quantity_decline_rate = (quantity_data.iloc[-1] - quantity_peak) / (quantity_peak + 1e-8)
-
- revenue_mean = np.mean(revenue_data)
- quantity_mean = np.mean(quantity_data)
- revenue_cv = np.std(revenue_data) / (revenue_mean + 1e-8)
- quantity_cv = np.std(quantity_data) / (quantity_mean + 1e-8)
-
- return {
- 'revenue_peak_idx': revenue_peak_idx,
- 'quantity_peak_idx': quantity_peak_idx,
- 'revenue_peak': revenue_peak,
- 'quantity_peak': quantity_peak,
- 'revenue_growth_rate': revenue_growth_rate,
- 'quantity_growth_rate': quantity_growth_rate,
- 'revenue_decline_rate': revenue_decline_rate,
- 'quantity_decline_rate': quantity_decline_rate,
- 'revenue_cv': revenue_cv,
- 'quantity_cv': quantity_cv,
- 'total_length': len(revenue_data),
- 'revenue_mean': revenue_mean,
- 'quantity_mean': quantity_mean
- }
- def assess_lifecycle_completeness(indicators):
- """评估生命周期完整性"""
- score = 0
- breakdown = {}
-
- # 1. 时间长度(权重15)
- sufficient_time = indicators['total_length'] >= 120
- score += 15 if sufficient_time else 0
- breakdown['sufficient_time'] = {'hit': sufficient_time, 'weight': 15}
-
- # 2. 峰值位置合理性(权重15)
- reasonable_peak_position = (
- 0.25 <= indicators['revenue_peak_idx'] / indicators['total_length'] <= 0.75 and
- 0.25 <= indicators['quantity_peak_idx'] / indicators['total_length'] <= 0.75
- )
- score += 15 if reasonable_peak_position else 0
- breakdown['reasonable_peak_position'] = {'hit': reasonable_peak_position, 'weight': 15}
-
- # 3. 显著增长(权重15)
- significant_growth = (
- indicators['revenue_growth_rate'] >= 1.0 or
- indicators['quantity_growth_rate'] >= 0.8
- )
- score += 15 if significant_growth else 0
- breakdown['significant_growth'] = {'hit': significant_growth, 'weight': 15}
-
- # 4. 明显衰退(权重15)
- noticeable_decline = (
- indicators['revenue_decline_rate'] <= -0.35 or
- indicators['quantity_decline_rate'] <= -0.30
- )
- score += 15 if noticeable_decline else 0
- breakdown['noticeable_decline'] = {'hit': noticeable_decline, 'weight': 15}
-
- # 5. 生命周期形状(权重10)
- has_lifecycle_shape = (
- indicators['revenue_cv'] >= 0.3 and
- indicators['quantity_cv'] >= 0.25
- )
- score += 10 if has_lifecycle_shape else 0
- breakdown['has_lifecycle_shape'] = {'hit': has_lifecycle_shape, 'weight': 10}
-
- # 6. 峰值显著性(权重10)
- peak_significance = (
- indicators['revenue_peak'] >= indicators['revenue_mean'] * 1.8 and
- indicators['quantity_peak'] >= indicators['quantity_mean'] * 1.8
- )
- score += 10 if peak_significance else 0
- breakdown['peak_significance'] = {'hit': peak_significance, 'weight': 10}
-
- # 7. 数据质量检查(权重10)
- data_quality_check = (
- indicators['total_length'] >= 120 and
- indicators['revenue_peak'] > 0 and
- indicators['quantity_peak'] > 0 and
- (indicators['revenue_growth_rate'] > 0 or indicators['quantity_growth_rate'] > 0) and
- (indicators['revenue_decline_rate'] < 0 or indicators['quantity_decline_rate'] < 0)
- )
- score += 10 if data_quality_check else 0
- breakdown['data_quality_check'] = {'hit': data_quality_check, 'weight': 10}
-
- # 8. 周期完整性(权重5)
- cycle_completeness = (
- sufficient_time and reasonable_peak_position and
- significant_growth and noticeable_decline and has_lifecycle_shape
- )
- score += 5 if cycle_completeness else 0
- breakdown['cycle_completeness'] = {'hit': cycle_completeness, 'weight': 5}
-
- # 9. 趋势一致性(权重5)
- trend_consistency = (
- abs(indicators['revenue_growth_rate'] - indicators['quantity_growth_rate']) <= 0.8 or
- abs(indicators['revenue_decline_rate'] - indicators['quantity_decline_rate']) <= 0.4 or
- (indicators['revenue_growth_rate'] * indicators['quantity_growth_rate'] > 0 and
- indicators['revenue_decline_rate'] * indicators['quantity_decline_rate'] > 0)
- )
- score += 5 if trend_consistency else 0
- breakdown['trend_consistency'] = {'hit': trend_consistency, 'weight': 5}
-
- is_complete = bool(score >= COMPLETENESS_SCORE_THRESHOLD)
- return is_complete, float(score) if isinstance(score, (np.integer, np.floating)) else score, breakdown
- def calculate_hot_product_coefficient(result, indicators, data_length):
- """计算爆款系数"""
- peak_score = min(100, (indicators['revenue_peak'] / (indicators['revenue_mean'] + 1e-8)) * 25)
- growth_score = max(0, min(100, (indicators['revenue_growth_rate'] + indicators['quantity_growth_rate']) * 10))
- stability_score = max(0, min(100, (1 - indicators['revenue_cv'] - indicators['quantity_cv']) * 15))
- completeness_factor = 20 if result.get('is_complete', False) else 0
-
- growth_stage_score = 0
- if '成长期' in result.get('stages_map', []):
- growth_stage_score = 20
-
- total_score = (peak_score + growth_score + stability_score + completeness_factor + growth_stage_score) / 100
-
- return min(1.0, total_score)
|