#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 促销效果分析服务 """ import pandas as pd import numpy as np from datetime import datetime from collections import defaultdict import json def analyze_promotion_effect(df, filename): """ 分析促销效果 Args: df: 销售数据DataFrame filename: 文件名 Returns: dict: 分析结果 """ try: print(f"开始分析促销效果: {filename}") print(f"数据形状: {df.shape}") # 1. 数据预处理 df = preprocess_data(df) # 2. 计算总体指标 summary = calculate_summary(df) # 3. 按促销类型分析 promotion_types = analyze_promotion_types(df) # 4. 按时间分析促销效果 time_analysis = analyze_time_effect(df) # 5. 按品类分析促销效果 category_analysis = analyze_category_effect(df) # 6. 按SKU分析促销效果 sku_analysis = analyze_sku_effect(df) # 7. 促销效果评估 effect_evaluation = evaluate_promotion_effect(df) # 8. 构建结果 results = { 'summary': summary, 'promotion_types': promotion_types, 'time_analysis': time_analysis, 'category_analysis': category_analysis, 'sku_analysis': sku_analysis, 'effect_evaluation': effect_evaluation } # 转换所有 numpy 类型为 Python 原生类型 results = convert_numpy_types(results) print("促销效果分析完成") return results except Exception as e: print(f"分析促销效果时出错: {str(e)}") raise def preprocess_data(df): """ 预处理数据 """ # 复制数据以避免修改原始数据 df = df.copy() # 处理日期列 if '订单创建时间' in df.columns: df['订单创建时间'] = pd.to_datetime(df['订单创建时间'], errors='coerce') # 提取日期部分 df['日期'] = df['订单创建时间'].dt.date # 处理数值列 numeric_columns = ['价格', '购买数量', '买家应付货款', '买家实际支付金额', '退款金额'] for col in numeric_columns: if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce') # 填充缺失值 df = df.fillna({ '价格': 0, '购买数量': 0, '买家应付货款': 0, '买家实际支付金额': 0, '退款金额': 0 }) # 确保SKU列存在 if '商家编码' in df.columns: df['SKU'] = df['商家编码'] elif '外部系统编号' in df.columns: df['SKU'] = df['外部系统编号'] else: # 如果没有SKU列,使用商品名称作为SKU df['SKU'] = df['商品名称'] # 确保品类列存在 if '品类' not in df.columns: # 简单品类划分:基于商品名称 df['品类'] = df['商品名称'].apply(lambda x: categorize_product(x)) # 计算促销力度 df['促销力度'] = 1 - (df['买家实际支付金额'] / df['买家应付货款']) df['促销力度'] = df['促销力度'].fillna(0) # 标记是否为促销商品 df['是否促销'] = df['促销力度'] > 0 return df def categorize_product(product_name): """ 根据商品名称简单分类 """ product_name = str(product_name).lower() if '腰垫' in product_name or '靠垫' in product_name: return '家居用品' elif '手机' in product_name or '电脑' in product_name: return '电子产品' elif '服装' in product_name or '鞋' in product_name: return '服装鞋包' else: return '其他' def calculate_summary(df): """ 计算总体指标 """ # 计算促销商品数量 promotional_products = df[df['是否促销']] non_promotional_products = df[~df['是否促销']] # 促销商品指标 promo_quantity = promotional_products['购买数量'].sum() promo_revenue = promotional_products['买家实际支付金额'].sum() promo_avg_price = promo_revenue / promo_quantity if promo_quantity > 0 else 0 promo_avg_promotion = promotional_products['促销力度'].mean() * 100 # 非促销商品指标 non_promo_quantity = non_promotional_products['购买数量'].sum() non_promo_revenue = non_promotional_products['买家实际支付金额'].sum() non_promo_avg_price = non_promo_revenue / non_promo_quantity if non_promo_quantity > 0 else 0 # 计算促销效果 quantity_effect = (promo_quantity - non_promo_quantity) / non_promo_quantity * 100 if non_promo_quantity > 0 else 0 revenue_effect = (promo_revenue - non_promo_revenue) / non_promo_revenue * 100 if non_promo_revenue > 0 else 0 summary = { 'total_orders': len(df), 'promotional_orders': len(promotional_products), 'non_promotional_orders': len(non_promotional_products), 'promotional_ratio': len(promotional_products) / len(df) * 100 if len(df) > 0 else 0, 'promo_quantity': int(promo_quantity), 'non_promo_quantity': int(non_promo_quantity), 'promo_revenue': round(promo_revenue, 2), 'non_promo_revenue': round(non_promo_revenue, 2), 'promo_avg_price': round(promo_avg_price, 2), 'non_promo_avg_price': round(non_promo_avg_price, 2), 'avg_promotion': round(promo_avg_promotion, 2), 'quantity_effect': round(quantity_effect, 2), 'revenue_effect': round(revenue_effect, 2) } # 转换所有 numpy 类型为 Python 原生类型 summary = convert_numpy_types(summary) return summary def analyze_promotion_types(df): """ 按促销类型分析 """ # 基于促销力度划分促销类型 def get_promotion_type(promotion): if promotion == 0: return '无促销' elif promotion < 0.1: return '小幅促销' elif promotion < 0.3: return '中幅促销' else: return '大幅促销' df['促销类型'] = df['促销力度'].apply(get_promotion_type) # 按促销类型分组 grouped = df.groupby('促销类型') promotion_types = {} type_list = [] for promotion_type, group in grouped: type_list.append(promotion_type) quantity = group['购买数量'].sum() revenue = group['买家实际支付金额'].sum() avg_price = revenue / quantity if quantity > 0 else 0 avg_promotion = group['促销力度'].mean() * 100 order_count = len(group) promotion_types[promotion_type] = { 'quantity': int(quantity), 'revenue': round(revenue, 2), 'avg_price': round(avg_price, 2), 'avg_promotion': round(avg_promotion, 2), 'order_count': order_count } return { 'types': promotion_types, 'type_list': type_list } def analyze_time_effect(df): """ 按时间分析促销效果 """ # 按日期分组 date_grouped = df.groupby('日期') date_series = [] promo_quantity_series = [] non_promo_quantity_series = [] promo_revenue_series = [] non_promo_revenue_series = [] avg_promotion_series = [] for date in sorted(date_grouped.groups.keys()): date_series.append(str(date)) date_data = date_grouped.get_group(date) # 促销商品 promo_data = date_data[date_data['是否促销']] promo_quantity = promo_data['购买数量'].sum() promo_revenue = promo_data['买家实际支付金额'].sum() # 非促销商品 non_promo_data = date_data[~date_data['是否促销']] non_promo_quantity = non_promo_data['购买数量'].sum() non_promo_revenue = non_promo_data['买家实际支付金额'].sum() # 平均促销力度 avg_promotion = promo_data['促销力度'].mean() * 100 if len(promo_data) > 0 else 0 promo_quantity_series.append(int(promo_quantity)) non_promo_quantity_series.append(int(non_promo_quantity)) promo_revenue_series.append(round(promo_revenue, 2)) non_promo_revenue_series.append(round(non_promo_revenue, 2)) avg_promotion_series.append(round(avg_promotion, 2)) return { 'date_series': date_series, 'promo_quantity_series': promo_quantity_series, 'non_promo_quantity_series': non_promo_quantity_series, 'promo_revenue_series': promo_revenue_series, 'non_promo_revenue_series': non_promo_revenue_series, 'avg_promotion_series': avg_promotion_series } def analyze_category_effect(df): """ 按品类分析促销效果 """ # 按品类分组 grouped = df.groupby('品类') category_effects = {} category_list = [] for category, group in grouped: category_list.append(category) # 促销商品 promo_data = group[group['是否促销']] promo_quantity = promo_data['购买数量'].sum() promo_revenue = promo_data['买家实际支付金额'].sum() # 非促销商品 non_promo_data = group[~group['是否促销']] non_promo_quantity = non_promo_data['购买数量'].sum() non_promo_revenue = non_promo_data['买家实际支付金额'].sum() # 计算促销效果 quantity_effect = (promo_quantity - non_promo_quantity) / non_promo_quantity * 100 if non_promo_quantity > 0 else 0 revenue_effect = (promo_revenue - non_promo_revenue) / non_promo_revenue * 100 if non_promo_revenue > 0 else 0 # 平均促销力度 avg_promotion = promo_data['促销力度'].mean() * 100 if len(promo_data) > 0 else 0 category_effects[category] = { 'promo_quantity': int(promo_quantity), 'non_promo_quantity': int(non_promo_quantity), 'promo_revenue': round(promo_revenue, 2), 'non_promo_revenue': round(non_promo_revenue, 2), 'quantity_effect': round(quantity_effect, 2), 'revenue_effect': round(revenue_effect, 2), 'avg_promotion': round(avg_promotion, 2), 'promo_order_count': len(promo_data), 'non_promo_order_count': len(non_promo_data) } return { 'category_effects': category_effects, 'category_list': category_list } def analyze_sku_effect(df): """ 按SKU分析促销效果 """ # 按SKU分组 grouped = df.groupby('SKU') sku_effects = {} sku_list = [] for sku, group in grouped: sku_list.append(sku) # 促销商品 promo_data = group[group['是否促销']] promo_quantity = promo_data['购买数量'].sum() promo_revenue = promo_data['买家实际支付金额'].sum() # 非促销商品 non_promo_data = group[~group['是否促销']] non_promo_quantity = non_promo_data['购买数量'].sum() non_promo_revenue = non_promo_data['买家实际支付金额'].sum() # 计算促销效果 quantity_effect = (promo_quantity - non_promo_quantity) / non_promo_quantity * 100 if non_promo_quantity > 0 else 0 revenue_effect = (promo_revenue - non_promo_revenue) / non_promo_revenue * 100 if non_promo_revenue > 0 else 0 # 平均促销力度 avg_promotion = promo_data['促销力度'].mean() * 100 if len(promo_data) > 0 else 0 sku_effects[sku] = { 'promo_quantity': int(promo_quantity), 'non_promo_quantity': int(non_promo_quantity), 'promo_revenue': round(promo_revenue, 2), 'non_promo_revenue': round(non_promo_revenue, 2), 'quantity_effect': round(quantity_effect, 2), 'revenue_effect': round(revenue_effect, 2), 'avg_promotion': round(avg_promotion, 2), 'promo_order_count': len(promo_data), 'non_promo_order_count': len(non_promo_data) } return { 'sku_effects': sku_effects, 'sku_list': sku_list } def evaluate_promotion_effect(df): """ 评估促销效果 """ # 促销商品 promo_data = df[df['是否促销']] # 非促销商品 non_promo_data = df[~df['是否促销']] # 计算各项指标 # 1. 销量提升率 promo_quantity = promo_data['购买数量'].sum() non_promo_quantity = non_promo_data['购买数量'].sum() quantity_lift = (promo_quantity - non_promo_quantity) / non_promo_quantity * 100 if non_promo_quantity > 0 else 0 # 2. revenue提升率 promo_revenue = promo_data['买家实际支付金额'].sum() non_promo_revenue = non_promo_data['买家实际支付金额'].sum() revenue_lift = (promo_revenue - non_promo_revenue) / non_promo_revenue * 100 if non_promo_revenue > 0 else 0 # 3. 促销商品占比 promo_ratio = len(promo_data) / len(df) * 100 if len(df) > 0 else 0 # 4. 平均促销力度 avg_promotion = promo_data['促销力度'].mean() * 100 if len(promo_data) > 0 else 0 # 5. 促销效果评分 # 基于销量提升率、revenue提升率和促销力度计算综合评分 score = 0 if quantity_lift > 0: score += quantity_lift * 0.4 if revenue_lift > 0: score += revenue_lift * 0.4 score += (100 - avg_promotion) * 0.2 # 促销力度越小,得分越高 score = min(100, max(0, score)) # 6. 效果等级 if score >= 80: level = '优秀' elif score >= 60: level = '良好' elif score >= 40: level = '一般' else: level = '需改进' return { 'quantity_lift': round(quantity_lift, 2), 'revenue_lift': round(revenue_lift, 2), 'promo_ratio': round(promo_ratio, 2), 'avg_promotion': round(avg_promotion, 2), 'score': round(score, 2), 'level': level } def convert_numpy_types(obj): """ 递归转换所有 numpy 类型为 Python 原生类型 Args: obj: 要转换的对象 Returns: 转换后的对象 """ if isinstance(obj, dict): return {key: convert_numpy_types(value) for key, value in obj.items()} elif isinstance(obj, list): return [convert_numpy_types(item) for item in obj] elif isinstance(obj, np.integer): return int(obj) elif isinstance(obj, np.floating): return float(obj) elif isinstance(obj, np.ndarray): return [convert_numpy_types(item) for item in obj.tolist()] else: return obj