Unverified Commit 5c461d61 authored by Qi Wang's avatar Qi Wang Committed by GitHub
Browse files

feat: multimodal request_rate based sweep (#7363)


Co-authored-by: default avatarClaude Opus 4.6 (1M context) <noreply@anthropic.com>
parent c08034eb
......@@ -7,7 +7,7 @@ import argparse
from typing import List
def _parse_concurrencies(value: str) -> List[int]:
def _parse_int_list(value: str) -> List[int]:
return [int(x.strip()) for x in value.split(",")]
......@@ -37,12 +37,21 @@ def parse_args(argv=None) -> argparse.Namespace:
default=None,
help="Override model name from config.",
)
parser.add_argument(
sweep_group = parser.add_mutually_exclusive_group()
sweep_group.add_argument(
"--request-rates",
type=_parse_int_list,
default=None,
help="Override request rates (comma-separated, e.g. '4,8,16,32').",
)
sweep_group.add_argument(
"--concurrencies",
type=_parse_concurrencies,
type=_parse_int_list,
default=None,
help="Override concurrency levels (comma-separated, e.g. '1,2,4,8').",
help="Override concurrency levels (comma-separated, e.g. '16,32,64,128').",
)
parser.add_argument(
"--osl",
type=int,
......@@ -53,7 +62,7 @@ def parse_args(argv=None) -> argparse.Namespace:
"--request-count",
type=int,
default=None,
help="Override request count per concurrency level.",
help="Override request count per sweep value.",
)
parser.add_argument(
"--skip-plots",
......
......@@ -22,10 +22,15 @@ class BenchmarkConfig:
@dataclass
class SweepConfig:
"""Top-level sweep configuration loaded from YAML with optional CLI overrides."""
"""Top-level sweep configuration loaded from YAML with optional CLI overrides.
Exactly one of ``request_rates`` or ``concurrencies`` must be set.
The active mode is exposed via ``sweep_mode`` and ``sweep_values``.
"""
model: str = "Qwen/Qwen3-VL-30B-A3B-Instruct-FP8"
concurrencies: List[int] = field(default_factory=lambda: [1, 2, 4, 8, 16, 32])
request_rates: Optional[List[int]] = None
concurrencies: Optional[List[int]] = None
osl: int = 150
request_count: int = 1000
warmup_count: int = 5
......@@ -38,6 +43,20 @@ class SweepConfig:
restart_server_every_benchmark: bool = True
env: Dict[str, str] = field(default_factory=dict)
@property
def sweep_mode(self) -> str:
"""Return ``'request_rate'`` or ``'concurrency'``."""
if self.concurrencies:
return "concurrency"
return "request_rate"
@property
def sweep_values(self) -> List[int]:
"""Return the active sweep values (request_rates or concurrencies)."""
if self.concurrencies:
return self.concurrencies
return self.request_rates or []
def validate(self, repo_root: Optional[Path] = None) -> None:
"""Validate that all referenced files and scripts exist."""
if not self.input_files:
......@@ -58,8 +77,17 @@ class SweepConfig:
f"Workflow script not found: {script} (config '{cfg.label}')"
)
if not self.concurrencies:
raise ValueError("At least one concurrency level is required.")
if self.request_rates and self.concurrencies:
raise ValueError(
"Cannot set both request_rates and concurrencies. Pick one."
)
if not self.request_rates and not self.concurrencies:
raise ValueError(
"At least one of request_rates or concurrencies is required."
)
_DEFAULT_REQUEST_RATES: List[int] = [4, 8, 16, 32, 64]
def _parse_benchmark_config(raw: Dict[str, Any]) -> BenchmarkConfig:
......@@ -78,25 +106,36 @@ def load_config(
with open(yaml_path) as f:
raw = yaml.safe_load(f)
defaults = SweepConfig()
configs = [_parse_benchmark_config(c) for c in raw.get("configs", [])]
# Resolve sweep mode from YAML — support both keys, default to request_rates.
yaml_request_rates = raw.get("request_rates")
yaml_concurrencies = raw.get("concurrencies")
if yaml_request_rates and yaml_concurrencies:
raise ValueError(
f"YAML config {yaml_path} sets both request_rates and concurrencies. "
"Pick one."
)
# Default to request_rates if neither is specified.
if not yaml_request_rates and not yaml_concurrencies:
yaml_request_rates = _DEFAULT_REQUEST_RATES
cfg = SweepConfig(
model=raw.get("model", defaults.model),
concurrencies=raw.get("concurrencies", defaults.concurrencies),
osl=raw.get("osl", defaults.osl),
request_count=raw.get("request_count", defaults.request_count),
warmup_count=raw.get("warmup_count", defaults.warmup_count),
port=raw.get("port", defaults.port),
timeout=raw.get("timeout", defaults.timeout),
model=raw.get("model", "Qwen/Qwen3-VL-30B-A3B-Instruct-FP8"),
request_rates=yaml_request_rates,
concurrencies=yaml_concurrencies,
osl=raw.get("osl", 150),
request_count=raw.get("request_count", 1000),
warmup_count=raw.get("warmup_count", 5),
port=raw.get("port", 8000),
timeout=raw.get("timeout", 600),
input_files=raw.get("input_files", []),
configs=configs,
output_dir=raw.get("output_dir", defaults.output_dir),
output_dir=raw.get("output_dir", "benchmarks/results/multimodal_default"),
skip_plots=raw.get("skip_plots", False),
restart_server_every_benchmark=raw.get(
"restart_server_every_benchmark",
defaults.restart_server_every_benchmark,
),
restart_server_every_benchmark=raw.get("restart_server_every_benchmark", True),
env=raw.get("env", {}),
)
......@@ -107,6 +146,18 @@ def load_config(
if hasattr(cfg, key):
setattr(cfg, key, value)
# CLI sweep mode override clears the other (mutually exclusive) field.
if (
"request_rates" in cli_overrides
and cli_overrides["request_rates"] is not None
):
cfg.concurrencies = None
elif (
"concurrencies" in cli_overrides
and cli_overrides["concurrencies"] is not None
):
cfg.request_rates = None
return cfg
......
......@@ -6,8 +6,8 @@ from __future__ import annotations
from pathlib import Path
from typing import List, Optional
from .config import SweepConfig, input_file_tag, resolve_repo_root
from .runner import run_aiperf_single, run_concurrency_sweep
from .config import BenchmarkConfig, SweepConfig, input_file_tag, resolve_repo_root
from .runner import run_aiperf_single
from .server import ServerManager
......@@ -28,13 +28,13 @@ def run_sweep(
config: SweepConfig,
repo_root: Optional[Path] = None,
) -> None:
"""Execute the full benchmark sweep: for each input file x benchmark config."""
"""Execute the full benchmark sweep: for each config x input file x sweep value."""
if repo_root is None:
repo_root = resolve_repo_root()
output_base = Path(config.output_dir)
restart = config.restart_server_every_benchmark
sweep_mode = config.sweep_mode
sweep_values = config.sweep_values
_print_banner("Multimodal Benchmark Sweep")
print(f" Model: {config.model}")
......@@ -43,10 +43,13 @@ def run_sweep(
print(f" {f}")
labels = [c.label for c in config.configs]
print(f" Configs: {labels}")
print(f" Concurrencies: {config.concurrencies}")
print(f" Sweep mode: {sweep_mode}")
print(f" Sweep values: {sweep_values}")
print(f" OSL: {config.osl}")
print(f" Requests: {config.request_count} per concurrency")
print(f" Restart every: {restart}")
print(f" Requests: {config.request_count} per {sweep_mode}")
print(
f" Restart: {'every run' if config.restart_server_every_benchmark else 'per config'}"
)
print(f" Output: {output_base}")
print(flush=True)
......@@ -54,52 +57,23 @@ def run_sweep(
env_overrides = dict(config.env) if config.env else {}
try:
for input_file in config.input_files:
file_tag = input_file_tag(input_file)
file_output_dir = output_base / file_tag
_print_banner(f"Input: {Path(input_file).name} ({file_tag})", char="#")
for bench_cfg in config.configs:
_print_banner(f"[{file_tag}] Config: {bench_cfg.label}", char="-")
workflow_abs = _resolve_workflow(bench_cfg.workflow, repo_root)
sweep_dir = file_output_dir / bench_cfg.label
if restart:
_sweep_with_restart(
server=server,
workflow_script=workflow_abs,
config=config,
bench_cfg=bench_cfg,
env_overrides=env_overrides,
input_file=input_file,
output_dir=sweep_dir,
)
else:
server.start(
workflow_script=workflow_abs,
model=config.model,
extra_args=bench_cfg.extra_args,
env_overrides=env_overrides,
)
try:
run_concurrency_sweep(
model=config.model,
port=config.port,
concurrencies=config.concurrencies,
request_count=config.request_count,
warmup_count=config.warmup_count,
input_file=input_file,
osl=config.osl,
output_dir=sweep_dir,
)
finally:
server.stop()
if not config.skip_plots:
for bench_cfg in config.configs:
_run_config(
bench_cfg=bench_cfg,
config=config,
server=server,
output_base=output_base,
sweep_mode=sweep_mode,
sweep_values=sweep_values,
env_overrides=env_overrides,
repo_root=repo_root,
)
if not config.skip_plots:
for input_file in config.input_files:
file_tag = input_file_tag(input_file)
_generate_plots_for_file(
file_output_dir,
output_base / file_tag,
[c.label for c in config.configs],
)
finally:
......@@ -109,40 +83,83 @@ def run_sweep(
_print_summary(config, output_base)
def _sweep_with_restart(
server: ServerManager,
workflow_script: str,
def _run_config(
bench_cfg: BenchmarkConfig,
config: SweepConfig,
bench_cfg,
server: ServerManager,
output_base: Path,
sweep_mode: str,
sweep_values: List[int],
env_overrides: dict,
input_file: str,
output_dir: Path,
repo_root: Path,
) -> None:
"""Run each concurrency level with a fresh server to avoid warm-cache effects."""
output_dir.mkdir(parents=True, exist_ok=True)
"""Run all sweep values for a single benchmark config."""
workflow_abs = _resolve_workflow(bench_cfg.workflow, repo_root)
_print_banner(f"Config: {bench_cfg.label}", char="#")
# Collect pending runs, skipping those with existing results.
pending_runs: List[tuple[str, str, int, Path]] = []
for input_file in config.input_files:
file_tag = input_file_tag(input_file)
sweep_dir = output_base / file_tag / bench_cfg.label
for value in sorted(sweep_values):
artifact_dir = sweep_dir / f"{sweep_mode}{value}"
if (artifact_dir / "profile_export_aiperf.json").exists():
print(
f" SKIP {bench_cfg.label} {sweep_mode}={value} "
f"(results exist in {artifact_dir})",
flush=True,
)
else:
pending_runs.append((input_file, file_tag, value, artifact_dir))
if not pending_runs:
print(f" All runs skipped for {bench_cfg.label}", flush=True)
return
for c in sorted(config.concurrencies):
if not config.restart_server_every_benchmark:
server.start(
workflow_script=workflow_script,
workflow_script=workflow_abs,
model=config.model,
extra_args=bench_cfg.extra_args,
env_overrides=env_overrides,
)
try:
run_aiperf_single(
model=config.model,
port=config.port,
concurrency=c,
request_count=config.request_count,
warmup_count=config.warmup_count,
input_file=input_file,
osl=config.osl,
artifact_dir=output_dir / f"c{c}",
try:
for input_file, file_tag, value, artifact_dir in pending_runs:
_print_banner(
f"[{file_tag}] Config: {bench_cfg.label} " f"{sweep_mode}={value}",
char="-",
)
finally:
server.stop()
print(f"Sweep complete. Results in {output_dir}", flush=True)
if config.restart_server_every_benchmark:
server.start(
workflow_script=workflow_abs,
model=config.model,
extra_args=bench_cfg.extra_args,
env_overrides=env_overrides,
)
try:
run_aiperf_single(
model=config.model,
port=config.port,
sweep_mode=sweep_mode,
sweep_value=value,
request_count=config.request_count,
warmup_count=config.warmup_count,
input_file=input_file,
osl=config.osl,
artifact_dir=artifact_dir,
)
finally:
if config.restart_server_every_benchmark:
server.stop()
finally:
if not config.restart_server_every_benchmark:
server.stop()
def _generate_plots_for_file(
......
......@@ -11,13 +11,19 @@ from typing import List
def _build_aiperf_cmd(
model: str,
port: int,
concurrency: int,
sweep_mode: str,
sweep_value: int,
request_count: int,
warmup_count: int,
input_file: str,
osl: int,
artifact_dir: Path,
) -> List[str]:
if sweep_mode == "concurrency":
sweep_flag = "--concurrency"
else:
sweep_flag = "--request-rate"
return [
"aiperf",
"profile",
......@@ -25,8 +31,8 @@ def _build_aiperf_cmd(
model,
"-u",
f"http://localhost:{port}",
"--concurrency",
str(concurrency),
sweep_flag,
str(sweep_value),
"--request-count",
str(request_count),
"--warmup-request-count",
......@@ -55,7 +61,8 @@ def _build_aiperf_cmd(
def run_aiperf_single(
model: str,
port: int,
concurrency: int,
sweep_mode: str,
sweep_value: int,
request_count: int,
warmup_count: int,
input_file: str,
......@@ -67,7 +74,8 @@ def run_aiperf_single(
cmd = _build_aiperf_cmd(
model=model,
port=port,
concurrency=concurrency,
sweep_mode=sweep_mode,
sweep_value=sweep_value,
request_count=request_count,
warmup_count=warmup_count,
input_file=input_file,
......@@ -75,7 +83,7 @@ def run_aiperf_single(
artifact_dir=artifact_dir,
)
print(f" aiperf concurrency={concurrency} -> {artifact_dir}", flush=True)
print(f" aiperf {sweep_mode}={sweep_value} -> {artifact_dir}", flush=True)
proc = subprocess.run(cmd, capture_output=True, text=True)
if proc.returncode != 0:
......@@ -88,32 +96,34 @@ def run_aiperf_single(
proc.returncode, cmd, output=proc.stdout, stderr=proc.stderr
)
print(f" aiperf concurrency={concurrency} done.", flush=True)
print(f" aiperf {sweep_mode}={sweep_value} done.", flush=True)
def run_concurrency_sweep(
def run_sweep(
model: str,
port: int,
concurrencies: List[int],
sweep_mode: str,
sweep_values: List[int],
request_count: int,
warmup_count: int,
input_file: str,
osl: int,
output_dir: Path,
) -> None:
"""Run aiperf across all concurrency levels, writing results under output_dir/c{N}/."""
"""Run aiperf across all sweep values, writing results under output_dir/{mode}{N}/."""
output_dir.mkdir(parents=True, exist_ok=True)
for c in sorted(concurrencies):
for value in sorted(sweep_values):
run_aiperf_single(
model=model,
port=port,
concurrency=c,
sweep_mode=sweep_mode,
sweep_value=value,
request_count=request_count,
warmup_count=warmup_count,
input_file=input_file,
osl=osl,
artifact_dir=output_dir / f"c{c}",
artifact_dir=output_dir / f"{sweep_mode}{value}",
)
print(f"Sweep complete. Results in {output_dir}", flush=True)
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
import sys
from pathlib import Path
# benchmarks/ is not an installed package; add repo root so
# ``from benchmarks.multimodal.sweep...`` resolves.
_repo_root = str(Path(__file__).resolve().parents[2])
if _repo_root not in sys.path:
sys.path.insert(0, _repo_root)
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
from pathlib import Path
from typing import List
from unittest.mock import MagicMock, patch
import pytest
from benchmarks.multimodal.sweep.config import BenchmarkConfig, SweepConfig
from benchmarks.multimodal.sweep.orchestrator import run_sweep
pytestmark = [pytest.mark.unit, pytest.mark.pre_merge, pytest.mark.gpu_0]
def _make_config(
tmp_path: Path,
num_configs: int = 2,
num_input_files: int = 2,
request_rates: List[int] | None = None,
restart_server_every_benchmark: bool = True,
) -> SweepConfig:
"""Create a SweepConfig with dummy workflow scripts and input files."""
if request_rates is None:
request_rates = [4, 8]
# Create dummy workflow scripts
configs: List[BenchmarkConfig] = []
for i in range(num_configs):
script = tmp_path / f"workflow_{i}.sh"
script.write_text("#!/bin/bash\necho ok")
script.chmod(0o755)
configs.append(
BenchmarkConfig(
label=f"cfg-{i}",
workflow=str(script),
extra_args=[],
)
)
# Create dummy input files
input_files: List[str] = []
for i in range(num_input_files):
f = tmp_path / f"input_{i}.jsonl"
f.write_text("{}\n")
input_files.append(str(f))
return SweepConfig(
model="test-model",
request_rates=request_rates,
concurrencies=None,
osl=10,
request_count=5,
warmup_count=1,
port=8000,
timeout=60,
input_files=input_files,
configs=configs,
output_dir=str(tmp_path / "results"),
skip_plots=True,
restart_server_every_benchmark=restart_server_every_benchmark,
)
@patch("benchmarks.multimodal.sweep.orchestrator.run_aiperf_single")
@patch("benchmarks.multimodal.sweep.orchestrator.ServerManager")
def test_loop_order_bench_cfg_outer(
mock_server_cls: MagicMock,
mock_aiperf: MagicMock,
tmp_path: Path,
) -> None:
"""bench_cfg is the outer loop, input_file middle, sweep_values inner."""
config = _make_config(tmp_path)
run_sweep(config, repo_root=tmp_path)
# Extract (label, input_file_stem, sweep_value) from each aiperf call.
calls: list[tuple[str, str, int]] = []
for c in mock_aiperf.call_args_list:
artifact_dir = Path(c.kwargs["artifact_dir"])
# Structure: results / <file_tag> / <label> / <mode><value>
label = artifact_dir.parent.name
value = c.kwargs["sweep_value"]
input_stem = Path(c.kwargs["input_file"]).stem
calls.append((label, input_stem, value))
# All cfg-0 calls come before all cfg-1 calls.
cfg0_calls = [c for c in calls if c[0] == "cfg-0"]
cfg1_calls = [c for c in calls if c[0] == "cfg-1"]
assert len(cfg0_calls) == 4 # 2 files x 2 rates
assert len(cfg1_calls) == 4
# cfg-0 block ends before cfg-1 block starts.
last_cfg0_idx = max(i for i, c in enumerate(calls) if c[0] == "cfg-0")
first_cfg1_idx = min(i for i, c in enumerate(calls) if c[0] == "cfg-1")
assert last_cfg0_idx < first_cfg1_idx
@patch("benchmarks.multimodal.sweep.orchestrator.run_aiperf_single")
@patch("benchmarks.multimodal.sweep.orchestrator.ServerManager")
def test_server_lifecycle_restart_every(
mock_server_cls: MagicMock,
mock_aiperf: MagicMock,
tmp_path: Path,
) -> None:
"""restart_server_every_benchmark=True: start/stop per run."""
config = _make_config(tmp_path, restart_server_every_benchmark=True)
mock_server = mock_server_cls.return_value
mock_server.is_running = False
run_sweep(config, repo_root=tmp_path)
total_runs = 2 * 2 * 2 # configs x files x rates
assert mock_server.start.call_count == total_runs
assert mock_server.stop.call_count == total_runs
@patch("benchmarks.multimodal.sweep.orchestrator.run_aiperf_single")
@patch("benchmarks.multimodal.sweep.orchestrator.ServerManager")
def test_server_lifecycle_restart_per_config(
mock_server_cls: MagicMock,
mock_aiperf: MagicMock,
tmp_path: Path,
) -> None:
"""restart_server_every_benchmark=False: start/stop once per config."""
config = _make_config(tmp_path, num_configs=3, restart_server_every_benchmark=False)
mock_server = mock_server_cls.return_value
mock_server.is_running = False
run_sweep(config, repo_root=tmp_path)
assert mock_server.start.call_count == 3
assert mock_server.stop.call_count == 3
@patch("benchmarks.multimodal.sweep.orchestrator.run_aiperf_single")
@patch("benchmarks.multimodal.sweep.orchestrator.ServerManager")
def test_skip_existing_results(
mock_server_cls: MagicMock,
mock_aiperf: MagicMock,
tmp_path: Path,
) -> None:
"""Runs with existing profile_export_aiperf.json are skipped."""
config = _make_config(tmp_path, num_configs=1, num_input_files=1)
mock_server = mock_server_cls.return_value
mock_server.is_running = False
# Pre-create result for rate=4
input_tag = Path(config.input_files[0]).stem
artifact_dir = tmp_path / "results" / input_tag / "cfg-0" / "request_rate4"
artifact_dir.mkdir(parents=True)
(artifact_dir / "profile_export_aiperf.json").write_text("{}")
run_sweep(config, repo_root=tmp_path)
# Only rate=8 should have run.
assert mock_aiperf.call_count == 1
assert mock_aiperf.call_args.kwargs["sweep_value"] == 8
@patch("benchmarks.multimodal.sweep.orchestrator.run_aiperf_single")
@patch("benchmarks.multimodal.sweep.orchestrator.ServerManager")
def test_skip_all_results_no_server_start(
mock_server_cls: MagicMock,
mock_aiperf: MagicMock,
tmp_path: Path,
) -> None:
"""When all runs have results, the server should never start."""
config = _make_config(
tmp_path,
num_configs=1,
num_input_files=1,
restart_server_every_benchmark=False,
)
mock_server = mock_server_cls.return_value
mock_server.is_running = False
# Pre-create results for both rates.
input_tag = Path(config.input_files[0]).stem
for rate in [4, 8]:
artifact_dir = (
tmp_path / "results" / input_tag / "cfg-0" / f"request_rate{rate}"
)
artifact_dir.mkdir(parents=True)
(artifact_dir / "profile_export_aiperf.json").write_text("{}")
run_sweep(config, repo_root=tmp_path)
assert mock_aiperf.call_count == 0
assert mock_server.start.call_count == 0
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment