#!/usr/bin/env python3 """ 用法示例: python multi_folder.py folder1 folder2 folder3 --suffix MyTag """ import argparse import os import glob import pandas as pd from openpyxl import Workbook, load_workbook from openpyxl.styles import ( Alignment, Border, Side, Font, PatternFill ) from openpyxl.utils import get_column_letter from openpyxl.utils.dataframe import dataframe_to_rows # ===== 英文 → 中文列名 ===== COL_MAP = { 'tp': '张量并行度', 'data_type': '数据类型', 'batch': '并发数', 'prompt_tokens': '输入的 token 数', 'completion_tokens': '输出的 token 数', 'TOTAL_THROUGHPUT(toks/s)': '总吞吐量(tokens/秒)', 'generate_throughput(toks/s)': '生成吞吐量(tokens/秒)', 'TTFT(ms)': '首个 token 的延迟(毫秒)', 'TPOT(ms)': '每个输出 token 的延迟(毫秒)', 'ITL(ms)': '每 token 的间隔延迟(毫秒)' } # ===== 通用样式 ===== thin = Side(border_style='thin', color='BFBFBF') border = Border(top=thin, bottom=thin, left=thin, right=thin) center_wrap = Alignment(horizontal='center', vertical='center', wrap_text=True) def beautify(ws, title, df): """统一美化:标题、表头、边框、斑马纹、自适应列宽""" # 1. 合并标题行 ws.merge_cells(start_row=1, start_column=1, end_row=1, end_column=len(df.columns)) ws['A1'] = title ws['A1'].font = Font(bold=True, size=13) ws['A1'].alignment = center_wrap ws.row_dimensions[1].height = 28 # 2. 表头 header_font = Font(bold=True, color='FFFFFF') header_fill = PatternFill('solid', fgColor='4F81BD') for cell in ws[2]: cell.font = header_font cell.fill = header_fill cell.alignment = center_wrap cell.border = border ws.row_dimensions[2].height = 25 # 3. 数据区 for row in ws.iter_rows(min_row=3, max_row=ws.max_row): fill = PatternFill('solid', fgColor='F8F9FA') if row[0].row % 2 == 0 else None for cell in row: cell.alignment = center_wrap cell.border = border if fill: cell.fill = fill # 4. 自适应列宽 for col_idx in range(1, len(df.columns) + 1): col_letter = get_column_letter(col_idx) max_len = max(len(str(ws.cell(r, col_idx).value or '')) for r in range(1, ws.max_row + 1)) width = max(max_len + 2, 10) * 1.1 ws.column_dimensions[col_letter].width = width # 5. 冻结首行 ws.freeze_panes = ws["A3"] def process_folder(folder, suffix): csv_files = glob.glob(os.path.join(folder, '*.csv')) if not csv_files: print(f'【{folder}】未找到 csv,跳过') return out_dir = os.path.join(folder, 'standardized_output') os.makedirs(out_dir, exist_ok=True) summary_path = os.path.join(out_dir, 'summary.xlsx') summary_wb = load_workbook(summary_path) if os.path.exists(summary_path) else Workbook() for sh in list(summary_wb.sheetnames): del summary_wb[sh] for csv_path in csv_files: df = pd.read_csv(csv_path) if 'TPOT(ms)' not in df.columns: continue # 新增两列 df['单路生成吞吐(toks/s)'] = (1000 / df['TPOT(ms)']).round(2) df['不带首字的生成吞吐(tok/s)'] = (df['单路生成吞吐(toks/s)'] * df['batch']).round(2) df.rename(columns=COL_MAP, inplace=True) base_name = os.path.splitext(os.path.basename(csv_path))[0] sheet_name = f'{base_name}_{suffix}'[:31] # 1) 独立 Excel single_xlsx = os.path.join(out_dir, f'{sheet_name}.xlsx') df.to_excel(single_xlsx, index=False, header=True, startrow=1) wb = load_workbook(single_xlsx) ws = wb.active beautify(ws, sheet_name, df) wb.save(single_xlsx) print(f'独立文件:{os.path.relpath(single_xlsx)}') # 2) 汇总 Excel if sheet_name in summary_wb.sheetnames: del summary_wb[sheet_name] ws_sum = summary_wb.create_sheet(title=sheet_name) for r_idx, row in enumerate(dataframe_to_rows(df, index=False, header=True), start=2): for c_idx, value in enumerate(row, start=1): ws_sum.cell(row=r_idx, column=c_idx, value=value) beautify(ws_sum, sheet_name, df) summary_wb.save(summary_path) print(f'汇总文件:{os.path.relpath(summary_path)} 已生成!') def main(): parser = argparse.ArgumentParser(description='批量处理多个文件夹内的 CSV') parser.add_argument('folders', nargs='+', help='要处理的文件夹列表') parser.add_argument('--suffix', '-s', default=None, help='统一后缀(缺省用文件夹名)') args = parser.parse_args() for folder in args.folders: if not os.path.isdir(folder): print(f'目录不存在:{folder}') continue suffix = args.suffix or os.path.basename(folder) process_folder(folder, suffix) if __name__ == '__main__': main()