#!/usr/bin/env python3 # -*- coding: utf-8 -*- import re import os import glob import csv import sys from pathlib import Path from collections import defaultdict def parse_log_file(log_file_path): """ 解析单个日志文件,提取关键信息 参数: log_file_path: 日志文件路径 返回: 字典包含解析结果,或None表示解析失败 """ try: with open(log_file_path, 'r', encoding='utf-8') as f: content = f.read() result = { 'model_name': None, 'batch_size': None, 'input_shape': None, 'FPS': None } # 1. 提取模型名称:从日志命令中提取 # 查找 "perf resnet50.onnx" 这样的模式 model_match = re.search(r'[/\s]([a-zA-Z0-9_-]+)\.onnx', content) if model_match: result['model_name'] = model_match.group(1) else: # 如果没找到,尝试从文件名提取 filename = os.path.basename(log_file_path) # 匹配类似 "resnet50-4batch.log" 的文件名 filename_match = re.search(r'([a-zA-Z0-9_-]+)-\d+batch\.log', filename) if filename_match: result['model_name'] = filename_match.group(1) # 2. 提取batch size:从"Batch size: 1"中提取 batch_match = re.search(r'Batch size:\s*(\d+)', content) if batch_match: result['batch_size'] = batch_match.group(1) # 3. 提取输入shape:从"--input-dim @input 1 3 224 224"中提取 input_match = re.search(r'--input-dim\s+(?:@)?[a-zA-Z0-9_\-\.]+\s+(\d+\s+\d+\s+\d+\s+\d+)', content) if input_match: result['input_shape'] = input_match.group(1) # 4. 提取总时间:从"Total time: 2.08637ms"中提取 # time_match = re.search(r'Total time:\s*([\d.]+)ms', content) # if time_match: # result['total_time'] = time_match.group(1) # 优先尝试提取 Rate 行中的数值 rate_match = re.search(r'Rate:\s*([\d.]+)\s*inferences/sec', content) if rate_match: result['FPS'] = rate_match.group(1) # 注意:这里实际存的是速率,不是时间 else: # 回退:从 Total time 提取(如果需要保留原逻辑) rate_match = re.search(r'Total time:\s*([\d.]+)ms', content) if rate_match: result['FPS'] = rate_match.group(1) # 优先尝试提取 Rate 行中的数值 # 尝试提取“最大使用: XXXX MiB”中的数值 memory_match = re.search(r'最大使用:\s*([\d.]+)\s*MiB', content) if memory_match: result['MaxMemoryUsageMiB'] = memory_match.group(1) else: result['MaxMemoryUsageMiB'] = "null" # 优先尝试提取 Rate 行中的数值 peak_usage_match = re.search(r'峰值使用率:\s*([\d.]+)%', content) if peak_usage_match: result['HCU%'] = peak_usage_match.group(1) else: # 回退:从 Total time 提取(如果需要保留原逻辑) peak_usage_match = re.search(r'峰值使用率:\s*([\d.]+)%', content) if peak_usage_match: result['HCU%'] = peak_usage_match.group(1) # 检查是否成功提取了所有必要信息 if all(result.values()): return result else: print(f"警告: 文件 {log_file_path} 解析不完整:") print(f" 模型名: {result['model_name']}") print(f" Batch: {result['batch_size']}") print(f" 输入shape: {result['input_shape']}") print(f" FPS: {result['FPS']}") print(f" MaxMemoryUsageMiB: {result['MaxMemoryUsageMiB']}") print(f" HCU%: {result['HCU%']}") return None except Exception as e: print(f"错误: 读取或解析文件 {log_file_path} 时出错: {e}") return None def main(): """主函数""" # 解析命令行参数 if len(sys.argv) > 1: log_pattern = sys.argv[1] else: log_pattern = "*.log" # 默认匹配所有log文件 if len(sys.argv) > 2: csv_file = sys.argv[2] else: csv_file = "result.csv" # 默认输出文件名 print(f"开始解析日志文件,模式: {log_pattern}") print(f"输出到: {csv_file}") print("=" * 50) # 获取所有匹配的日志文件 log_files = glob.glob(log_pattern) if not log_files: print(f"错误: 没有找到匹配 '{log_pattern}' 的日志文件") return print(f"找到 {len(log_files)} 个日志文件") # 检查CSV文件是否存在,决定是否需要写表头 write_header = not os.path.exists(csv_file) # 解析所有日志文件 results = [] for log_file in sorted(log_files): print(f"正在解析: {log_file}") result = parse_log_file(log_file) if result: results.append(result) print(f" ✓ 成功解析: {result['model_name']} (batch={result['batch_size']})") if not results: print("错误: 没有成功解析任何日志文件") return # 写入CSV文件 try: with open(csv_file, 'a', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=['model_name', 'batch_size', 'input_shape', 'FPS', "MaxMemoryUsageMiB", "HCU%"]) if write_header: writer.writeheader() print(f"创建新的CSV文件: {csv_file}") # 写入数据,检查是否已存在相同的记录 existing_data = [] if os.path.exists(csv_file) and os.path.getsize(csv_file) > 0: with open(csv_file, 'r', encoding='utf-8') as existing_f: existing_reader = csv.DictReader(existing_f) existing_data = list(existing_reader) # 对结果按 model_name 字符串升序,batch_size 数值升序排序 results.sort(key=lambda x: (x['model_name'], int(x['batch_size'])), reverse=False) new_count = 0 for result in results: # 检查是否已存在相同模型、相同batch、相同输入的记录 is_duplicate = False for existing in existing_data: if (existing.get('model_name') == result['model_name'] and existing.get('batch_size') == result['batch_size'] and existing.get('input_shape') == result['input_shape']): is_duplicate = True print(f" ⚠ 跳过重复记录: {result['model_name']}, batch={result['batch_size']}") break if not is_duplicate: writer.writerow({ 'model_name': result['model_name'], 'batch_size': result['batch_size'], 'input_shape': result['input_shape'], 'FPS': result['FPS'], 'MaxMemoryUsageMiB': result['MaxMemoryUsageMiB'], 'HCU%': result['HCU%'] }) new_count += 1 print(f"\n" + "=" * 50) print("解析完成!") print(f"处理日志文件数: {len(log_files)}") print(f"成功解析文件数: {len(results)}") print(f"新增记录数: {new_count}") print(f"CSV文件: {csv_file}") # 显示CSV文件内容预览 print("\nCSV内容预览:") print("-" * 50) try: with open(csv_file, 'r', encoding='utf-8') as f: lines = f.readlines() for i, line in enumerate(lines[:20]): # 显示前20行 print(line.rstrip()) if len(lines) > 20: print(f"... (共 {len(lines)} 行)") except Exception as e: print(f"读取CSV文件时出错: {e}") except Exception as e: print(f"错误: 写入CSV文件时出错: {e}") if __name__ == "__main__": main()