""" 数据加载和清洗模块 """ import pandas as pd import numpy as np import io import re from dateutil import parser # 数据文件配置 CSV_ENCODINGS = ['utf-8', 'gbk', 'gb2312', 'gb18030'] EXCEL_EXTENSIONS = ('.xlsx', '.xls') CSV_EXTENSIONS = ('.csv',) SUPPORTED_EXTENSIONS = CSV_EXTENSIONS + EXCEL_EXTENSIONS # 必需的数据列 REQUIRED_COLUMNS = [ '订单状态', '订单付款时间', '买家实际支付金额', '标题', '商品名称', '购买数量', '外部系统编号', '商品属性' ] def validate_file_upload(file): """验证文件上传""" if file is None: return False, '没有选择文件' if file.filename == '': return False, '没有选择文件' if file.filename is None or not file.filename.endswith(SUPPORTED_EXTENSIONS): return False, f'只支持CSV、Excel文件,当前文件: {file.filename}' return True, None def parse_datetime(date_str): """解析日期时间字符串""" if pd.isna(date_str) or date_str == '': return None try: return parser.parse(str(date_str)) except: return None def _read_file(file_data, filename): """读取文件(Excel或CSV)""" if filename.endswith(EXCEL_EXTENSIONS): df = pd.read_excel(file_data) print(f"📊 Excel文件读取成功") return df else: # 尝试多种编码读取CSV file_content = file_data.read() for encoding in CSV_ENCODINGS: try: # 使用encoding_errors='ignore'忽略解码错误,并自动处理BOM df = pd.read_csv(io.StringIO(file_content.decode(encoding, errors='ignore')), low_memory=False, encoding_errors='ignore') print(f"📊 CSV文件读取成功({encoding.upper()}编码)") return df except (UnicodeDecodeError, Exception) as e: continue raise Exception("无法以任何编码读取CSV文件") def _clean_title(title): """清理商品标题""" if pd.isna(title): return "未知商品" title_str = str(title).strip() title_str = re.sub(r'\s+', ' ', title_str) if len(title_str) > 100: title_str = title_str[:100] + "..." return title_str if title_str else "未知商品" def load_and_clean_data(file_data, filename): """ 加载并清洗上传的数据文件 Args: file_data: 文件数据对象 filename: 文件名 Returns: DataFrame: 清洗后的数据 """ try: df = _read_file(file_data, filename) except Exception as e: raise Exception(f"错误:无法读取文件。{str(e)}") print(f"✅ 数据文件加载成功:{filename}") print(f"📊 原始数据记录数:{len(df):,}") print(f"📄 文件列名:{list(df.columns)}") # 清理列名(移除BOM和空格) df.columns = df.columns.str.strip().str.replace('\ufeff', '', regex=False) # 检查必需列 missing_cols = [col for col in REQUIRED_COLUMNS if col not in df.columns] if missing_cols: print(f"❌ 缺少必需列:{missing_cols}") print(f"📄 当前文件列名:{list(df.columns)}") raise Exception(f"错误:数据文件缺少必需的列:{', '.join(missing_cols)}") # 筛选有效订单 print(f"🔍 筛选前订单状态分布:{df['订单状态'].value_counts().to_dict()}") valid_orders = df['订单状态'] == '交易成功' df = df[valid_orders].copy() print(f"✅ 筛选有效订单后剩余 {len(df):,} 条记录") if len(df) == 0: raise Exception("错误:没有有效的交易成功订单") # 转换数据类型 df['订单付款时间'] = pd.Series(df['订单付款时间']).apply(parse_datetime) # 处理金额和数量字段 df['买家实际支付金额'] = pd.to_numeric(df['买家实际支付金额'], errors='coerce').fillna(0) df['购买数量'] = pd.to_numeric(df['购买数量'], errors='coerce').fillna(1).astype(int) if '价格' in df.columns: df['价格'] = pd.to_numeric(df['价格'], errors='coerce').fillna(0) else: df['价格'] = 0 # 处理标题字段,清理和标准化 df['商品标题'] = pd.Series(df['标题']).apply(_clean_title) df['外部系统编号'] = pd.Series(df['外部系统编号']).fillna('未知编号') # 创建SPU标识(基于商品名称) df['SPU'] = pd.Series(df['商品名称']).apply(_clean_title) # 创建SKU标识 df['SKU'] = df['外部系统编号'].astype(str) # 计算销售额 df['单笔销售额'] = np.where( df['买家实际支付金额'] > 0, df['买家实际支付金额'], df['价格'] * df['购买数量'] ) # 移除无效数据 valid_data = ( pd.Series(df['订单付款时间']).notna() & (df['单笔销售额'] > 0) & (df['购买数量'] > 0) ) df = df[valid_data].copy() print(f"✅ 数据清洗完成,最终有效记录:{len(df):,}") return df