""" SKU爆款系数分析服务 根据《2025-11-23:SKU 爆款系数算法逻辑总结(V2.0)》实现 """ import pandas as pd import numpy as np from sklearn.preprocessing import MinMaxScaler from datetime import datetime def analyze_hotproduct(df, filename): """ 分析SKU爆款系数 Args: df: 清洗后的DataFrame,需包含test2.csv相关字段 filename: 文件名(用于日志) Returns: dict: 爆款分析结果 """ print(f"🔥 开始爆款系数分析: {filename}") # 1. 数据准备与字段映射 df_work = df.copy() # 字段映射(适配系统现有字段) required_fields = { '订单付款时间': '下单时间', 'SKU': '商品ID', '购买数量': '数量', '价格': '价格', '买家实际支付金额': '商家实收', '订单状态': '订单状态' } # 重命名字段以匹配算法文档 for sys_field, doc_field in required_fields.items(): if sys_field in df_work.columns: df_work[doc_field] = df_work[sys_field] # 添加小时字段 df_work['小时'] = pd.to_datetime(df_work['下单时间']).dt.hour # 添加用户ID(如果没有则用订单号模拟) if '买家会员名' in df_work.columns: df_work['用户ID'] = df_work['买家会员名'] elif '订单编号' in df_work.columns: df_work['用户ID'] = df_work['订单编号'] else: # 生成模拟用户ID df_work['用户ID'] = range(len(df_work)) # 2. 筛选交易成功订单 valid = df_work[df_work['订单状态'] == '交易成功'].copy() print(f"✅ 有效订单数: {len(valid)}") if len(valid) == 0: return { 'success': False, 'message': '没有交易成功的订单', 'data': {} } # 3. 聚合到SKU级别 sku_agg = valid.groupby('商品ID').agg( 总销量=('数量', 'sum'), 总实收=('商家实收', 'sum'), 标价=('价格', 'mean'), 首单=('下单时间', 'min'), 末单=('下单时间', 'max'), 总UID=('用户ID', 'nunique') ).reset_index() # 计算天数跨度 sku_agg['天数跨度'] = (sku_agg['末单'] - sku_agg['首单']).dt.days + 1 # 避免除零 sku_agg['天数跨度'] = sku_agg['天数跨度'].apply(lambda x: max(x, 1)) # 计算单位时间销量 sku_agg['单位时间销量'] = sku_agg['总销量'] / sku_agg['天数跨度'] # 计算实收率(价格接受度) sku_agg['实收率'] = sku_agg['总实收'] / (sku_agg['标价'] * sku_agg['总销量']) # 限制实收率范围在0-1之间(可能存在折扣或优惠券导致>1的情况) sku_agg['实收率'] = sku_agg['实收率'].clip(0, 1) # 4. 计算退款率 if '退款状态' in df_work.columns: refund_count = df_work[df_work['退款状态'] == '退款成功'].groupby('商品ID').size() total_count = df_work.groupby('商品ID').size() refund_rate = refund_count / total_count else: # 如果没有退款状态字段,假设退款率为0 refund_rate = pd.Series(0, index=sku_agg['商品ID']) sku_agg['退款率'] = sku_agg['商品ID'].map(refund_rate).fillna(0) sku_agg['稳定性得分'] = 1 - sku_agg['退款率'] # 5. 计算复购率 uid_buy = valid.groupby(['商品ID', '用户ID']).size().reset_index(name='购买次数') rep_uid = uid_buy[uid_buy['购买次数'] > 1].groupby('商品ID').size() sku_agg['复购UID数'] = sku_agg['商品ID'].map(rep_uid).fillna(0) sku_agg['复购率'] = (sku_agg['复购UID数'] / sku_agg['总UID']).fillna(0) # 6. 计算夜间占比(0-6点) night_orders = valid[valid['小时'].between(0, 6)] night_count = night_orders.groupby('商品ID').size() total_orders = valid.groupby('商品ID').size() night_ratio = night_count / total_orders sku_agg['夜间占比'] = sku_agg['商品ID'].map(night_ratio).fillna(0) # 7. 归一化处理(MinMaxScaler) metrics_cols = ['单位时间销量', '实收率', '稳定性得分', '复购率', '夜间占比'] scaler = MinMaxScaler() sku_agg[metrics_cols] = scaler.fit_transform(sku_agg[metrics_cols]) # 8. 计算爆款系数(按权重加权) weights = { '单位时间销量': 0.4, # 销售热度 40% '实收率': 0.3, # 价格接受度 30% '稳定性得分': 0.1, # 退款稳定性 10% '复购率': 0.1, # 复购热度 10% '夜间占比': 0.1 # 夜间爆发力 10% } sku_agg['爆款系数'] = ( weights['单位时间销量'] * sku_agg['单位时间销量'] + weights['实收率'] * sku_agg['实收率'] + weights['稳定性得分'] * sku_agg['稳定性得分'] + weights['复购率'] * sku_agg['复购率'] + weights['夜间占比'] * sku_agg['夜间占比'] ) # 9. 分级标记 def classify_level(score): if score >= 0.80: return '超级爆款' elif score >= 0.60: return '潜力爆款' elif score >= 0.40: return '常规款' else: return '清货款' sku_agg['爆款等级'] = sku_agg['爆款系数'].apply(classify_level) # 10. 获取商品标题 title_map = valid.groupby('商品ID')['商品标题'].first() if '商品标题' in valid.columns else {} sku_agg['商品标题'] = sku_agg['商品ID'].map(title_map).fillna('未知商品') # 11. 格式化结果 results = {} for _, row in sku_agg.iterrows(): sku_id = row['商品ID'] results[sku_id] = { 'sku_id': sku_id, 'product_title': row['商品标题'], 'hotproduct_score': round(float(row['爆款系数']), 4), 'hotproduct_level': row['爆款等级'], 'metrics': { 'sales_heat': round(float(row['单位时间销量']), 4), # 销售热度 'price_acceptance': round(float(row['实收率']), 4), # 价格接受度 'refund_stability': round(float(row['稳定性得分']), 4), # 退款稳定性 'repurchase_rate': round(float(row['复购率']), 4), # 复购热度 'night_burst': round(float(row['夜间占比']), 4) # 夜间爆发力 }, 'raw_data': { 'total_quantity': int(row['总销量']), 'total_revenue': round(float(row['总实收']), 2), 'avg_price': round(float(row['标价']), 2), 'days_span': int(row['天数跨度']), 'daily_sales': round(float(row['总销量'] / row['天数跨度']), 2), 'unique_buyers': int(row['总UID']), 'repurchase_buyers': int(row['复购UID数']), 'refund_rate': round(float(row['退款率']), 4) } } # 12. 统计概览 level_dist = sku_agg['爆款等级'].value_counts().to_dict() summary = { 'total_sku_count': len(sku_agg), 'level_distribution': level_dist, 'avg_score': round(float(sku_agg['爆款系数'].mean()), 4), 'max_score': round(float(sku_agg['爆款系数'].max()), 4), 'min_score': round(float(sku_agg['爆款系数'].min()), 4), 'top_5_skus': sku_agg.nlargest(5, '爆款系数')['商品ID'].tolist() } print(f"✅ 爆款分析完成: {len(results)} 个SKU") print(f"📊 等级分布: {level_dist}") return { 'success': True, 'message': '爆款系数分析完成', 'data': { 'sku_results': results, 'summary': summary, 'weights': weights } }