| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479 |
- import os
- import warnings
- from typing import Dict, Any, List
- import numpy as np
- import pandas as pd
- warnings.filterwarnings('ignore')
- # ==================== 路径与配置 ====================
- # 项目根目录:dtm_python_merged
- BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- DATA_DIR = os.path.join(BASE_DIR, 'data')
- # 数据文件路径(注意:文件需要已经复制到 dtm_python_merged/data 下)
- PAYMENT_FILE = os.path.join(DATA_DIR, '供应商账期.xlsx')
- DELIVERY_FILE = os.path.join(DATA_DIR, '采购数据_双键合并结果.xlsx')
- COST_FILE = os.path.join(DATA_DIR, '采购订单统计.xlsx')
- # ==================== 工具函数 ====================
- def sanitize(obj):
- """递归将numpy/pandas类型转为Python原生类型,确保JSON可序列化"""
- if isinstance(obj, dict):
- return {k: sanitize(v) for k, v in obj.items()}
- if isinstance(obj, list):
- return [sanitize(v) for v in obj]
- if isinstance(obj, np.integer):
- return int(obj)
- if isinstance(obj, np.floating):
- v = float(obj)
- return None if np.isnan(v) else v
- if isinstance(obj, np.bool_):
- return bool(obj)
- if isinstance(obj, np.ndarray):
- return obj.tolist()
- if isinstance(obj, (pd.Timestamp, pd.Timedelta)):
- return str(obj)
- if isinstance(obj, float) and np.isnan(obj):
- return None
- return obj
- # ==================== 供应商综合评估器 ====================
- class IntegratedSupplierEvaluator:
- """
- 供应商综合评估系统
- 整合账期(Payment)、交付(Delivery)、成本(Cost)三个维度进行综合评分
- """
- def __init__(self,
- payment_file: str,
- delivery_file: str,
- cost_file: str,
- payment_sheet: str = 'sheet1',
- cost_sheet: str = 0):
- """
- 初始化评估器
- 参数:
- payment_file: 账期数据Excel文件路径
- delivery_file: 交付数据Excel文件路径
- cost_file: 成本数据Excel文件路径
- payment_sheet: 账期数据工作表名称
- cost_sheet: 成本数据工作表名称或索引
- """
- self.payment_df = None
- self.delivery_df = None
- self.cost_df = None
- # 加载数据
- self._load_payment_data(payment_file, payment_sheet)
- self._load_delivery_data(delivery_file)
- self._load_cost_data(cost_file, cost_sheet)
- print("=" * 80)
- print("供应商评估数据加载完成!")
- print(f"账期文件: {payment_file}")
- print(f"交付文件: {delivery_file}")
- print(f"成本文件: {cost_file}")
- print("=" * 80)
- def _load_payment_data(self, file_path, sheet_name):
- """加载并处理账期数据"""
- try:
- df = pd.read_excel(file_path, sheet_name=sheet_name, engine='openpyxl')
- if '供应商名称' not in df.columns or '结算期限' not in df.columns:
- print(f"⚠️ 账期数据缺少必需列,该维度将被跳过")
- return
- # 提取账期天数
- df['账期天数'] = df['结算期限'].astype(str).str.extract(r'(\d+)').astype(float)
- # 计算账期分数
- conditions = [
- df['账期天数'] >= 90,
- df['账期天数'] >= 60,
- df['账期天数'] >= 45,
- df['账期天数'] >= 30,
- ]
- choices = [100, 90, 80, 60]
- df['账期分数'] = np.select(conditions, choices, default=0)
- # 标准化供应商名称
- df['供应商名称'] = df['供应商名称'].astype(str).str.strip()
- self.payment_df = df[['供应商名称', '账期分数']].copy()
- print(f"✓ 账期数据加载成功 ({len(self.payment_df)} 条记录)")
- except Exception as e:
- print(f"⚠️ 账期数据加载失败: {e}")
- def _load_delivery_data(self, file_path):
- """加载并处理交付数据"""
- try:
- df = pd.read_excel(file_path)
- # 转换日期格式
- date_columns = ['计划交货日期', '实际验收日期']
- for col in date_columns:
- if col in df.columns:
- df[col] = pd.to_datetime(df[col], errors='coerce')
- # 转换数量列
- quantity_columns = ['订单计划数量', '实际入库数量']
- for col in quantity_columns:
- if col in df.columns:
- df[col] = pd.to_numeric(df[col], errors='coerce')
- self.delivery_df = df
- print(f"✓ 交付数据加载成功 ({len(self.delivery_df)} 条记录)")
- except Exception as e:
- print(f"⚠️ 交付数据加载失败: {e}")
- def _load_cost_data(self, file_path, sheet_name):
- """加载并处理成本数据"""
- try:
- df = pd.read_excel(file_path, sheet_name=sheet_name)
- required_columns = ['供应商代码', '供应商名称', '产品代码', '产品名称', '参考价']
- if not all(col in df.columns for col in required_columns):
- print(f"⚠️ 成本数据缺少必需列,该维度将被跳过")
- return
- df['参考价'] = pd.to_numeric(df['参考价'], errors='coerce')
- df.dropna(subset=['参考价', '产品代码', '供应商代码', '供应商名称'], inplace=True)
- self.cost_df = df
- print(f"✓ 成本数据加载成功 ({len(self.cost_df)} 条记录)")
- except Exception as e:
- print(f"⚠️ 成本数据加载失败: {e}")
- def _calculate_delivery_score(self, product_data, supplier_code, supplier_name,
- acceptable_delay_days=7, max_delay_tolerance=60):
- """计算单个供应商的交付维度得分"""
- group = product_data[
- (product_data['供应商代码'] == supplier_code) &
- (product_data['供应商名称'] == supplier_name)
- ].copy()
- if len(group) == 0:
- return None
- # D1: 交货准时率
- group['交货偏差天数'] = (group['实际验收日期'] - group['计划交货日期']).dt.days
- group['是否准时'] = group['交货偏差天数'].apply(
- lambda x: 1 if x <= acceptable_delay_days else 0
- )
- on_time_qty = (group['是否准时'] * group['实际入库数量']).sum()
- total_qty = group['实际入库数量'].sum()
- d1_otd = (on_time_qty / total_qty * 100) if total_qty > 0 else 0
- # D2: 平均偏差
- weighted_deviation = (group['交货偏差天数'] * group['实际入库数量']).sum()
- d2_avg_dev = weighted_deviation / total_qty if total_qty > 0 else 0
- # D3: 最长延迟
- delays = group[group['交货偏差天数'] > 0]['交货偏差天数']
- d3_max_delay = delays.max() if len(delays) > 0 else 0
- # D4: 数量满足率
- doc_groups = group.groupby('联系单据').agg({
- '订单计划数量': 'first',
- '实际入库数量': 'sum'
- })
- total_ordered = doc_groups['订单计划数量'].sum()
- total_received = doc_groups['实际入库数量'].sum()
- d4_fill_rate = (total_received / total_ordered * 100) if total_ordered > 0 else 0
- # 计算各指标得分
- score_d1 = self._score_d1(d1_otd)
- score_d2 = self._score_d2(d2_avg_dev)
- score_d3 = self._score_d3(d3_max_delay, max_delay_tolerance)
- score_d4 = self._score_d4(d4_fill_rate)
- # 加权总分 (权重可调整)
- weights = {'D1': 0.5, 'D2': 0.3, 'D3': 0.1, 'D4': 0.1}
- delivery_score = (score_d1 * weights['D1'] +
- score_d2 * weights['D2'] +
- score_d3 * weights['D3'] +
- score_d4 * weights['D4'])
- return {
- '交付分数': delivery_score,
- 'D1_准时率': d1_otd,
- 'D2_平均偏差': d2_avg_dev,
- 'D3_最长延迟': d3_max_delay,
- 'D4_满足率': d4_fill_rate
- }
- def _score_d1(self, otd_rate):
- """D1评分"""
- if otd_rate >= 80:
- return 100
- elif otd_rate <= 50:
- return 0
- else:
- return (otd_rate - 50) / (80 - 50) * 100
- def _score_d2(self, avg_deviation):
- """D2评分"""
- if avg_deviation <= 45:
- return 100
- elif avg_deviation >= 90:
- return 0
- else:
- return (90 - avg_deviation) / (90 - 45) * 100
- def _score_d3(self, max_delay, tolerance=60):
- """D3评分"""
- if max_delay <= tolerance:
- return 100
- elif max_delay > 120:
- return 0
- else:
- return (120 - max_delay) / (120 - tolerance) * 100
- def _score_d4(self, fill_rate):
- """D4评分"""
- if 85 <= fill_rate <= 110:
- return 100
- elif fill_rate < 85:
- if fill_rate <= 50:
- return 0
- else:
- return (fill_rate - 50) / (85 - 50) * 100
- else:
- if fill_rate >= 150:
- return 0
- else:
- return (150 - fill_rate) / (150 - 110) * 100
- def _calculate_cost_score(self, product_code):
- """计算成本维度得分"""
- if self.cost_df is None:
- return pd.DataFrame()
- filtered_df = self.cost_df[self.cost_df['产品代码'] == product_code].copy()
- if filtered_df.empty:
- return pd.DataFrame()
- # 获取每个供应商的最低报价
- filtered_df = filtered_df.sort_values(by=['参考价'], ascending=True)
- lowest_price_df = filtered_df.drop_duplicates(
- subset=['供应商代码'],
- keep='first'
- ).copy()
- # 计算排名
- lowest_price_df['成本排名'] = lowest_price_df['参考价'].rank(
- method='min',
- ascending=True
- ).astype(int)
- # 计算成本分数
- rank_to_score = {1: 100, 2: 90, 3: 80, 4: 60, 5: 0}
- lowest_price_df['成本分数'] = lowest_price_df['成本排名'].apply(
- lambda rank: rank_to_score.get(rank, 0)
- )
- return lowest_price_df[['供应商代码', '供应商名称', '参考价', '成本排名', '成本分数']]
- def evaluate_product(self,
- product_code: str,
- weights: Dict[str, float] = None) -> pd.DataFrame:
- """
- 综合评估指定产品的所有供应商
- 参数:
- product_code: 产品代码
- weights: 各维度权重 {'成本': 0.4, '交付': 0.4, '账期': 0.2}
- 返回:
- 评估结果DataFrame
- """
- if weights is None:
- weights = {'成本': 0.4, '交付': 0.4, '账期': 0.2}
- # 验证权重和为1
- total_weight = sum(weights.values())
- if abs(total_weight - 1.0) > 0.001:
- print(f"⚠️ 权重之和为 {total_weight},已自动归一化")
- weights = {k: v / total_weight for k, v in weights.items()}
- results: List[Dict[str, Any]] = []
- # 1. 获取成本维度数据
- cost_scores = self._calculate_cost_score(product_code)
- if cost_scores.empty:
- print(f"❌ 未找到产品代码 '{product_code}' 的成本数据")
- return pd.DataFrame()
- product_name = self.cost_df[self.cost_df['产品代码'] == product_code]['产品名称'].iloc[0]
- # 2. 遍历每个供应商
- for _, row in cost_scores.iterrows():
- supplier_code = row['供应商代码']
- supplier_name = row['供应商名称']
- result = {
- '产品代码': product_code,
- '产品名称': product_name,
- '供应商代码': supplier_code,
- '供应商名称': supplier_name,
- '参考价': row['参考价'],
- '成本排名': row['成本排名'],
- '成本分数': row['成本分数']
- }
- # 3. 计算交付维度分数
- if self.delivery_df is not None:
- delivery_data = self.delivery_df[self.delivery_df['产品代码'] == product_code]
- if not delivery_data.empty:
- delivery_result = self._calculate_delivery_score(
- delivery_data, supplier_code, supplier_name
- )
- if delivery_result:
- result['交付分数'] = delivery_result['交付分数']
- result['交付_准时率(%)'] = delivery_result['D1_准时率']
- result['交付_平均偏差(天)'] = delivery_result['D2_平均偏差']
- else:
- result['交付分数'] = None
- else:
- result['交付分数'] = None
- else:
- result['交付分数'] = None
- # 4. 获取账期维度分数
- if self.payment_df is not None:
- payment_match = self.payment_df[
- self.payment_df['供应商名称'] == supplier_name
- ]
- if not payment_match.empty:
- result['账期分数'] = payment_match['账期分数'].iloc[0]
- else:
- result['账期分数'] = None
- else:
- result['账期分数'] = None
- results.append(result)
- # 5. 计算综合得分
- result_df = pd.DataFrame(results)
- # 处理缺失值:使用该维度所有供应商的平均分
- for dimension in ['成本分数', '交付分数', '账期分数']:
- if dimension in result_df.columns:
- mean_score = result_df[dimension].mean()
- if pd.isna(mean_score):
- mean_score = 0
- result_df[dimension].fillna(mean_score, inplace=True)
- # 计算加权总分
- result_df['综合得分'] = (
- result_df['成本分数'] * weights['成本'] +
- result_df['交付分数'] * weights['交付'] +
- result_df['账期分数'] * weights['账期']
- )
- # 排序并选择Top5
- result_df = result_df.sort_values('综合得分', ascending=False)
- top5_df = result_df.head(5).copy()
- # 添加综合排名
- top5_df.insert(0, '综合排名', range(1, len(top5_df) + 1))
- return top5_df
- def get_product_details(self, product_code: str) -> Dict[str, Any]:
- """
- 获取产品成本/交付/账期详细信息,供图表展示
- """
- # 1. 成本维度详情
- cost_scores = self._calculate_cost_score(product_code)
- cost_details = sanitize(cost_scores.to_dict(orient='records')) if not cost_scores.empty else []
- # 2. 交付维度详情 (需要为每个供应商计算)
- delivery_details: List[Dict[str, Any]] = []
- if self.delivery_df is not None:
- delivery_data = self.delivery_df[self.delivery_df['产品代码'] == product_code]
- if not delivery_data.empty:
- # 获取所有相关的供应商
- unique_suppliers = delivery_data[['供应商代码', '供应商名称']].drop_duplicates()
- for _, supplier_row in unique_suppliers.iterrows():
- supplier_code = supplier_row['供应商代码']
- supplier_name = supplier_row['供应商名称']
- delivery_result = self._calculate_delivery_score(
- delivery_data, supplier_code, supplier_name
- )
- if delivery_result:
- delivery_result['供应商代码'] = supplier_code
- delivery_result['供应商名称'] = supplier_name
- delivery_details.append(sanitize(delivery_result))
- # 3. 账期维度详情
- payment_details: List[Dict[str, Any]] = []
- if self.payment_df is not None and not cost_scores.empty:
- # 从成本数据中获取所有相关供应商名称
- cost_supplier_names = cost_scores['供应商名称'].unique()
- payment_matches = self.payment_df[
- self.payment_df['供应商名称'].isin(cost_supplier_names)
- ]
- payment_details = sanitize(payment_matches.to_dict(orient='records'))
- return {
- 'product_code': product_code,
- 'cost_details': cost_details,
- 'delivery_details': delivery_details,
- 'payment_details': payment_details
- }
- # ==================== 对外单例与权重管理 ====================
- # 初始化评估器实例
- evaluator = IntegratedSupplierEvaluator(
- payment_file=PAYMENT_FILE,
- delivery_file=DELIVERY_FILE,
- cost_file=COST_FILE,
- payment_sheet='sheet1',
- cost_sheet=0
- )
- # 默认权重配置
- default_weights: Dict[str, float] = {
- '成本': 0.4,
- '交付': 0.4,
- '账期': 0.2
- }
- # 当前权重配置(可被用户修改)
- current_weights: Dict[str, float] = default_weights.copy()
- def get_current_weights() -> Dict[str, float]:
- return current_weights
- def update_weights(new_weights: Dict[str, float]) -> Dict[str, float]:
- """
- 更新全局权重,调用前应已校验和为1
- """
- global current_weights
- # 简单拷贝,避免外部直接修改引用
- current_weights = {
- '成本': float(new_weights.get('成本', default_weights['成本'])),
- '交付': float(new_weights.get('交付', default_weights['交付'])),
- '账期': float(new_weights.get('账期', default_weights['账期']))
- }
- return current_weights
|