Unverified Commit 419e936a authored by Ben Hamm's avatar Ben Hamm Committed by GitHub
Browse files

refactor: remove benchmark shim, use AIPerf directly (#7074)


Signed-off-by: default avatarBen Hamm <ben.hamm@gmail.com>
Co-authored-by: default avatarClaude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: default avatarBiswa Panda <biswa.panda@gmail.com>
Co-authored-by: default avatarSaravana Periyasamy <saperiyasamy@nvidia.com>
parent 50818575
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
# Benchmarks # Benchmarks
This directory contains benchmarking scripts and tools for performance evaluation of Dynamo deployments. The benchmarking framework is a wrapper around aiperf that makes it easy to benchmark DynamoGraphDeployments or other deployments with exposed endpoints. This directory contains benchmarking tools and scripts for Dynamo deployments. Benchmarking uses [AIPerf](https://github.com/ai-dynamo/aiperf) directly — a comprehensive tool for measuring generative AI inference performance.
## Quick Start ## Quick Start
...@@ -26,49 +26,37 @@ First, deploy your DynamoGraphDeployment using the [deployment documentation](.. ...@@ -26,49 +26,37 @@ First, deploy your DynamoGraphDeployment using the [deployment documentation](..
# Port-forward your deployment to http://localhost:8000 # Port-forward your deployment to http://localhost:8000
kubectl port-forward -n <namespace> svc/<frontend-service-name> 8000:8000 > /dev/null 2>&1 & kubectl port-forward -n <namespace> svc/<frontend-service-name> 8000:8000 > /dev/null 2>&1 &
# Run benchmark # Run a single benchmark
python3 -m benchmarks.utils.benchmark \ aiperf profile \
--benchmark-name my-benchmark \ --model <your-model> \
--endpoint-url http://localhost:8000 \ --url http://localhost:8000 \
--model "<your-model>" --endpoint-type chat \
--streaming \
# Generate plots --concurrency 10 \
python3 -m benchmarks.utils.plot --data-dir ./benchmarks/results --request-count 100
# Or plot only specific benchmark experiments # Run a concurrency sweep for Pareto analysis
python3 -m benchmarks.utils.plot --data-dir ./benchmarks/results --benchmark-name my-benchmark for c in 1 2 5 10 50 100; do
``` aiperf profile \
--model <your-model> \
## Features --url http://localhost:8000 \
--endpoint-type chat \
Benchmark any HTTP endpoints! The benchmarking framework supports: --streaming \
--concurrency $c \
**Flexible Configuration:** --request-count $(( c * 3 > 10 ? c * 3 : 10 )) \
- User-defined benchmark names using `--benchmark-name` flag --artifact-dir "artifacts/my-benchmark/c$c"
- Support for single endpoint benchmarking with `--endpoint-url` flag done
- Customizable concurrency levels (configurable via CONCURRENCIES env var), sequence lengths, and models
- Automated performance plot generation with custom benchmark names # Generate comparison plots
aiperf plot artifacts/my-benchmark
**Supported Backends:**
- DynamoGraphDeployments with port-forwarded endpoints
- External HTTP endpoints (for comparison with non-Dynamo backends or platforms)
## Installation
This is already included as part of the Dynamo container images. To install locally or standalone:
```bash
pip install -e .
``` ```
## Data Generation Tools ## Directory Contents
This directory also includes lightweight tools for:
- Analyzing prefix-structured data (`datagen analyze`)
- Synthesizing structured data customizable for testing purposes (`datagen synthesize`)
Detailed information is provided in the `prefix_data_generator` directory. - **`incluster/`** — Kubernetes Job manifest for running benchmarks inside the cluster
- **`router/`** — KV Router benchmarking scripts (prefix ratio, trace replay, agent, priority queue)
- **`prefix_data_generator/`** — Tools for analyzing and synthesizing prefix-structured data
## Comprehensive Guide ## Comprehensive Guide
For detailed documentation, configuration options, and advanced usage, see the [complete benchmarking guide](../docs/benchmarks/benchmarking.md). For detailed documentation including server-side benchmarking, Pareto analysis, and advanced AIPerf features, see the [complete benchmarking guide](../docs/benchmarks/benchmarking.md).
...@@ -37,22 +37,29 @@ spec: ...@@ -37,22 +37,29 @@ spec:
secretKeyRef: secretKeyRef:
name: hf-token-secret name: hf-token-secret
key: HF_TOKEN key: HF_TOKEN
command: ["python3", "-m", "benchmarks.utils.benchmark"] command: ["/bin/bash", "-c"]
args: args:
- --model - |
- "Qwen/Qwen3-0.6B" set -euo pipefail
- --isl MODEL="Qwen/Qwen3-0.6B"
- "2000" URL="http://vllm-agg-frontend:8000"
- --std OUTPUT_DIR="/data/results/qwen3-0p6b-vllm-agg"
- "10"
- --osl for c in 1 2 5 10 50 100; do
- "256" echo "=== Concurrency $c ==="
- --output-dir aiperf profile \
- /data/results --model "$MODEL" \
- --benchmark-name --url "$URL" \
- "qwen3-0p6b-vllm-agg" --endpoint-type chat \
- --endpoint-url --streaming \
- "vllm-agg-frontend:8000" --concurrency $c \
--request-count $(( c * 3 > 10 ? c * 3 : 10 )) \
--synthetic-input-tokens-mean 2000 \
--output-tokens-mean 256 \
--artifact-dir "$OUTPUT_DIR/c$c" \
--ui none
done
echo "=== Benchmark complete ==="
volumeMounts: volumeMounts:
- name: data-volume - name: data-volume
mountPath: /data mountPath: /data
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Package marker for benchmarks utilities
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import os
import subprocess
from pathlib import Path
from typing import List
# Default concurrency levels - can be overridden with CONCURRENCIES environment variable
DEFAULT_CONCURRENCIES: List[int] = [1, 2, 5, 10, 50, 100, 250]
# Default request count per concurrency level - can be overridden with REQUEST_COUNT env var
# When set to 0 or unset, defaults to max(concurrency * REQUEST_COUNT_SCALE_FACTOR, 10)
# to ensure the concurrency level is fully utilized and each slot runs enough requests
# for stable measurements
DEFAULT_REQUEST_COUNT: int = 0
REQUEST_COUNT_SCALE_FACTOR: int = 3
def get_concurrency_levels() -> List[int]:
"""Get concurrency levels from environment variable or use defaults"""
concurrencies_env = os.getenv("CONCURRENCIES")
if concurrencies_env:
try:
# Parse comma-separated values
concurrencies = [int(x.strip()) for x in concurrencies_env.split(",")]
# Validate all are positive integers
for c in concurrencies:
if c <= 0:
raise ValueError(f"Concurrency level must be positive, got: {c}")
return sorted(concurrencies)
except ValueError as e:
print(f"WARNING: Invalid CONCURRENCIES environment variable: {e}")
print(f"Using default concurrency levels: {DEFAULT_CONCURRENCIES}")
return DEFAULT_CONCURRENCIES
return DEFAULT_CONCURRENCIES
def get_request_count() -> int:
"""Get request count from environment variable or use default.
Returns 0 to indicate 'auto' mode (will be computed per concurrency level).
"""
request_count_env = os.getenv("REQUEST_COUNT")
if request_count_env:
try:
count = int(request_count_env.strip())
if count < 0:
raise ValueError(f"Request count must be non-negative, got: {count}")
return count
except ValueError as e:
print(f"WARNING: Invalid REQUEST_COUNT environment variable: {e}")
return DEFAULT_REQUEST_COUNT
return DEFAULT_REQUEST_COUNT
CONCURRENCIES: List[int] = get_concurrency_levels()
def run_aiperf(
service_url: str,
model_name: str,
isl: int,
osl: int,
stddev: int,
concurrency: int,
output_dir: Path,
request_count: int = 0,
) -> None:
output_dir.mkdir(parents=True, exist_ok=True)
# Auto-compute request count: need enough requests to fully utilize concurrency
# and run each slot at least REQUEST_COUNT_SCALE_FACTOR times for stable measurements
if request_count <= 0:
request_count = max(concurrency * REQUEST_COUNT_SCALE_FACTOR, 10)
elif request_count < concurrency:
print(
f"WARNING: request_count ({request_count}) < concurrency ({concurrency}). "
f"Actual in-flight concurrency will be capped at {request_count}.",
flush=True,
)
cmd = [
"aiperf",
"profile",
"-m",
model_name,
"--endpoint-type",
"chat",
"--streaming",
"-u",
service_url,
"--synthetic-input-tokens-mean",
str(isl),
"--synthetic-input-tokens-stddev",
str(stddev),
"--concurrency",
str(concurrency),
"--request-count",
str(request_count),
"--output-tokens-mean",
str(osl),
"--extra-inputs",
f"max_tokens:{osl}",
"--extra-inputs",
f"min_tokens:{osl}",
"--extra-inputs",
"ignore_eos:true",
"--tokenizer",
model_name,
"--artifact-dir",
str(output_dir),
]
print(
f"Running aiperf with isl {isl}, osl {osl}, concurrency {concurrency}, request_count {request_count}",
flush=True,
)
aip_process = subprocess.Popen(
cmd,
cwd=str(output_dir),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
stdout, stderr = aip_process.communicate()
if aip_process.returncode == 0:
print("Aiperf profiling completed successfully", flush=True)
if stdout:
print(stdout)
else:
print(f"Aiperf failed with error code: {aip_process.returncode}")
if stderr:
print(f"stderr: {stderr}")
raise subprocess.CalledProcessError(
aip_process.returncode, cmd, output=stdout, stderr=stderr
)
def run_concurrency_sweep(
service_url: str, model_name: str, isl: int, osl: int, stddev: int, output_dir: Path
) -> None:
concurrency_levels = get_concurrency_levels()
request_count = get_request_count()
print(
f"Running concurrency sweep for {model_name} with ISL {isl} and OSL {osl} and standard deviation {stddev}",
flush=True,
)
print(f"Concurrency levels: {concurrency_levels}", flush=True)
print(
f"Request count: {request_count if request_count > 0 else f'auto (max(concurrency*{REQUEST_COUNT_SCALE_FACTOR}, 10))'}",
flush=True,
)
for c in concurrency_levels:
print(f"Starting concurrency level {c}", flush=True)
run_aiperf(
service_url,
model_name,
isl,
osl,
stddev,
c,
output_dir / f"c{c}",
request_count=request_count,
)
#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import argparse
import re
import sys
from urllib.parse import urlsplit
from benchmarks.utils.workflow import has_http_scheme, run_benchmark_workflow
from deploy.utils.kubernetes import is_running_in_cluster
def validate_endpoint(endpoint: str) -> None:
"""Validate that endpoint is HTTP endpoint or internal service URL when running in cluster"""
v = endpoint.strip()
if is_running_in_cluster():
# Allow HTTP(S) or internal service URLs like host[:port][/path]
if has_http_scheme(v):
pass
else:
parts = urlsplit(f"//{v}")
host_ok = bool(parts.hostname)
port_ok = parts.port is None or (1 <= parts.port <= 65535)
if not (host_ok and port_ok):
raise ValueError(
f"Endpoint must be HTTP(S) or internal service URL. Got: {endpoint}"
)
else:
if not has_http_scheme(v):
raise ValueError(f"Endpoint must be HTTP endpoint. Got: {endpoint}")
def validate_benchmark_name(name: str) -> None:
"""Validate benchmark name"""
if not name.strip():
raise ValueError("Benchmark name cannot be empty")
name = name.strip()
# Validate name characters
if not re.match(r"^[a-zA-Z0-9_-]+$", name):
raise ValueError(f"Invalid benchmark name: {name}")
# Validate reserved names
if name.lower() == "plots":
raise ValueError("Benchmark name 'plots' is reserved")
def main() -> int:
parser = argparse.ArgumentParser(description="Benchmark Orchestrator")
parser.add_argument(
"--benchmark-name",
required=True,
help="Name/label for this benchmark (used in plots and results)",
)
parser.add_argument(
"--endpoint-url",
required=True,
help="Endpoint to benchmark: HTTP(S) URL (e.g., http://localhost:8000) or in-cluster service URL host[:port]",
)
parser.add_argument("--isl", type=int, default=2000, help="Input sequence length")
parser.add_argument(
"--std",
type=int,
default=10,
help="Input sequence standard deviation",
)
parser.add_argument("--osl", type=int, default=256, help="Output sequence length")
parser.add_argument(
"--model",
default="Qwen/Qwen3-0.6B",
help="Model name (must match the model deployed at the endpoint)",
)
parser.add_argument(
"--output-dir", type=str, default="benchmarks/results", help="Output directory"
)
args = parser.parse_args()
# Validate inputs
try:
validate_benchmark_name(args.benchmark_name)
validate_endpoint(args.endpoint_url)
except ValueError as e:
print(f"ERROR: {e}")
return 1
# Run the benchmark workflow with the parsed inputs
run_benchmark_workflow(
inputs={args.benchmark_name: args.endpoint_url},
isl=args.isl,
std=args.std,
osl=args.osl,
model=args.model,
output_dir=args.output_dir,
)
return 0
if __name__ == "__main__":
sys.exit(main())
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import json
import re
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import matplotlib.pyplot as plt
def parse_benchmark_results(result_dir: Path) -> List[Tuple[int, Dict]]:
"""
Parse benchmark results from a deployment directory.
Args:
result_dir: Path to the result directory
Returns:
List of (concurrency_level, metrics_dict) tuples sorted by concurrency
"""
results = []
# Find all concurrency directories (e.g., c1, c2, c5, c10, c50, c100, c250)
for concurrency_dir in result_dir.iterdir():
if not concurrency_dir.is_dir() or not concurrency_dir.name.startswith("c"):
continue
# Extract concurrency level from directory name
match = re.match(r"c(\d+)", concurrency_dir.name)
if not match:
continue
concurrency = int(match.group(1))
# Find the aiperf JSON file
aiperf_json = None
for json_file in concurrency_dir.rglob("profile_export_aiperf.json"):
aiperf_json = json_file
break
if aiperf_json and aiperf_json.exists():
try:
with open(aiperf_json, "r") as f:
metrics = json.load(f)
results.append((concurrency, metrics))
print(f"Loaded metrics for concurrency {concurrency}")
except Exception as e:
print(f"Error loading {aiperf_json}: {e}")
else:
print(f"Warning: No aiperf JSON found for {concurrency_dir}")
# Sort by concurrency level
results.sort(key=lambda x: x[0])
return results
def extract_metric_series(
results: List[Tuple[int, Dict]], metric_path: str, stat: str = "avg"
) -> Tuple[List[int], List[float]]:
"""
Extract a time series of a specific metric across concurrency levels.
Args:
results: List of (concurrency, metrics) tuples
metric_path: Dot-separated path to the metric (e.g., 'inter_token_latency')
stat: Statistic to extract ('avg', 'p50', 'p90', etc.)
Returns:
Tuple of (concurrency_levels, metric_values)
"""
concurrencies = []
values = []
path_keys = metric_path.split(".")
for concurrency, metrics in results:
try:
node = metrics
for k in path_keys:
node = node[k]
value = node[stat]
concurrencies.append(concurrency)
values.append(float(value))
except (KeyError, TypeError):
print(
f"Warning: {metric_path}.{stat} not found for concurrency {concurrency}"
)
continue
return concurrencies, values
def create_plot(
title: str,
xlabel: str,
ylabel: str,
data_series: List[Tuple[str, List[int], List[float]]],
output_path: Path,
log_scale_x: bool = False,
log_scale_y: bool = False,
) -> None:
"""
Create a line plot with multiple series.
Args:
title: Plot title
xlabel: X-axis label
ylabel: Y-axis label
data_series: List of (label, x_values, y_values) tuples
output_path: Path to save the plot
log_scale_x: Whether to use log scale for X axis
log_scale_y: Whether to use log scale for Y axis
"""
plt.figure(figsize=(10, 6))
colors = ["#1f77b4", "#ff7f0e", "#2ca02c", "#d62728", "#9467bd", "#8c564b"]
for i, (label, x_vals, y_vals) in enumerate(data_series):
if x_vals and y_vals: # Only plot if we have data
plt.plot(
x_vals,
y_vals,
marker="o",
linewidth=2,
markersize=6,
color=colors[i % len(colors)],
label=label,
)
plt.title(title, fontsize=14, fontweight="bold")
plt.xlabel(xlabel, fontsize=12)
plt.ylabel(ylabel, fontsize=12)
plt.grid(True, alpha=0.3)
if log_scale_x:
plt.xscale("log")
if log_scale_y:
plt.yscale("log")
plt.legend()
plt.tight_layout()
plt.savefig(output_path, dpi=300, bbox_inches="tight")
plt.close()
print(f"Saved plot: {output_path}")
def create_efficiency_plot(
deployment_results: Dict, plots_dir: Path, output_tokens: int = 200
) -> None:
"""
Create an efficiency plot showing tok/s/gpu vs tok/s/user with concurrency as labeled points.
Args:
deployment_results: Dict of deployment_type -> results
plots_dir: Directory to save plots
output_tokens: Average output tokens per request (default 200)
"""
plt.figure(figsize=(12, 8))
# Support for up to 12 deployments in the plots
colors = [
"#1f77b4",
"#ff7f0e",
"#2ca02c",
"#d62728",
"#9467bd",
"#8c564b",
"#e377c2",
"#7f7f7f",
"#bcbd22",
"#17becf",
"#aec7e8",
"#ffbb78",
]
markers = ["o", "s", "^", "D", "v", "<", ">", "p", "*", "h", "H", "+"]
for deployment_type, results in deployment_results.items():
tok_s_per_user = []
tok_s_per_gpu = []
concurrency_levels = []
for concurrency, metrics in results:
try:
# Get request throughput (requests/sec)
request_throughput = metrics["request_throughput"]["avg"]
# Calculate total tokens per second
total_tok_s = request_throughput * output_tokens
# Guard against zero concurrency and parameterize GPU count
if concurrency <= 0:
continue
num_gpus = metrics.get("cluster", {}).get("num_gpus", 1)
tok_s_user = total_tok_s / concurrency
tok_s_gpu = total_tok_s / max(1, num_gpus)
tok_s_per_user.append(tok_s_user)
tok_s_per_gpu.append(tok_s_gpu)
concurrency_levels.append(concurrency)
except KeyError as e:
print(
f"Warning: Missing metric for {deployment_type} concurrency {concurrency}: {e}"
)
continue
if tok_s_per_user and tok_s_per_gpu:
# Plot points
color_idx = list(deployment_results.keys()).index(deployment_type)
color = colors[color_idx % len(colors)]
marker = markers[color_idx % len(markers)]
plt.scatter(
tok_s_per_user,
tok_s_per_gpu,
c=color,
marker=marker,
s=120,
alpha=0.8,
label=deployment_type.title(),
edgecolors="black",
linewidth=1.5,
)
# Add concurrency labels
for i, (x, y, c) in enumerate(
zip(tok_s_per_user, tok_s_per_gpu, concurrency_levels)
):
plt.annotate(
f"{c}",
(x, y),
xytext=(8, 8),
textcoords="offset points",
fontsize=10,
fontweight="bold",
ha="left",
)
plt.title("GPU Efficiency vs User Experience", fontsize=14, fontweight="bold")
plt.xlabel("Tokens/sec per User", fontsize=12)
plt.ylabel("Tokens/sec per GPU", fontsize=12)
plt.grid(True, alpha=0.3)
# Add a note about what the numbers represent
plt.figtext(
0.02,
0.02,
"Note: Numbers on dots indicate concurrency level",
fontsize=10,
style="italic",
alpha=0.7,
)
plt.legend()
plt.tight_layout()
output_path = plots_dir / "efficiency_tok_s_gpu_vs_user.png"
plt.savefig(output_path, dpi=300, bbox_inches="tight")
plt.close()
print(f"Saved efficiency plot: {output_path}")
def generate_plots(
base_output_dir: Path, output_dir: Path, benchmark_names: Optional[List[str]] = None
) -> None:
"""
Generate performance plots from benchmark results.
Args:
base_output_dir: Base directory containing benchmark results
output_dir: Directory to save plots
benchmark_names: Optional list of specific benchmark names to plot. If None, plots all subdirectories.
"""
print(f"Generating plots from results in {base_output_dir}")
if not base_output_dir.exists():
print(f"Results directory does not exist: {base_output_dir}")
return
# Create plots directory
output_dir.mkdir(parents=True, exist_ok=True)
# Parse results for each deployment type
deployment_results = {}
# Find all subdirectories that contain benchmark results
names_set = set(benchmark_names) if benchmark_names is not None else None
for item in base_output_dir.iterdir():
if item.is_dir() and item.name != "plots":
deployment_type = item.name
# If benchmark_names is specified, only process those directories
if names_set is not None and deployment_type not in names_set:
print(f"Skipping {deployment_type} (not in specified benchmark names)")
continue
results = parse_benchmark_results(item)
if results:
deployment_results[deployment_type] = results
print(f"Found {len(results)} concurrency levels for {deployment_type}")
else:
print(f"No valid results found for {deployment_type}")
if not deployment_results:
if benchmark_names:
available = sorted(
[
p.name
for p in base_output_dir.iterdir()
if p.is_dir() and p.name != "plots"
]
)
missing = sorted([n for n in benchmark_names if n not in available])
print(f"No benchmark results found for specified names: {benchmark_names}")
if missing:
print(f"Missing (not found under {base_output_dir}): {missing}")
print(f"Available experiments: {available}")
else:
print("No benchmark results found to plot!")
# 1. P50 Inter-token Latency vs Concurrency
p50_data = []
for deployment_type, results in deployment_results.items():
concurrencies, latencies = extract_metric_series(
results, "inter_token_latency", "p50"
)
if concurrencies:
p50_data.append((deployment_type.title(), concurrencies, latencies))
create_plot(
title="P50 Inter-Token Latency vs Concurrency",
xlabel="Concurrency Level",
ylabel="P50 Inter-Token Latency (ms)",
data_series=p50_data,
output_path=output_dir / "p50_inter_token_latency_vs_concurrency.png",
log_scale_x=True,
)
# 2. Average Inter-token Latency vs Concurrency
avg_latency_data = []
for deployment_type, results in deployment_results.items():
concurrencies, latencies = extract_metric_series(
results, "inter_token_latency", "avg"
)
if concurrencies:
avg_latency_data.append((deployment_type.title(), concurrencies, latencies))
create_plot(
title="Average Inter-Token Latency vs Concurrency",
xlabel="Concurrency Level",
ylabel="Average Inter-Token Latency (ms)",
data_series=avg_latency_data,
output_path=output_dir / "avg_inter_token_latency_vs_concurrency.png",
log_scale_x=True,
)
# 3. Request Throughput vs Concurrency
throughput_data = []
for deployment_type, results in deployment_results.items():
concurrencies, throughputs = extract_metric_series(
results, "request_throughput", "avg"
)
if concurrencies:
throughput_data.append(
(deployment_type.title(), concurrencies, throughputs)
)
create_plot(
title="Request Throughput vs Concurrency",
xlabel="Concurrency Level",
ylabel="Request Throughput (req/s)",
data_series=throughput_data,
output_path=output_dir / "request_throughput_vs_concurrency.png",
log_scale_x=True,
)
# 4. Average Time to First Token vs Concurrency
ttft_data = []
for deployment_type, results in deployment_results.items():
concurrencies, ttfts = extract_metric_series(
results, "time_to_first_token", "avg"
)
if concurrencies:
ttft_data.append((deployment_type.title(), concurrencies, ttfts))
create_plot(
title="Average Time to First Token vs Concurrency",
xlabel="Concurrency Level",
ylabel="Average Time to First Token (ms)",
data_series=ttft_data,
output_path=output_dir / "avg_time_to_first_token_vs_concurrency.png",
log_scale_x=True,
)
# 5. Efficiency plot: tok/s/gpu vs tok/s/user
create_efficiency_plot(deployment_results, output_dir)
# Generate summary
summary_lines = [
"Benchmark Results Summary",
"=" * 30,
"",
f"Results directory: {base_output_dir}",
f"Plots generated: {output_dir}",
"",
"Deployment Types Found:",
]
for deployment_type, results in deployment_results.items():
concurrency_levels = [r[0] for r in results]
summary_lines.append(
f" {deployment_type}: {len(results)} concurrency levels ({min(concurrency_levels)}-{max(concurrency_levels)})"
)
summary_lines.extend(
[
"",
"Generated Plots:",
" - p50_inter_token_latency_vs_concurrency.png",
" - avg_inter_token_latency_vs_concurrency.png",
" - request_throughput_vs_concurrency.png",
" - avg_time_to_first_token_vs_concurrency.png",
" - efficiency_tok_s_gpu_vs_user.png",
]
)
summary_path = output_dir / "SUMMARY.txt"
summary_path.write_text("\n".join(summary_lines))
print(f"Generated summary: {summary_path}")
print(f"All plots saved to: {output_dir}")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description="Generate performance plots from benchmark results"
)
parser.add_argument(
"--data-dir", required=True, help="Directory containing benchmark results"
)
parser.add_argument(
"--output-dir", help="Output directory for plots (defaults to data-dir/plots)"
)
parser.add_argument(
"--benchmark-name",
action="append",
help="Specific benchmark experiment name to plot (can be specified multiple times). If not specified, plots all subdirectories.",
)
args = parser.parse_args()
data_dir = Path(args.data_dir)
benchmark_names = args.benchmark_name if args.benchmark_name else None
if args.output_dir:
# If output dir specified, use it as base and call generate_plots
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
generate_plots(data_dir, output_dir, benchmark_names)
else:
# Use data_dir as base output dir
generate_plots(data_dir, data_dir / "plots", benchmark_names)
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
from pathlib import Path
from typing import Dict, List
from benchmarks.utils.aiperf import run_concurrency_sweep
from deploy.utils.kubernetes import is_running_in_cluster
def has_http_scheme(url: str) -> bool:
"""Check if URL has HTTP or HTTPS scheme."""
return url.lower().startswith(("http://", "https://"))
def normalize_service_url(endpoint: str) -> str:
e = endpoint.strip()
if has_http_scheme(e):
return e
if is_running_in_cluster():
return f"http://{e}"
return e # Outside cluster, validation will have ensured scheme is present
def print_concurrency_start(
label: str, model: str, isl: int, osl: int, std: int
) -> None:
"""Print concurrency sweep start messages"""
print(f"⚙️ Starting {label} concurrency sweep!", flush=True)
print(
"⏱️ This may take several minutes - running through multiple concurrency levels...",
flush=True,
)
print(f"🎯 Model: {model} | ISL: {isl} | OSL: {osl} | StdDev: {std}")
def run_endpoint_benchmark(
label: str,
endpoint: str,
model: str,
isl: int,
osl: int,
std: int,
output_dir: Path,
) -> None:
"""Run benchmark for an existing endpoint with custom label"""
# Normalize endpoint to a usable URL (handles in-cluster scheme-less inputs)
service_url = normalize_service_url(endpoint)
print(f"🚀 Starting benchmark of endpoint '{label}': {service_url}")
print(f"📁 Results will be saved to: {output_dir / label}")
print_concurrency_start(label, model, isl, osl, std)
# Create output directory
(output_dir / label).mkdir(parents=True, exist_ok=True)
run_concurrency_sweep(
service_url=service_url,
model_name=model,
isl=isl,
osl=osl,
stddev=std,
output_dir=output_dir / label,
)
print("✅ Endpoint benchmark completed successfully!")
def print_final_summary(output_dir: Path, labels: List[str]) -> None:
"""Print final benchmark summary"""
print("🎉 Benchmark workflow completed successfully!")
print(f"📁 All results available at: {output_dir}")
if labels:
print(f"🚀 Benchmarked: {', '.join(labels)}")
def run_benchmark_workflow(
inputs: Dict[str, str],
isl: int = 2000,
std: int = 10,
osl: int = 256,
model: str = "Qwen/Qwen3-0.6B",
output_dir: str = "benchmarks/results",
) -> None:
"""Main benchmark workflow orchestrator for HTTP endpoints (and in-cluster internal service URLs)"""
output_dir_path = Path(output_dir)
output_dir_path.mkdir(parents=True, exist_ok=True)
# Run endpoint benchmarks
benchmarked_labels = []
for label, endpoint in inputs.items():
run_endpoint_benchmark(label, endpoint, model, isl, osl, std, output_dir_path)
benchmarked_labels.append(label)
# Generate final summary
print_final_summary(output_dir_path, benchmarked_labels)
...@@ -4,8 +4,6 @@ ...@@ -4,8 +4,6 @@
# Kubernetes and async dependencies # Kubernetes and async dependencies
aiofiles>=0.8.0 aiofiles>=0.8.0
# Benchmarking dependencies for Dynamo
genai-perf==0.0.15
httpx>=0.24.0 httpx>=0.24.0
kubernetes-asyncio>=24.0.0 kubernetes-asyncio>=24.0.0
......
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment