data_loader_service.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. """
  2. 数据加载和清洗模块
  3. """
  4. import pandas as pd
  5. import numpy as np
  6. import io
  7. import re
  8. from dateutil import parser
  9. # 数据文件配置
  10. CSV_ENCODINGS = ['utf-8', 'gbk', 'gb2312', 'gb18030']
  11. EXCEL_EXTENSIONS = ('.xlsx', '.xls')
  12. CSV_EXTENSIONS = ('.csv',)
  13. SUPPORTED_EXTENSIONS = CSV_EXTENSIONS + EXCEL_EXTENSIONS
  14. # 必需的数据列
  15. REQUIRED_COLUMNS = [
  16. '订单状态', '订单付款时间', '买家实际支付金额',
  17. '标题', '商品名称', '购买数量', '外部系统编号', '商品属性'
  18. ]
  19. def validate_file_upload(file):
  20. """验证文件上传"""
  21. if file is None:
  22. return False, '没有选择文件'
  23. if file.filename == '':
  24. return False, '没有选择文件'
  25. if file.filename is None or not file.filename.endswith(SUPPORTED_EXTENSIONS):
  26. return False, f'只支持CSV、Excel文件,当前文件: {file.filename}'
  27. return True, None
  28. def parse_datetime(date_str):
  29. """解析日期时间字符串"""
  30. if pd.isna(date_str) or date_str == '':
  31. return None
  32. try:
  33. return parser.parse(str(date_str))
  34. except:
  35. return None
  36. def _read_file(file_data, filename):
  37. """读取文件(Excel或CSV)"""
  38. if filename.endswith(EXCEL_EXTENSIONS):
  39. df = pd.read_excel(file_data)
  40. print(f"📊 Excel文件读取成功")
  41. return df
  42. else:
  43. # 尝试多种编码读取CSV
  44. file_content = file_data.read()
  45. for encoding in CSV_ENCODINGS:
  46. try:
  47. # 使用encoding_errors='ignore'忽略解码错误,并自动处理BOM
  48. df = pd.read_csv(io.StringIO(file_content.decode(encoding, errors='ignore')),
  49. low_memory=False, encoding_errors='ignore')
  50. print(f"📊 CSV文件读取成功({encoding.upper()}编码)")
  51. return df
  52. except (UnicodeDecodeError, Exception) as e:
  53. continue
  54. raise Exception("无法以任何编码读取CSV文件")
  55. def _clean_title(title):
  56. """清理商品标题"""
  57. if pd.isna(title):
  58. return "未知商品"
  59. title_str = str(title).strip()
  60. title_str = re.sub(r'\s+', ' ', title_str)
  61. if len(title_str) > 100:
  62. title_str = title_str[:100] + "..."
  63. return title_str if title_str else "未知商品"
  64. def load_and_clean_data(file_data, filename):
  65. """
  66. 加载并清洗上传的数据文件
  67. Args:
  68. file_data: 文件数据对象
  69. filename: 文件名
  70. Returns:
  71. DataFrame: 清洗后的数据
  72. """
  73. try:
  74. df = _read_file(file_data, filename)
  75. except Exception as e:
  76. raise Exception(f"错误:无法读取文件。{str(e)}")
  77. print(f"✅ 数据文件加载成功:{filename}")
  78. print(f"📊 原始数据记录数:{len(df):,}")
  79. print(f"📄 文件列名:{list(df.columns)}")
  80. # 清理列名(移除BOM和空格)
  81. df.columns = df.columns.str.strip().str.replace('\ufeff', '', regex=False)
  82. # 检查必需列
  83. missing_cols = [col for col in REQUIRED_COLUMNS if col not in df.columns]
  84. if missing_cols:
  85. print(f"❌ 缺少必需列:{missing_cols}")
  86. print(f"📄 当前文件列名:{list(df.columns)}")
  87. raise Exception(f"错误:数据文件缺少必需的列:{', '.join(missing_cols)}")
  88. # 筛选有效订单
  89. print(f"🔍 筛选前订单状态分布:{df['订单状态'].value_counts().to_dict()}")
  90. valid_orders = df['订单状态'] == '交易成功'
  91. df = df[valid_orders].copy()
  92. print(f"✅ 筛选有效订单后剩余 {len(df):,} 条记录")
  93. if len(df) == 0:
  94. raise Exception("错误:没有有效的交易成功订单")
  95. # 转换数据类型
  96. df['订单付款时间'] = pd.Series(df['订单付款时间']).apply(parse_datetime)
  97. # 处理金额和数量字段
  98. df['买家实际支付金额'] = pd.to_numeric(df['买家实际支付金额'], errors='coerce').fillna(0)
  99. df['购买数量'] = pd.to_numeric(df['购买数量'], errors='coerce').fillna(1).astype(int)
  100. if '价格' in df.columns:
  101. df['价格'] = pd.to_numeric(df['价格'], errors='coerce').fillna(0)
  102. else:
  103. df['价格'] = 0
  104. # 处理标题字段,清理和标准化
  105. df['商品标题'] = pd.Series(df['标题']).apply(_clean_title)
  106. df['外部系统编号'] = pd.Series(df['外部系统编号']).fillna('未知编号')
  107. # 创建SPU标识(基于商品名称)
  108. df['SPU'] = pd.Series(df['商品名称']).apply(_clean_title)
  109. # 创建SKU标识
  110. df['SKU'] = df['外部系统编号'].astype(str)
  111. # 计算销售额
  112. df['单笔销售额'] = np.where(
  113. df['买家实际支付金额'] > 0,
  114. df['买家实际支付金额'],
  115. df['价格'] * df['购买数量']
  116. )
  117. # 移除无效数据
  118. valid_data = (
  119. pd.Series(df['订单付款时间']).notna() &
  120. (df['单笔销售额'] > 0) &
  121. (df['购买数量'] > 0)
  122. )
  123. df = df[valid_data].copy()
  124. print(f"✅ 数据清洗完成,最终有效记录:{len(df):,}")
  125. return df