| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- """
- 数据加载和清洗模块
- """
- import pandas as pd
- import numpy as np
- import io
- import re
- from dateutil import parser
- # 数据文件配置
- CSV_ENCODINGS = ['utf-8', 'gbk', 'gb2312', 'gb18030']
- EXCEL_EXTENSIONS = ('.xlsx', '.xls')
- CSV_EXTENSIONS = ('.csv',)
- SUPPORTED_EXTENSIONS = CSV_EXTENSIONS + EXCEL_EXTENSIONS
- # 必需的数据列
- REQUIRED_COLUMNS = [
- '订单状态', '订单付款时间', '买家实际支付金额',
- '标题', '商品名称', '购买数量', '外部系统编号', '商品属性'
- ]
- def validate_file_upload(file):
- """验证文件上传"""
- if file is None:
- return False, '没有选择文件'
-
- if file.filename == '':
- return False, '没有选择文件'
-
- if file.filename is None or not file.filename.endswith(SUPPORTED_EXTENSIONS):
- return False, f'只支持CSV、Excel文件,当前文件: {file.filename}'
-
- return True, None
- def parse_datetime(date_str):
- """解析日期时间字符串"""
- if pd.isna(date_str) or date_str == '':
- return None
-
- try:
- return parser.parse(str(date_str))
- except:
- return None
- def _read_file(file_data, filename):
- """读取文件(Excel或CSV)"""
- if filename.endswith(EXCEL_EXTENSIONS):
- df = pd.read_excel(file_data)
- print(f"📊 Excel文件读取成功")
- return df
- else:
- # 尝试多种编码读取CSV
- file_content = file_data.read()
- for encoding in CSV_ENCODINGS:
- try:
- # 使用encoding_errors='ignore'忽略解码错误,并自动处理BOM
- df = pd.read_csv(io.StringIO(file_content.decode(encoding, errors='ignore')),
- low_memory=False, encoding_errors='ignore')
- print(f"📊 CSV文件读取成功({encoding.upper()}编码)")
- return df
- except (UnicodeDecodeError, Exception) as e:
- continue
- raise Exception("无法以任何编码读取CSV文件")
- def _clean_title(title):
- """清理商品标题"""
- if pd.isna(title):
- return "未知商品"
- title_str = str(title).strip()
- title_str = re.sub(r'\s+', ' ', title_str)
- if len(title_str) > 100:
- title_str = title_str[:100] + "..."
- return title_str if title_str else "未知商品"
- def load_and_clean_data(file_data, filename):
- """
- 加载并清洗上传的数据文件
-
- Args:
- file_data: 文件数据对象
- filename: 文件名
-
- Returns:
- DataFrame: 清洗后的数据
- """
- try:
- df = _read_file(file_data, filename)
- except Exception as e:
- raise Exception(f"错误:无法读取文件。{str(e)}")
- print(f"✅ 数据文件加载成功:{filename}")
- print(f"📊 原始数据记录数:{len(df):,}")
- print(f"📄 文件列名:{list(df.columns)}")
-
- # 清理列名(移除BOM和空格)
- df.columns = df.columns.str.strip().str.replace('\ufeff', '', regex=False)
-
- # 检查必需列
- missing_cols = [col for col in REQUIRED_COLUMNS if col not in df.columns]
- if missing_cols:
- print(f"❌ 缺少必需列:{missing_cols}")
- print(f"📄 当前文件列名:{list(df.columns)}")
- raise Exception(f"错误:数据文件缺少必需的列:{', '.join(missing_cols)}")
- # 筛选有效订单
- print(f"🔍 筛选前订单状态分布:{df['订单状态'].value_counts().to_dict()}")
- valid_orders = df['订单状态'] == '交易成功'
- df = df[valid_orders].copy()
- print(f"✅ 筛选有效订单后剩余 {len(df):,} 条记录")
-
- if len(df) == 0:
- raise Exception("错误:没有有效的交易成功订单")
- # 转换数据类型
- df['订单付款时间'] = pd.Series(df['订单付款时间']).apply(parse_datetime)
-
- # 处理金额和数量字段
- df['买家实际支付金额'] = pd.to_numeric(df['买家实际支付金额'], errors='coerce').fillna(0)
- df['购买数量'] = pd.to_numeric(df['购买数量'], errors='coerce').fillna(1).astype(int)
- if '价格' in df.columns:
- df['价格'] = pd.to_numeric(df['价格'], errors='coerce').fillna(0)
- else:
- df['价格'] = 0
-
- # 处理标题字段,清理和标准化
- df['商品标题'] = pd.Series(df['标题']).apply(_clean_title)
- df['外部系统编号'] = pd.Series(df['外部系统编号']).fillna('未知编号')
- # 创建SPU标识(基于商品名称)
- df['SPU'] = pd.Series(df['商品名称']).apply(_clean_title)
-
- # 创建SKU标识
- df['SKU'] = df['外部系统编号'].astype(str)
-
- # 计算销售额
- df['单笔销售额'] = np.where(
- df['买家实际支付金额'] > 0,
- df['买家实际支付金额'],
- df['价格'] * df['购买数量']
- )
-
- # 移除无效数据
- valid_data = (
- pd.Series(df['订单付款时间']).notna() &
- (df['单笔销售额'] > 0) &
- (df['购买数量'] > 0)
- )
- df = df[valid_data].copy()
-
- print(f"✅ 数据清洗完成,最终有效记录:{len(df):,}")
-
- return df
|