#!/usr/bin/env python3 import argparse import csv import io import json import re import sys from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Tuple, Union _FILENAME_RE = re.compile(r"_bs(?P\d+)_in(?P\d+)_out(?P\d+)\.(?Plog|txt)$") _BLOCK_START = "============ Serving Benchmark Result ============" _BLOCK_END = "==================================================" def _try_parse_number(value): v = value.strip() if not v: return v m = re.match(r"^-?\d+$", v) if m: try: return int(v) except ValueError: return v m = re.match(r"^-?\d+(?:\.\d+)?$", v) if m: try: return float(v) except ValueError: return v return v def _parse_serving_result_block(text): lines = text.splitlines() try: start_idx = lines.index(_BLOCK_START) except ValueError: return {} end_idx = None for i in range(start_idx + 1, len(lines)): if lines[i].strip() == _BLOCK_END: end_idx = i break if end_idx is None: end_idx = len(lines) metrics = {} for raw in lines[start_idx + 1 : end_idx]: if ":" not in raw: continue key, value = raw.split(":", 1) key = key.strip() value = value.strip() if not key or not value: continue # Values are padded; take the first token if it looks numeric. first = value.split()[0] metrics[key] = _try_parse_number(first) return metrics def _extract_case_from_path(path): m = _FILENAME_RE.search(path.name) if not m: return (None, None, None) return (int(m.group("bs")), int(m.group("in_len")), int(m.group("out_len"))) class BenchResult(object): def __init__(self, path, bs, in_len, out_len, metrics): self.path = path self.bs = bs self.in_len = in_len self.out_len = out_len self.metrics = metrics def key(self): bs = self.bs if self.bs is not None else 0 in_len = self.in_len if self.in_len is not None else 0 out_len = self.out_len if self.out_len is not None else 0 return (bs, in_len, out_len, self.path.name) def _find_logs(paths): logs = [] for p in paths: if p.is_dir(): logs.extend(sorted(p.glob("bench_*.log"))) else: logs.append(p) return [p for p in logs if p.exists()] def _fmt_float(v): if isinstance(v, float): return f"{v:.2f}" if isinstance(v, int): return str(v) if v is None: return "NA" return str(v) def _md_line(r): m = r.metrics return ( f"- bs={r.bs} in={r.in_len} out={r.out_len}: " f"req/s={_fmt_float(m.get('Request throughput (req/s)'))}, " f"out_tok/s={_fmt_float(m.get('Output token throughput (tok/s)'))}, " f"TTFT mean/p99={_fmt_float(m.get('Mean TTFT (ms)'))}/{_fmt_float(m.get('P99 TTFT (ms)'))} ms, " f"TPOT mean/p99={_fmt_float(m.get('Mean TPOT (ms)'))}/{_fmt_float(m.get('P99 TPOT (ms)'))} ms, " f"ITL mean/p99={_fmt_float(m.get('Mean ITL (ms)'))}/{_fmt_float(m.get('P99 ITL (ms)'))} ms" ) def main(argv): parser = argparse.ArgumentParser( description="Parse vLLM benchmark-serving bench_*.log files and output a request-result list." ) parser.add_argument( "paths", nargs="+", help="One or more bench_*.log files or a directory containing them.", ) parser.add_argument( "--format", choices=("markdown", "csv", "jsonl"), default="markdown", help="Output format (default: markdown).", ) parser.add_argument( "--output", help="Write output to a file instead of stdout.", ) args = parser.parse_args(argv) input_paths = [Path(p).expanduser() for p in args.paths] logs = _find_logs(input_paths) if not logs: print("No bench_*.log files found.", file=sys.stderr) return 2 results = [] for log in logs: try: text = log.read_text(encoding="utf-8", errors="replace") except OSError as e: print(f"Failed to read {log}: {e}", file=sys.stderr) continue metrics = _parse_serving_result_block(text) bs, in_len, out_len = _extract_case_from_path(log) results.append(BenchResult(path=log, bs=bs, in_len=in_len, out_len=out_len, metrics=metrics)) results.sort(key=lambda r: r.key()) if args.format == "markdown": output_text = "\n".join([_md_line(r) for r in results]) + "\n" elif args.format == "jsonl": rows = [] for r in results: rows.append( { "file": str(r.path), "bs": r.bs, "in_len": r.in_len, "out_len": r.out_len, "metrics": r.metrics, } ) output_text = "\n".join(json.dumps(row, ensure_ascii=False) for row in rows) + "\n" else: # csv # Flatten key metrics into columns. fields = [ "file", "bs", "in_len", "out_len", "successful_requests", "benchmark_duration_s", "req_per_s", "out_tok_per_s", "total_tok_per_s", "ttft_mean_ms", "ttft_p99_ms", "tpot_mean_ms", "tpot_p99_ms", "itl_mean_ms", "itl_p99_ms", ] key_map = { "successful_requests": "Successful requests", "benchmark_duration_s": "Benchmark duration (s)", "req_per_s": "Request throughput (req/s)", "out_tok_per_s": "Output token throughput (tok/s)", "total_tok_per_s": "Total Token throughput (tok/s)", "ttft_mean_ms": "Mean TTFT (ms)", "ttft_p99_ms": "P99 TTFT (ms)", "tpot_mean_ms": "Mean TPOT (ms)", "tpot_p99_ms": "P99 TPOT (ms)", "itl_mean_ms": "Mean ITL (ms)", "itl_p99_ms": "P99 ITL (ms)", } buf = io.StringIO() writer = csv.DictWriter(buf, fieldnames=fields) writer.writeheader() for r in results: row = { "file": str(r.path), "bs": r.bs, "in_len": r.in_len, "out_len": r.out_len, } for out_key, metric_key in key_map.items(): row[out_key] = r.metrics.get(metric_key) writer.writerow(row) output_text = buf.getvalue() if args.output: out_path = Path(args.output).expanduser() out_path.write_text(output_text, encoding="utf-8") else: sys.stdout.write(output_text) return 0 if __name__ == "__main__": raise SystemExit(main(sys.argv[1:]))