supplier_resilence_service.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. import os
  2. import warnings
  3. from typing import Dict, Any, List
  4. import numpy as np
  5. import pandas as pd
  6. warnings.filterwarnings('ignore')
  7. # ==================== 路径与配置 ====================
  8. # 项目根目录:dtm_python_merged
  9. BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  10. DATA_DIR = os.path.join(BASE_DIR, 'data')
  11. # 数据文件路径(注意:文件需要已经复制到 dtm_python_merged/data 下)
  12. PAYMENT_FILE = os.path.join(DATA_DIR, '供应商账期.xlsx')
  13. DELIVERY_FILE = os.path.join(DATA_DIR, '采购数据_双键合并结果.xlsx')
  14. COST_FILE = os.path.join(DATA_DIR, '采购订单统计.xlsx')
  15. # ==================== 工具函数 ====================
  16. def sanitize(obj):
  17. """递归将numpy/pandas类型转为Python原生类型,确保JSON可序列化"""
  18. if isinstance(obj, dict):
  19. return {k: sanitize(v) for k, v in obj.items()}
  20. if isinstance(obj, list):
  21. return [sanitize(v) for v in obj]
  22. if isinstance(obj, np.integer):
  23. return int(obj)
  24. if isinstance(obj, np.floating):
  25. v = float(obj)
  26. return None if np.isnan(v) else v
  27. if isinstance(obj, np.bool_):
  28. return bool(obj)
  29. if isinstance(obj, np.ndarray):
  30. return obj.tolist()
  31. if isinstance(obj, (pd.Timestamp, pd.Timedelta)):
  32. return str(obj)
  33. if isinstance(obj, float) and np.isnan(obj):
  34. return None
  35. return obj
  36. # ==================== 供应商综合评估器 ====================
  37. class IntegratedSupplierEvaluator:
  38. """
  39. 供应商综合评估系统
  40. 整合账期(Payment)、交付(Delivery)、成本(Cost)三个维度进行综合评分
  41. """
  42. def __init__(self,
  43. payment_file: str,
  44. delivery_file: str,
  45. cost_file: str,
  46. payment_sheet: str = 'sheet1',
  47. cost_sheet: str = 0):
  48. """
  49. 初始化评估器
  50. 参数:
  51. payment_file: 账期数据Excel文件路径
  52. delivery_file: 交付数据Excel文件路径
  53. cost_file: 成本数据Excel文件路径
  54. payment_sheet: 账期数据工作表名称
  55. cost_sheet: 成本数据工作表名称或索引
  56. """
  57. self.payment_df = None
  58. self.delivery_df = None
  59. self.cost_df = None
  60. # 加载数据
  61. self._load_payment_data(payment_file, payment_sheet)
  62. self._load_delivery_data(delivery_file)
  63. self._load_cost_data(cost_file, cost_sheet)
  64. print("=" * 80)
  65. print("供应商评估数据加载完成!")
  66. print(f"账期文件: {payment_file}")
  67. print(f"交付文件: {delivery_file}")
  68. print(f"成本文件: {cost_file}")
  69. print("=" * 80)
  70. def _load_payment_data(self, file_path, sheet_name):
  71. """加载并处理账期数据"""
  72. try:
  73. df = pd.read_excel(file_path, sheet_name=sheet_name, engine='openpyxl')
  74. if '供应商名称' not in df.columns or '结算期限' not in df.columns:
  75. print(f"⚠️ 账期数据缺少必需列,该维度将被跳过")
  76. return
  77. # 提取账期天数
  78. df['账期天数'] = df['结算期限'].astype(str).str.extract(r'(\d+)').astype(float)
  79. # 计算账期分数
  80. conditions = [
  81. df['账期天数'] >= 90,
  82. df['账期天数'] >= 60,
  83. df['账期天数'] >= 45,
  84. df['账期天数'] >= 30,
  85. ]
  86. choices = [100, 90, 80, 60]
  87. df['账期分数'] = np.select(conditions, choices, default=0)
  88. # 标准化供应商名称
  89. df['供应商名称'] = df['供应商名称'].astype(str).str.strip()
  90. self.payment_df = df[['供应商名称', '账期分数']].copy()
  91. print(f"✓ 账期数据加载成功 ({len(self.payment_df)} 条记录)")
  92. except Exception as e:
  93. print(f"⚠️ 账期数据加载失败: {e}")
  94. def _load_delivery_data(self, file_path):
  95. """加载并处理交付数据"""
  96. try:
  97. df = pd.read_excel(file_path)
  98. # 转换日期格式
  99. date_columns = ['计划交货日期', '实际验收日期']
  100. for col in date_columns:
  101. if col in df.columns:
  102. df[col] = pd.to_datetime(df[col], errors='coerce')
  103. # 转换数量列
  104. quantity_columns = ['订单计划数量', '实际入库数量']
  105. for col in quantity_columns:
  106. if col in df.columns:
  107. df[col] = pd.to_numeric(df[col], errors='coerce')
  108. self.delivery_df = df
  109. print(f"✓ 交付数据加载成功 ({len(self.delivery_df)} 条记录)")
  110. except Exception as e:
  111. print(f"⚠️ 交付数据加载失败: {e}")
  112. def _load_cost_data(self, file_path, sheet_name):
  113. """加载并处理成本数据"""
  114. try:
  115. df = pd.read_excel(file_path, sheet_name=sheet_name)
  116. required_columns = ['供应商代码', '供应商名称', '产品代码', '产品名称', '参考价']
  117. if not all(col in df.columns for col in required_columns):
  118. print(f"⚠️ 成本数据缺少必需列,该维度将被跳过")
  119. return
  120. df['参考价'] = pd.to_numeric(df['参考价'], errors='coerce')
  121. df.dropna(subset=['参考价', '产品代码', '供应商代码', '供应商名称'], inplace=True)
  122. self.cost_df = df
  123. print(f"✓ 成本数据加载成功 ({len(self.cost_df)} 条记录)")
  124. except Exception as e:
  125. print(f"⚠️ 成本数据加载失败: {e}")
  126. def _calculate_delivery_score(self, product_data, supplier_code, supplier_name,
  127. acceptable_delay_days=7, max_delay_tolerance=60):
  128. """计算单个供应商的交付维度得分"""
  129. group = product_data[
  130. (product_data['供应商代码'] == supplier_code) &
  131. (product_data['供应商名称'] == supplier_name)
  132. ].copy()
  133. if len(group) == 0:
  134. return None
  135. # D1: 交货准时率
  136. group['交货偏差天数'] = (group['实际验收日期'] - group['计划交货日期']).dt.days
  137. group['是否准时'] = group['交货偏差天数'].apply(
  138. lambda x: 1 if x <= acceptable_delay_days else 0
  139. )
  140. on_time_qty = (group['是否准时'] * group['实际入库数量']).sum()
  141. total_qty = group['实际入库数量'].sum()
  142. d1_otd = (on_time_qty / total_qty * 100) if total_qty > 0 else 0
  143. # D2: 平均偏差
  144. weighted_deviation = (group['交货偏差天数'] * group['实际入库数量']).sum()
  145. d2_avg_dev = weighted_deviation / total_qty if total_qty > 0 else 0
  146. # D3: 最长延迟
  147. delays = group[group['交货偏差天数'] > 0]['交货偏差天数']
  148. d3_max_delay = delays.max() if len(delays) > 0 else 0
  149. # D4: 数量满足率
  150. doc_groups = group.groupby('联系单据').agg({
  151. '订单计划数量': 'first',
  152. '实际入库数量': 'sum'
  153. })
  154. total_ordered = doc_groups['订单计划数量'].sum()
  155. total_received = doc_groups['实际入库数量'].sum()
  156. d4_fill_rate = (total_received / total_ordered * 100) if total_ordered > 0 else 0
  157. # 计算各指标得分
  158. score_d1 = self._score_d1(d1_otd)
  159. score_d2 = self._score_d2(d2_avg_dev)
  160. score_d3 = self._score_d3(d3_max_delay, max_delay_tolerance)
  161. score_d4 = self._score_d4(d4_fill_rate)
  162. # 加权总分 (权重可调整)
  163. weights = {'D1': 0.5, 'D2': 0.3, 'D3': 0.1, 'D4': 0.1}
  164. delivery_score = (score_d1 * weights['D1'] +
  165. score_d2 * weights['D2'] +
  166. score_d3 * weights['D3'] +
  167. score_d4 * weights['D4'])
  168. return {
  169. '交付分数': delivery_score,
  170. 'D1_准时率': d1_otd,
  171. 'D2_平均偏差': d2_avg_dev,
  172. 'D3_最长延迟': d3_max_delay,
  173. 'D4_满足率': d4_fill_rate
  174. }
  175. def _score_d1(self, otd_rate):
  176. """D1评分"""
  177. if otd_rate >= 80:
  178. return 100
  179. elif otd_rate <= 50:
  180. return 0
  181. else:
  182. return (otd_rate - 50) / (80 - 50) * 100
  183. def _score_d2(self, avg_deviation):
  184. """D2评分"""
  185. if avg_deviation <= 45:
  186. return 100
  187. elif avg_deviation >= 90:
  188. return 0
  189. else:
  190. return (90 - avg_deviation) / (90 - 45) * 100
  191. def _score_d3(self, max_delay, tolerance=60):
  192. """D3评分"""
  193. if max_delay <= tolerance:
  194. return 100
  195. elif max_delay > 120:
  196. return 0
  197. else:
  198. return (120 - max_delay) / (120 - tolerance) * 100
  199. def _score_d4(self, fill_rate):
  200. """D4评分"""
  201. if 85 <= fill_rate <= 110:
  202. return 100
  203. elif fill_rate < 85:
  204. if fill_rate <= 50:
  205. return 0
  206. else:
  207. return (fill_rate - 50) / (85 - 50) * 100
  208. else:
  209. if fill_rate >= 150:
  210. return 0
  211. else:
  212. return (150 - fill_rate) / (150 - 110) * 100
  213. def _calculate_cost_score(self, product_code):
  214. """计算成本维度得分"""
  215. if self.cost_df is None:
  216. return pd.DataFrame()
  217. filtered_df = self.cost_df[self.cost_df['产品代码'] == product_code].copy()
  218. if filtered_df.empty:
  219. return pd.DataFrame()
  220. # 获取每个供应商的最低报价
  221. filtered_df = filtered_df.sort_values(by=['参考价'], ascending=True)
  222. lowest_price_df = filtered_df.drop_duplicates(
  223. subset=['供应商代码'],
  224. keep='first'
  225. ).copy()
  226. # 计算排名
  227. lowest_price_df['成本排名'] = lowest_price_df['参考价'].rank(
  228. method='min',
  229. ascending=True
  230. ).astype(int)
  231. # 计算成本分数
  232. rank_to_score = {1: 100, 2: 90, 3: 80, 4: 60, 5: 0}
  233. lowest_price_df['成本分数'] = lowest_price_df['成本排名'].apply(
  234. lambda rank: rank_to_score.get(rank, 0)
  235. )
  236. return lowest_price_df[['供应商代码', '供应商名称', '参考价', '成本排名', '成本分数']]
  237. def evaluate_product(self,
  238. product_code: str,
  239. weights: Dict[str, float] = None) -> pd.DataFrame:
  240. """
  241. 综合评估指定产品的所有供应商
  242. 参数:
  243. product_code: 产品代码
  244. weights: 各维度权重 {'成本': 0.4, '交付': 0.4, '账期': 0.2}
  245. 返回:
  246. 评估结果DataFrame
  247. """
  248. if weights is None:
  249. weights = {'成本': 0.4, '交付': 0.4, '账期': 0.2}
  250. # 验证权重和为1
  251. total_weight = sum(weights.values())
  252. if abs(total_weight - 1.0) > 0.001:
  253. print(f"⚠️ 权重之和为 {total_weight},已自动归一化")
  254. weights = {k: v / total_weight for k, v in weights.items()}
  255. results: List[Dict[str, Any]] = []
  256. # 1. 获取成本维度数据
  257. cost_scores = self._calculate_cost_score(product_code)
  258. if cost_scores.empty:
  259. print(f"❌ 未找到产品代码 '{product_code}' 的成本数据")
  260. return pd.DataFrame()
  261. product_name = self.cost_df[self.cost_df['产品代码'] == product_code]['产品名称'].iloc[0]
  262. # 2. 遍历每个供应商
  263. for _, row in cost_scores.iterrows():
  264. supplier_code = row['供应商代码']
  265. supplier_name = row['供应商名称']
  266. result = {
  267. '产品代码': product_code,
  268. '产品名称': product_name,
  269. '供应商代码': supplier_code,
  270. '供应商名称': supplier_name,
  271. '参考价': row['参考价'],
  272. '成本排名': row['成本排名'],
  273. '成本分数': row['成本分数']
  274. }
  275. # 3. 计算交付维度分数
  276. if self.delivery_df is not None:
  277. delivery_data = self.delivery_df[self.delivery_df['产品代码'] == product_code]
  278. if not delivery_data.empty:
  279. delivery_result = self._calculate_delivery_score(
  280. delivery_data, supplier_code, supplier_name
  281. )
  282. if delivery_result:
  283. result['交付分数'] = delivery_result['交付分数']
  284. result['交付_准时率(%)'] = delivery_result['D1_准时率']
  285. result['交付_平均偏差(天)'] = delivery_result['D2_平均偏差']
  286. else:
  287. result['交付分数'] = None
  288. else:
  289. result['交付分数'] = None
  290. else:
  291. result['交付分数'] = None
  292. # 4. 获取账期维度分数
  293. if self.payment_df is not None:
  294. payment_match = self.payment_df[
  295. self.payment_df['供应商名称'] == supplier_name
  296. ]
  297. if not payment_match.empty:
  298. result['账期分数'] = payment_match['账期分数'].iloc[0]
  299. else:
  300. result['账期分数'] = None
  301. else:
  302. result['账期分数'] = None
  303. results.append(result)
  304. # 5. 计算综合得分
  305. result_df = pd.DataFrame(results)
  306. # 处理缺失值:使用该维度所有供应商的平均分
  307. for dimension in ['成本分数', '交付分数', '账期分数']:
  308. if dimension in result_df.columns:
  309. mean_score = result_df[dimension].mean()
  310. if pd.isna(mean_score):
  311. mean_score = 0
  312. result_df[dimension].fillna(mean_score, inplace=True)
  313. # 计算加权总分
  314. result_df['综合得分'] = (
  315. result_df['成本分数'] * weights['成本'] +
  316. result_df['交付分数'] * weights['交付'] +
  317. result_df['账期分数'] * weights['账期']
  318. )
  319. # 排序并选择Top5
  320. result_df = result_df.sort_values('综合得分', ascending=False)
  321. top5_df = result_df.head(5).copy()
  322. # 添加综合排名
  323. top5_df.insert(0, '综合排名', range(1, len(top5_df) + 1))
  324. return top5_df
  325. def get_product_details(self, product_code: str) -> Dict[str, Any]:
  326. """
  327. 获取产品成本/交付/账期详细信息,供图表展示
  328. """
  329. # 1. 成本维度详情
  330. cost_scores = self._calculate_cost_score(product_code)
  331. cost_details = sanitize(cost_scores.to_dict(orient='records')) if not cost_scores.empty else []
  332. # 2. 交付维度详情 (需要为每个供应商计算)
  333. delivery_details: List[Dict[str, Any]] = []
  334. if self.delivery_df is not None:
  335. delivery_data = self.delivery_df[self.delivery_df['产品代码'] == product_code]
  336. if not delivery_data.empty:
  337. # 获取所有相关的供应商
  338. unique_suppliers = delivery_data[['供应商代码', '供应商名称']].drop_duplicates()
  339. for _, supplier_row in unique_suppliers.iterrows():
  340. supplier_code = supplier_row['供应商代码']
  341. supplier_name = supplier_row['供应商名称']
  342. delivery_result = self._calculate_delivery_score(
  343. delivery_data, supplier_code, supplier_name
  344. )
  345. if delivery_result:
  346. delivery_result['供应商代码'] = supplier_code
  347. delivery_result['供应商名称'] = supplier_name
  348. delivery_details.append(sanitize(delivery_result))
  349. # 3. 账期维度详情
  350. payment_details: List[Dict[str, Any]] = []
  351. if self.payment_df is not None and not cost_scores.empty:
  352. # 从成本数据中获取所有相关供应商名称
  353. cost_supplier_names = cost_scores['供应商名称'].unique()
  354. payment_matches = self.payment_df[
  355. self.payment_df['供应商名称'].isin(cost_supplier_names)
  356. ]
  357. payment_details = sanitize(payment_matches.to_dict(orient='records'))
  358. return {
  359. 'product_code': product_code,
  360. 'cost_details': cost_details,
  361. 'delivery_details': delivery_details,
  362. 'payment_details': payment_details
  363. }
  364. # ==================== 对外单例与权重管理 ====================
  365. # 初始化评估器实例
  366. evaluator = IntegratedSupplierEvaluator(
  367. payment_file=PAYMENT_FILE,
  368. delivery_file=DELIVERY_FILE,
  369. cost_file=COST_FILE,
  370. payment_sheet='sheet1',
  371. cost_sheet=0
  372. )
  373. # 默认权重配置
  374. default_weights: Dict[str, float] = {
  375. '成本': 0.4,
  376. '交付': 0.4,
  377. '账期': 0.2
  378. }
  379. # 当前权重配置(可被用户修改)
  380. current_weights: Dict[str, float] = default_weights.copy()
  381. def get_current_weights() -> Dict[str, float]:
  382. return current_weights
  383. def update_weights(new_weights: Dict[str, float]) -> Dict[str, float]:
  384. """
  385. 更新全局权重,调用前应已校验和为1
  386. """
  387. global current_weights
  388. # 简单拷贝,避免外部直接修改引用
  389. current_weights = {
  390. '成本': float(new_weights.get('成本', default_weights['成本'])),
  391. '交付': float(new_weights.get('交付', default_weights['交付'])),
  392. '账期': float(new_weights.get('账期', default_weights['账期']))
  393. }
  394. return current_weights