import os import warnings from typing import Dict, Any, List import numpy as np import pandas as pd warnings.filterwarnings('ignore') # ==================== 路径与配置 ==================== # 项目根目录:dtm_python_merged BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) DATA_DIR = os.path.join(BASE_DIR, 'data') # 数据文件路径(注意:文件需要已经复制到 dtm_python_merged/data 下) PAYMENT_FILE = os.path.join(DATA_DIR, '供应商账期.xlsx') DELIVERY_FILE = os.path.join(DATA_DIR, '采购数据_双键合并结果.xlsx') COST_FILE = os.path.join(DATA_DIR, '采购订单统计.xlsx') # ==================== 工具函数 ==================== def sanitize(obj): """递归将numpy/pandas类型转为Python原生类型,确保JSON可序列化""" if isinstance(obj, dict): return {k: sanitize(v) for k, v in obj.items()} if isinstance(obj, list): return [sanitize(v) for v in obj] if isinstance(obj, np.integer): return int(obj) if isinstance(obj, np.floating): v = float(obj) return None if np.isnan(v) else v if isinstance(obj, np.bool_): return bool(obj) if isinstance(obj, np.ndarray): return obj.tolist() if isinstance(obj, (pd.Timestamp, pd.Timedelta)): return str(obj) if isinstance(obj, float) and np.isnan(obj): return None return obj # ==================== 供应商综合评估器 ==================== class IntegratedSupplierEvaluator: """ 供应商综合评估系统 整合账期(Payment)、交付(Delivery)、成本(Cost)三个维度进行综合评分 """ def __init__(self, payment_file: str, delivery_file: str, cost_file: str, payment_sheet: str = 'sheet1', cost_sheet: str = 0): """ 初始化评估器 参数: payment_file: 账期数据Excel文件路径 delivery_file: 交付数据Excel文件路径 cost_file: 成本数据Excel文件路径 payment_sheet: 账期数据工作表名称 cost_sheet: 成本数据工作表名称或索引 """ self.payment_df = None self.delivery_df = None self.cost_df = None # 加载数据 self._load_payment_data(payment_file, payment_sheet) self._load_delivery_data(delivery_file) self._load_cost_data(cost_file, cost_sheet) print("=" * 80) print("供应商评估数据加载完成!") print(f"账期文件: {payment_file}") print(f"交付文件: {delivery_file}") print(f"成本文件: {cost_file}") print("=" * 80) def _load_payment_data(self, file_path, sheet_name): """加载并处理账期数据""" try: df = pd.read_excel(file_path, sheet_name=sheet_name, engine='openpyxl') if '供应商名称' not in df.columns or '结算期限' not in df.columns: print(f"⚠️ 账期数据缺少必需列,该维度将被跳过") return # 提取账期天数 df['账期天数'] = df['结算期限'].astype(str).str.extract(r'(\d+)').astype(float) # 计算账期分数 conditions = [ df['账期天数'] >= 90, df['账期天数'] >= 60, df['账期天数'] >= 45, df['账期天数'] >= 30, ] choices = [100, 90, 80, 60] df['账期分数'] = np.select(conditions, choices, default=0) # 标准化供应商名称 df['供应商名称'] = df['供应商名称'].astype(str).str.strip() self.payment_df = df[['供应商名称', '账期分数']].copy() print(f"✓ 账期数据加载成功 ({len(self.payment_df)} 条记录)") except Exception as e: print(f"⚠️ 账期数据加载失败: {e}") def _load_delivery_data(self, file_path): """加载并处理交付数据""" try: df = pd.read_excel(file_path) # 转换日期格式 date_columns = ['计划交货日期', '实际验收日期'] for col in date_columns: if col in df.columns: df[col] = pd.to_datetime(df[col], errors='coerce') # 转换数量列 quantity_columns = ['订单计划数量', '实际入库数量'] for col in quantity_columns: if col in df.columns: df[col] = pd.to_numeric(df[col], errors='coerce') self.delivery_df = df print(f"✓ 交付数据加载成功 ({len(self.delivery_df)} 条记录)") except Exception as e: print(f"⚠️ 交付数据加载失败: {e}") def _load_cost_data(self, file_path, sheet_name): """加载并处理成本数据""" try: df = pd.read_excel(file_path, sheet_name=sheet_name) required_columns = ['供应商代码', '供应商名称', '产品代码', '产品名称', '参考价'] if not all(col in df.columns for col in required_columns): print(f"⚠️ 成本数据缺少必需列,该维度将被跳过") return df['参考价'] = pd.to_numeric(df['参考价'], errors='coerce') df.dropna(subset=['参考价', '产品代码', '供应商代码', '供应商名称'], inplace=True) self.cost_df = df print(f"✓ 成本数据加载成功 ({len(self.cost_df)} 条记录)") except Exception as e: print(f"⚠️ 成本数据加载失败: {e}") def _calculate_delivery_score(self, product_data, supplier_code, supplier_name, acceptable_delay_days=7, max_delay_tolerance=60): """计算单个供应商的交付维度得分""" group = product_data[ (product_data['供应商代码'] == supplier_code) & (product_data['供应商名称'] == supplier_name) ].copy() if len(group) == 0: return None # D1: 交货准时率 group['交货偏差天数'] = (group['实际验收日期'] - group['计划交货日期']).dt.days group['是否准时'] = group['交货偏差天数'].apply( lambda x: 1 if x <= acceptable_delay_days else 0 ) on_time_qty = (group['是否准时'] * group['实际入库数量']).sum() total_qty = group['实际入库数量'].sum() d1_otd = (on_time_qty / total_qty * 100) if total_qty > 0 else 0 # D2: 平均偏差 weighted_deviation = (group['交货偏差天数'] * group['实际入库数量']).sum() d2_avg_dev = weighted_deviation / total_qty if total_qty > 0 else 0 # D3: 最长延迟 delays = group[group['交货偏差天数'] > 0]['交货偏差天数'] d3_max_delay = delays.max() if len(delays) > 0 else 0 # D4: 数量满足率 doc_groups = group.groupby('联系单据').agg({ '订单计划数量': 'first', '实际入库数量': 'sum' }) total_ordered = doc_groups['订单计划数量'].sum() total_received = doc_groups['实际入库数量'].sum() d4_fill_rate = (total_received / total_ordered * 100) if total_ordered > 0 else 0 # 计算各指标得分 score_d1 = self._score_d1(d1_otd) score_d2 = self._score_d2(d2_avg_dev) score_d3 = self._score_d3(d3_max_delay, max_delay_tolerance) score_d4 = self._score_d4(d4_fill_rate) # 加权总分 (权重可调整) weights = {'D1': 0.5, 'D2': 0.3, 'D3': 0.1, 'D4': 0.1} delivery_score = (score_d1 * weights['D1'] + score_d2 * weights['D2'] + score_d3 * weights['D3'] + score_d4 * weights['D4']) return { '交付分数': delivery_score, 'D1_准时率': d1_otd, 'D2_平均偏差': d2_avg_dev, 'D3_最长延迟': d3_max_delay, 'D4_满足率': d4_fill_rate } def _score_d1(self, otd_rate): """D1评分""" if otd_rate >= 80: return 100 elif otd_rate <= 50: return 0 else: return (otd_rate - 50) / (80 - 50) * 100 def _score_d2(self, avg_deviation): """D2评分""" if avg_deviation <= 45: return 100 elif avg_deviation >= 90: return 0 else: return (90 - avg_deviation) / (90 - 45) * 100 def _score_d3(self, max_delay, tolerance=60): """D3评分""" if max_delay <= tolerance: return 100 elif max_delay > 120: return 0 else: return (120 - max_delay) / (120 - tolerance) * 100 def _score_d4(self, fill_rate): """D4评分""" if 85 <= fill_rate <= 110: return 100 elif fill_rate < 85: if fill_rate <= 50: return 0 else: return (fill_rate - 50) / (85 - 50) * 100 else: if fill_rate >= 150: return 0 else: return (150 - fill_rate) / (150 - 110) * 100 def _calculate_cost_score(self, product_code): """计算成本维度得分""" if self.cost_df is None: return pd.DataFrame() filtered_df = self.cost_df[self.cost_df['产品代码'] == product_code].copy() if filtered_df.empty: return pd.DataFrame() # 获取每个供应商的最低报价 filtered_df = filtered_df.sort_values(by=['参考价'], ascending=True) lowest_price_df = filtered_df.drop_duplicates( subset=['供应商代码'], keep='first' ).copy() # 计算排名 lowest_price_df['成本排名'] = lowest_price_df['参考价'].rank( method='min', ascending=True ).astype(int) # 计算成本分数 rank_to_score = {1: 100, 2: 90, 3: 80, 4: 60, 5: 0} lowest_price_df['成本分数'] = lowest_price_df['成本排名'].apply( lambda rank: rank_to_score.get(rank, 0) ) return lowest_price_df[['供应商代码', '供应商名称', '参考价', '成本排名', '成本分数']] def evaluate_product(self, product_code: str, weights: Dict[str, float] = None) -> pd.DataFrame: """ 综合评估指定产品的所有供应商 参数: product_code: 产品代码 weights: 各维度权重 {'成本': 0.4, '交付': 0.4, '账期': 0.2} 返回: 评估结果DataFrame """ if weights is None: weights = {'成本': 0.4, '交付': 0.4, '账期': 0.2} # 验证权重和为1 total_weight = sum(weights.values()) if abs(total_weight - 1.0) > 0.001: print(f"⚠️ 权重之和为 {total_weight},已自动归一化") weights = {k: v / total_weight for k, v in weights.items()} results: List[Dict[str, Any]] = [] # 1. 获取成本维度数据 cost_scores = self._calculate_cost_score(product_code) if cost_scores.empty: print(f"❌ 未找到产品代码 '{product_code}' 的成本数据") return pd.DataFrame() product_name = self.cost_df[self.cost_df['产品代码'] == product_code]['产品名称'].iloc[0] # 2. 遍历每个供应商 for _, row in cost_scores.iterrows(): supplier_code = row['供应商代码'] supplier_name = row['供应商名称'] result = { '产品代码': product_code, '产品名称': product_name, '供应商代码': supplier_code, '供应商名称': supplier_name, '参考价': row['参考价'], '成本排名': row['成本排名'], '成本分数': row['成本分数'] } # 3. 计算交付维度分数 if self.delivery_df is not None: delivery_data = self.delivery_df[self.delivery_df['产品代码'] == product_code] if not delivery_data.empty: delivery_result = self._calculate_delivery_score( delivery_data, supplier_code, supplier_name ) if delivery_result: result['交付分数'] = delivery_result['交付分数'] result['交付_准时率(%)'] = delivery_result['D1_准时率'] result['交付_平均偏差(天)'] = delivery_result['D2_平均偏差'] else: result['交付分数'] = None else: result['交付分数'] = None else: result['交付分数'] = None # 4. 获取账期维度分数 if self.payment_df is not None: payment_match = self.payment_df[ self.payment_df['供应商名称'] == supplier_name ] if not payment_match.empty: result['账期分数'] = payment_match['账期分数'].iloc[0] else: result['账期分数'] = None else: result['账期分数'] = None results.append(result) # 5. 计算综合得分 result_df = pd.DataFrame(results) # 处理缺失值:使用该维度所有供应商的平均分 for dimension in ['成本分数', '交付分数', '账期分数']: if dimension in result_df.columns: mean_score = result_df[dimension].mean() if pd.isna(mean_score): mean_score = 0 result_df[dimension].fillna(mean_score, inplace=True) # 计算加权总分 result_df['综合得分'] = ( result_df['成本分数'] * weights['成本'] + result_df['交付分数'] * weights['交付'] + result_df['账期分数'] * weights['账期'] ) # 排序并选择Top5 result_df = result_df.sort_values('综合得分', ascending=False) top5_df = result_df.head(5).copy() # 添加综合排名 top5_df.insert(0, '综合排名', range(1, len(top5_df) + 1)) return top5_df def get_product_details(self, product_code: str) -> Dict[str, Any]: """ 获取产品成本/交付/账期详细信息,供图表展示 """ # 1. 成本维度详情 cost_scores = self._calculate_cost_score(product_code) cost_details = sanitize(cost_scores.to_dict(orient='records')) if not cost_scores.empty else [] # 2. 交付维度详情 (需要为每个供应商计算) delivery_details: List[Dict[str, Any]] = [] if self.delivery_df is not None: delivery_data = self.delivery_df[self.delivery_df['产品代码'] == product_code] if not delivery_data.empty: # 获取所有相关的供应商 unique_suppliers = delivery_data[['供应商代码', '供应商名称']].drop_duplicates() for _, supplier_row in unique_suppliers.iterrows(): supplier_code = supplier_row['供应商代码'] supplier_name = supplier_row['供应商名称'] delivery_result = self._calculate_delivery_score( delivery_data, supplier_code, supplier_name ) if delivery_result: delivery_result['供应商代码'] = supplier_code delivery_result['供应商名称'] = supplier_name delivery_details.append(sanitize(delivery_result)) # 3. 账期维度详情 payment_details: List[Dict[str, Any]] = [] if self.payment_df is not None and not cost_scores.empty: # 从成本数据中获取所有相关供应商名称 cost_supplier_names = cost_scores['供应商名称'].unique() payment_matches = self.payment_df[ self.payment_df['供应商名称'].isin(cost_supplier_names) ] payment_details = sanitize(payment_matches.to_dict(orient='records')) return { 'product_code': product_code, 'cost_details': cost_details, 'delivery_details': delivery_details, 'payment_details': payment_details } # ==================== 对外单例与权重管理 ==================== # 初始化评估器实例 evaluator = IntegratedSupplierEvaluator( payment_file=PAYMENT_FILE, delivery_file=DELIVERY_FILE, cost_file=COST_FILE, payment_sheet='sheet1', cost_sheet=0 ) # 默认权重配置 default_weights: Dict[str, float] = { '成本': 0.4, '交付': 0.4, '账期': 0.2 } # 当前权重配置(可被用户修改) current_weights: Dict[str, float] = default_weights.copy() def get_current_weights() -> Dict[str, float]: return current_weights def update_weights(new_weights: Dict[str, float]) -> Dict[str, float]: """ 更新全局权重,调用前应已校验和为1 """ global current_weights # 简单拷贝,避免外部直接修改引用 current_weights = { '成本': float(new_weights.get('成本', default_weights['成本'])), '交付': float(new_weights.get('交付', default_weights['交付'])), '账期': float(new_weights.get('账期', default_weights['账期'])) } return current_weights