| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211 |
- """
- SPU爆款系数分析服务
- 根据《2025-11-23:SPU 爆款系数算法逻辑总结(V2.0)》实现
- """
- import pandas as pd
- import numpy as np
- from sklearn.preprocessing import MinMaxScaler
- from datetime import datetime
- def analyze_spu_hotproduct(df, filename):
- """
- 分析SPU爆款系数
-
- Args:
- df: 清洗后的DataFrame,需包含test2.csv相关字段
- filename: 文件名(用于日志)
-
- Returns:
- dict: 爆款分析结果
- """
- print(f"🔥 开始爆款系数分析: {filename}")
-
- # 1. 数据准备与字段映射
- df_work = df.copy()
-
- # 字段映射(适配系统现有字段)
- required_fields = {
- '订单付款时间': '下单时间',
- 'SPU': '商品ID',
- '购买数量': '数量',
- '价格': '价格',
- '买家实际支付金额': '商家实收',
- '订单状态': '订单状态'
- }
-
- # 重命名字段以匹配算法文档
- for sys_field, doc_field in required_fields.items():
- if sys_field in df_work.columns:
- df_work[doc_field] = df_work[sys_field]
-
- # 添加小时字段
- df_work['小时'] = pd.to_datetime(df_work['下单时间']).dt.hour
-
- # 添加用户ID(如果没有则用订单号模拟)
- if '买家会员名' in df_work.columns:
- df_work['用户ID'] = df_work['买家会员名']
- elif '订单编号' in df_work.columns:
- df_work['用户ID'] = df_work['订单编号']
- else:
- # 生成模拟用户ID
- df_work['用户ID'] = range(len(df_work))
-
- # 2. 筛选交易成功订单
- valid = df_work[df_work['订单状态'] == '交易成功'].copy()
- print(f"✅ 有效订单数: {len(valid)}")
-
- if len(valid) == 0:
- return {
- 'success': False,
- 'message': '没有交易成功的订单',
- 'data': {}
- }
-
- # 3. 聚合到SPU级别
- spu_agg = valid.groupby('商品ID').agg(
- 总销量=('数量', 'sum'),
- 总实收=('商家实收', 'sum'),
- 标价=('价格', 'mean'),
- 首单=('下单时间', 'min'),
- 末单=('下单时间', 'max'),
- 总UID=('用户ID', 'nunique')
- ).reset_index()
-
- # 计算天数跨度
- spu_agg['天数跨度'] = (spu_agg['末单'] - spu_agg['首单']).dt.days + 1
-
- # 避免除零
- spu_agg['天数跨度'] = spu_agg['天数跨度'].apply(lambda x: max(x, 1))
-
- # 计算单位时间销量
- spu_agg['单位时间销量'] = spu_agg['总销量'] / spu_agg['天数跨度']
-
- # 计算实收率(价格接受度)
- spu_agg['实收率'] = spu_agg['总实收'] / (spu_agg['标价'] * spu_agg['总销量'])
- # 限制实收率范围在0-1之间(可能存在折扣或优惠券导致>1的情况)
- spu_agg['实收率'] = spu_agg['实收率'].clip(0, 1)
-
- # 4. 计算退款率
- if '退款状态' in df_work.columns:
- refund_count = df_work[df_work['退款状态'] == '退款成功'].groupby('商品ID').size()
- total_count = df_work.groupby('商品ID').size()
- refund_rate = refund_count / total_count
- else:
- # 如果没有退款状态字段,假设退款率为0
- refund_rate = pd.Series(0, index=spu_agg['商品ID'])
-
- spu_agg['退款率'] = spu_agg['商品ID'].map(refund_rate).fillna(0)
- spu_agg['稳定性得分'] = 1 - spu_agg['退款率']
-
- # 5. 计算复购率
- uid_buy = valid.groupby(['商品ID', '用户ID']).size().reset_index(name='购买次数')
- rep_uid = uid_buy[uid_buy['购买次数'] > 1].groupby('商品ID').size()
- spu_agg['复购UID数'] = spu_agg['商品ID'].map(rep_uid).fillna(0)
- spu_agg['复购率'] = (spu_agg['复购UID数'] / spu_agg['总UID']).fillna(0)
-
- # 6. 计算夜间占比(0-6点)
- night_orders = valid[valid['小时'].between(0, 6)]
- night_count = night_orders.groupby('商品ID').size()
- total_orders = valid.groupby('商品ID').size()
- night_ratio = night_count / total_orders
- spu_agg['夜间占比'] = spu_agg['商品ID'].map(night_ratio).fillna(0)
-
- # 7. 归一化处理(MinMaxScaler)
- metrics_cols = ['单位时间销量', '实收率', '稳定性得分', '复购率', '夜间占比']
- scaler = MinMaxScaler()
- spu_agg[metrics_cols] = scaler.fit_transform(spu_agg[metrics_cols])
-
- # 8. 计算爆款系数(按权重加权)
- weights = {
- '单位时间销量': 0.4, # 销售热度 40%
- '实收率': 0.3, # 价格接受度 30%
- '稳定性得分': 0.1, # 退款稳定性 10%
- '复购率': 0.1, # 复购热度 10%
- '夜间占比': 0.1 # 夜间爆发力 10%
- }
-
- spu_agg['爆款系数'] = (
- weights['单位时间销量'] * spu_agg['单位时间销量'] +
- weights['实收率'] * spu_agg['实收率'] +
- weights['稳定性得分'] * spu_agg['稳定性得分'] +
- weights['复购率'] * spu_agg['复购率'] +
- weights['夜间占比'] * spu_agg['夜间占比']
- )
-
- # 9. 分级标记
- def classify_level(score):
- if score >= 0.80:
- return '超级爆款'
- elif score >= 0.60:
- return '潜力爆款'
- elif score >= 0.40:
- return '常规款'
- else:
- return '清货款'
-
- spu_agg['爆款等级'] = spu_agg['爆款系数'].apply(classify_level)
-
- # 10. 获取商品标题(优先使用商品名称/SPU)
- if '商品名称' in valid.columns:
- title_map = valid.groupby('商品ID')['商品名称'].first()
- elif 'SPU' in valid.columns:
- title_map = valid.groupby('商品ID')['SPU'].first()
- elif '商品标题' in valid.columns:
- title_map = valid.groupby('商品ID')['商品标题'].first()
- else:
- title_map = {}
- spu_agg['商品标题'] = spu_agg['商品ID'].map(title_map).fillna('未知商品')
-
- # 11. 格式化结果
- results = {}
- for _, row in spu_agg.iterrows():
- spu_id = row['商品ID']
- results[spu_id] = {
- 'spu_id': spu_id,
- 'product_title': row['商品标题'],
- 'hotproduct_score': round(float(row['爆款系数']), 4),
- 'hotproduct_level': row['爆款等级'],
- 'metrics': {
- 'sales_heat': round(float(row['单位时间销量']), 4), # 销售热度
- 'price_acceptance': round(float(row['实收率']), 4), # 价格接受度
- 'refund_stability': round(float(row['稳定性得分']), 4), # 退款稳定性
- 'repurchase_rate': round(float(row['复购率']), 4), # 复购热度
- 'night_burst': round(float(row['夜间占比']), 4) # 夜间爆发力
- },
- 'raw_data': {
- 'total_quantity': int(row['总销量']),
- 'total_revenue': round(float(row['总实收']), 2),
- 'avg_price': round(float(row['标价']), 2),
- 'days_span': int(row['天数跨度']),
- 'daily_sales': round(float(row['总销量'] / row['天数跨度']), 2),
- 'unique_buyers': int(row['总UID']),
- 'repurchase_buyers': int(row['复购UID数']),
- 'refund_rate': round(float(row['退款率']), 4)
- }
- }
-
- # 12. 统计概览
- level_dist = spu_agg['爆款等级'].value_counts().to_dict()
-
- summary = {
- 'total_spu_count': len(spu_agg),
- 'level_distribution': level_dist,
- 'avg_score': round(float(spu_agg['爆款系数'].mean()), 4),
- 'max_score': round(float(spu_agg['爆款系数'].max()), 4),
- 'min_score': round(float(spu_agg['爆款系数'].min()), 4),
- 'top_5_spus': spu_agg.nlargest(5, '爆款系数')['商品ID'].tolist()
- }
-
- print(f"✅ 爆款分析完成: {len(results)} 个SPU")
- print(f"📊 等级分布: {level_dist}")
-
- return {
- 'success': True,
- 'message': '爆款系数分析完成',
- 'data': {
- 'spu_results': results,
- 'summary': summary,
- 'weights': weights
- }
- }
|