"vllm/vscode:/vscode.git/clone" did not exist on "43ed4143c4ec00f4b587c5bcefdb3b6520fbe966"
Unverified Commit 01bf7170 authored by Biswa Panda's avatar Biswa Panda Committed by GitHub
Browse files

feat(perf): add frontend performance sweep runner with full observability (#6749)

parent 7389a369
# Frontend Performance Profiling
Unified observability and benchmarking suite for Dynamo frontend performance.
## Quick Start
```bash
cd ~/dev/dynamo
source dynamo/bin/activate
# Single run (mocker + frontend + aiperf + Prometheus)
cd benchmarks/frontend/scripts
./run_perf.sh --model Qwen/Qwen3-0.6B --concurrency 32 --num-requests 640 \
--speedup-ratio 0 --skip-bpf --skip-nsys --skip-flamegraph --skip-perf
# Sweep (multiple config points)
python3 sweep_runner.py --tokenizers hf --concurrency 32 --isl 512 \
--benchmark-duration 30 --speedup-ratio 0 \
-- --skip-bpf --skip-nsys --skip-flamegraph --skip-perf
```
## Architecture
The benchmarking suite has two layers: a Python sweep orchestrator that builds a grid of configurations, and a shell harness that executes each individual run.
```mermaid
flowchart TB
subgraph Orchestrator ["sweep_runner.py (Python orchestrator)"]
direction TB
grid["Build sweep grid<br/>(tokenizers x concurrency x ISL x workers x models x rps)"]
loop["For each config point"]
collect["Collect results into CSV + summary.md"]
report["Generate per-run report.md"]
grid --> loop --> collect --> report
end
loop -- "invokes" --> run_perf
subgraph run_perf ["run_perf.sh (per-run harness)"]
direction TB
infra["Step 0: Ensure etcd + NATS"]
mockers["Step 1: Start mocker workers<br/>(N models x M workers)"]
frontend["Step 2: Start frontend<br/>(optionally under nsys)"]
ready["Step 3: Wait for /v1/models readiness"]
captures["Step 4: Start parallel captures<br/>(perf stat, BPF, flamegraph, /proc, Prometheus)"]
load["Step 5: aiperf load generation"]
wait["Step 6: Wait for captures to finish"]
export["Step 7: Final Prometheus snapshot + nsys export"]
save["Step 8: Save config.json"]
infra --> mockers --> frontend --> ready --> captures --> load --> wait --> export --> save
end
```
### Runtime topology
During a benchmark run, the following processes are active. The frontend receives HTTP requests from aiperf, tokenizes the input, routes to a backend model via the request plane (TCP), and streams response tokens back to the client.
```mermaid
flowchart LR
aiperf["aiperf<br/>(load generator)"]
subgraph Frontend ["Frontend (Rust, port 8000)"]
direction TB
http["HTTP server<br/>/v1/chat/completions"]
preprocess["Preprocess<br/>(template + tokenize)"]
router["Router<br/>(model lookup)"]
transport["Transport<br/>(TCP request plane)"]
http --> preprocess --> router --> transport
end
subgraph Models ["Mocker Workers"]
direction TB
subgraph model1 ["model-1"]
w1a["worker 1<br/>port 8081"]
w1b["worker 2<br/>port 8082"]
end
subgraph model2 ["model-2"]
w2a["worker 1<br/>port 8083"]
w2b["worker 2<br/>port 8084"]
end
end
subgraph Infra ["Infrastructure"]
etcd["etcd<br/>(service discovery)"]
nats["NATS<br/>(event plane)"]
end
subgraph Observability ["Parallel Captures"]
prom["Prometheus<br/>(/metrics scraping)"]
perf["perf stat<br/>(HW counters)"]
nsys["nsys<br/>(NVTX + OS runtime)"]
flame["flamegraph<br/>(CPU + off-CPU)"]
bpf["BPF traces<br/>(kernel-level)"]
end
aiperf -- "HTTP/SSE" --> http
transport -- "TCP" --> w1a & w1b & w2a & w2b
Frontend -. "register/discover" .-> etcd
Models -. "register/discover" .-> etcd
Models -. "events" .-> nats
Frontend -. "events" .-> nats
prom -. "scrape" .-> Frontend & Models
perf -. "attach" .-> Frontend
nsys -. "profile" .-> Frontend
flame -. "sample" .-> Frontend
bpf -. "trace" .-> Frontend
```
### Multi-model naming
When `--num-models` is 1, the served model name matches the HF model path (e.g., `Qwen/Qwen3-0.6B`). When `--num-models` is greater than 1, each model instance gets a synthetic name (`model-1`, `model-2`, ...) but all share the same underlying `--model-path` for weights and tokenizer config.
## Prerequisites
| Tool | Required | Install |
|------|----------|---------|
| etcd | Yes | `apt install etcd` or [releases](https://github.com/etcd-io/etcd/releases) |
| nats-server | Yes | `apt install nats-server` or [nats.io](https://nats.io/download/) |
| aiperf | Yes | `uv pip install "git+https://github.com/ai-dynamo/aiperf.git@main"` (in dynamo venv) |
| jq | Yes | `apt install jq` |
| perf | Optional | `apt install linux-tools-$(uname -r)` |
| bpftrace | Optional | `apt install bpftrace` (needs root or CAP_BPF + CAP_PERFMON) |
| inferno | Optional | `cargo install inferno` (for flamegraphs) |
| nsys | Optional | NVIDIA Nsight Systems |
## sweep_runner.py
The main entry point for running performance sweeps. Iterates over a grid of configurations and delegates each point to `run_perf.sh`.
### Basic Usage
```bash
# Smoke test (1 run)
python3 sweep_runner.py --tokenizers hf --concurrency 32 --isl 512 \
--benchmark-duration 30 --speedup-ratio 0 \
-- --skip-bpf --skip-nsys --skip-flamegraph --skip-perf
# Full tokenizer comparison
python3 sweep_runner.py --tokenizers hf,fastokens \
--concurrency 32,64 --isl 512,1024,2048 \
--benchmark-duration 60 --speedup-ratio 0
# Transport saturation (vary workers and request count)
python3 sweep_runner.py --tokenizers hf --concurrency 4096 \
--num-requests 16384,32768 --workers 1,2,4,8 --speedup-ratio 0
# Preview sweep plan without running
python3 sweep_runner.py --dry-run --tokenizers hf,fastokens \
--concurrency 32,64 --isl 512,1024
```
### Multi-Model and Worker Sweeps
The `--num-models` and `--workers` flags control how many model instances and backend workers per model are launched. These are the primary knobs for studying frontend scalability under multi-tenant and parallel-worker configurations.
#### Scaling models (fixed workers per model)
Useful for measuring how adding more served models affects frontend routing, transport fan-out, and per-model latency.
```bash
# Sweep across 1, 2, 3, 4 model instances, 1 worker each, at 75 rps
for m in 1 2 3 4; do
python3 sweep_runner.py \
--tokenizers hf \
--concurrency 512 \
--isl 512 \
--workers 1 \
--num-models $m \
--rps 75 \
--benchmark-duration 60 \
--speedup-ratio 0 \
--output-dir artifacts/sweep_models/m${m} \
-- --skip-bpf
done
# Compare results
for m in 1 2 3 4; do
echo "=== m=$m ==="
cat artifacts/sweep_models/m${m}/summary.md
echo
done
```
#### Scaling workers per model (fixed model count)
Useful for measuring whether adding more backend workers relieves transport bottlenecks for a single model under heavy load.
```bash
# Sweep across 1, 2, 4, 8 workers for a single model
python3 sweep_runner.py \
--tokenizers hf \
--concurrency 512 \
--isl 512 \
--workers 1,2,4,8 \
--num-models 1 \
--rps 75 \
--benchmark-duration 60 \
--speedup-ratio 0 \
--output-dir artifacts/sweep_workers \
-- --skip-bpf
```
#### Combined model + worker grid
For a full factorial sweep over both dimensions, supply multiple values for both flags. Each combination produces a separate run.
```bash
# 2x3 grid: (1 model, 2 models) x (1, 2, 4 workers)
python3 sweep_runner.py \
--tokenizers hf \
--concurrency 256 \
--isl 512 \
--workers 1,2,4 \
--num-models 2 \
--rps 50 \
--benchmark-duration 60 \
--speedup-ratio 0 \
--output-dir artifacts/sweep_grid \
-- --skip-bpf
```
> **Note:** `--num-models` is a single integer (not comma-separated). To sweep across model counts, loop externally as shown in the "Scaling models" example above.
#### What to look for in the results
| Metric | Where to find it | What it tells you |
|--------|-----------------|-------------------|
| Req/s and Tok/s | `summary.md` | Whether the frontend can sustain the target load |
| TTFT p50/p99 | `summary.md` | End-to-end first-token latency (includes preprocess + routing + transport) |
| `transport_roundtrip` p50 | `report.md` section 4 | Time spent in the TCP request plane; grows when workers are saturated |
| Tokio worker busy ratio | `report.md` section 7 | Fraction of time each async worker is busy; values above 0.95 indicate saturation |
| Event loop stalls | `report.md` section 7 | How often the Tokio runtime stalled; high counts suggest blocking work on the async executor |
| `preprocess.tokenize` | `report.md` section 5 (NVTX) | Per-request tokenization cost; varies by tokenizer backend |
### With Profilers
```bash
# With perf stat + flamegraphs (no root needed)
python3 sweep_runner.py --tokenizers hf --concurrency 64 --isl 1024 \
--benchmark-duration 60 --speedup-ratio 0
# With everything including BPF (needs sudo)
sudo -E python3 sweep_runner.py --tokenizers hf --concurrency 64 --isl 1024 \
--benchmark-duration 60 --speedup-ratio 0
# nsys profiling (needs nsys in PATH)
python3 sweep_runner.py --tokenizers hf --concurrency 64 --isl 1024 \
--benchmark-duration 60 --speedup-ratio 0 \
-- --nsys-path /opt/nvidia/nsight-systems/bin/nsys
```
Profiler controls are passed through to run_perf.sh after `--`:
| Flag | Effect |
|------|--------|
| `--skip-bpf` | Skip BPF tracing |
| `--skip-nsys` | Skip Nsight Systems |
| `--skip-flamegraph` | Skip CPU/off-CPU flamegraphs |
| `--skip-perf` | Skip perf stat hardware counters |
### All Options
| Option | Default | Description |
|--------|---------|-------------|
| `--model` | `Qwen/Qwen3-0.6B` | HF model path |
| `--backend` | `mocker` | Engine: `mocker` (synthetic) or `vllm` |
| `--tokenizers` | `hf,fastokens` | Comma-separated tokenizer backends |
| `--concurrency` | `50,100,200` | Comma-separated concurrency levels |
| `--isl` | `512,1024,2048` | Comma-separated input sequence lengths |
| `--osl` | `256` | Output sequence length |
| `--workers` | `2` | Comma-separated worker counts per model |
| `--num-models` | `1` | Number of model instances (each gets `--workers` workers) |
| `--rps` | - | Comma-separated target request rates (req/s) |
| `--aiperf-targets` | `first` | `first`: model-1 only. `all`: run aiperf for each model |
| `--speedup-ratio` | `1.0` | Mocker speedup (0 = infinite) |
| `--benchmark-duration` | `60` | aiperf run duration (seconds) |
| `--num-requests` | - | Comma-separated request counts (overrides duration) |
| `--output-dir` | auto | Output directory |
| `--max-consecutive-fails` | `2` | Skip remaining ISLs after N failures |
| `--cooldown` | `3` | Seconds between runs |
| `--dry-run` | - | Print plan without executing |
| `--no-report` | - | Skip per-run report generation |
## run_perf.sh
Low-level per-run harness. Normally called by sweep_runner.py, but can be used directly for single runs.
```bash
# Minimal (no profilers)
./run_perf.sh --model Qwen/Qwen3-0.6B --concurrency 32 --num-requests 640 \
--speedup-ratio 0 --skip-bpf --skip-nsys --skip-flamegraph --skip-perf
# Full observability (needs sudo for BPF)
sudo -E ./run_perf.sh --model Qwen/Qwen3-0.6B --concurrency 64 \
--benchmark-duration 60 --speedup-ratio 0
# Multi-model with 2 workers each
./run_perf.sh --model Qwen/Qwen3-0.6B --num-models 2 --workers 2 \
--concurrency 32 --benchmark-duration 30 --speedup-ratio 0 \
--skip-bpf --skip-nsys --skip-flamegraph --skip-perf
# 4 models, 1 worker each, rate-limited to 75 rps
./run_perf.sh --model Qwen/Qwen3-0.6B --num-models 4 --workers 1 \
--concurrency 512 --benchmark-duration 60 --request-rate 75 \
--speedup-ratio 0 --skip-bpf
```
## Analyzing Results
```bash
# Per-run report (generated automatically by sweep_runner.py)
python3 analysis/create_report.py analyze artifacts/sweep_<ts>/hf_c32_isl512_w2
# Auto-find latest run
python3 analysis/create_report.py analyze
# Prometheus delta (initial vs final snapshot)
diff <(grep "^dynamo_frontend" artifacts/.../prometheus/initial_snapshot.txt | sort) \
<(grep "^dynamo_frontend" artifacts/.../prometheus/final_snapshot.txt | sort)
# nsys SQLite queries (when nsys was enabled)
sqlite3 artifacts/.../nsys/frontend.sqlite \
"SELECT name, COUNT(*), ROUND(AVG(end-start)/1e3,1) as avg_us
FROM NVTX_EVENTS WHERE end > start GROUP BY name ORDER BY avg_us DESC"
```
## Output Structure
```text
artifacts/sweep_YYYYMMDD_HHMMSS/
results.csv Sweep results (all runs)
summary.md Comparison table
hf_c32_isl512_w2/ Per-run directory
config.json Run parameters
report.md Analysis report
aiperf/
profile_export_aiperf.json aiperf metrics
prometheus/
initial_snapshot.txt Pre-load metrics
final_snapshot.txt Post-load metrics
timeseries.jsonl Per-second scrapes
system/
thread_count.txt Thread count over time
fd_count.txt FD count over time
proc_status.txt /proc/PID/status snapshots
logs/
frontend.log
mocker_*.log
perf/ (if --with-perf)
perf_stat.txt
cpu_flamegraph.svg
bpf/ (if --with-bpf, needs root)
runqlat.txt
syscall_latency.txt
...
nsys/ (if --with-nsys)
frontend.nsys-rep
frontend.sqlite
```
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Unified observability analysis script.
Ingests the entire output directory from run_perf.sh and
produces a comprehensive report covering:
- aiperf throughput & latency
- Server-side Prometheus metrics (stage durations, tokio, transport, compute)
- NVTX pipeline stages (from nsys SQLite export)
- Syscall profile (from nsys OSRT_API)
- Hardware counters (from perf stat)
- CPU flamegraph pointer
- BPF insights (runqlat, syscall latency, transport latency, context switches)
- System resource trends (thread count, FD count)
- Auto-generated key findings
Usage:
python create_report.py analyze <obs_directory>
python create_report.py analyze # auto-finds latest obs_* dir
"""
import argparse
import json
import sys
from pathlib import Path
from typing import Optional
# Reuse parsers from the existing analysis module (same directory)
_SCRIPT_DIR = Path(__file__).resolve().parent
sys.path.insert(0, str(_SCRIPT_DIR))
from frontend_perf_analysis import AiperfResult, PrometheusSnapshot # noqa: E402
from parsing_util import ( # noqa: E402
find_latest_obs_dir,
load_aiperf,
load_prometheus,
parse_bpftrace_histograms,
parse_nsys_context_switches,
parse_nvtx_stages,
parse_perf_stat,
parse_syscall_profile,
parse_timeseries,
summarize_histogram,
)
# ─── Section helpers ─────────────────────────────────────────────────────────
def _section(title: str) -> str:
"""Format a report section header."""
return f"\n## {title}\n"
def _subsection(title: str) -> str:
return f"\n### {title}\n"
# ─── 1. Configuration ─────────────────────────────────────────────────────
def section_config(obs_dir: Path) -> Optional[str]:
config_path = obs_dir / "config.json"
if not config_path.exists():
return None
try:
with open(config_path) as f:
config = json.load(f)
except (json.JSONDecodeError, OSError):
return None
lines = [_section("1. Capture Configuration")]
key_fields = [
("Model", "model"),
("Concurrency", "concurrency"),
("Num Requests", "num_requests"),
("ISL (tokens)", "isl"),
("OSL (tokens)", "osl"),
("Speedup Ratio", "speedup_ratio"),
("Workers", "num_workers"),
("Event Plane", "event_plane"),
]
lines.append("| Parameter | Value |")
lines.append("|---|---|")
for label, key in key_fields:
val = config.get(key, "N/A")
lines.append(f"| {label} | {val} |")
# Show which profilers were active (keys match run_perf.sh config.json)
profilers = []
if config.get("has_nsys"):
profilers.append("nsys")
if config.get("has_perf"):
profilers.append("perf")
if config.get("has_bpf"):
profilers.append("bpf")
lines.append(f"| Profilers | {', '.join(profilers) if profilers else 'none'} |")
return "\n".join(lines)
# ─── 2. Throughput ─────────────────────────────────────────────────────────
def section_throughput(aiperf: Optional[AiperfResult]) -> Optional[str]:
if not aiperf:
return None
lines = [_section("2. Throughput")]
lines.append("| Metric | Value |")
lines.append("|---|---|")
lines.append(f"| Requests/sec | {aiperf.request_throughput_rps:.2f} |")
lines.append(f"| Output tokens/sec | {aiperf.throughput_tok_s:.2f} |")
return "\n".join(lines)
# ─── 3. Latency ───────────────────────────────────────────────────────────
def section_latency(aiperf: Optional[AiperfResult]) -> Optional[str]:
if not aiperf:
return None
lines = [_section("3. End-to-End Latency")]
lines.append("| Metric | p50 (ms) | p95 (ms) | p99 (ms) |")
lines.append("|---|---:|---:|---:|")
lines.append(
f"| TTFT | {aiperf.ttft_p50_ms:.1f} | {aiperf.ttft_p95_ms:.1f} | {aiperf.ttft_p99_ms:.1f} |"
)
lines.append(
f"| ITL | {aiperf.itl_p50_ms:.1f} | {aiperf.itl_p95_ms:.1f} | {aiperf.itl_p99_ms:.1f} |"
)
return "\n".join(lines)
# ─── 4. Pipeline Stage Durations ──────────────────────────────────────────
def section_lifecycle(prom: Optional[PrometheusSnapshot]) -> Optional[str]:
if not prom or not prom.stage_durations:
return None
lines = [_section("4. Pipeline Stage Durations (Prometheus histograms)")]
lines.append("| Stage | p50 (ms) | p95 (ms) | p99 (ms) |")
lines.append("|---|---:|---:|---:|")
total_p50 = 0
for stage, vals in prom.stage_durations.items():
p50_ms = vals["p50"] * 1000
p95_ms = vals["p95"] * 1000
p99_ms = vals["p99"] * 1000
lines.append(f"| {stage} | {p50_ms:.2f} | {p95_ms:.2f} | {p99_ms:.2f} |")
total_p50 += p50_ms
lines.append(f"| **TOTAL (sum of p50)** | **{total_p50:.2f}** | | |")
return "\n".join(lines)
# ─── 4b. Transport Breakdown ─────────────────────────────────────────────
def section_transport_breakdown(prom: Optional[PrometheusSnapshot]) -> Optional[str]:
if not prom:
return None
has_request_plane = (
prom.request_plane_queue_p50 > 0
or prom.request_plane_send_p50 > 0
or prom.request_plane_roundtrip_ttft_p50 > 0
)
has_work_handler = hasattr(prom, "work_handler_network_transit") and (
prom.work_handler_network_transit or prom.work_handler_time_to_first_response
)
if not has_request_plane and not has_work_handler:
return None
lines = [_section("4b. Transport Breakdown")]
if has_request_plane:
lines.append(_subsection("Frontend View (AddressedPushRouter)"))
queue_ms = prom.request_plane_queue_p50 * 1000
send_ms = prom.request_plane_send_p50 * 1000
roundtrip_ttft_ms = prom.request_plane_roundtrip_ttft_p50 * 1000
lines.append("| Metric | p50 (ms) |")
lines.append("|---|---:|")
lines.append(f"| Queue (serialize+encode) | {queue_ms:.2f} |")
lines.append(f"| Send (network+ack) | {send_ms:.2f} |")
lines.append(f"| Roundtrip TTFT | {roundtrip_ttft_ms:.2f} |")
lines.append(f"| Inflight gauge | {prom.request_plane_inflight:.0f} |")
if has_work_handler:
lines.append(_subsection("Backend View (WorkHandler)"))
parts_rows = []
if prom.work_handler_network_transit:
t = prom.work_handler_network_transit
parts_rows.append(
f"| Part 1 - Network transit (T2-T1) | {t['p50']*1000:.2f} | {t['p95']*1000:.2f} | {t['p99']*1000:.2f} |"
)
if prom.work_handler_time_to_first_response:
t = prom.work_handler_time_to_first_response
parts_rows.append(
f"| Part 2 - Processing (T3-T2) | {t['p50']*1000:.2f} | {t['p95']*1000:.2f} | {t['p99']*1000:.2f} |"
)
if parts_rows:
lines.append("| Phase | p50 (ms) | p95 (ms) | p99 (ms) |")
lines.append("|---|---:|---:|---:|")
lines.extend(parts_rows)
# Note: Part 3 (response return) cannot be reliably derived by
# subtracting p50s because quantiles are not additive across
# independent distributions. A distribution-based breakdown
# would require access to raw sample histograms.
return "\n".join(lines)
# ─── 5. NVTX Pipeline Stages ────────────────────────────────────────────────
def section_nvtx(obs_dir: Path) -> Optional[str]:
"""Format NVTX_EVENTS from nsys SQLite export."""
stages = parse_nvtx_stages(obs_dir)
if not stages:
return None
lines = [_section("5. NVTX Pipeline Stages (from nsys SQLite)")]
lines.append("| Range Name | Count | Avg (us) | Min (us) | Max (us) |")
lines.append("|---|---:|---:|---:|---:|")
for s in stages:
name = s["name"][:40]
lines.append(
f"| {name} | {s['count']:d} | {s['avg_us']:.1f} | {s['min_us']:.1f} | {s['max_us']:.1f} |"
)
return "\n".join(lines)
# ─── 6. Syscall Profile ─────────────────────────────────────────────────────
def section_syscall_profile(obs_dir: Path) -> Optional[str]:
"""Format OSRT_API from nsys SQLite (OS runtime API calls)."""
profile = parse_syscall_profile(obs_dir)
if not profile:
return None
lines = [_section("6. Syscall / OS Runtime Profile (from nsys)")]
lines.append("| API Call | Count | Avg (us) | Total (ms) |")
lines.append("|---|---:|---:|---:|")
for entry in profile:
name = entry["name"][:40]
lines.append(
f"| {name} | {entry['count']:d} | {entry['avg_us']:.1f} | {entry['total_ms']:.1f} |"
)
return "\n".join(lines)
# ─── 7. Tokio Runtime Health ────────────────────────────────────────────────
def _worker_distribution(values: list, unit: str, warn_threshold: float) -> list:
"""Summarize a per-worker metric list into a compact distribution table."""
if not values:
return []
n = len(values)
sorted_vals = sorted(values)
avg = sum(values) / n
p50 = sorted_vals[n // 2]
p99 = sorted_vals[min(int(n * 0.99), n - 1)]
mn, mx = sorted_vals[0], sorted_vals[-1]
n_warn = sum(1 for v in values if v > warn_threshold)
n_ok = n - n_warn
lines = []
lines.append(f"| Workers | {n} |")
lines.append(f"| Avg | {avg:,.0f} {unit} |")
lines.append(f"| p50 | {p50:,.0f} {unit} |")
lines.append(f"| p99 | {p99:,.0f} {unit} |")
lines.append(f"| Min / Max | {mn:,.0f} / {mx:,.0f} {unit} |")
lines.append(
f"| Health | {n_ok} ok, {n_warn} warn (threshold: {warn_threshold:,.0f} {unit}) |"
)
return lines
def section_tokio(prom: Optional[PrometheusSnapshot]) -> Optional[str]:
if not prom:
return None
if not prom.tokio_worker_mean_poll_time_ns and not prom.tokio_worker_busy_ratio:
return None
lines = [_section("7. Tokio Runtime Health")]
# --- Poll Time Summary ---
if prom.tokio_worker_mean_poll_time_ns:
lines.append(_subsection("Worker Poll Time"))
lines.append("| Metric | Value |")
lines.append("|---|---|")
lines.extend(
_worker_distribution(prom.tokio_worker_mean_poll_time_ns, "ns", 100_000)
)
# --- Busy Ratio Summary ---
if prom.tokio_worker_busy_ratio:
n = len(prom.tokio_worker_busy_ratio)
sorted_br = sorted(prom.tokio_worker_busy_ratio)
avg_br = sum(prom.tokio_worker_busy_ratio) / n
p50_br = sorted_br[n // 2]
p99_br = sorted_br[min(int(n * 0.99), n - 1)]
n_saturated = sum(1 for v in prom.tokio_worker_busy_ratio if v >= 0.95)
n_hot = sum(1 for v in prom.tokio_worker_busy_ratio if 0.5 <= v < 0.95)
n_idle = sum(1 for v in prom.tokio_worker_busy_ratio if v < 0.5)
lines.append(_subsection("Worker Busy Ratio"))
lines.append("| Metric | Value |")
lines.append("|---|---|")
lines.append(f"| Workers | {n} |")
lines.append(f"| Avg | {avg_br:.3f} |")
lines.append(f"| p50 / p99 | {p50_br:.3f} / {p99_br:.3f} |")
lines.append(f"| Min / Max | {sorted_br[0]:.3f} / {sorted_br[-1]:.3f} |")
lines.append(
f"| Distribution | {n_idle} idle (<0.5), {n_hot} hot (0.5-0.95), {n_saturated} saturated (>=0.95) |"
)
# --- Event Loop ---
lines.append(_subsection("Event Loop"))
lines.append("| Metric | Value |")
lines.append("|---|---:|")
lines.append(f"| Stall count | {prom.tokio_event_loop_stall_total:.0f} |")
lines.append(f"| Global queue depth | {prom.tokio_global_queue_depth:.0f} |")
lines.append(
f"| Budget forced yields | {prom.tokio_budget_forced_yield_total:.0f} |"
)
# --- Assessment ---
lines.append(_subsection("Assessment"))
issues = []
if prom.tokio_worker_mean_poll_time_ns:
avg_pt = sum(prom.tokio_worker_mean_poll_time_ns) / len(
prom.tokio_worker_mean_poll_time_ns
)
if avg_pt > 100_000:
issues.append(f"High avg poll time: {avg_pt:,.0f}ns (threshold: 100,000ns)")
if prom.tokio_worker_busy_ratio:
n_saturated = sum(1 for v in prom.tokio_worker_busy_ratio if v >= 0.95)
avg_br = sum(prom.tokio_worker_busy_ratio) / len(prom.tokio_worker_busy_ratio)
if avg_br > 0.8:
issues.append(f"High avg busy ratio: {avg_br:.3f} (threshold: 0.8)")
if n_saturated > 0:
issues.append(f"{n_saturated} worker(s) saturated (busy ratio >= 0.95)")
if prom.tokio_event_loop_stall_total > 0:
issues.append(
f"Event loop stalls detected: {prom.tokio_event_loop_stall_total:.0f}"
)
if issues:
for issue in issues:
lines.append(f"- **warn** {issue}")
else:
lines.append("- **ok** Tokio runtime healthy")
return "\n".join(lines)
# ─── 8. Transport Gauges ─────────────────────────────────────────────────
def section_transport(prom: Optional[PrometheusSnapshot]) -> Optional[str]:
if not prom:
return None
if prom.tcp_pool_active == 0 and prom.tcp_pool_idle == 0:
return None
lines = [_section("8. Transport Layer")]
lines.append("| Metric | Value |")
lines.append("|---|---:|")
lines.append(f"| TCP Pool Active | {prom.tcp_pool_active:.0f} |")
lines.append(f"| TCP Pool Idle | {prom.tcp_pool_idle:.0f} |")
total = prom.tcp_pool_active + prom.tcp_pool_idle
if total > 0:
utilization = prom.tcp_pool_active / total * 100
lines.append(f"| Utilization | {utilization:.1f}% |")
return "\n".join(lines)
# ─── 9. Compute Pool ─────────────────────────────────────────────────────
def section_compute(prom: Optional[PrometheusSnapshot]) -> Optional[str]:
if not prom or prom.compute_pool_active == 0:
return None
lines = [_section("9. Compute Pool")]
lines.append(f"Active tasks: {prom.compute_pool_active:.0f}")
return "\n".join(lines)
# ─── 10. Hardware Counters ──────────────────────────────────────────────────
def section_hw_counters(obs_dir: Path) -> Optional[str]:
counters = parse_perf_stat(obs_dir)
if not counters:
return None
lines = [_section("10. Hardware Counters (perf stat)")]
lines.append("| Counter | Value |")
lines.append("|---|---:|")
for key in [
"task-clock",
"context-switches",
"cpu-migrations",
"page-faults",
"cycles",
"instructions",
"branches",
"branch-misses",
"cache-references",
"cache-misses",
]:
if key in counters:
lines.append(f"| {key} | {counters[key]:,.0f} |")
if "ipc" in counters:
lines.append(f"| IPC | {counters['ipc']:.2f} |")
if "cache-miss-rate" in counters:
lines.append(f"| Cache miss rate | {counters['cache-miss-rate']:.2f}% |")
if "branch-miss-rate" in counters:
lines.append(f"| Branch miss rate | {counters['branch-miss-rate']:.2f}% |")
return "\n".join(lines)
# ─── 11. Flamegraph ─────────────────────────────────────────────────────
def section_flamegraph(obs_dir: Path) -> Optional[str]:
lines = [_section("11. Flamegraphs")]
found = False
svg_entries = [
("cpu_flamegraph.svg", "CPU Flamegraph"),
("offcpu_flamegraph.svg", "Off-CPU Flamegraph"),
]
for filename, label in svg_entries:
path = obs_dir / "perf" / filename
if path.exists() and path.stat().st_size > 0:
# Use relative path from report.md location (obs_dir/)
rel_path = f"perf/{filename}"
lines.append(f"### {label}")
lines.append("")
lines.append(f'<img src="{rel_path}" alt="{label}" width="100%">')
lines.append("")
lines.append(f"*File: `{path}`*")
lines.append("")
found = True
return "\n".join(lines) if found else None
# ─── 12. BPF Insights ───────────────────────────────────────────────────────
def section_bpf(obs_dir: Path) -> Optional[str]:
bpf_dir = obs_dir / "bpf"
if not bpf_dir.exists():
return None
parts = []
def _bpf_summary(bpf_path: Path, title: str, desc: str) -> list:
"""Parse a BPF histogram file and return a compact summary."""
if not bpf_path.exists() or bpf_path.stat().st_size == 0:
return []
text = bpf_path.read_text()
hists = parse_bpftrace_histograms(text)
if not hists:
return []
# Aggregate across all histogram labels for a single summary
total_samples = 0
weighted_p50_sum = 0.0
weighted_p99_sum = 0.0
max_p99 = 0.0
max_bucket_seen = 0
for h in hists[:10]:
stats = summarize_histogram(h["buckets"])
n = stats["total"]
if n == 0:
continue
total_samples += n
weighted_p50_sum += stats["p50"] * n
weighted_p99_sum += stats["p99"] * n
max_p99 = max(max_p99, stats["p99"])
max_bucket_seen = max(max_bucket_seen, stats["max_bucket"])
if total_samples == 0:
return []
avg_p50 = weighted_p50_sum / total_samples
avg_p99 = weighted_p99_sum / total_samples
rows = [_subsection(title)]
rows.append(f"> {desc}")
rows.append("")
rows.append("| Metric | Value |")
rows.append("|---|---|")
rows.append(f"| Total samples | {total_samples:,d} |")
rows.append(f"| p50 (weighted avg) | {avg_p50:,.0f} us |")
rows.append(f"| p99 (weighted avg) | {avg_p99:,.0f} us |")
rows.append(f"| Worst p99 | {max_p99:,.0f} us |")
rows.append(f"| Max bucket | {max_bucket_seen:,d} us |")
return rows
parts.extend(
_bpf_summary(
bpf_dir / "runqlat.txt",
"Run Queue Latency",
"Time tasks spent waiting in the CPU run queue before being scheduled. "
"High values indicate CPU contention or oversubscription.",
)
)
parts.extend(
_bpf_summary(
bpf_dir / "syscall_latency.txt",
"Syscall Latency",
"Latency of key syscalls (futex, epoll_wait, read, write, recvmsg). "
"High futex latency suggests lock contention; high epoll_wait means idle polling.",
)
)
parts.extend(
_bpf_summary(
bpf_dir / "transport_latency.txt",
"Transport Latency (BPF)",
"Kernel-observed latency for TCP send/recv operations on the transport plane. "
"High values may indicate network congestion or small-buffer stalls.",
)
)
parts.extend(
_bpf_summary(
bpf_dir / "context_switches.txt",
"Context Switch Overhead",
"Voluntary and involuntary context switch overhead per event. "
"High involuntary switches suggest CPU pressure.",
)
)
if not parts:
return None
return _section("12. BPF Insights") + "\n".join(parts)
# ─── 13. System Resources ───────────────────────────────────────────────────
def section_system_resources(obs_dir: Path) -> Optional[str]:
system_dir = obs_dir / "system"
if not system_dir.exists():
return None
rows = []
# Thread count
thread_data = parse_timeseries(system_dir / "thread_count.txt", "threads")
if thread_data:
values = [v for _, v in thread_data]
rows.append(
f"| Threads | {min(values):.0f} | {max(values):.0f} | {sum(values)/len(values):.0f} | {len(values)} |"
)
# FD count
fd_data = parse_timeseries(system_dir / "fd_count.txt", "fds")
if fd_data:
values = [v for _, v in fd_data]
rows.append(
f"| FDs | {min(values):.0f} | {max(values):.0f} | {sum(values)/len(values):.0f} | {len(values)} |"
)
if not rows:
return None
lines = [_section("13. System Resources")]
lines.append("| Resource | Min | Max | Avg | Samples |")
lines.append("|---|---:|---:|---:|---:|")
lines.extend(rows)
return "\n".join(lines)
# ─── 14. Context Switches (nsys) ────────────────────────────────────────────
def section_nsys_context_switches(obs_dir: Path) -> Optional[str]:
data = parse_nsys_context_switches(obs_dir)
if not data:
return None
lines = [_section("14. Context Switches (nsys SCHED_EVENTS)")]
lines.append("| Metric | Value |")
lines.append("|---|---:|")
lines.append(f"| Total Events | {data['total']:,d} |")
if data["avg_duration"]:
lines.append(f"| Avg Duration | {data['avg_duration']:.0f} (tid units) |")
return "\n".join(lines)
# ─── 15. Key Findings ───────────────────────────────────────────────────────
def section_key_findings(
aiperf: Optional[AiperfResult],
prom: Optional[PrometheusSnapshot],
hw: Optional[dict],
) -> str:
"""Auto-generate insights based on thresholds."""
findings = []
if aiperf:
if aiperf.ttft_p99_ms > 500:
findings.append(f"High TTFT p99: {aiperf.ttft_p99_ms:.0f}ms (> 500ms)")
if prom:
# Check transport breakdown
if prom.request_plane_roundtrip_ttft_p50 > 0.1:
findings.append(
f"High roundtrip TTFT p50: {prom.request_plane_roundtrip_ttft_p50*1000:.0f}ms (> 100ms)"
)
# Tokio health
if prom.tokio_worker_mean_poll_time_ns:
avg_pt = sum(prom.tokio_worker_mean_poll_time_ns) / len(
prom.tokio_worker_mean_poll_time_ns
)
if avg_pt > 100_000:
findings.append(
f"Tokio avg poll time elevated: {avg_pt:.0f}ns (> 100μs)"
)
if prom.tokio_event_loop_stall_total > 10:
findings.append(
f"Multiple event loop stalls: {prom.tokio_event_loop_stall_total:.0f}"
)
if hw:
if hw.get("cache-miss-rate", 0) > 5:
findings.append(f"Cache miss rate: {hw['cache-miss-rate']:.1f}% (> 5%)")
if hw.get("ipc", 999) < 0.5:
findings.append(f"Low IPC: {hw['ipc']:.2f} (< 0.5)")
lines = [_section("15. Key Findings")]
if not findings:
lines.append("No anomalies detected - all metrics within expected ranges.")
else:
lines.append(f"Found {len(findings)} notable item(s):")
lines.append("")
for i, f in enumerate(findings, 1):
lines.append(f"{i}. {f}")
return "\n".join(lines)
# ─── Main: assemble report ──────────────────────────────────────────────────
def run_analysis(obs_dir: Path) -> str:
"""Run full analysis and return the report as a string."""
print(f"Analyzing: {obs_dir}")
print("")
# Load data sources
aiperf = load_aiperf(obs_dir)
prom = load_prometheus(obs_dir)
hw = parse_perf_stat(obs_dir)
# Build report
sections = [
section_config(obs_dir),
section_throughput(aiperf),
section_latency(aiperf),
section_lifecycle(prom),
section_transport_breakdown(prom),
section_nvtx(obs_dir),
section_syscall_profile(obs_dir),
section_tokio(prom),
section_transport(prom),
section_compute(prom),
section_hw_counters(obs_dir),
section_flamegraph(obs_dir),
section_bpf(obs_dir),
section_system_resources(obs_dir),
section_nsys_context_switches(obs_dir),
section_key_findings(aiperf, prom, hw),
]
# Filter out None sections
report_parts = [s for s in sections if s is not None]
# Header
header = (
"# Unified Observability Report\n\n"
f"**Directory:** `{obs_dir}`\n\n"
f"**Generated:** {__import__('datetime').datetime.now().isoformat()}\n"
)
report = header + "\n".join(report_parts)
# Summary of what's missing
missing = []
if aiperf is None:
missing.append("aiperf results")
if prom is None:
missing.append("Prometheus snapshot")
if hw is None:
missing.append("perf stat")
if not (obs_dir / "nsys" / "frontend.sqlite").exists():
missing.append("nsys SQLite")
if not (obs_dir / "bpf").exists() or not any((obs_dir / "bpf").glob("*.txt")):
missing.append("BPF data")
if missing:
report += f"\n\n[Skipped sections — missing data: {', '.join(missing)}]\n"
return report
def cmd_analyze(args: argparse.Namespace) -> None:
"""Analyze an observability capture directory."""
if args.obs_dir:
obs_dir = Path(args.obs_dir)
else:
# Auto-find latest
script_dir = Path(__file__).resolve().parent # .../analysis/
repo_root = script_dir.parents[3] # .../dynamo/ (repo root)
obs_dir_found = find_latest_obs_dir(repo_root)
if obs_dir_found is None:
print("ERROR: No artifacts/obs_* directory found. Specify path explicitly.")
sys.exit(1)
obs_dir = obs_dir_found
print(f"Auto-detected: {obs_dir}")
if not obs_dir.exists():
print(f"ERROR: Directory not found: {obs_dir}")
sys.exit(1)
report = run_analysis(obs_dir)
# Print to stdout
print(report)
# Also write to report.md
report_path = obs_dir / "report.md"
report_path.write_text(report)
print(f"\nReport saved to: {report_path}")
def main() -> None:
parser = argparse.ArgumentParser(
description="Unified observability analysis for dynamo frontend"
)
subparsers = parser.add_subparsers(dest="command", required=True)
p_analyze = subparsers.add_parser(
"analyze", help="Analyze an observability capture directory"
)
p_analyze.add_argument(
"obs_dir",
nargs="?",
default=None,
help="Path to obs_* directory (default: auto-find latest)",
)
p_analyze.set_defaults(func=cmd_analyze)
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
main()
#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Frontend performance analysis script.
Ingests aiperf JSON + Prometheus snapshots and produces:
- Scalability curves (TTFT/ITL/throughput vs concurrency for each ISL)
- KV router A/B comparison
- ISL scaling heatmaps
- Stage waterfall breakdown
- Transport overhead analysis
- Tokio health correlation
- Regression detection (Mann-Whitney U test)
Usage:
# Analyze a single run
python frontend_perf_analysis.py analyze <artifact_dir>
# Compare two runs (A/B or regression)
python frontend_perf_analysis.py compare <baseline_dir> <candidate_dir>
# Generate ISL heatmap
python frontend_perf_analysis.py heatmap <artifact_dir>
"""
import argparse
import json
import re
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
@dataclass
class AiperfResult:
"""Parsed aiperf profile result."""
concurrency: int = 0
isl: int = 0
ttft_p50_ms: float = 0.0
ttft_p95_ms: float = 0.0
ttft_p99_ms: float = 0.0
itl_p50_ms: float = 0.0
itl_p95_ms: float = 0.0
itl_p99_ms: float = 0.0
throughput_tok_s: float = 0.0
request_throughput_rps: float = 0.0
@dataclass
class PrometheusSnapshot:
"""Parsed Prometheus /metrics snapshot."""
stage_durations: dict = field(default_factory=dict) # stage -> {p50, p95, p99}
request_plane_queue_p50: float = 0.0
request_plane_send_p50: float = 0.0
request_plane_roundtrip_ttft_p50: float = 0.0
request_plane_inflight: float = 0.0
# Transport breakdown (backend-side, cross-process wall-clock)
work_handler_network_transit: dict = field(default_factory=dict) # {p50, p95, p99}
work_handler_time_to_first_response: dict = field(
default_factory=dict
) # {p50, p95, p99}
tokio_worker_mean_poll_time_ns: list = field(default_factory=list)
tokio_event_loop_stall_total: float = 0.0
tokio_global_queue_depth: float = 0.0
tokio_budget_forced_yield_total: float = 0.0
tokio_worker_busy_ratio: list = field(default_factory=list)
tcp_pool_active: float = 0.0
tcp_pool_idle: float = 0.0
compute_pool_active: float = 0.0
@dataclass
class TestPoint:
"""A single test point (concurrency x ISL)."""
key: str
concurrency: int
isl: int
aiperf: Optional[AiperfResult] = None
prometheus: Optional[PrometheusSnapshot] = None
def parse_aiperf_json(path: Path) -> Optional[AiperfResult]:
"""Parse aiperf profile_export_aiperf.json."""
# Look for the aiperf JSON output in the directory
candidates = [
path / "profile_export_aiperf.json",
path / "profile_results.json",
]
# Also try any .json file in the directory
if path.is_dir():
candidates.extend(sorted(path.glob("*.json")))
for candidate in candidates:
if candidate.exists():
try:
with open(candidate) as f:
data = json.load(f)
return _extract_aiperf_metrics(data)
except (json.JSONDecodeError, KeyError):
continue
return None
def _extract_aiperf_metrics(data: dict) -> AiperfResult:
"""Extract metrics from aiperf v0.6+ JSON structure.
Top-level keys: time_to_first_token, inter_token_latency,
output_token_throughput, request_throughput, input_config, etc.
Each latency key is a dict with p50/p95/p99/avg/min/max.
"""
result = AiperfResult()
# TTFT — aiperf v0.6 key: time_to_first_token (unit: ms)
ttft = data.get("time_to_first_token", data.get("ttft", {}))
if isinstance(ttft, dict):
result.ttft_p50_ms = float(ttft.get("p50", ttft.get("avg", 0)) or 0)
result.ttft_p95_ms = float(ttft.get("p95", 0) or 0)
result.ttft_p99_ms = float(ttft.get("p99", 0) or 0)
# ITL — aiperf v0.6 key: inter_token_latency (unit: ms)
itl = data.get("inter_token_latency", data.get("itl", {}))
if isinstance(itl, dict):
result.itl_p50_ms = float(itl.get("p50", itl.get("avg", 0)) or 0)
result.itl_p95_ms = float(itl.get("p95", 0) or 0)
result.itl_p99_ms = float(itl.get("p99", 0) or 0)
# Throughput — output_token_throughput.avg (tok/s)
otput = data.get("output_token_throughput", {})
result.throughput_tok_s = float(otput.get("avg", 0) or 0)
# Request throughput — request_throughput.avg (req/s)
rtput = data.get("request_throughput", {})
result.request_throughput_rps = float(rtput.get("avg", 0) or 0)
# Config: concurrency from input_config.loadgen; ISL from input_sequence_length
cfg = data.get("input_config", {})
loadgen = cfg.get("loadgen", {})
result.concurrency = int(loadgen.get("concurrency", 0))
isl_field = data.get("input_sequence_length", {})
result.isl = int(isl_field.get("avg", 0) or 0)
return result
def parse_prometheus_snapshot(path: Path) -> Optional[PrometheusSnapshot]:
"""Parse Prometheus text format snapshot.
Delegates to parsing_util.parse_prometheus_text() with the conventional
``prometheus_snapshot.txt`` filename.
"""
from parsing_util import parse_prometheus_text
snapshot_path = path / "prometheus_snapshot.txt"
if not snapshot_path.exists():
return None
return parse_prometheus_text(snapshot_path)
def load_test_points(artifact_dir: Path) -> list[TestPoint]:
"""Load all test points from an artifact directory."""
points = []
for subdir in sorted(artifact_dir.iterdir()):
if not subdir.is_dir():
continue
# Parse key like "c10_isl4096"
match = re.match(r"c(\d+)_isl(\d+)", subdir.name)
if not match:
# Try nested epoch directories
for nested in sorted(subdir.iterdir()):
if nested.is_dir():
match = re.match(r"c(\d+)_isl(\d+)", nested.name)
if match:
c, isl = int(match.group(1)), int(match.group(2))
point = TestPoint(key=nested.name, concurrency=c, isl=isl)
point.aiperf = parse_aiperf_json(nested)
point.prometheus = parse_prometheus_snapshot(nested)
points.append(point)
continue
c, isl = int(match.group(1)), int(match.group(2))
point = TestPoint(key=subdir.name, concurrency=c, isl=isl)
point.aiperf = parse_aiperf_json(subdir)
point.prometheus = parse_prometheus_snapshot(subdir)
points.append(point)
return points
def print_scalability_table(points: list[TestPoint]) -> None:
"""Print scalability table: TTFT/ITL/throughput vs concurrency for each ISL."""
isls = sorted(set(p.isl for p in points))
concurrencies = sorted(set(p.concurrency for p in points))
for isl in isls:
print(f"\n{'='*80}")
print(f"ISL = {isl}")
print(f"{'='*80}")
print(
f"{'Conc':>6} {'TTFT p50':>10} {'TTFT p95':>10} {'ITL p50':>10} "
f"{'ITL p95':>10} {'Tput tok/s':>12} {'RPS':>8}"
)
print("-" * 80)
for c in concurrencies:
matching = [
p for p in points if p.concurrency == c and p.isl == isl and p.aiperf
]
if not matching:
continue
a = matching[0].aiperf
print(
f"{c:>6} {a.ttft_p50_ms:>10.2f} {a.ttft_p95_ms:>10.2f} "
f"{a.itl_p50_ms:>10.2f} {a.itl_p95_ms:>10.2f} "
f"{a.throughput_tok_s:>12.1f} {a.request_throughput_rps:>8.2f}"
)
def print_stage_waterfall(points: list[TestPoint]) -> None:
"""Print stage breakdown at each load level."""
print(f"\n{'='*80}")
print("Pipeline Stage Breakdown (p50 seconds)")
print(f"{'='*80}")
stages = ["preprocess", "route", "transport_roundtrip", "postprocess"]
header = f"{'Key':>20}"
for s in stages:
header += f" {s:>18}"
print(header)
print("-" * 100)
for p in sorted(points, key=lambda x: (x.isl, x.concurrency)):
if not p.prometheus or not p.prometheus.stage_durations:
continue
line = f"{p.key:>20}"
for s in stages:
val = p.prometheus.stage_durations.get(s, {}).get("p50", 0)
line += f" {val:>18.6f}"
print(line)
def print_transport_breakdown(points: list[TestPoint]) -> None:
"""Print transport overhead: queue_seconds vs roundtrip_ttft_seconds."""
print(f"\n{'='*80}")
print("Transport Overhead (p50 seconds)")
print(f"{'='*80}")
print(f"{'Key':>20} {'Queue (encode)':>16} {'RT TTFT (net)':>16} {'Inflight':>10}")
print("-" * 70)
for p in sorted(points, key=lambda x: (x.isl, x.concurrency)):
if not p.prometheus:
continue
print(
f"{p.key:>20} {p.prometheus.request_plane_queue_p50:>16.6f} "
f"{p.prometheus.request_plane_roundtrip_ttft_p50:>16.6f} "
f"{p.prometheus.request_plane_inflight:>10.0f}"
)
def print_tokio_health(points: list[TestPoint]) -> None:
"""Print tokio health indicators."""
print(f"\n{'='*80}")
print("Tokio Health")
print(f"{'='*80}")
print(
f"{'Key':>20} {'Avg Poll ns':>12} {'Max Poll ns':>12} "
f"{'Stalls':>8} {'Queue':>8} {'Yields':>8} {'Busy Avg':>10}"
)
print("-" * 90)
for p in sorted(points, key=lambda x: (x.isl, x.concurrency)):
if not p.prometheus:
continue
pm = p.prometheus
avg_poll = (
sum(pm.tokio_worker_mean_poll_time_ns)
/ len(pm.tokio_worker_mean_poll_time_ns)
if pm.tokio_worker_mean_poll_time_ns
else 0
)
max_poll = max(pm.tokio_worker_mean_poll_time_ns, default=0)
avg_busy = (
sum(pm.tokio_worker_busy_ratio) / len(pm.tokio_worker_busy_ratio)
if pm.tokio_worker_busy_ratio
else 0
)
# Health indicators
poll_status = ""
if avg_poll > 1_000_000:
poll_status = " STARVING"
elif avg_poll > 100_000:
poll_status = " WARN"
print(
f"{p.key:>20} {avg_poll:>12.0f} {max_poll:>12.0f} "
f"{pm.tokio_event_loop_stall_total:>8.0f} "
f"{pm.tokio_global_queue_depth:>8.0f} "
f"{pm.tokio_budget_forced_yield_total:>8.0f} "
f"{avg_busy:>10.3f}{poll_status}"
)
def compare_runs(
baseline_dir: Path, candidate_dir: Path, threshold_pct: float = 10.0
) -> None:
"""Compare two runs and flag regressions."""
baseline_points = load_test_points(baseline_dir)
candidate_points = load_test_points(candidate_dir)
if not baseline_points or not candidate_points:
print("ERROR: No test points found in one or both directories.")
return
# Index by key
baseline_map = {(p.concurrency, p.isl): p for p in baseline_points}
candidate_map = {(p.concurrency, p.isl): p for p in candidate_points}
common_keys = sorted(set(baseline_map.keys()) & set(candidate_map.keys()))
if not common_keys:
print("ERROR: No matching test points between baseline and candidate.")
return
print(f"\n{'='*100}")
print(
f"A/B Comparison: {baseline_dir.name} (baseline) vs {candidate_dir.name} (candidate)"
)
print(f"Regression threshold: {threshold_pct}%")
print(f"{'='*100}")
print(
f"{'Key':>20} {'TTFT p50 B':>12} {'TTFT p50 C':>12} {'Delta%':>8} "
f"{'ITL p50 B':>12} {'ITL p50 C':>12} {'Delta%':>8} {'Status':>12}"
)
print("-" * 100)
regressions = []
for key in common_keys:
bp = baseline_map[key]
cp = candidate_map[key]
if not bp.aiperf or not cp.aiperf:
continue
ttft_delta = 0
if bp.aiperf.ttft_p50_ms > 0:
ttft_delta = (
(cp.aiperf.ttft_p50_ms - bp.aiperf.ttft_p50_ms)
/ bp.aiperf.ttft_p50_ms
* 100
)
itl_delta = 0
if bp.aiperf.itl_p50_ms > 0:
itl_delta = (
(cp.aiperf.itl_p50_ms - bp.aiperf.itl_p50_ms)
/ bp.aiperf.itl_p50_ms
* 100
)
status = "OK"
if ttft_delta > threshold_pct or itl_delta > threshold_pct:
status = "REGRESSION"
regressions.append((f"c{key[0]}_isl{key[1]}", ttft_delta, itl_delta))
elif ttft_delta < -threshold_pct or itl_delta < -threshold_pct:
status = "IMPROVED"
print(
f"{'c' + str(key[0]) + '_isl' + str(key[1]):>20} "
f"{bp.aiperf.ttft_p50_ms:>12.2f} {cp.aiperf.ttft_p50_ms:>12.2f} "
f"{ttft_delta:>+7.1f}% "
f"{bp.aiperf.itl_p50_ms:>12.2f} {cp.aiperf.itl_p50_ms:>12.2f} "
f"{itl_delta:>+7.1f}% "
f"{status:>12}"
)
# Note: Mann-Whitney U on aggregated p50 values is statistically invalid
# because p50s across (concurrency, ISL) configs are not IID samples.
# A valid test would require raw per-request TTFT samples for matched
# configurations. Skipped to avoid reporting misleading p-values.
if regressions:
print(f"\nWARNING: {len(regressions)} regression(s) detected:")
for key, ttft_d, itl_d in regressions:
print(f" {key}: TTFT {ttft_d:+.1f}%, ITL {itl_d:+.1f}%")
def print_heatmap(points: list[TestPoint]) -> None:
"""Print ISL x concurrency heatmap of TTFT p95."""
isls = sorted(set(p.isl for p in points))
concurrencies = sorted(set(p.concurrency for p in points))
point_map = {(p.concurrency, p.isl): p for p in points}
print(f"\n{'='*80}")
print("TTFT p95 Heatmap (ms) — Concurrency x ISL")
print(f"{'='*80}")
# Header
header = f"{'Conc':>8}"
for isl in isls:
header += f" {'ISL=' + str(isl):>12}"
print(header)
print("-" * (8 + 13 * len(isls)))
for c in concurrencies:
line = f"{c:>8}"
for isl in isls:
key = (c, isl)
if key in point_map and point_map[key].aiperf:
val = point_map[key].aiperf.ttft_p95_ms
line += f" {val:>12.2f}"
else:
line += f" {'---':>12}"
print(line)
def generate_plots(points: list[TestPoint], output_dir: Path) -> None:
"""Generate matplotlib plots if available."""
try:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import numpy as np
except ImportError:
print("\nSKIP: matplotlib/numpy not available for plot generation")
print("Install: pip install matplotlib numpy")
return
output_dir.mkdir(parents=True, exist_ok=True)
isls = sorted(set(p.isl for p in points))
concurrencies = sorted(set(p.concurrency for p in points))
# --- Scalability curves ---
fig, axes = plt.subplots(1, 3, figsize=(18, 6))
for isl in isls:
isl_points = sorted(
[p for p in points if p.isl == isl and p.aiperf],
key=lambda x: x.concurrency,
)
if not isl_points:
continue
cs = [p.concurrency for p in isl_points]
ttft_p50 = [p.aiperf.ttft_p50_ms for p in isl_points]
itl_p50 = [p.aiperf.itl_p50_ms for p in isl_points]
tput = [p.aiperf.throughput_tok_s for p in isl_points]
axes[0].plot(cs, ttft_p50, "o-", label=f"ISL={isl}")
axes[1].plot(cs, itl_p50, "o-", label=f"ISL={isl}")
axes[2].plot(cs, tput, "o-", label=f"ISL={isl}")
axes[0].set_title("TTFT p50 vs Concurrency")
axes[0].set_xlabel("Concurrency")
axes[0].set_ylabel("TTFT p50 (ms)")
axes[0].legend()
axes[0].grid(True)
axes[1].set_title("ITL p50 vs Concurrency")
axes[1].set_xlabel("Concurrency")
axes[1].set_ylabel("ITL p50 (ms)")
axes[1].legend()
axes[1].grid(True)
axes[2].set_title("Throughput vs Concurrency")
axes[2].set_xlabel("Concurrency")
axes[2].set_ylabel("Throughput (tok/s)")
axes[2].legend()
axes[2].grid(True)
plt.tight_layout()
plt.savefig(output_dir / "scalability_curves.png", dpi=150)
plt.close()
print(f" Saved: {output_dir / 'scalability_curves.png'}")
# --- ISL Heatmap ---
if len(isls) > 1 and len(concurrencies) > 1:
point_map = {(p.concurrency, p.isl): p for p in points}
matrix = np.full((len(concurrencies), len(isls)), np.nan)
for i, c in enumerate(concurrencies):
for j, isl in enumerate(isls):
key = (c, isl)
if key in point_map and point_map[key].aiperf:
matrix[i, j] = point_map[key].aiperf.ttft_p95_ms
fig, ax = plt.subplots(figsize=(10, 8))
im = ax.imshow(matrix, aspect="auto", cmap="YlOrRd")
ax.set_xticks(range(len(isls)))
ax.set_xticklabels([str(x) for x in isls])
ax.set_yticks(range(len(concurrencies)))
ax.set_yticklabels([str(x) for x in concurrencies])
ax.set_xlabel("Input Sequence Length")
ax.set_ylabel("Concurrency")
ax.set_title("TTFT p95 (ms) — Concurrency x ISL")
plt.colorbar(im, label="TTFT p95 (ms)")
# Annotate cells
for i in range(len(concurrencies)):
for j in range(len(isls)):
if not np.isnan(matrix[i, j]):
ax.text(
j,
i,
f"{matrix[i, j]:.1f}",
ha="center",
va="center",
fontsize=8,
)
plt.tight_layout()
plt.savefig(output_dir / "isl_heatmap.png", dpi=150)
plt.close()
print(f" Saved: {output_dir / 'isl_heatmap.png'}")
# --- Stage waterfall ---
stages = ["preprocess", "route", "transport_roundtrip", "postprocess"]
stage_data = {}
labels = []
for p in sorted(points, key=lambda x: (x.isl, x.concurrency)):
if not p.prometheus or not p.prometheus.stage_durations:
continue
labels.append(p.key)
for s in stages:
stage_data.setdefault(s, []).append(
p.prometheus.stage_durations.get(s, {}).get("p50", 0) * 1000
)
if labels and any(stage_data.values()):
fig, ax = plt.subplots(figsize=(14, 6))
bottoms = np.zeros(len(labels))
colors = ["#2196F3", "#4CAF50", "#FF9800", "#9C27B0"]
for s, color in zip(stages, colors):
vals = np.array(stage_data.get(s, [0] * len(labels)))
ax.bar(labels, vals, bottom=bottoms, label=s, color=color)
bottoms += vals
ax.set_xlabel("Test Point")
ax.set_ylabel("Duration (ms)")
ax.set_title("Pipeline Stage Breakdown (p50)")
ax.legend()
plt.xticks(rotation=45, ha="right")
plt.tight_layout()
plt.savefig(output_dir / "stage_waterfall.png", dpi=150)
plt.close()
print(f" Saved: {output_dir / 'stage_waterfall.png'}")
def cmd_analyze(args: argparse.Namespace) -> None:
"""Analyze a single run directory."""
artifact_dir = Path(args.artifact_dir)
if not artifact_dir.exists():
print(f"ERROR: Directory not found: {artifact_dir}")
sys.exit(1)
print(f"Loading test points from: {artifact_dir}")
points = load_test_points(artifact_dir)
if not points:
print("ERROR: No test points found. Expected directories like c10_isl4096/")
sys.exit(1)
print(f"Found {len(points)} test point(s)")
print_scalability_table(points)
print_stage_waterfall(points)
print_transport_breakdown(points)
print_tokio_health(points)
print_heatmap(points)
if not args.no_plots:
plot_dir = artifact_dir / "plots"
print(f"\nGenerating plots in: {plot_dir}")
generate_plots(points, plot_dir)
def cmd_compare(args: argparse.Namespace) -> None:
"""Compare two run directories."""
baseline = Path(args.baseline_dir)
candidate = Path(args.candidate_dir)
for d in [baseline, candidate]:
if not d.exists():
print(f"ERROR: Directory not found: {d}")
sys.exit(1)
compare_runs(baseline, candidate, threshold_pct=args.threshold)
def cmd_heatmap(args: argparse.Namespace) -> None:
"""Generate ISL heatmap."""
artifact_dir = Path(args.artifact_dir)
points = load_test_points(artifact_dir)
if not points:
print("ERROR: No test points found.")
sys.exit(1)
print_heatmap(points)
if not args.no_plots:
plot_dir = artifact_dir / "plots"
generate_plots(points, plot_dir)
def main() -> None:
parser = argparse.ArgumentParser(description="Dynamo frontend performance analysis")
subparsers = parser.add_subparsers(dest="command", required=True)
# analyze
p_analyze = subparsers.add_parser("analyze", help="Analyze a single run")
p_analyze.add_argument("artifact_dir", help="Directory with test point subdirs")
p_analyze.add_argument(
"--no-plots", action="store_true", help="Skip plot generation"
)
p_analyze.set_defaults(func=cmd_analyze)
# compare
p_compare = subparsers.add_parser("compare", help="Compare two runs (A/B)")
p_compare.add_argument("baseline_dir", help="Baseline run directory")
p_compare.add_argument("candidate_dir", help="Candidate run directory")
p_compare.add_argument(
"--threshold",
type=float,
default=10.0,
help="Regression threshold percent (default: 10)",
)
p_compare.set_defaults(func=cmd_compare)
# heatmap
p_heatmap = subparsers.add_parser("heatmap", help="Generate ISL heatmap")
p_heatmap.add_argument("artifact_dir", help="Directory with test point subdirs")
p_heatmap.add_argument(
"--no-plots", action="store_true", help="Skip plot generation"
)
p_heatmap.set_defaults(func=cmd_heatmap)
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
main()
#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
data parsing utilities.
Pure data-extraction functions for Prometheus histograms, nsys SQLite databases,
perf stat output, bpftrace histograms, and timeseries files. Returns structured
Python objects — no formatting or report generation.
Used by create_report.py for report generation.
"""
import json
import logging
import re
import sqlite3
import sys
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
# Reuse parsers from the existing analysis module (same directory)
_SCRIPT_DIR = Path(__file__).resolve().parent
sys.path.insert(0, str(_SCRIPT_DIR))
from frontend_perf_analysis import ( # noqa: E402
AiperfResult,
PrometheusSnapshot,
_extract_aiperf_metrics,
)
# ─── Prometheus parsing ────────────────────────────────────────────────────
def parse_prometheus_text(path: Path) -> Optional[PrometheusSnapshot]:
"""Parse Prometheus text format from a specific file path.
Extracts stage durations, request plane metrics, transport breakdown,
Tokio runtime metrics, and transport/compute gauges into a PrometheusSnapshot.
"""
if not path.exists() or path.stat().st_size == 0:
return None
text = path.read_text()
snap = PrometheusSnapshot()
def get_gauge(name: str) -> Optional[float]:
m = re.search(rf"^{re.escape(name)}\s+(\S+)", text, re.MULTILINE)
return float(m.group(1)) if m else None
def get_gauge_by_label(name: str, label_key: str) -> dict:
pattern = rf'^{re.escape(name)}\{{[^}}]*{re.escape(label_key)}="([^"]+)"[^}}]*\}}\s+(\S+)'
return {
m.group(1): float(m.group(2))
for m in re.finditer(pattern, text, re.MULTILINE)
}
def histogram_quantile(name: str, quantile: float, filter_label: str = "") -> float:
bucket_pattern = rf"^{re.escape(name)}_bucket\{{[^}}]*{re.escape(filter_label)}[^}}]*le=\"([^\"]+)\"[^}}]*\}}\s+(\S+)"
buckets = []
for m in re.finditer(bucket_pattern, text, re.MULTILINE):
le_str, count_str = m.group(1), m.group(2)
le = float("inf") if le_str == "+Inf" else float(le_str)
buckets.append((le, float(count_str)))
if not buckets:
return 0.0
buckets.sort(key=lambda x: x[0])
count_m = re.search(
rf"^{re.escape(name)}_count\{{{re.escape(filter_label)}[^}}]*\}}\s+(\S+)",
text,
re.MULTILINE,
)
total = float(count_m.group(1)) if count_m else buckets[-1][1]
if total == 0:
return 0.0
target = quantile * total
prev_le, prev_count = 0.0, 0.0
for le, count in buckets:
if count >= target:
if count == prev_count:
return prev_le
frac = (target - prev_count) / (count - prev_count)
return prev_le + frac * (le - prev_le)
prev_le, prev_count = le, count
return buckets[-1][0] if buckets else 0.0
# Stage durations
for stage in ["preprocess", "route", "transport_roundtrip", "postprocess"]:
label_filter = f'stage="{stage}"'
p50 = histogram_quantile(
"dynamo_frontend_stage_duration_seconds", 0.50, label_filter
)
p95 = histogram_quantile(
"dynamo_frontend_stage_duration_seconds", 0.95, label_filter
)
p99 = histogram_quantile(
"dynamo_frontend_stage_duration_seconds", 0.99, label_filter
)
count_m = re.search(
rf"^dynamo_frontend_stage_duration_seconds_count\{{[^}}]*stage=\"{re.escape(stage)}\"[^}}]*\}}\s+(\S+)",
text,
re.MULTILINE,
)
if count_m and float(count_m.group(1)) > 0:
snap.stage_durations[stage] = {"p50": p50, "p95": p95, "p99": p99}
snap.request_plane_queue_p50 = histogram_quantile(
"dynamo_request_plane_queue_seconds", 0.50
)
snap.request_plane_send_p50 = histogram_quantile(
"dynamo_request_plane_send_seconds", 0.50
)
snap.request_plane_roundtrip_ttft_p50 = histogram_quantile(
"dynamo_request_plane_roundtrip_ttft_seconds", 0.50
)
snap.request_plane_inflight = get_gauge("dynamo_request_plane_inflight") or 0
# Transport breakdown (backend-side metrics)
for metric_name, attr_name in [
("dynamo_component_network_transit_seconds", "work_handler_network_transit"),
(
"dynamo_component_time_to_first_response_seconds",
"work_handler_time_to_first_response",
),
]:
p50 = histogram_quantile(metric_name, 0.50)
p95 = histogram_quantile(metric_name, 0.95)
p99 = histogram_quantile(metric_name, 0.99)
if p50 > 0 or p95 > 0 or p99 > 0:
setattr(snap, attr_name, {"p50": p50, "p95": p95, "p99": p99})
poll_times = get_gauge_by_label("dynamo_tokio_worker_mean_poll_time_ns", "worker")
snap.tokio_worker_mean_poll_time_ns = list(poll_times.values())
snap.tokio_event_loop_stall_total = (
get_gauge("dynamo_frontend_event_loop_stall_total") or 0
)
snap.tokio_global_queue_depth = get_gauge("dynamo_tokio_global_queue_depth") or 0
snap.tokio_budget_forced_yield_total = (
get_gauge("dynamo_tokio_budget_forced_yield_total") or 0
)
busy_ratios_raw = get_gauge_by_label("dynamo_tokio_worker_busy_ratio", "worker")
snap.tokio_worker_busy_ratio = [v / 1000.0 for v in busy_ratios_raw.values()]
snap.tcp_pool_active = get_gauge("dynamo_transport_tcp_pool_active") or 0
snap.tcp_pool_idle = get_gauge("dynamo_transport_tcp_pool_idle") or 0
snap.compute_pool_active = (
get_gauge("dynamo_compute_compute_pool_active_tasks") or 0
)
return snap
# ─── aiperf loading ────────────────────────────────────────────────────────
def load_aiperf(obs_dir: Path) -> Optional[AiperfResult]:
"""Load aiperf results from the aiperf subdir."""
aiperf_dir = obs_dir / "aiperf"
for candidate in [
aiperf_dir / "profile_export_aiperf.json",
aiperf_dir / "profile_results.json",
]:
if candidate.exists():
try:
with open(candidate) as f:
data = json.load(f)
return _extract_aiperf_metrics(data)
except (json.JSONDecodeError, KeyError):
continue
# Try any json file in aiperf dir
if aiperf_dir.is_dir():
for jf in sorted(aiperf_dir.glob("*.json")):
try:
with open(jf) as f:
data = json.load(f)
if "time_to_first_token" in data or "ttft" in data:
return _extract_aiperf_metrics(data)
except (json.JSONDecodeError, KeyError):
continue
return None
def load_prometheus(obs_dir: Path) -> Optional[PrometheusSnapshot]:
"""Load Prometheus snapshot — try final_snapshot.txt first, then aiperf dir."""
prom_dir = obs_dir / "prometheus"
final_path = prom_dir / "final_snapshot.txt"
if final_path.exists() and final_path.stat().st_size > 0:
return parse_prometheus_text(final_path)
# Fallback: check aiperf dir
aiperf_prom = obs_dir / "aiperf" / "prometheus_snapshot.txt"
if aiperf_prom.exists():
return parse_prometheus_text(aiperf_prom)
return None
# ─── perf stat parsing ─────────────────────────────────────────────────────
def parse_perf_stat(obs_dir: Path) -> Optional[dict]:
"""Parse perf stat output into a dict of counter name -> value."""
path = obs_dir / "perf" / "perf_stat.txt"
if not path.exists():
return None
text = path.read_text()
counters = {}
patterns = {
"task-clock": r"([\d,\.]+)\s+msec\s+task-clock",
"context-switches": r"([\d,\.]+)\s+context-switches",
"cpu-migrations": r"([\d,\.]+)\s+cpu-migrations",
"page-faults": r"([\d,\.]+)\s+page-faults",
"cycles": r"([\d,\.]+)\s+cycles",
"instructions": r"([\d,\.]+)\s+instructions",
"branches": r"([\d,\.]+)\s+branches",
"branch-misses": r"([\d,\.]+)\s+branch-misses",
"cache-references": r"([\d,\.]+)\s+cache-references",
"cache-misses": r"([\d,\.]+)\s+cache-misses",
}
for name, pattern in patterns.items():
m = re.search(pattern, text)
if m:
counters[name] = float(m.group(1).replace(",", ""))
# Extract IPC if present
ipc_m = re.search(r"([\d,\.]+)\s+insn per cycle", text)
if ipc_m:
counters["ipc"] = float(ipc_m.group(1).replace(",", ""))
# Cache miss rate
cache_refs = counters.get("cache-references", 0)
cache_misses = counters.get("cache-misses", 0)
if cache_refs > 0:
counters["cache-miss-rate"] = cache_misses / cache_refs * 100
# Branch miss rate
branches = counters.get("branches", 0)
branch_misses = counters.get("branch-misses", 0)
if branches > 0:
counters["branch-miss-rate"] = branch_misses / branches * 100
return counters if counters else None
# ─── bpftrace histogram parsing ────────────────────────────────────────────
def parse_bpftrace_histograms(text: str) -> list[dict]:
"""Parse bpftrace histogram output blocks.
Each block looks like:
@label_name[key]:
[1, 2) 123 |@@@@@@@@@@ |
[2, 4) 456 |@@@@@@@@@@@@@@@@@@@@|
"""
histograms = []
current_label = None
current_buckets = []
for line in text.split("\n"):
# Match label line
label_m = re.match(r"^@(\w+)(?:\[([^\]]*)\])?:", line)
if label_m:
if current_label and current_buckets:
histograms.append({"label": current_label, "buckets": current_buckets})
current_label = label_m.group(1)
if label_m.group(2):
current_label += f"[{label_m.group(2)}]"
current_buckets = []
continue
# Match bucket line: [lo, hi) count |bars|
# Handles optional unit suffixes: K (1024), M (1024^2)
bucket_m = re.match(r"\s*\[(\d+)([KkMm])?\s*,\s*(\d+)([KkMm])?\)\s+(\d+)", line)
if bucket_m and current_label:
_unit_mult = {"K": 1024, "k": 1024, "M": 1048576, "m": 1048576}
lo = int(bucket_m.group(1)) * _unit_mult.get(bucket_m.group(2) or "", 1)
hi = int(bucket_m.group(3)) * _unit_mult.get(bucket_m.group(4) or "", 1)
count = int(bucket_m.group(5))
current_buckets.append({"lo": lo, "hi": hi, "count": count})
if current_label and current_buckets:
histograms.append({"label": current_label, "buckets": current_buckets})
return histograms
def summarize_histogram(buckets: list[dict]) -> dict:
"""Compute basic stats (p50, p99, total, max_bucket) from histogram buckets."""
total = sum(b["count"] for b in buckets)
if total == 0:
return {"total": 0, "p50": 0, "p99": 0, "max_bucket": 0}
cumulative = 0
p50 = p99 = 0
max_bucket = 0
for b in buckets:
cumulative += b["count"]
mid = (b["lo"] + b["hi"]) / 2
if cumulative >= total * 0.50 and p50 == 0:
p50 = mid
if cumulative >= total * 0.99 and p99 == 0:
p99 = mid
if b["count"] > 0:
max_bucket = b["hi"]
return {"total": total, "p50": p50, "p99": p99, "max_bucket": max_bucket}
# ─── timeseries parsing ────────────────────────────────────────────────────
def parse_timeseries(path: Path, key: str) -> list[tuple[str, float]]:
"""Parse lines like '2025-01-01T00:00:00+00:00 key=value'."""
if not path.exists():
return []
points = []
for line in path.read_text().strip().split("\n"):
m = re.match(rf"(\S+)\s+{re.escape(key)}=(\d+)", line)
if m:
points.append((m.group(1), float(m.group(2))))
return points
# ─── nsys SQLite queries ───────────────────────────────────────────────────
def parse_nvtx_stages(
obs_dir: Path,
) -> Optional[list[dict]]:
"""Parse NVTX_EVENTS from nsys SQLite, return list of stage dicts.
Each dict has keys: name, count, avg_us, min_us, max_us.
"""
sqlite_path = obs_dir / "nsys" / "frontend.sqlite"
if not sqlite_path.exists():
return None
try:
conn = sqlite3.connect(str(sqlite_path))
tables = [
r[0]
for r in conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()
]
if "NVTX_EVENTS" not in tables:
conn.close()
return None
rows = conn.execute(
"""
SELECT text, COUNT(*) as cnt,
AVG(end - start) as avg_ns,
MIN(end - start) as min_ns,
MAX(end - start) as max_ns
FROM NVTX_EVENTS
WHERE text IS NOT NULL AND end > start
GROUP BY text
ORDER BY avg_ns DESC
"""
).fetchall()
conn.close()
if not rows:
return None
return [
{
"name": text or "?",
"count": cnt,
"avg_us": avg_ns / 1000,
"min_us": min_ns / 1000,
"max_us": max_ns / 1000,
}
for text, cnt, avg_ns, min_ns, max_ns in rows
]
except sqlite3.Error as e:
logger.debug("parse_nvtx_stages: sqlite error: %s", e)
return None
def parse_syscall_profile(
obs_dir: Path,
) -> Optional[list[dict]]:
"""Parse OSRT_API from nsys SQLite (OS runtime API calls).
Each dict has keys: name, count, avg_us, total_ms.
"""
sqlite_path = obs_dir / "nsys" / "frontend.sqlite"
if not sqlite_path.exists():
return None
try:
conn = sqlite3.connect(str(sqlite_path))
tables = [
r[0]
for r in conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()
]
if "OSRT_API" not in tables:
conn.close()
return None
rows = conn.execute(
"""
SELECT nameId, COUNT(*) as cnt,
AVG(end - start) as avg_ns,
SUM(end - start) as total_ns
FROM OSRT_API
WHERE end > start
GROUP BY nameId
ORDER BY total_ns DESC
LIMIT 20
"""
).fetchall()
# Try to resolve names from StringIds table
name_map = {}
if "StringIds" in tables:
for row in conn.execute("SELECT id, value FROM StringIds").fetchall():
name_map[row[0]] = row[1]
conn.close()
if not rows:
return None
return [
{
"name": name_map.get(name_id, f"id={name_id}"),
"count": cnt,
"avg_us": avg_ns / 1000,
"total_ms": total_ns / 1e6,
}
for name_id, cnt, avg_ns, total_ns in rows
]
except sqlite3.Error as e:
logger.debug("parse_syscall_profile: sqlite error: %s", e)
return None
def parse_nsys_context_switches(
obs_dir: Path,
) -> Optional[dict]:
"""Parse SCHED_EVENTS from nsys SQLite.
Returns dict with keys: total, avg_duration.
"""
sqlite_path = obs_dir / "nsys" / "frontend.sqlite"
if not sqlite_path.exists():
return None
try:
conn = sqlite3.connect(str(sqlite_path))
tables = [
r[0]
for r in conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()
]
if "SCHED_EVENTS" not in tables:
conn.close()
return None
row = conn.execute(
"""
SELECT COUNT(*) as total,
AVG(endGlobalTid - startGlobalTid) as avg_duration
FROM SCHED_EVENTS
"""
).fetchone()
conn.close()
if not row or row[0] == 0:
return None
return {"total": row[0], "avg_duration": row[1]}
except sqlite3.Error as e:
logger.debug("parse_nsys_context_switches: sqlite error: %s", e)
return None
# ─── Directory utilities ───────────────────────────────────────────────────
def find_latest_obs_dir(repo_root: Path) -> Optional[Path]:
"""Find the most recent artifacts/obs_* directory."""
artifacts = repo_root / "artifacts"
if not artifacts.exists():
return None
dirs = sorted(artifacts.glob("obs_*"), reverse=True)
return dirs[0] if dirs else None
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Unified observability capture — starts services, collects ALL observability
# data (nsys, perf, BPF, system stats, Prometheus), runs aiperf load, then
# shuts down and exports results.
#
# Usage:
# ./run_perf.sh --model Qwen/Qwen3-0.6B --concurrency 64 --num-requests 4096
# ./run_perf.sh --skip-bpf --skip-nsys # opt-out of heavy tools
# sudo ./run_perf.sh --skip-nsys --skip-perf \ # run bpf tracing as root
# --model Qwen/Qwen3-0.6B --concurrency 64 --num-requests 4096
#
# Output: artifacts/obs_YYYYMMDD_HHMMSS/ with subdirs for each data source.
#
# Prerequisites:
# - dynamo.mocker and dynamo.frontend installed
# - aiperf installed
# - Optional: nsys, perf, bpftrace, flamegraph tools (auto-detected)
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
REPO_ROOT="${SCRIPT_DIR}/../../.."
# Raise file descriptor limit — high concurrency runs open many sockets
# (aiperf connections + frontend + mocker workers + /proc polling + captures).
# The default 1024 is insufficient at concurrency >= 100.
ulimit -n 65536 2>/dev/null || ulimit -n 8192 2>/dev/null || true
# When running under sudo, preserve the invoking user's environment:
# - HF cache (so models downloaded as the regular user are visible)
# - PATH (so aiperf, python, etc. are found)
if [[ -n "${SUDO_USER:-}" ]]; then
REAL_HOME=$(getent passwd "$SUDO_USER" | cut -d: -f6)
export HF_HOME="${HF_HOME:-${REAL_HOME}/.cache/huggingface}"
# Add common user binary paths that sudo strips
for p in "${REAL_HOME}/.local/bin" "${REPO_ROOT}/dynamo/bin"; do
[[ -d "$p" ]] && [[ ":$PATH:" != *":$p:"* ]] && export PATH="$p:$PATH"
done
fi
# ─── Defaults ───────────────────────────────────────────────────────────────
MODEL="${MODEL:-nvidia/Llama-3.1-8B-Instruct-FP8}"
MODEL_NAME=""
NUM_WORKERS="${NUM_WORKERS:-2}"
SPEEDUP_RATIO="${SPEEDUP_RATIO:-1.0}"
CONCURRENCY="${CONCURRENCY:-64}"
NUM_REQUESTS="${NUM_REQUESTS:-}"
ISL="${ISL:-1024}"
OSL="${OSL:-256}"
REQUEST_PLANE="${REQUEST_PLANE:-tcp}"
EVENT_PLANE="${EVENT_PLANE:-nats}"
CAPTURE_DURATION="${CAPTURE_DURATION:-60}"
OUTPUT_DIR=""
DATA_PARALLEL_SIZE="${DATA_PARALLEL_SIZE:-1}"
FRONTEND_PORT="${FRONTEND_PORT:-8000}"
PLANNER_PROFILE="${PLANNER_PROFILE:-}"
TOKENIZER_BACKEND="${TOKENIZER_BACKEND:-}"
NUM_MODELS="${NUM_MODELS:-1}" # Number of model instances (each gets NUM_WORKERS workers)
AIPERF_TARGETS="${AIPERF_TARGETS:-first}" # "first" = model-1 only, "all" = one aiperf run per model
BENCHMARK_DURATION="${BENCHMARK_DURATION:-}" # aiperf --benchmark-duration (seconds)
REQUEST_RATE="${REQUEST_RATE:-}" # aiperf --request-rate (requests/sec)
WARMUP_DURATION="${WARMUP_DURATION:-}" # aiperf --warmup-duration (seconds)
WARMUP_COUNT="${WARMUP_COUNT:-}" # aiperf --warmup-request-count
# Opt-out flags
SKIP_BPF=false
SKIP_NSYS=false
SKIP_FLAMEGRAPH=false
SKIP_PERF=false
# Optional captures
ENABLE_TCPDUMP=false
TCPDUMP_PORT=""
# Custom tool paths
NSYS_PATH=""
# ─── Argument parsing ───────────────────────────────────────────────────────
while [[ $# -gt 0 ]]; do
case $1 in
--model) MODEL="$2"; shift 2 ;;
--model-name) MODEL_NAME="$2"; shift 2 ;;
--workers) NUM_WORKERS="$2"; shift 2 ;;
--speedup-ratio) SPEEDUP_RATIO="$2"; shift 2 ;;
--concurrency) CONCURRENCY="$2"; shift 2 ;;
--num-requests) NUM_REQUESTS="$2"; shift 2 ;;
--isl) ISL="$2"; shift 2 ;;
--osl) OSL="$2"; shift 2 ;;
--request-plane) REQUEST_PLANE="$2"; shift 2 ;;
--event-plane) EVENT_PLANE="$2"; shift 2 ;;
--capture-duration) CAPTURE_DURATION="$2"; shift 2 ;;
--output-dir) OUTPUT_DIR="$2"; shift 2 ;;
--data-parallel-size) DATA_PARALLEL_SIZE="$2"; shift 2 ;;
--frontend-port) FRONTEND_PORT="$2"; shift 2 ;;
--planner-profile) PLANNER_PROFILE="$2"; shift 2 ;;
--tokenizer-backend) TOKENIZER_BACKEND="$2"; shift 2 ;;
--fast-tokens) TOKENIZER_BACKEND="fast"; shift ;;
--num-models) NUM_MODELS="$2"; shift 2 ;;
--aiperf-targets) AIPERF_TARGETS="$2"; shift 2 ;;
--benchmark-duration) BENCHMARK_DURATION="$2"; shift 2 ;;
--request-rate) REQUEST_RATE="$2"; shift 2 ;;
--warmup-duration) WARMUP_DURATION="$2"; shift 2 ;;
--warmup-count) WARMUP_COUNT="$2"; shift 2 ;;
--skip-bpf) SKIP_BPF=true; shift ;;
--skip-nsys) SKIP_NSYS=true; shift ;;
--skip-flamegraph) SKIP_FLAMEGRAPH=true; shift ;;
--skip-perf) SKIP_PERF=true; shift ;;
--tcpdump) ENABLE_TCPDUMP=true; shift ;;
--tcpdump-port) TCPDUMP_PORT="$2"; shift 2 ;;
--nsys-path) NSYS_PATH="$2"; shift 2 ;;
-h|--help)
cat <<'USAGE'
Usage: run_perf.sh [OPTIONS]
Starts mocker + frontend, runs parallel observability capture, then aiperf load.
Service Options:
--model PATH Model path (default: nvidia/Llama-3.1-8B-Instruct-FP8)
--model-name NAME Served model name (default: same as --model)
--workers N Number of mocker workers (default: 2)
--speedup-ratio RATIO Mocker speedup ratio (default: 1.0; 0 = infinite)
--data-parallel-size N Mocker DP workers (default: 1)
--request-plane PLANE nats|http|tcp (default: tcp)
--event-plane PLANE nats|zmq (default: nats)
--frontend-port PORT Frontend HTTP port (default: 8000)
--planner-profile PATH Planner profile data path
--tokenizer-backend NAME Tokenizer backend: "fast" or "hf" (default: unset, uses HF)
--fast-tokens Shorthand for --tokenizer-backend fast
--num-models N Number of model instances (default: 1). Each gets --workers workers
with names model-1, model-2, ...
--aiperf-targets MODE "first" (default): aiperf targets model-1 only.
"all": run aiperf sequentially for each model.
--benchmark-duration N aiperf run duration in seconds (default: use --num-requests)
--request-rate N Target requests per second (aiperf --request-rate)
--warmup-duration N aiperf warmup phase duration in seconds
--warmup-count N aiperf warmup request count (default: concurrency)
Load Options:
--concurrency N aiperf concurrency (default: 64)
--num-requests N Total requests (default: 640)
--isl N Input sequence length (default: 1024)
--osl N Output sequence length (default: 256)
Capture Options:
--capture-duration N Duration for parallel captures in seconds (default: 60)
--output-dir DIR Output directory (default: auto timestamped)
--skip-bpf Skip BPF tracing
--skip-nsys Skip Nsight Systems profiling
--skip-flamegraph Skip flamegraph generation
--skip-perf Skip perf record/stat
--tcpdump Enable packet capture via tcpdump
--tcpdump-port PORT Port filter for tcpdump (default: --frontend-port)
--nsys-path PATH Path to nsys binary (default: auto-detected from PATH)
USAGE
exit 0
;;
*) echo "Unknown option: $1"; exit 1 ;;
esac
done
# Default model-name to model if not set
[[ -z "$MODEL_NAME" ]] && MODEL_NAME="$MODEL"
# For multi-model: build array of model names.
# Each model group registers under its own served name (model-1, model-2, ...)
# but uses the same --model-path for weights. The mocker publishes source_path
# (the real HF model ID) in the ModelDeploymentCard so the frontend can resolve
# tokenizer/config even when the served name differs.
MODEL_NAMES=()
if [[ "$NUM_MODELS" -le 1 ]]; then
MODEL_NAMES=("$MODEL_NAME")
else
for m in $(seq 1 "$NUM_MODELS"); do
MODEL_NAMES+=("model-${m}")
done
fi
# Default tcpdump port to frontend port
[[ -z "$TCPDUMP_PORT" ]] && TCPDUMP_PORT="$FRONTEND_PORT"
# Auto-sync capture-duration to cover the full benchmark window.
# If benchmark-duration is set and capture-duration wasn't explicitly overridden,
# extend captures to benchmark-duration + warmup headroom + buffer.
if [[ -n "$BENCHMARK_DURATION" ]] && [[ "$CAPTURE_DURATION" -eq 60 ]]; then
_WARMUP_HEADROOM=${WARMUP_DURATION:-10}
CAPTURE_DURATION=$(( BENCHMARK_DURATION + _WARMUP_HEADROOM + 5 ))
echo " Auto-adjusted capture-duration to ${CAPTURE_DURATION}s (benchmark=${BENCHMARK_DURATION}s + warmup + 5s buffer)"
fi
# Resolve nsys binary path
if [[ -n "$NSYS_PATH" ]]; then
if [[ ! -x "$NSYS_PATH" ]]; then
echo "ERROR: --nsys-path '$NSYS_PATH' is not executable"; exit 1
fi
NSYS_CMD="$NSYS_PATH"
else
NSYS_CMD="nsys"
fi
# ─── Output directory ────────────────────────────────────────────────────────
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
if [[ -z "$OUTPUT_DIR" ]]; then
OUTPUT_DIR="${REPO_ROOT}/artifacts/obs_${TIMESTAMP}"
fi
mkdir -p "$OUTPUT_DIR"/{aiperf,nsys,perf,bpf,system,prometheus,logs}
echo "╔══════════════════════════════════════════════════════════════╗"
echo "║ Unified Observability Capture ║"
echo "╚══════════════════════════════════════════════════════════════╝"
echo ""
echo "Output: $OUTPUT_DIR"
echo "Tokenizer: ${TOKENIZER_BACKEND:-hf (default)}"
echo ""
# ─── Pre-flight: detect available tools ──────────────────────────────────────
echo "--- Pre-flight checks ---"
HAS_NSYS=false
HAS_PERF=false
HAS_BPF=false
HAS_FLAMEGRAPH_RENDERER=false
if command -v "$NSYS_CMD" &>/dev/null && [[ "$SKIP_NSYS" == false ]]; then
HAS_NSYS=true
echo " nsys: $("$NSYS_CMD" --version 2>/dev/null | head -1)"
else
echo " nsys: SKIP"
fi
if command -v perf &>/dev/null && [[ "$SKIP_PERF" == false ]]; then
HAS_PERF=true
echo " perf: $(perf version 2>/dev/null | head -1)"
else
echo " perf: SKIP"
fi
if command -v bpftrace &>/dev/null && [[ "$SKIP_BPF" == false ]]; then
# BPF needs root or both CAP_BPF and CAP_PERFMON
if [[ $(id -u) -eq 0 ]]; then
HAS_BPF=true
echo " bpftrace: available (root)"
elif command -v capsh &>/dev/null \
&& capsh --print 2>/dev/null | grep '^Current:' | grep -q cap_bpf \
&& capsh --print 2>/dev/null | grep '^Current:' | grep -q cap_perfmon; then
HAS_BPF=true
echo " bpftrace: available (CAP_BPF + CAP_PERFMON)"
else
echo " bpftrace: SKIP (needs root or CAP_BPF + CAP_PERFMON)"
fi
else
echo " bpftrace: SKIP"
fi
if command -v flamegraph.pl &>/dev/null || command -v inferno-flamegraph &>/dev/null; then
HAS_FLAMEGRAPH_RENDERER=true
echo " flamegraph: available"
else
echo " flamegraph: SKIP (install inferno: cargo install inferno)"
fi
if ! command -v jq &>/dev/null; then
echo "ERROR: jq not found. Install: apt-get install -y jq (or brew install jq)"
exit 1
fi
echo " jq: available"
if ! command -v aiperf &>/dev/null && ! python3 -c "import aiperf" 2>/dev/null; then
echo "ERROR: aiperf not found. Install: pip install git+https://github.com/ai-dynamo/aiperf.git"
exit 1
fi
echo " aiperf: available"
echo ""
# ─── Tracked PIDs for cleanup ───────────────────────────────────────────────
ALL_PIDS=() # everything we need to kill on exit
CAPTURE_PIDS=() # capture processes we wait for
ETCD_PID="" # etcd PID if we started it
ETCD_DATA_DIR=""
NATS_PID="" # nats-server PID if we started it
cleanup() {
echo ""
echo "--- Cleaning up ---"
# Stop capture processes with SIGINT first (bpftrace needs INT to flush maps)
for pid in "${CAPTURE_PIDS[@]}"; do
kill -INT "$pid" 2>/dev/null || true
done
# Give bpftrace a moment to flush aggregation output
sleep 2
# SIGTERM any captures still running
for pid in "${CAPTURE_PIDS[@]}"; do
kill "$pid" 2>/dev/null || true
done
# Then service processes
for pid in "${ALL_PIDS[@]}"; do
kill "$pid" 2>/dev/null || true
done
# Give processes a moment to exit, then force kill
sleep 1
for pid in "${ALL_PIDS[@]}" "${CAPTURE_PIDS[@]}"; do
kill -9 "$pid" 2>/dev/null || true
done
# Stop infrastructure daemons if we started them
if [[ -n "$ETCD_PID" ]]; then
kill "$ETCD_PID" 2>/dev/null || true
sleep 1
kill -9 "$ETCD_PID" 2>/dev/null || true
[[ -n "$ETCD_DATA_DIR" ]] && rm -rf "$ETCD_DATA_DIR"
fi
if [[ -n "$NATS_PID" ]]; then
kill "$NATS_PID" 2>/dev/null || true
sleep 1
kill -9 "$NATS_PID" 2>/dev/null || true
fi
wait 2>/dev/null || true
# Wait for the frontend port to be fully released (TIME_WAIT, etc.)
_port_wait=0
while ss -tlnp 2>/dev/null | grep -q ":${FRONTEND_PORT} "; do
sleep 1
_port_wait=$((_port_wait + 1))
if [[ $_port_wait -ge 15 ]]; then
break
fi
done
echo "Cleanup complete."
}
trap cleanup EXIT INT TERM
# ─── Step 0: Ensure etcd + NATS are running ──────────────────────────────────
echo "--- Checking infrastructure (etcd + NATS) ---"
# etcd
if ! curl -sf http://localhost:2379/health >/dev/null 2>&1; then
if command -v etcd &>/dev/null; then
echo " etcd not running — starting it..."
ETCD_DATA_DIR=$(mktemp -d)
etcd --data-dir="$ETCD_DATA_DIR" \
--listen-client-urls=http://localhost:2379 \
--advertise-client-urls=http://localhost:2379 \
--listen-peer-urls=http://localhost:2380 \
--initial-advertise-peer-urls=http://localhost:2380 \
--initial-cluster=default=http://localhost:2380 \
> /dev/null 2>&1 &
ETCD_PID=$!
for i in $(seq 1 30); do
if curl -sf http://localhost:2379/health >/dev/null 2>&1; then
echo " etcd ready (PID $ETCD_PID)"
break
fi
sleep 1
if [[ $i -eq 30 ]]; then
echo "ERROR: etcd failed to start after 30s"; exit 1
fi
done
else
echo "ERROR: etcd not running and not found in PATH."; exit 1
fi
else
echo " etcd already running"
fi
# NATS
if ! nc -z localhost 4222 2>/dev/null; then
if command -v nats-server &>/dev/null; then
echo " nats-server not running — starting it..."
nats-server > /dev/null 2>&1 &
NATS_PID=$!
for i in $(seq 1 30); do
if nc -z localhost 4222 2>/dev/null; then
echo " nats-server ready (PID $NATS_PID)"
break
fi
sleep 1
if [[ $i -eq 30 ]]; then
echo "ERROR: nats-server failed to start after 30s"; exit 1
fi
done
else
echo "ERROR: nats-server not running and not found in PATH."; exit 1
fi
else
echo " nats-server already running"
fi
echo ""
# ─── Step 1: Start mocker workers ───────────────────────────────────────────
TOTAL_WORKERS=$(( ${#MODEL_NAMES[@]} * NUM_WORKERS ))
echo "--- Starting $TOTAL_WORKERS mocker worker(s) (${#MODEL_NAMES[@]} model(s) x $NUM_WORKERS worker(s)) ---"
BASE_SYSTEM_PORT=8081
WORKER_IDX=0
for MN in "${MODEL_NAMES[@]}"; do
for i in $(seq 1 "$NUM_WORKERS"); do
WORKER_IDX=$((WORKER_IDX + 1))
WORKER_PORT=$((BASE_SYSTEM_PORT + WORKER_IDX - 1))
MOCKER_ARGS=(
--model-path "$MODEL"
--model-name "$MN"
--speedup-ratio "$SPEEDUP_RATIO"
--request-plane "$REQUEST_PLANE"
)
if [[ "$DATA_PARALLEL_SIZE" -gt 1 ]]; then
MOCKER_ARGS+=(--data-parallel-size "$DATA_PARALLEL_SIZE")
fi
if [[ -n "$PLANNER_PROFILE" ]]; then
MOCKER_ARGS+=(--planner-profile-data "$PLANNER_PROFILE")
fi
MN_SAFE="${MN//\//_}"
HF_HUB_OFFLINE=1 DYN_SYSTEM_PORT=$WORKER_PORT DYN_EVENT_PLANE="$EVENT_PLANE" python -m dynamo.mocker "${MOCKER_ARGS[@]}" \
> "$OUTPUT_DIR/logs/mocker_${MN_SAFE}_${i}.log" 2>&1 &
ALL_PIDS+=($!)
echo " Worker $WORKER_IDX ($MN #$i): PID ${ALL_PIDS[-1]}, port $WORKER_PORT"
done
done
# Update NUM_WORKERS to total for Prometheus scraping later
NUM_WORKERS_TOTAL=$WORKER_IDX
# ─── Step 2: Start frontend (optionally under nsys) ─────────────────────────
echo ""
echo "--- Starting frontend ---"
FRONTEND_ENV=(
HF_HUB_OFFLINE=1
DYN_HTTP_PORT="$FRONTEND_PORT"
DYN_PERF_DIAG=1
DYN_ENABLE_NVTX=1
DYN_REQUEST_PLANE="$REQUEST_PLANE"
DYN_EVENT_PLANE="$EVENT_PLANE"
)
if [[ -n "$TOKENIZER_BACKEND" ]]; then
# Map human-readable CLI values to DYN_TOKENIZER env var values
# (Rust model_card.rs reads DYN_TOKENIZER, expects "fastokens" or "default")
case "$TOKENIZER_BACKEND" in
fast|fastokens) _DYN_TOK_VAL="fastokens" ;;
hf|default|"") _DYN_TOK_VAL="default" ;;
*) _DYN_TOK_VAL="$TOKENIZER_BACKEND" ;;
esac
FRONTEND_ENV+=(DYN_TOKENIZER="$_DYN_TOK_VAL")
echo " Tokenizer backend: $TOKENIZER_BACKEND (DYN_TOKENIZER=$_DYN_TOK_VAL)"
fi
# Enable Rust NVTX annotations when nsys profiling is active.
# The Rust NVTX subsystem (lib/runtime/src/nvtx.rs) requires both the
# compile-time "nvtx" feature AND this runtime env var. We only set it
# when nsys is active to avoid the ~50ns/annotation overhead during
# clean throughput runs.
if [[ "$HAS_NSYS" == true ]]; then
FRONTEND_ENV+=(DYN_ENABLE_RUST_NVTX=1)
echo " Rust NVTX annotations: enabled"
fi
if [[ "$HAS_NSYS" == true ]]; then
echo " (under nsys profiling)"
env "${FRONTEND_ENV[@]}" \
"$NSYS_CMD" profile \
--trace=osrt,nvtx \
--sample=cpu \
--cpuctxsw=none \
--output="${OUTPUT_DIR}/nsys/frontend" \
--force-overwrite=true \
python -m dynamo.frontend \
> "$OUTPUT_DIR/logs/frontend.log" 2>&1 &
NSYS_WRAPPER_PID=$!
ALL_PIDS+=($NSYS_WRAPPER_PID)
# Resolve the actual python child PID for perf stat / proc polling.
# nsys spawns python as a child; we need the real PID for --pid attachments.
FRONTEND_PID=""
for _try in $(seq 1 30); do
sleep 1
_child=$(pgrep -P "$NSYS_WRAPPER_PID" -f "python.*dynamo.frontend" 2>/dev/null | head -1 || true)
if [[ -n "$_child" ]]; then
FRONTEND_PID="$_child"
break
fi
done
if [[ -z "$FRONTEND_PID" ]]; then
echo " WARNING: could not resolve python child PID under nsys; using nsys wrapper PID"
echo " (perf stat and /proc polling will attach to nsys, not the frontend process)"
FRONTEND_PID="$NSYS_WRAPPER_PID"
fi
echo " nsys wrapper PID: $NSYS_WRAPPER_PID"
echo " Frontend PID: $FRONTEND_PID"
else
env "${FRONTEND_ENV[@]}" python -m dynamo.frontend \
> "$OUTPUT_DIR/logs/frontend.log" 2>&1 &
FRONTEND_PID=$!
ALL_PIDS+=($FRONTEND_PID)
echo " Frontend PID: $FRONTEND_PID"
fi
# ─── Step 3: Wait for readiness ─────────────────────────────────────────────
echo ""
echo "Waiting for ${#MODEL_NAMES[@]} model(s) to be ready: ${MODEL_NAMES[*]}..."
MAX_WAIT=180
WAITED=0
_all_models_ready=false
while [[ "$_all_models_ready" == false ]]; do
_all_models_ready=true
for _mn in "${MODEL_NAMES[@]}"; do
if ! curl -s --max-time 5 "http://127.0.0.1:$FRONTEND_PORT/v1/models" 2>/dev/null | \
jq -e --arg model "$_mn" '.data[]? | select(.id == $model)' >/dev/null 2>&1; then
_all_models_ready=false
break
fi
done
if [[ "$_all_models_ready" == true ]]; then
break
fi
sleep 2
WAITED=$((WAITED + 2))
if [[ $WAITED -ge $MAX_WAIT ]]; then
echo "ERROR: Models not ready after ${MAX_WAIT}s."
echo "Last 20 lines of frontend log:"
tail -20 "$OUTPUT_DIR/logs/frontend.log" 2>/dev/null || true
exit 1
fi
if ! kill -0 "$FRONTEND_PID" 2>/dev/null; then
echo "ERROR: Frontend process died."
echo "Last 20 lines of frontend log:"
tail -20 "$OUTPUT_DIR/logs/frontend.log" 2>/dev/null || true
exit 1
fi
done
echo "All ${#MODEL_NAMES[@]} model(s) ready (waited ${WAITED}s)"
# Capture initial Prometheus snapshot (baseline for histogram delta analysis)
echo " Capturing initial Prometheus snapshot..."
{
curl -s --max-time 5 "http://127.0.0.1:$FRONTEND_PORT/metrics" 2>/dev/null || true
for wi in $(seq 1 "$NUM_WORKERS_TOTAL"); do
WPORT=$((BASE_SYSTEM_PORT + wi - 1))
curl -s --max-time 5 "http://127.0.0.1:$WPORT/metrics" 2>/dev/null || true
done
} > "$OUTPUT_DIR/prometheus/initial_snapshot.txt"
# ─── Step 4: Start parallel captures ────────────────────────────────────────
echo ""
echo "--- Starting parallel captures (${CAPTURE_DURATION}s) ---"
# 4a. perf stat — with NMI watchdog fallback
if [[ "$HAS_PERF" == true ]]; then
NMI_WATCHDOG=$(cat /proc/sys/kernel/nmi_watchdog 2>/dev/null || echo "0")
if [[ "$NMI_WATCHDOG" == "1" ]]; then
# NMI watchdog holds a HW PMU counter, causing <not counted> for
# most hardware events. Fall back to software counters + userspace-
# qualified cycles/instructions which work on the remaining PMUs.
echo " [perf stat] pid=$FRONTEND_PID (NMI watchdog active — using software + :u counters)"
echo " TIP: disable watchdog for full HW counters: sudo sysctl kernel.nmi_watchdog=0"
perf stat --pid "$FRONTEND_PID" \
-e task-clock,context-switches,cpu-migrations,page-faults,cycles:u,instructions:u \
-o "$OUTPUT_DIR/perf/perf_stat.txt" \
-- sleep "$CAPTURE_DURATION" &
CAPTURE_PIDS+=($!)
else
echo " [perf stat] pid=$FRONTEND_PID"
perf stat --pid "$FRONTEND_PID" \
-o "$OUTPUT_DIR/perf/perf_stat.txt" \
-- sleep "$CAPTURE_DURATION" &
CAPTURE_PIDS+=($!)
fi
fi
# 4b. BPF scripts (if available) — delegate to bpf/run.sh
# run.sh --batch waits for its children internally and forwards TERM→INT
# so bpftrace flushes aggregation maps before exiting.
if [[ "$HAS_BPF" == true ]]; then
"${SCRIPT_DIR}/bpf/run.sh" --batch \
--pid "$FRONTEND_PID" \
--output-dir "$OUTPUT_DIR/bpf" \
--duration "$CAPTURE_DURATION" &
CAPTURE_PIDS+=($!)
fi
# 4e. Flamegraph captures — delegate to flamegraph/ scripts
if [[ "$SKIP_FLAMEGRAPH" == false ]]; then
if command -v perf &>/dev/null || command -v samply &>/dev/null; then
echo " [flamegraph] CPU, pid=$FRONTEND_PID"
"${SCRIPT_DIR}/flamegraph/cpu_flamegraph.sh" \
--pid "$FRONTEND_PID" \
--duration "$CAPTURE_DURATION" \
--output-dir "$OUTPUT_DIR/perf" \
--output cpu_flamegraph &
CAPTURE_PIDS+=($!)
else
echo " [flamegraph] SKIP CPU — install perf or samply:"
echo " apt install linux-tools-\$(uname -r) OR cargo install samply"
fi
if [[ "$HAS_BPF" == true ]]; then
echo " [flamegraph] Off-CPU, pid=$FRONTEND_PID"
"${SCRIPT_DIR}/flamegraph/offcpu_flamegraph.sh" \
--pid "$FRONTEND_PID" \
--duration "$CAPTURE_DURATION" \
--output-dir "$OUTPUT_DIR/perf" \
--output offcpu_flamegraph &
CAPTURE_PIDS+=($!)
else
echo " [flamegraph] SKIP Off-CPU — needs bpftrace with root or CAP_BPF"
fi
fi
# 4c. System capture (/proc polling, thread/fd counts, socket stats)
echo " [system] /proc stats, thread/fd count, ss"
(
INTERVAL=1
for _i in $(seq 1 "$CAPTURE_DURATION"); do
TS=$(date -Iseconds)
# /proc status
echo "--- $TS ---" >> "$OUTPUT_DIR/system/proc_status.txt"
cat "/proc/$FRONTEND_PID/status" >> "$OUTPUT_DIR/system/proc_status.txt" 2>/dev/null || true
# /proc stat — raw scheduler/CPU time info
echo "--- $TS ---" >> "$OUTPUT_DIR/system/proc_stat.txt"
cat "/proc/$FRONTEND_PID/stat" >> "$OUTPUT_DIR/system/proc_stat.txt" 2>/dev/null || true
# /proc statm — page-level memory info
echo "--- $TS ---" >> "$OUTPUT_DIR/system/proc_statm.txt"
cat "/proc/$FRONTEND_PID/statm" >> "$OUTPUT_DIR/system/proc_statm.txt" 2>/dev/null || true
# Thread count
THREADS=$(ls -1 "/proc/$FRONTEND_PID/task/" 2>/dev/null | wc -l)
echo "$TS threads=$THREADS" >> "$OUTPUT_DIR/system/thread_count.txt"
# FD count
FDS=$(ls -1 "/proc/$FRONTEND_PID/fd/" 2>/dev/null | wc -l)
echo "$TS fds=$FDS" >> "$OUTPUT_DIR/system/fd_count.txt"
# Socket stats
echo "--- $TS ---" >> "$OUTPUT_DIR/system/ss_stats.txt"
ss -tin >> "$OUTPUT_DIR/system/ss_stats.txt" 2>/dev/null || true
sleep "$INTERVAL"
done
) &
CAPTURE_PIDS+=($!)
# 4c-2. tcpdump packet capture (optional)
if [[ "$ENABLE_TCPDUMP" == true ]]; then
if command -v tcpdump &>/dev/null; then
echo " [tcpdump] port=$TCPDUMP_PORT"
timeout "$CAPTURE_DURATION" tcpdump -i any \
-w "$OUTPUT_DIR/system/capture.pcap" \
"port $TCPDUMP_PORT" \
-s 96 -c 100000 &>/dev/null &
CAPTURE_PIDS+=($!)
else
echo " [tcpdump] SKIP — tcpdump not found (apt install tcpdump)"
fi
fi
# 4d. Periodic Prometheus /metrics scraping (frontend + all mocker workers)
echo " [prometheus] scraping every 1s"
(
for _i in $(seq 1 "$CAPTURE_DURATION"); do
METRICS=$(curl -s --max-time 3 "http://127.0.0.1:$FRONTEND_PORT/metrics" 2>/dev/null || echo "")
# Append mocker worker metrics (ports BASE_SYSTEM_PORT .. BASE_SYSTEM_PORT+NUM_WORKERS-1)
for wi in $(seq 1 "$NUM_WORKERS_TOTAL"); do
WPORT=$((BASE_SYSTEM_PORT + wi - 1))
WMETRICS=$(curl -s --max-time 3 "http://127.0.0.1:$WPORT/metrics" 2>/dev/null || echo "")
if [[ -n "$WMETRICS" ]]; then
METRICS="${METRICS}"$'\n'"${WMETRICS}"
fi
done
if [[ -n "$METRICS" ]]; then
# JSONL: one line per scrape with timestamp
TS=$(date -Iseconds)
printf '{"ts":"%s","metrics":%s}\n' "$TS" "$(echo "$METRICS" | python3 -c '
import sys, json
lines = sys.stdin.read().strip().split("\n")
out = {}
for line in lines:
if line.startswith("#") or not line.strip():
continue
parts = line.split()
if len(parts) >= 2:
out[parts[0]] = parts[1]
print(json.dumps(out))
' 2>/dev/null || echo '"{}"')" >> "$OUTPUT_DIR/prometheus/timeseries.jsonl"
fi
sleep 1
done
) &
CAPTURE_PIDS+=($!)
# ─── Step 5: Run aiperf load ────────────────────────────────────────────────
echo ""
echo "--- Running aiperf load ---"
echo " concurrency=$CONCURRENCY requests=${NUM_REQUESTS:-auto} isl=$ISL osl=$OSL"
[[ -n "$BENCHMARK_DURATION" ]] && echo " benchmark-duration=${BENCHMARK_DURATION}s"
[[ -n "$REQUEST_RATE" ]] && echo " request-rate=${REQUEST_RATE} req/s"
# Build load-control args: prefer --benchmark-duration (time-based) over
# --request-count (count-based) so each run gets a consistent measurement
# window regardless of throughput. This mirrors k8s_run_aiperf() in sweep.sh.
_LOAD_ARGS=()
_EFFECTIVE_REQUESTS="null"
if [[ -n "$BENCHMARK_DURATION" ]]; then
_LOAD_ARGS+=(--benchmark-duration "$BENCHMARK_DURATION")
fi
if [[ -n "$REQUEST_RATE" ]]; then
_LOAD_ARGS+=(--request-rate "$REQUEST_RATE")
fi
if [[ -n "$NUM_REQUESTS" ]]; then
_LOAD_ARGS+=(--request-count "$NUM_REQUESTS")
_EFFECTIVE_REQUESTS="$NUM_REQUESTS"
fi
# If neither was specified, default to concurrency * 20 (min 640)
if [[ ${#_LOAD_ARGS[@]} -eq 0 ]]; then
_AUTO=$(( CONCURRENCY * 20 ))
[[ "$_AUTO" -lt 640 ]] && _AUTO=640
_LOAD_ARGS+=(--request-count "$_AUTO")
_EFFECTIVE_REQUESTS="$_AUTO"
fi
_WARMUP_ARGS=()
if [[ -n "$WARMUP_DURATION" ]]; then
_WARMUP_ARGS+=(--warmup-duration "$WARMUP_DURATION")
elif [[ -n "$WARMUP_COUNT" ]]; then
_WARMUP_ARGS+=(--warmup-request-count "$WARMUP_COUNT")
else
_WARMUP_ARGS+=(--warmup-request-count "$CONCURRENCY")
fi
# Build the list of models to target
_AIPERF_MODELS=()
if [[ "$AIPERF_TARGETS" == "all" && ${#MODEL_NAMES[@]} -gt 1 ]]; then
_AIPERF_MODELS=("${MODEL_NAMES[@]}")
else
_AIPERF_MODELS=("${MODEL_NAMES[0]}")
fi
echo " aiperf targets: ${_AIPERF_MODELS[*]} (mode=$AIPERF_TARGETS)"
for _AIPERF_MODEL in "${_AIPERF_MODELS[@]}"; do
if [[ ${#_AIPERF_MODELS[@]} -gt 1 ]]; then
AIPERF_ARTIFACT_DIR="$OUTPUT_DIR/aiperf/${_AIPERF_MODEL}"
echo ""
echo " --- aiperf: model=${_AIPERF_MODEL} ---"
else
AIPERF_ARTIFACT_DIR="$OUTPUT_DIR/aiperf"
fi
mkdir -p "$AIPERF_ARTIFACT_DIR"
# When the served model name differs from the HF model path (multi-model),
# tell aiperf where to find the tokenizer.
_AIPERF_TOK_ARGS=()
if [[ "$_AIPERF_MODEL" != "$MODEL" ]]; then
_AIPERF_TOK_ARGS=(--tokenizer "$MODEL")
fi
HF_HUB_OFFLINE=1 aiperf profile --artifact-dir "$AIPERF_ARTIFACT_DIR" \
--model "$_AIPERF_MODEL" \
"${_AIPERF_TOK_ARGS[@]}" \
--endpoint-type chat \
--endpoint /v1/chat/completions \
--streaming \
--url "http://127.0.0.1:$FRONTEND_PORT" \
--synthetic-input-tokens-mean "$ISL" \
--synthetic-input-tokens-stddev 0 \
--output-tokens-mean "$OSL" \
--output-tokens-stddev 0 \
--extra-inputs max_tokens:"$OSL" \
--extra-inputs min_tokens:"$OSL" \
--extra-inputs ignore_eos:true \
--extra-inputs repetition_penalty:1.0 \
--extra-inputs temperature:0.0 \
--concurrency "$CONCURRENCY" \
"${_LOAD_ARGS[@]}" \
"${_WARMUP_ARGS[@]}" \
--num-dataset-entries 12800 \
--random-seed 100 \
--workers-max "$CONCURRENCY" \
--record-processors 32 \
--ui simple || echo "WARNING: aiperf failed for model ${_AIPERF_MODEL}"
done
# Check for server_metrics_export.json in the primary aiperf dir
_PRIMARY_AIPERF_DIR="$OUTPUT_DIR/aiperf"
[[ ${#_AIPERF_MODELS[@]} -gt 1 ]] && _PRIMARY_AIPERF_DIR="$OUTPUT_DIR/aiperf/${_AIPERF_MODELS[0]}"
if [[ -f "$_PRIMARY_AIPERF_DIR/server_metrics_export.json" ]]; then
echo " Found server_metrics_export.json"
fi
# ─── Step 6: Wait for captures to finish ─────────────────────────────────────
echo ""
echo "--- Waiting for captures to finish ---"
# Give captures CAPTURE_DURATION + 15s grace period, then force-kill stragglers.
# Profiler processes (perf record, flamegraph, bpftrace) can hang if the target
# PID becomes idle or exits, or if hardware counters can't be released cleanly.
_CAPTURE_DEADLINE=$(( CAPTURE_DURATION + 15 ))
_capture_waited=0
_all_done=false
while [[ "$_all_done" == false && $_capture_waited -lt $_CAPTURE_DEADLINE ]]; do
_all_done=true
for pid in "${CAPTURE_PIDS[@]}"; do
if kill -0 "$pid" 2>/dev/null; then
_all_done=false
break
fi
done
if [[ "$_all_done" == false ]]; then
sleep 1
_capture_waited=$((_capture_waited + 1))
fi
done
if [[ "$_all_done" == false ]]; then
echo " WARNING: captures still running after ${_CAPTURE_DEADLINE}s — sending SIGTERM"
for pid in "${CAPTURE_PIDS[@]}"; do
kill "$pid" 2>/dev/null || true
done
sleep 2
# Force kill any remaining
for pid in "${CAPTURE_PIDS[@]}"; do
kill -9 "$pid" 2>/dev/null || true
done
fi
# Reap all child statuses
for pid in "${CAPTURE_PIDS[@]}"; do
wait "$pid" 2>/dev/null || true
done
# Capture final Prometheus snapshot AFTER all aiperf requests have completed.
# This must happen after aiperf (Step 5) and after captures (Step 6) to ensure
# all request metrics are reflected in the snapshot.
echo " Capturing final Prometheus snapshot..."
{
curl -s --max-time 5 "http://127.0.0.1:$FRONTEND_PORT/metrics" 2>/dev/null || true
for wi in $(seq 1 "$NUM_WORKERS_TOTAL"); do
WPORT=$((BASE_SYSTEM_PORT + wi - 1))
curl -s --max-time 5 "http://127.0.0.1:$WPORT/metrics" 2>/dev/null || true
done
} > "$OUTPUT_DIR/prometheus/final_snapshot.txt"
# Save individual worker snapshots for debugging
for wi in $(seq 1 "$NUM_WORKERS_TOTAL"); do
WPORT=$((BASE_SYSTEM_PORT + wi - 1))
curl -s --max-time 5 "http://127.0.0.1:$WPORT/metrics" \
> "$OUTPUT_DIR/prometheus/mocker_${wi}_snapshot.txt" 2>/dev/null || true
done
# ─── Step 7: Stop frontend and post-process nsys ─────────────────────────────
echo ""
echo "--- Post-processing ---"
# 7a. Stop frontend so nsys can finalize the .nsys-rep file.
# When running under nsys, the .nsys-rep is only written after the wrapped
# process exits. We must stop the frontend before attempting the export.
if [[ "$HAS_NSYS" == true ]]; then
echo " Stopping frontend (PID $FRONTEND_PID) so nsys can finalize..."
kill -INT "$FRONTEND_PID" 2>/dev/null || true
# Wait for the nsys-wrapped frontend to exit and write the .nsys-rep
NSYS_WAIT=0
NSYS_WAIT_MAX=30
while kill -0 "$FRONTEND_PID" 2>/dev/null; do
sleep 1
NSYS_WAIT=$((NSYS_WAIT + 1))
if [[ $NSYS_WAIT -ge $NSYS_WAIT_MAX ]]; then
echo " WARNING: frontend did not exit after ${NSYS_WAIT_MAX}s, sending SIGTERM"
kill "$FRONTEND_PID" 2>/dev/null || true
sleep 2
break
fi
done
# Remove frontend from ALL_PIDS so cleanup() doesn't try to kill it again
NEW_ALL_PIDS=()
for pid in "${ALL_PIDS[@]}"; do
[[ "$pid" != "$FRONTEND_PID" ]] && NEW_ALL_PIDS+=("$pid")
done
ALL_PIDS=("${NEW_ALL_PIDS[@]}")
if [[ -f "$OUTPUT_DIR/nsys/frontend.nsys-rep" ]]; then
echo " Exporting nsys to SQLite..."
"$NSYS_CMD" export --type sqlite \
--output "$OUTPUT_DIR/nsys/frontend.sqlite" \
"$OUTPUT_DIR/nsys/frontend.nsys-rep" 2>/dev/null || \
echo " WARNING: nsys sqlite export failed"
if [[ -f "$OUTPUT_DIR/nsys/frontend.sqlite" ]]; then
echo " nsys SQLite: $OUTPUT_DIR/nsys/frontend.sqlite"
fi
else
echo " WARNING: $OUTPUT_DIR/nsys/frontend.nsys-rep not found after frontend exit"
fi
else
# No nsys — stop frontend normally (cleanup will handle it, but stop early
# to avoid holding ports during config save)
kill -INT "$FRONTEND_PID" 2>/dev/null || true
fi
# ─── Step 8: Save config ────────────────────────────────────────────────────
cat > "$OUTPUT_DIR/config.json" <<EOF
{
"timestamp": "$TIMESTAMP",
"model": "$MODEL",
"model_name": "$MODEL_NAME",
"model_names": $(printf '%s\n' "${MODEL_NAMES[@]}" | jq -R . | jq -s .),
"num_models": $NUM_MODELS,
"num_workers_per_model": $NUM_WORKERS,
"num_workers_total": $NUM_WORKERS_TOTAL,
"speedup_ratio": "$SPEEDUP_RATIO",
"data_parallel_size": $DATA_PARALLEL_SIZE,
"request_plane": "$REQUEST_PLANE",
"event_plane": "$EVENT_PLANE",
"frontend_port": $FRONTEND_PORT,
"tokenizer_backend": "${TOKENIZER_BACKEND:-hf}",
"concurrency": $CONCURRENCY,
"num_requests": ${_EFFECTIVE_REQUESTS:-null},
"benchmark_duration": ${BENCHMARK_DURATION:-null},
"request_rate": ${REQUEST_RATE:-null},
"isl": $ISL,
"osl": $OSL,
"capture_duration": $CAPTURE_DURATION,
"frontend_pid": $FRONTEND_PID,
"has_nsys": $HAS_NSYS,
"has_perf": $HAS_PERF,
"has_bpf": $HAS_BPF,
"skip_bpf": $SKIP_BPF,
"skip_nsys": $SKIP_NSYS,
"skip_flamegraph": $SKIP_FLAMEGRAPH,
"tcpdump": $ENABLE_TCPDUMP,
"tcpdump_port": $TCPDUMP_PORT,
"nsys_path": "${NSYS_PATH:-auto}"
}
EOF
# ─── Done ────────────────────────────────────────────────────────────────────
echo ""
echo "╔══════════════════════════════════════════════════════════════╗"
echo "║ Capture Complete ║"
echo "╚══════════════════════════════════════════════════════════════╝"
echo ""
echo "Output directory: $OUTPUT_DIR"
echo ""
echo "Contents:"
find "$OUTPUT_DIR" -type f | sort | while read -r f; do
SIZE=$(stat -c%s "$f" 2>/dev/null || echo "?")
REL="${f#$OUTPUT_DIR/}"
printf " %-50s %s bytes\n" "$REL" "$SIZE"
done
echo ""
echo "Run analysis:"
echo " python3 ${SCRIPT_DIR}/analysis/create_report.py analyze $OUTPUT_DIR"
#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Frontend performance sweep runner.
Standalone Python script that orchestrates performance sweeps by delegating
each run to run_perf.sh. Combines the sweep grid logic of sweep.sh with
the saturation analysis of tasks/sweep.py, and the Prometheus/report
integration of the analysis scripts.
Sweep dimensions (all configurable):
- tokenizers (hf, fastokens)
- concurrency levels
- ISL values
- worker counts
Backends:
- mocker (default): fast synthetic backend, no real inference
- vllm: real vLLM inference server (produces TTFT/ITL metrics)
Each (tokenizer, concurrency, ISL) point is a separate run_perf.sh invocation.
Results are collected into CSV + summary.md + per-run reports.
Usage:
# Smoke test (2 runs)
python3 sweep_runner.py --tokenizers hf,fastokens --concurrency 32 --isl 512 \\
--benchmark-duration 30 --speedup-ratio 0
# Full sweep with mocker
python3 sweep_runner.py --tokenizers hf,fastokens --concurrency 32,64 --isl 512,1024,2048
# vLLM backend (real inference)
python3 sweep_runner.py --backend vllm --tokenizers hf --concurrency 128 --isl 1024
# Transport saturation sweep (tasks/sweep.py style)
python3 sweep_runner.py --tokenizers hf --concurrency 4096 \\
--num-requests 16384,32768 --workers 1,2,4,8 --speedup-ratio 0
# Dry run
python3 sweep_runner.py --dry-run --tokenizers hf,fastokens --concurrency 32,64 --isl 512,1024
"""
import argparse
import csv
import json
import os
import signal
import subprocess
import sys
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
SCRIPT_DIR = Path(__file__).resolve().parent
REPO_ROOT = SCRIPT_DIR.parent.parent.parent
ANALYSIS_DIR = SCRIPT_DIR / "analysis"
# ── Defaults ─────────────────────────────────────────────────────────────────
DEFAULT_MODEL = "Qwen/Qwen3-0.6B"
DEFAULT_OSL = 256
DEFAULT_SPEEDUP = 1.0
DEFAULT_BENCHMARK_DURATION = 60
DEFAULT_MAX_CONSECUTIVE_FAILS = 2
DEFAULT_COOLDOWN = 3
TOKENIZER_MAP = {
"fast": "fastokens",
"fastokens": "fastokens",
"hf": "default",
"default": "default",
}
# ── Data ─────────────────────────────────────────────────────────────────────
@dataclass
class RunConfig:
"""Configuration for a single sweep point."""
backend: str # "mocker" or "vllm"
tokenizer: str # "hf" or "fastokens"
concurrency: int
isl: int
osl: int
workers: int
num_models: int
aiperf_targets: str # "first" or "all"
speedup_ratio: float
model: str
benchmark_duration: Optional[int]
num_requests: Optional[int]
request_rate: Optional[int]
@property
def run_id(self) -> str:
base = f"{self.tokenizer}_c{self.concurrency}_isl{self.isl}_w{self.workers}"
if self.num_models > 1:
base += f"_m{self.num_models}"
if self.request_rate:
base += f"_rps{self.request_rate}"
return base
@dataclass
class RunResult:
"""Result from a single sweep point."""
config: RunConfig
status: str = "pending" # ok, fail, skipped
req_per_sec: float = 0.0
output_tok_per_sec: float = 0.0
ttft_p50_ms: float = 0.0
ttft_p99_ms: float = 0.0
itl_p50_ms: float = 0.0
itl_p99_ms: float = 0.0
duration_sec: float = 0.0
run_dir: str = ""
# ── Helpers ──────────────────────────────────────────────────────────────────
def _kill_port(port: int):
"""Kill any process holding a port (SIGTERM first, then SIGKILL)."""
subprocess.run(
f"fuser -k -TERM {port}/tcp", shell=True, capture_output=True, timeout=5
)
time.sleep(2)
subprocess.run(
f"fuser -k -KILL {port}/tcp", shell=True, capture_output=True, timeout=5
)
def _port_free(port: int) -> bool:
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
return s.connect_ex(("127.0.0.1", port)) != 0
finally:
s.close()
def _wait_port_free(port: int, timeout: int = 30):
"""Wait for a port to become free."""
for i in range(timeout):
if _port_free(port):
return
if i == 0:
print(f" Waiting for port {port} to free...")
time.sleep(1)
print(f" Forcing port {port} release...")
_kill_port(port)
time.sleep(2)
def _parse_aiperf_json(json_path: Path) -> dict:
"""Parse aiperf profile_export_aiperf.json."""
if not json_path.exists():
return {}
try:
data = json.loads(json_path.read_text())
result = {}
# Request throughput
rt = data.get("request_throughput", {})
result["req_per_sec"] = rt.get("avg", 0)
# Output token throughput
ot = data.get("output_token_throughput", {})
result["output_tok_per_sec"] = ot.get("avg", 0)
# TTFT (aiperf exports in ms already)
ttft = data.get("time_to_first_token", data.get("ttft", {}))
if isinstance(ttft, dict):
result["ttft_p50_ms"] = ttft.get("p50", 0) or 0
result["ttft_p99_ms"] = ttft.get("p99", 0) or 0
# ITL
itl = data.get("inter_token_latency", data.get("itl", {}))
if isinstance(itl, dict):
result["itl_p50_ms"] = itl.get("p50", 0) or 0
result["itl_p99_ms"] = itl.get("p99", 0) or 0
# Duration (can be dict with .avg or raw float)
bd = data.get("benchmark_duration", 0)
result["duration_sec"] = bd.get("avg", 0) if isinstance(bd, dict) else (bd or 0)
return result
except (json.JSONDecodeError, KeyError, TypeError):
return {}
def _run_single(
cfg: RunConfig,
run_dir: Path,
passthrough_args: list[str],
) -> RunResult:
"""Execute a single run_perf.sh invocation."""
result = RunResult(config=cfg, run_dir=str(run_dir))
cmd = [
str(SCRIPT_DIR / "run_perf.sh"),
"--model",
cfg.model,
"--isl",
str(cfg.isl),
"--osl",
str(cfg.osl),
"--concurrency",
str(cfg.concurrency),
"--workers",
str(cfg.workers),
"--speedup-ratio",
str(cfg.speedup_ratio),
"--num-models",
str(cfg.num_models),
"--aiperf-targets",
cfg.aiperf_targets,
"--output-dir",
str(run_dir),
]
if cfg.benchmark_duration:
cmd.extend(["--benchmark-duration", str(cfg.benchmark_duration)])
if cfg.num_requests:
cmd.extend(["--num-requests", str(cfg.num_requests)])
if cfg.request_rate:
cmd.extend(["--request-rate", str(cfg.request_rate)])
if cfg.tokenizer in ("fast", "fastokens"):
cmd.append("--fast-tokens")
# TODO: when run_perf.sh gains --backend vllm support, pass it here
if cfg.backend == "vllm":
print(
" WARNING: vllm backend not yet supported by run_perf.sh; using mocker"
)
cmd.extend(passthrough_args)
print(f" cmd: {' '.join(cmd[:6])}...")
try:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
start_new_session=True,
)
stdout, _ = proc.communicate(timeout=600)
if proc.returncode == 0:
result.status = "ok"
else:
result.status = "fail"
print(f" run_perf.sh failed (rc={proc.returncode})")
# Print last few lines of output for debugging
lines = (stdout or "").strip().split("\n")
for line in lines[-5:]:
print(f" {line}")
except subprocess.TimeoutExpired:
result.status = "fail"
print(" TIMEOUT after 600s")
try:
pgid = os.getpgid(proc.pid)
os.killpg(pgid, signal.SIGTERM)
time.sleep(2)
os.killpg(pgid, signal.SIGKILL)
except ProcessLookupError:
pass # already exited
except Exception as e:
result.status = "fail"
print(f" ERROR: {e}")
# Parse aiperf results -- check both flat and multi-model layouts
aiperf_json = run_dir / "aiperf" / "profile_export_aiperf.json"
if not aiperf_json.exists():
# Multi-model: results are in aiperf/<model-name>/
for candidate in sorted(
(run_dir / "aiperf").glob("*/profile_export_aiperf.json")
):
aiperf_json = candidate
break # Use the first model's results for the summary row
metrics = _parse_aiperf_json(aiperf_json)
if metrics:
result.req_per_sec = metrics.get("req_per_sec", 0)
result.output_tok_per_sec = metrics.get("output_tok_per_sec", 0)
result.ttft_p50_ms = metrics.get("ttft_p50_ms", 0)
result.ttft_p99_ms = metrics.get("ttft_p99_ms", 0)
result.itl_p50_ms = metrics.get("itl_p50_ms", 0)
result.itl_p99_ms = metrics.get("itl_p99_ms", 0)
result.duration_sec = metrics.get("duration_sec", 0)
return result
def _generate_report(run_dir: Path):
"""Run create_report.py on a single run directory."""
try:
sys.path.insert(0, str(ANALYSIS_DIR))
from create_report import run_analysis
report = run_analysis(run_dir)
(run_dir / "report.md").write_text(report)
except Exception as e:
print(f" Report generation failed: {e}")
# ── Output ───────────────────────────────────────────────────────────────────
def _write_csv(results: list[RunResult], csv_path: Path):
"""Write incremental CSV (called after each run)."""
fieldnames = [
"run_id",
"backend",
"tokenizer",
"concurrency",
"isl",
"osl",
"workers",
"speedup_ratio",
"status",
"req_per_sec",
"output_tok_per_sec",
"ttft_p50_ms",
"ttft_p99_ms",
"itl_p50_ms",
"itl_p99_ms",
"duration_sec",
"run_dir",
]
with open(csv_path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore")
writer.writeheader()
for r in results:
row = {
"run_id": r.config.run_id,
"backend": r.config.backend,
"tokenizer": r.config.tokenizer,
"concurrency": r.config.concurrency,
"isl": r.config.isl,
"osl": r.config.osl,
"workers": r.config.workers,
"speedup_ratio": r.config.speedup_ratio,
"status": r.status,
"req_per_sec": f"{r.req_per_sec:.2f}" if r.req_per_sec else "",
"output_tok_per_sec": f"{r.output_tok_per_sec:.1f}"
if r.output_tok_per_sec
else "",
"ttft_p50_ms": f"{r.ttft_p50_ms:.1f}" if r.ttft_p50_ms else "",
"ttft_p99_ms": f"{r.ttft_p99_ms:.1f}" if r.ttft_p99_ms else "",
"itl_p50_ms": f"{r.itl_p50_ms:.1f}" if r.itl_p50_ms else "",
"itl_p99_ms": f"{r.itl_p99_ms:.1f}" if r.itl_p99_ms else "",
"duration_sec": f"{r.duration_sec:.1f}" if r.duration_sec else "",
"run_dir": r.run_dir,
}
writer.writerow(row)
def _write_summary(results: list[RunResult], summary_path: Path):
"""Write markdown summary table."""
lines = ["# Sweep Summary\n"]
lines.append(f"**Generated:** {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
lines.append(
"| Run ID | Req/s | Tok/s | TTFT p50 | TTFT p99 | ITL p50 | Duration | Status |"
)
lines.append(
"|--------|------:|------:|---------:|---------:|--------:|---------:|--------|"
)
for r in results:
rps = f"{r.req_per_sec:.1f}" if r.req_per_sec else "-"
tps = f"{r.output_tok_per_sec:.0f}" if r.output_tok_per_sec else "-"
tp50 = f"{r.ttft_p50_ms:.1f}ms" if r.ttft_p50_ms else "-"
tp99 = f"{r.ttft_p99_ms:.1f}ms" if r.ttft_p99_ms else "-"
ip50 = f"{r.itl_p50_ms:.1f}ms" if r.itl_p50_ms else "-"
dur = f"{r.duration_sec:.0f}s" if r.duration_sec else "-"
lines.append(
f"| {r.config.run_id} | {rps} | {tps} | {tp50} | {tp99} | {ip50} | {dur} | {r.status} |"
)
lines.append("")
ok = sum(1 for r in results if r.status == "ok")
fail = sum(1 for r in results if r.status == "fail")
skip = sum(1 for r in results if r.status == "skipped")
lines.append(
f"**Totals:** {ok} passed, {fail} failed, {skip} skipped out of {len(results)}"
)
summary_path.write_text("\n".join(lines) + "\n")
def _print_results_table(results: list[RunResult]):
"""Print a compact results table to stdout."""
print(f"\n{'='*90}")
print(
f" {'Run ID':<30} {'Req/s':>8} {'Tok/s':>8} {'TTFT p50':>10} {'TTFT p99':>10} {'Status':>8}"
)
print(f" {'-'*30} {'-'*8} {'-'*8} {'-'*10} {'-'*10} {'-'*8}")
for r in results:
rps = f"{r.req_per_sec:.1f}" if r.req_per_sec else "N/A"
tps = f"{r.output_tok_per_sec:.0f}" if r.output_tok_per_sec else "N/A"
tp50 = f"{r.ttft_p50_ms:.1f}ms" if r.ttft_p50_ms else "N/A"
tp99 = f"{r.ttft_p99_ms:.1f}ms" if r.ttft_p99_ms else "N/A"
print(
f" {r.config.run_id:<30} {rps:>8} {tps:>8} {tp50:>10} {tp99:>10} {r.status:>8}"
)
print(f"{'='*90}")
# ── Main ─────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="Frontend performance sweep runner",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""Examples:
# Smoke test
python3 sweep_runner.py --tokenizers hf,fastokens --concurrency 32 --isl 512 \\
--benchmark-duration 30 --speedup-ratio 0
# Full tokenizer comparison
python3 sweep_runner.py --tokenizers hf,fastokens --concurrency 32,64 --isl 512,1024,2048
# vLLM backend (real inference)
python3 sweep_runner.py --backend vllm --tokenizers hf --concurrency 128 --isl 1024
# Transport saturation (high concurrency, vary workers)
python3 sweep_runner.py --tokenizers hf --concurrency 4096 \\
--num-requests 16384,32768 --workers 1,2,4,8 --speedup-ratio 0
# With profilers (needs sudo for BPF)
sudo -E python3 sweep_runner.py --tokenizers hf --concurrency 64 --isl 1024 \\
-- --with-nsys --with-perf --with-bpf
""",
)
parser.add_argument("--model", default=DEFAULT_MODEL, help="HF model path")
parser.add_argument(
"--backend",
choices=["mocker", "vllm"],
default="mocker",
help="Engine backend: mocker (synthetic) or vllm (real inference)",
)
parser.add_argument(
"--tokenizers",
default="hf,fastokens",
help="Comma-separated tokenizer backends (hf, fastokens)",
)
parser.add_argument(
"--concurrency", default="50,100,200", help="Comma-separated concurrency levels"
)
parser.add_argument(
"--isl", default="512,1024,2048", help="Comma-separated ISL values"
)
parser.add_argument(
"--osl", type=int, default=DEFAULT_OSL, help="Output sequence length"
)
parser.add_argument(
"--workers", default="2", help="Comma-separated worker counts per model"
)
parser.add_argument(
"--num-models",
type=int,
default=1,
help="Number of model instances (each gets --workers workers, named model-1, model-2, ...)",
)
parser.add_argument(
"--aiperf-targets",
choices=["first", "all"],
default="first",
help="'first': aiperf targets model-1 only (default). 'all': run aiperf for each model.",
)
parser.add_argument(
"--speedup-ratio",
type=float,
default=DEFAULT_SPEEDUP,
help="Mocker speedup (0=infinite)",
)
parser.add_argument(
"--benchmark-duration",
type=int,
default=DEFAULT_BENCHMARK_DURATION,
help="aiperf duration (seconds)",
)
parser.add_argument(
"--num-requests",
default=None,
help="Comma-separated request counts (overrides --benchmark-duration)",
)
parser.add_argument(
"--rps",
default=None,
help="Comma-separated target request rates (req/s). Sweep dimension when multiple values given.",
)
parser.add_argument(
"--output-dir",
default=None,
help="Output directory (default: auto timestamped)",
)
parser.add_argument(
"--max-consecutive-fails", type=int, default=DEFAULT_MAX_CONSECUTIVE_FAILS
)
parser.add_argument(
"--cooldown", type=int, default=DEFAULT_COOLDOWN, help="Seconds between runs"
)
parser.add_argument(
"--dry-run", action="store_true", help="Print plan without executing"
)
parser.add_argument(
"--no-report", action="store_true", help="Skip per-run report generation"
)
parser.add_argument(
"passthrough", nargs="*", help="Extra args passed to run_perf.sh (after --)"
)
args = parser.parse_args()
# Parse lists
tokenizers = [t.strip() for t in args.tokenizers.split(",")]
concurrencies = [int(c) for c in args.concurrency.split(",")]
isls = [int(i) for i in args.isl.split(",")]
worker_counts = [int(w) for w in args.workers.split(",")]
num_requests_list = (
[int(n) for n in args.num_requests.split(",")] if args.num_requests else [None]
)
rps_list = [int(r) for r in args.rps.split(",")] if args.rps else [None]
# Build sweep grid
configs: list[RunConfig] = []
for tokenizer in tokenizers:
for workers in worker_counts:
for concurrency in concurrencies:
for isl in isls:
for nr in num_requests_list:
for rps in rps_list:
configs.append(
RunConfig(
backend=args.backend,
tokenizer=tokenizer,
concurrency=concurrency,
isl=isl,
osl=args.osl,
workers=workers,
num_models=args.num_models,
aiperf_targets=args.aiperf_targets,
speedup_ratio=args.speedup_ratio,
model=args.model,
benchmark_duration=args.benchmark_duration
if nr is None
else None,
num_requests=nr,
request_rate=rps,
)
)
# Output directory
if args.output_dir:
output_root = Path(args.output_dir)
else:
ts = time.strftime("%Y%m%d_%H%M%S")
output_root = REPO_ROOT / "artifacts" / f"sweep_{ts}"
total = len(configs)
print(f"Sweep plan: {total} runs")
print(f" Model: {args.model}")
print(f" Backend: {args.backend}")
print(f" Tokenizers: {tokenizers}")
print(f" Concurrencies: {concurrencies}")
print(f" ISLs: {isls}")
print(f" Workers/model: {worker_counts}")
print(f" Models: {args.num_models}")
print(f" Benchmark dur: {args.benchmark_duration}s")
if args.num_requests:
print(f" Num requests: {[int(n) for n in args.num_requests.split(',')]}")
if args.rps:
print(f" Request rates: {[int(r) for r in args.rps.split(',')]} req/s")
print(f" Output: {output_root}")
print()
if args.dry_run:
for i, cfg in enumerate(configs, 1):
print(f" [{i}/{total}] {cfg.run_id}")
return
output_root.mkdir(parents=True, exist_ok=True)
csv_path = output_root / "results.csv"
summary_path = output_root / "summary.md"
# Passthrough args for run_perf.sh (e.g., --skip-bpf --skip-nsys)
passthrough = args.passthrough or []
results: list[RunResult] = []
consecutive_fails: dict[tuple, int] = {} # (backend, concurrency, workers) -> count
try:
for i, cfg in enumerate(configs, 1):
key = (cfg.backend, cfg.concurrency, cfg.workers)
run_dir = output_root / cfg.run_id
# Skip after consecutive failures
if consecutive_fails.get(key, 0) >= args.max_consecutive_fails:
result = RunResult(config=cfg, status="skipped", run_dir=str(run_dir))
results.append(result)
print(
f"\n [{i}/{total}] SKIPPED {cfg.run_id} ({args.max_consecutive_fails} consecutive failures)"
)
continue
print(f"\n{'='*60}")
print(f" [{i}/{total}] {cfg.run_id}")
print(f"{'='*60}")
# Wait for port from previous run
_wait_port_free(8000)
# Run
result = _run_single(cfg, run_dir, passthrough)
results.append(result)
# Update consecutive failure tracking
if result.status == "ok":
consecutive_fails[key] = 0
rps = f"{result.req_per_sec:.1f}" if result.req_per_sec else "N/A"
tp50 = f"{result.ttft_p50_ms:.1f}ms" if result.ttft_p50_ms else "N/A"
print(f" OK: {rps} req/s, TTFT p50={tp50}")
else:
consecutive_fails[key] = consecutive_fails.get(key, 0) + 1
print(
f" FAIL (consecutive: {consecutive_fails[key]}/{args.max_consecutive_fails})"
)
# Generate per-run report
if not args.no_report and result.status == "ok":
_generate_report(run_dir)
# Write incremental CSV + summary
_write_csv(results, csv_path)
_write_summary(results, summary_path)
# Cooldown
if i < total:
time.sleep(args.cooldown)
except KeyboardInterrupt:
print("\n\nInterrupted! Partial results saved.")
finally:
_write_csv(results, csv_path)
_write_summary(results, summary_path)
# Final output
_print_results_table(results)
print(f"\nResults: {csv_path}")
print(f"Summary: {summary_path}")
print(f"Per-run: {output_root}/<run_id>/report.md")
if __name__ == "__main__":
main()
...@@ -339,6 +339,11 @@ pub fn make_engine<'p>( ...@@ -339,6 +339,11 @@ pub fn make_engine<'p>(
} else { } else {
// Mocker only needs tokenizer, not weights // Mocker only needs tokenizer, not weights
let ignore_weights = matches!(args.engine_type, EngineType::Mocker); let ignore_weights = matches!(args.engine_type, EngineType::Mocker);
// Preserve the original HF model ID as source_path so the
// frontend can resolve model metadata even when the served
// model name differs (e.g., --model-name model-1 --model-path
// Qwen/Qwen3-0.6B).
builder.source_path(model_path.clone());
LocalModel::fetch(&model_path.display().to_string(), ignore_weights) LocalModel::fetch(&model_path.display().to_string(), ignore_weights)
.await .await
.map_err(to_pyerr)? .map_err(to_pyerr)?
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment