#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 销售趋势分析和预测服务 """ import pandas as pd import numpy as np from datetime import datetime, timedelta from collections import defaultdict import json def analyze_sale_trend(df, filename): """ 分析销售趋势数据 Args: df: 销售数据DataFrame filename: 文件名 Returns: dict: 分析结果 """ try: print(f"开始分析销售趋势数据: {filename}") print(f"数据形状: {df.shape}") # 1. 数据预处理 df = preprocess_data(df) # 2. 计算总体指标 summary = calculate_summary(df) # 3. 按品类分析趋势 categories = analyze_categories(df) # 4. 按SKU分析趋势 sku_data = analyze_skus(df) # 5. 整体趋势分析 trends = analyze_trends(df) # 6. 季节性分析 seasonality = analyze_seasonality(df) # 7. 计算特征重要性 feature_importance = calculate_feature_importance(df) # 8. 构建结果 results = { 'summary': summary, 'categories': categories['categories_data'], 'category_list': categories['category_list'], 'category_skus': categories['category_skus'], 'data': sku_data['sku_data'], 'sku_list': sku_data['sku_list'], 'trends': trends, 'seasonality': seasonality, 'feature_importance': feature_importance } # 转换所有 numpy 类型为 Python 原生类型 results = convert_numpy_types(results) print("销售趋势分析完成") return results except Exception as e: print(f"分析销售趋势时出错: {str(e)}") raise def predict_sales_trend(df, filename, predict_days=30): """ 预测销售趋势 Args: df: 销售数据DataFrame filename: 文件名 predict_days: 预测天数 Returns: dict: 预测结果 """ try: print(f"开始预测销售趋势: {filename}") print(f"预测天数: {predict_days}") # 1. 数据预处理 df = preprocess_data(df) # 2. 计算基础指标 summary = calculate_summary(df) # 3. 整体趋势预测 overall_prediction = predict_overall_trend(df, predict_days) # 4. 按品类预测 category_predictions = predict_category_trends(df, predict_days) # 5. 按SKU预测 sku_predictions = predict_sku_trends(df, predict_days) # 6. 计算特征重要性 feature_importance = calculate_feature_importance(df) # 7. 构建结果 results = { 'summary': summary, 'overall_prediction': overall_prediction, 'category_predictions': category_predictions, 'sku_predictions': sku_predictions, 'feature_importance': feature_importance, 'predict_days': predict_days } # 转换所有 numpy 类型为 Python 原生类型 results = convert_numpy_types(results) print("销售趋势预测完成") return results except Exception as e: print(f"预测销售趋势时出错: {str(e)}") raise def convert_numpy_types(obj): """ 递归转换所有 numpy 类型为 Python 原生类型 Args: obj: 要转换的对象 Returns: 转换后的对象 """ if isinstance(obj, dict): return {key: convert_numpy_types(value) for key, value in obj.items()} elif isinstance(obj, list): return [convert_numpy_types(item) for item in obj] elif isinstance(obj, np.integer): return int(obj) elif isinstance(obj, np.floating): return float(obj) elif isinstance(obj, np.ndarray): return [convert_numpy_types(item) for item in obj.tolist()] else: return obj def calculate_feature_importance(df): """ 计算特征重要性 Args: df: 销售数据DataFrame Returns: dict: 特征重要性数据 """ try: print("计算特征重要性") # 特征重要性数据 feature_importance = { 'features': [ '价格', '促销力度', '退款率', '季节性', '时间趋势', '品类影响', 'SKU影响' ], 'importance': [] } # 计算价格重要性 if '价格' in df.columns: price_std = df['价格'].std() price_importance = min(100, (price_std / df['价格'].mean() * 100) if df['价格'].mean() > 0 else 50) else: price_importance = 50 feature_importance['importance'].append(round(price_importance, 2)) # 计算促销力度重要性 if '促销力度' in df.columns: promotion_std = df['促销力度'].std() promotion_importance = min(100, (promotion_std * 100) if promotion_std > 0 else 40) else: # 计算促销力度 df['促销力度'] = 1 - (df['买家实际支付金额'] / df['买家应付货款']) df['促销力度'] = df['促销力度'].fillna(0) promotion_std = df['促销力度'].std() promotion_importance = min(100, (promotion_std * 100) if promotion_std > 0 else 40) feature_importance['importance'].append(round(promotion_importance, 2)) # 计算退款率重要性 refunded_orders = df[df['退款状态'] != '没有申请退款'] refund_rate = len(refunded_orders) / len(df) * 100 if len(df) > 0 else 0 refund_importance = min(100, refund_rate * 2) feature_importance['importance'].append(round(refund_importance, 2)) # 计算季节性重要性 # 基于销量的波动性 if '购买数量' in df.columns: quantity_std = df['购买数量'].std() quantity_mean = df['购买数量'].mean() seasonality_importance = min(100, (quantity_std / quantity_mean * 100) if quantity_mean > 0 else 30) else: seasonality_importance = 30 feature_importance['importance'].append(round(seasonality_importance, 2)) # 计算时间趋势重要性 # 基于销量的变化趋势 time_importance = 60 # 默认值 feature_importance['importance'].append(time_importance) # 计算品类影响重要性 if '品类' in df.columns: category_count = df['品类'].nunique() category_importance = min(100, category_count * 10) else: category_importance = 40 feature_importance['importance'].append(round(category_importance, 2)) # 计算SKU影响重要性 if 'SKU' in df.columns: sku_count = df['SKU'].nunique() sku_importance = min(100, sku_count * 5) else: sku_importance = 30 feature_importance['importance'].append(round(sku_importance, 2)) # 标准化重要性值,确保总和为100 total_importance = sum(feature_importance['importance']) if total_importance > 0: feature_importance['importance'] = [round((imp / total_importance) * 100, 2) for imp in feature_importance['importance']] print("特征重要性计算完成") return feature_importance except Exception as e: print(f"计算特征重要性时出错: {str(e)}") # 返回默认值 return { 'features': [ '价格', '促销力度', '退款率', '季节性', '时间趋势', '品类影响', 'SKU影响' ], 'importance': [15, 15, 10, 15, 20, 10, 15] } def preprocess_data(df): """ 预处理数据 """ # 复制数据以避免修改原始数据 df = df.copy() # 处理日期列 if '订单创建时间' in df.columns: df['订单创建时间'] = pd.to_datetime(df['订单创建时间'], errors='coerce') # 提取日期部分 df['日期'] = df['订单创建时间'].dt.date # 处理数值列 numeric_columns = ['价格', '购买数量', '买家应付货款', '买家实际支付金额', '退款金额'] for col in numeric_columns: if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce') # 填充缺失值 df = df.fillna({ '价格': 0, '购买数量': 0, '买家应付货款': 0, '买家实际支付金额': 0, '退款金额': 0 }) # 确保SKU列存在 if '商家编码' in df.columns: df['SKU'] = df['商家编码'] elif '外部系统编号' in df.columns: df['SKU'] = df['外部系统编号'] else: # 如果没有SKU列,使用商品名称作为SKU df['SKU'] = df['商品名称'] # 确保品类列存在 if '品类' not in df.columns: # 简单品类划分:基于商品名称 df['品类'] = df['商品名称'].apply(lambda x: categorize_product(x)) return df def categorize_product(product_name): """ 根据商品名称简单分类 """ product_name = str(product_name).lower() if '腰垫' in product_name or '靠垫' in product_name: return '家居用品' elif '手机' in product_name or '电脑' in product_name: return '电子产品' elif '服装' in product_name or '鞋' in product_name: return '服装鞋包' else: return '其他' def calculate_summary(df): """ 计算总体指标 """ total_quantity = df['购买数量'].sum() total_revenue = df['买家实际支付金额'].sum() avg_price = total_revenue / total_quantity if total_quantity > 0 else 0 # 计算促销力度 df['促销力度'] = 1 - (df['买家实际支付金额'] / df['买家应付货款']) df['促销力度'] = df['促销力度'].fillna(0) avg_promotion = df['促销力度'].mean() * 100 # 转换为百分比 # 计算退款相关指标 refunded_orders = df[df['退款状态'] != '没有申请退款'] total_refund = refunded_orders['退款金额'].sum() refund_rate = len(refunded_orders) / len(df) * 100 if len(df) > 0 else 0 summary = { 'total_orders': len(df), 'total_quantity': int(total_quantity), 'total_revenue': round(total_revenue, 2), 'avg_price': round(avg_price, 2), 'avg_promotion': round(avg_promotion, 2), 'total_refund': round(total_refund, 2), 'refund_rate': round(refund_rate, 2) } # 转换所有 numpy 类型为 Python 原生类型 summary = convert_numpy_types(summary) return summary def analyze_categories(df): """ 按品类分析趋势 """ categories_data = {} category_list = [] category_skus = defaultdict(list) # 按品类分组 grouped = df.groupby('品类') for category, group in grouped: category_list.append(category) # 计算品类指标 total_quantity = group['购买数量'].sum() total_revenue = group['买家实际支付金额'].sum() avg_price = total_revenue / total_quantity if total_quantity > 0 else 0 # 计算促销力度 group['促销力度'] = 1 - (group['买家实际支付金额'] / group['买家应付货款']) group['促销力度'] = group['促销力度'].fillna(0) avg_promotion = group['促销力度'].mean() * 100 # 计算退款相关指标 refunded_orders = group[group['退款状态'] != '没有申请退款'] total_refund = refunded_orders['退款金额'].sum() refund_rate = len(refunded_orders) / len(group) * 100 if len(group) > 0 else 0 # 趋势数据 date_series = [] quantity_series = [] price_series = [] # 按日期排序 date_grouped = group.groupby('日期') for date in sorted(date_grouped.groups.keys()): date_series.append(str(date)) date_data = date_grouped.get_group(date) quantity_series.append(int(date_data['购买数量'].sum())) avg_date_price = date_data['买家实际支付金额'].sum() / date_data['购买数量'].sum() if date_data['购买数量'].sum() > 0 else 0 price_series.append(round(avg_date_price, 2)) categories_data[category] = { 'total_quantity': int(total_quantity), 'total_revenue': round(total_revenue, 2), 'avg_price': round(avg_price, 2), 'avg_promotion': round(avg_promotion, 2), 'total_refund': round(total_refund, 2), 'refund_rate': round(refund_rate, 2), 'date_series': date_series, 'quantity_series': quantity_series, 'price_series': price_series } # 收集该品类下的SKU skus = group['SKU'].unique().tolist() category_skus[category] = skus result = { 'categories_data': categories_data, 'category_list': category_list, 'category_skus': dict(category_skus) } # 转换所有 numpy 类型为 Python 原生类型 result = convert_numpy_types(result) return result def analyze_skus(df): """ 按SKU分析趋势 """ sku_data = {} sku_list = [] # 按SKU分组 grouped = df.groupby('SKU') for sku, group in grouped: sku_list.append(sku) # 计算SKU指标 total_quantity = group['购买数量'].sum() total_revenue = group['买家实际支付金额'].sum() avg_price = total_revenue / total_quantity if total_quantity > 0 else 0 # 计算促销力度 group['促销力度'] = 1 - (group['买家实际支付金额'] / group['买家应付货款']) group['促销力度'] = group['促销力度'].fillna(0) avg_promotion = group['促销力度'].mean() * 100 # 计算退款相关指标 refunded_orders = group[group['退款状态'] != '没有申请退款'] total_refund = refunded_orders['退款金额'].sum() refund_rate = len(refunded_orders) / len(group) * 100 if len(group) > 0 else 0 # 趋势数据 date_series = [] quantity_series = [] price_series = [] # 按日期排序 date_grouped = group.groupby('日期') for date in sorted(date_grouped.groups.keys()): date_series.append(str(date)) date_data = date_grouped.get_group(date) quantity_series.append(int(date_data['购买数量'].sum())) avg_date_price = date_data['买家实际支付金额'].sum() / date_data['购买数量'].sum() if date_data['购买数量'].sum() > 0 else 0 price_series.append(round(avg_date_price, 2)) sku_data[sku] = { 'total_quantity': int(total_quantity), 'total_revenue': round(total_revenue, 2), 'avg_price': round(avg_price, 2), 'avg_promotion': round(avg_promotion, 2), 'total_refund': round(total_refund, 2), 'refund_rate': round(refund_rate, 2), 'date_series': date_series, 'quantity_series': quantity_series, 'price_series': price_series } result = { 'sku_data': sku_data, 'sku_list': sku_list } # 转换所有 numpy 类型为 Python 原生类型 result = convert_numpy_types(result) return result def analyze_trends(df): """ 分析整体趋势数据 """ # 按日期分组 date_grouped = df.groupby('日期') date_series = [] quantity_series = [] revenue_series = [] avg_price_series = [] avg_promotion_series = [] for date in sorted(date_grouped.groups.keys()): date_series.append(str(date)) date_data = date_grouped.get_group(date) quantity = date_data['购买数量'].sum() revenue = date_data['买家实际支付金额'].sum() avg_price = revenue / quantity if quantity > 0 else 0 # 计算促销力度 date_data['促销力度'] = 1 - (date_data['买家实际支付金额'] / date_data['买家应付货款']) date_data['促销力度'] = date_data['促销力度'].fillna(0) avg_promotion = date_data['促销力度'].mean() * 100 quantity_series.append(int(quantity)) revenue_series.append(round(revenue, 2)) avg_price_series.append(round(avg_price, 2)) avg_promotion_series.append(round(avg_promotion, 2)) trends = { 'date_series': date_series, 'quantity_series': quantity_series, 'revenue_series': revenue_series, 'avg_price_series': avg_price_series, 'avg_promotion_series': avg_promotion_series } # 转换所有 numpy 类型为 Python 原生类型 trends = convert_numpy_types(trends) return trends def analyze_seasonality(df): """ 分析季节性 """ # 按日期分组 date_grouped = df.groupby('日期') # 构建日期到销量的映射 date_quantity_map = {} for date, group in date_grouped: date_quantity_map[date] = group['购买数量'].sum() # 计算7天移动平均 dates = sorted(date_quantity_map.keys()) quantities = [date_quantity_map[date] for date in dates] # 计算移动平均 window = 7 moving_avg = [] for i in range(len(quantities)): start = max(0, i - window + 1) window_data = quantities[start:i+1] moving_avg.append(sum(window_data) / len(window_data)) # 计算季节性因子 seasonality_factors = [] for i in range(len(quantities)): if moving_avg[i] > 0: seasonality_factors.append(quantities[i] / moving_avg[i]) else: seasonality_factors.append(1.0) seasonality = { 'date_series': [str(date) for date in dates], 'quantity_series': quantities, 'moving_avg_series': [round(avg, 2) for avg in moving_avg], 'seasonality_factors': [round(factor, 2) for factor in seasonality_factors] } # 转换所有 numpy 类型为 Python 原生类型 seasonality = convert_numpy_types(seasonality) return seasonality def predict_overall_trend(df, predict_days): """ 预测整体销售趋势 使用 a+x+y 模型 """ # 按日期分组 date_grouped = df.groupby('日期') # 构建历史数据 dates = sorted(date_grouped.groups.keys()) quantities = [] for date in dates: quantities.append(date_grouped.get_group(date)['购买数量'].sum()) # 计算基础销量 a(历史平均) a = np.mean(quantities) if quantities else 0 # 计算时间趋势因子 x(线性回归) x = 0 if len(quantities) > 1: # 简单线性回归 days = np.arange(len(quantities)) slope, intercept = np.polyfit(days, quantities, 1) x = slope # 计算季节性因子 y # 这里使用简单的7天周期性 y = 1.0 if len(quantities) >= 7: # 计算最近7天的平均季节性因子 recent_quantities = quantities[-7:] recent_avg = np.mean(recent_quantities) y = recent_quantities[-1] / recent_avg if recent_avg > 0 else 1.0 # 生成预测日期 last_date = dates[-1] if dates else datetime.now().date() predict_dates = [] for i in range(1, predict_days + 1): predict_dates.append(last_date + timedelta(days=i)) # 生成预测值 predict_quantities = [] predict_revenues = [] # 计算历史平均价格 avg_price = df['买家实际支付金额'].sum() / df['购买数量'].sum() if df['购买数量'].sum() > 0 else 0 for i, predict_date in enumerate(predict_dates): # 计算预测值: a + x * (days_since_start) * y days_since_start = len(quantities) + i predicted_quantity = max(0, a + x * days_since_start * y) predict_quantities.append(round(predicted_quantity, 2)) predict_revenues.append(round(predicted_quantity * avg_price, 2)) overall_prediction = { 'date_series': [str(date) for date in predict_dates], 'quantity_series': predict_quantities, 'revenue_series': predict_revenues, 'model_params': { 'a': round(a, 2), 'x': round(x, 4), 'y': round(y, 2) } } # 转换所有 numpy 类型为 Python 原生类型 overall_prediction = convert_numpy_types(overall_prediction) return overall_prediction def predict_category_trends(df, predict_days): """ 按品类预测销售趋势 """ category_predictions = {} # 按品类分组 grouped = df.groupby('品类') for category, group in grouped: # 按日期分组 date_grouped = group.groupby('日期') # 构建历史数据 dates = sorted(date_grouped.groups.keys()) quantities = [] for date in dates: quantities.append(date_grouped.get_group(date)['购买数量'].sum()) # 计算基础销量 a(历史平均) a = np.mean(quantities) if quantities else 0 # 计算时间趋势因子 x(线性回归) x = 0 if len(quantities) > 1: # 简单线性回归 days = np.arange(len(quantities)) slope, intercept = np.polyfit(days, quantities, 1) x = slope # 计算季节性因子 y y = 1.0 if len(quantities) >= 7: # 计算最近7天的平均季节性因子 recent_quantities = quantities[-7:] recent_avg = np.mean(recent_quantities) y = recent_quantities[-1] / recent_avg if recent_avg > 0 else 1.0 # 生成预测日期 last_date = dates[-1] if dates else datetime.now().date() predict_dates = [] for i in range(1, predict_days + 1): predict_dates.append(last_date + timedelta(days=i)) # 生成预测值 predict_quantities = [] predict_revenues = [] # 计算该品类的平均价格 category_avg_price = group['买家实际支付金额'].sum() / group['购买数量'].sum() if group['购买数量'].sum() > 0 else 0 for i, predict_date in enumerate(predict_dates): # 计算预测值: a + x * (days_since_start) * y days_since_start = len(quantities) + i predicted_quantity = max(0, a + x * days_since_start * y) predict_quantities.append(round(predicted_quantity, 2)) predict_revenues.append(round(predicted_quantity * category_avg_price, 2)) category_predictions[category] = { 'date_series': [str(date) for date in predict_dates], 'quantity_series': predict_quantities, 'revenue_series': predict_revenues, 'model_params': { 'a': round(a, 2), 'x': round(x, 4), 'y': round(y, 2) } } # 转换所有 numpy 类型为 Python 原生类型 category_predictions = convert_numpy_types(category_predictions) return category_predictions def predict_sku_trends(df, predict_days): """ 按SKU预测销售趋势 """ sku_predictions = {} # 按SKU分组 grouped = df.groupby('SKU') for sku, group in grouped: # 按日期分组 date_grouped = group.groupby('日期') # 构建历史数据 dates = sorted(date_grouped.groups.keys()) quantities = [] for date in dates: quantities.append(date_grouped.get_group(date)['购买数量'].sum()) # 计算基础销量 a(历史平均) a = np.mean(quantities) if quantities else 0 # 计算时间趋势因子 x(线性回归) x = 0 if len(quantities) > 1: # 简单线性回归 days = np.arange(len(quantities)) slope, intercept = np.polyfit(days, quantities, 1) x = slope # 计算季节性因子 y y = 1.0 if len(quantities) >= 7: # 计算最近7天的平均季节性因子 recent_quantities = quantities[-7:] recent_avg = np.mean(recent_quantities) y = recent_quantities[-1] / recent_avg if recent_avg > 0 else 1.0 # 生成预测日期 last_date = dates[-1] if dates else datetime.now().date() predict_dates = [] for i in range(1, predict_days + 1): predict_dates.append(last_date + timedelta(days=i)) # 生成预测值 predict_quantities = [] predict_revenues = [] # 计算该SKU的平均价格 sku_avg_price = group['买家实际支付金额'].sum() / group['购买数量'].sum() if group['购买数量'].sum() > 0 else 0 for i, predict_date in enumerate(predict_dates): # 计算预测值: a + x * (days_since_start) * y days_since_start = len(quantities) + i predicted_quantity = max(0, a + x * days_since_start * y) predict_quantities.append(round(predicted_quantity, 2)) predict_revenues.append(round(predicted_quantity * sku_avg_price, 2)) sku_predictions[sku] = { 'date_series': [str(date) for date in predict_dates], 'quantity_series': predict_quantities, 'revenue_series': predict_revenues, 'model_params': { 'a': round(a, 2), 'x': round(x, 4), 'y': round(y, 2) } } # 转换所有 numpy 类型为 Python 原生类型 sku_predictions = convert_numpy_types(sku_predictions) return sku_predictions