"lib/vscode:/vscode.git/clone" did not exist on "4224e57da0b0239575f4c2e3acf3f11adaf102a1"
Unverified Commit 9780bf3a authored by Qi Wang's avatar Qi Wang Committed by GitHub
Browse files

perf: multimodal benchmark sweep (#6795)

parent f0bfda1e
...@@ -59,18 +59,41 @@ def sample_slots( ...@@ -59,18 +59,41 @@ def sample_slots(
num_requests: int, num_requests: int,
images_per_request: int, images_per_request: int,
) -> list[str]: ) -> list[str]:
"""Sample image slots from a fixed pool, no duplicates within each request.""" """Sample image slots from a fixed pool, no duplicates within each request.
assert (
len(pool) >= images_per_request Every image in the pool is guaranteed to appear at least once.
), f"images-pool ({len(pool)}) must be >= images-per-request ({images_per_request})" """
pool_size = len(pool)
total_slots = num_requests * images_per_request total_slots = num_requests * images_per_request
slot_refs: list[str] = [] assert (
for _ in range(num_requests): pool_size >= images_per_request
slot_refs.extend(py_rng.sample(pool, images_per_request)) ), f"images-pool ({pool_size}) must be >= images-per-request ({images_per_request})"
assert total_slots >= pool_size, (
f"total slots ({num_requests}×{images_per_request}={total_slots}) < "
f"images-pool ({pool_size}). Increase --num-requests or --images-per-request, "
f"or reduce --images-pool."
)
# Round-robin every pool image into requests so each appears at least once
shuffled = list(pool)
py_rng.shuffle(shuffled)
requests: list[list[str]] = [[] for _ in range(num_requests)]
for i, img in enumerate(shuffled):
requests[i % num_requests].append(img)
# Fill remaining slots with random pool samples (no intra-request duplicates)
for req in requests:
remaining = images_per_request - len(req)
if remaining > 0:
used = set(req)
available = [img for img in pool if img not in used]
req.extend(py_rng.sample(available, remaining))
py_rng.shuffle(req)
slot_refs = [img for req in requests for img in req]
num_unique = len(set(slot_refs)) num_unique = len(set(slot_refs))
print( print(
f"Generated {total_slots} image slots from pool of {len(pool)}: " f"Generated {total_slots} image slots from pool of {pool_size}: "
f"{num_unique} unique in use, " f"{num_unique} unique in use, "
f"{total_slots - num_unique} duplicate references " f"{total_slots - num_unique} duplicate references "
f"({(total_slots - num_unique) / total_slots:.1%} reuse)" f"({(total_slots - num_unique) / total_slots:.1%} reuse)"
......
...@@ -46,12 +46,11 @@ def main() -> None: ...@@ -46,12 +46,11 @@ def main() -> None:
) )
slot_refs = sample_slots(py_rng, pool, num_requests, images_per_request) slot_refs = sample_slots(py_rng, pool, num_requests, images_per_request)
unique_images = len(set(slot_refs))
output_path = args.output output_path = args.output
if output_path is None: if output_path is None:
output_path = ( output_path = (
Path(__file__).parent Path(__file__).parent
/ f"{num_requests}req_{images_per_request}img_{unique_images}pool_{args.user_text_tokens}word_{args.image_mode}.jsonl" / f"{num_requests}req_{images_per_request}img_{image_pool}pool_{args.user_text_tokens}word_{args.image_mode}.jsonl"
) )
with open(output_path, "w") as f: with open(output_path, "w") as f:
......
results
\ No newline at end of file
# Multimodal Benchmark Sweep
YAML-driven benchmark orchestrator that launches serving backends, runs
[aiperf](https://github.com/triton-inference-server/perf_analyzer) concurrency
sweeps, and optionally generates comparison plots.
## Quick Start
```bash
# from the repo root
python -m benchmarks.multimodal.sweep \
--config benchmarks/multimodal/sweep/experiments/embedding_cache/vllm_serve.yaml
```
## How It Works
1. Parse the YAML experiment config.
2. For each **input file** × each **benchmark config**:
- Launch the serving backend via the workflow script.
- Run `aiperf profile` at every concurrency level.
- Stop the server (by default the server restarts between concurrency
levels to avoid warm-cache bias — controlled by
`restart_server_every_benchmark`).
3. Generate comparison plots across configs for each input file.
## YAML Config Reference
```yaml
model: Qwen/Qwen3-VL-30B-A3B-Instruct-FP8
concurrencies: [16, 32, 64, 128, 256]
osl: 150 # output sequence length
request_count: 1000 # requests per concurrency level
warmup_count: 5
port: 8000
timeout: 900 # seconds to wait for server readiness
output_dir: benchmarks/multimodal/sweep/results/vllm_serve
# Optional env vars injected into the server process
env:
ENABLE_ENCODER_CACHE: "0"
# JSONL files produced by benchmarks/multimodal/jsonl/
input_files:
- benchmarks/multimodal/jsonl/1000req_1img_200pool_400word_http.jsonl
- benchmarks/multimodal/jsonl/1000req_4img_200pool_400word_http.jsonl
# Each config launches the workflow with its own extra_args
configs:
- label: cache-off
workflow: examples/backends/vllm/launch/vllm_serve_embedding_cache.sh
extra_args: [--no-enable-prefix-caching, --multimodal-embedding-cache-capacity-gb, "0"]
- label: cache-on
workflow: examples/backends/vllm/launch/vllm_serve_embedding_cache.sh
extra_args: [--no-enable-prefix-caching, --multimodal-embedding-cache-capacity-gb, "10"]
```
## CLI Overrides
Any top-level YAML field can be overridden from the command line:
```bash
python -m benchmarks.multimodal.sweep \
--config experiments/embedding_cache/vllm_serve.yaml \
--concurrencies 1,2,4 \
--osl 200 \
--request-count 50 \
--skip-plots
```
## Output Directory Structure
Given the config above with two input files and two configs (`cache-off`,
`cache-on`) at concurrencies `[16, 32]`, the output tree looks like:
```
<output_dir>/
├── 1000req_1img_200pool_400word_http/ # ← derived from input filename
│ ├── cache-off/ # ← config label
│ │ ├── c16/ # ← concurrency level
│ │ │ ├── profile_export.jsonl
│ │ │ ├── profile_export_aiperf.json
│ │ │ ├── profile_export_aiperf.csv
│ │ │ ├── gpu_telemetry_export.jsonl
│ │ │ ├── inputs.json
│ │ │ └── logs/
│ │ │ └── aiperf.log
│ │ └── c32/
│ │ └── ...
│ ├── cache-on/
│ │ ├── c16/
│ │ │ └── ...
│ │ └── c32/
│ │ └── ...
│ └── plots/ # ← comparison plots across configs
│ └── ...
└── 1000req_4img_200pool_400word_http/
├── cache-off/
│ └── ...
├── cache-on/
│ └── ...
└── plots/
└── ...
```
## Existing Experiments
| Experiment | Config | Backend |
|---|---|---|
| Embedding cache (vLLM serve) | `experiments/embedding_cache/vllm_serve.yaml` | Single-node vLLM |
| Embedding cache (vLLM E+PD) | `experiments/embedding_cache/vllm_e_pd.yaml` | Disaggregated vLLM E+PD |
| Embedding cache (TRT-LLM E+PD) | `experiments/embedding_cache/trtllm_e_pd.yaml` | Disaggregated TRT-LLM E+PD |
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
CLI entry point for the multimodal benchmark sweep.
Usage:
python -m benchmarks.multimodal.sweep --config experiment.yaml
python -m benchmarks.multimodal.sweep --config experiment.yaml --output-dir /tmp/results
python -m benchmarks.multimodal.sweep --config experiment.yaml --model MyModel --osl 200
"""
from __future__ import annotations
from .args import parse_args
from .config import load_config, resolve_repo_root
from .orchestrator import run_sweep
def main(argv=None) -> None:
args = parse_args(argv)
overrides = {k: v for k, v in vars(args).items() if k != "config" and v is not None}
config = load_config(args.config, cli_overrides=overrides or None)
repo_root = resolve_repo_root()
config.validate(repo_root=repo_root)
run_sweep(config, repo_root=repo_root)
if __name__ == "__main__":
main()
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import argparse
from typing import List
def _parse_concurrencies(value: str) -> List[int]:
return [int(x.strip()) for x in value.split(",")]
def parse_args(argv=None) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Run a multimodal benchmark sweep from a YAML config.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"Examples:\n"
" python -m benchmarks.multimodal.sweep --config experiments/cache_sweep.yaml\n"
" python -m benchmarks.multimodal.sweep --config exp.yaml --osl 200 --skip-plots\n"
),
)
parser.add_argument(
"--config",
required=True,
help="Path to YAML experiment config file.",
)
parser.add_argument(
"--output-dir",
default=None,
help="Override output directory from config.",
)
parser.add_argument(
"--model",
default=None,
help="Override model name from config.",
)
parser.add_argument(
"--concurrencies",
type=_parse_concurrencies,
default=None,
help="Override concurrency levels (comma-separated, e.g. '1,2,4,8').",
)
parser.add_argument(
"--osl",
type=int,
default=None,
help="Override output sequence length.",
)
parser.add_argument(
"--request-count",
type=int,
default=None,
help="Override request count per concurrency level.",
)
parser.add_argument(
"--skip-plots",
action="store_true",
default=None,
help="Skip plot generation.",
)
return parser.parse_args(argv)
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
import yaml
@dataclass
class BenchmarkConfig:
"""A single benchmark configuration: a workflow script + arguments."""
label: str
workflow: str
extra_args: List[str] = field(default_factory=list)
@dataclass
class SweepConfig:
"""Top-level sweep configuration loaded from YAML with optional CLI overrides."""
model: str = "Qwen/Qwen3-VL-30B-A3B-Instruct-FP8"
concurrencies: List[int] = field(default_factory=lambda: [1, 2, 4, 8, 16, 32])
osl: int = 150
request_count: int = 1000
warmup_count: int = 5
port: int = 8000
timeout: int = 600
input_files: List[str] = field(default_factory=list)
configs: List[BenchmarkConfig] = field(default_factory=list)
output_dir: str = "benchmarks/results/multimodal_default"
skip_plots: bool = False
restart_server_every_benchmark: bool = True
env: Dict[str, str] = field(default_factory=dict)
def validate(self, repo_root: Optional[Path] = None) -> None:
"""Validate that all referenced files and scripts exist."""
if not self.input_files:
raise ValueError("At least one input_file is required.")
if not self.configs:
raise ValueError("At least one benchmark config is required.")
for f in self.input_files:
if not Path(f).is_file():
raise FileNotFoundError(f"Input file not found: {f}")
for cfg in self.configs:
script = Path(cfg.workflow)
if repo_root and not script.is_absolute():
script = repo_root / script
if not script.is_file():
raise FileNotFoundError(
f"Workflow script not found: {script} (config '{cfg.label}')"
)
if not self.concurrencies:
raise ValueError("At least one concurrency level is required.")
def _parse_benchmark_config(raw: Dict[str, Any]) -> BenchmarkConfig:
return BenchmarkConfig(
label=raw["label"],
workflow=raw["workflow"],
extra_args=[str(a) for a in raw.get("extra_args", [])],
)
def load_config(
yaml_path: str,
cli_overrides: Optional[Dict[str, Any]] = None,
) -> SweepConfig:
"""Load a SweepConfig from a YAML file, applying optional CLI overrides."""
with open(yaml_path) as f:
raw = yaml.safe_load(f)
defaults = SweepConfig()
configs = [_parse_benchmark_config(c) for c in raw.get("configs", [])]
cfg = SweepConfig(
model=raw.get("model", defaults.model),
concurrencies=raw.get("concurrencies", defaults.concurrencies),
osl=raw.get("osl", defaults.osl),
request_count=raw.get("request_count", defaults.request_count),
warmup_count=raw.get("warmup_count", defaults.warmup_count),
port=raw.get("port", defaults.port),
timeout=raw.get("timeout", defaults.timeout),
input_files=raw.get("input_files", []),
configs=configs,
output_dir=raw.get("output_dir", defaults.output_dir),
skip_plots=raw.get("skip_plots", False),
restart_server_every_benchmark=raw.get(
"restart_server_every_benchmark",
defaults.restart_server_every_benchmark,
),
env=raw.get("env", {}),
)
if cli_overrides:
for key, value in cli_overrides.items():
if value is None:
continue
if hasattr(cfg, key):
setattr(cfg, key, value)
return cfg
def input_file_tag(path: str) -> str:
"""Derive a short directory-safe tag from a JSONL filename."""
return Path(path).stem.replace(" ", "_")
def resolve_repo_root() -> Path:
"""Walk up from CWD looking for pyproject.toml to find the repo root."""
candidate = Path(os.getcwd()).resolve()
while candidate != candidate.parent:
if (candidate / "pyproject.toml").is_file():
return candidate
candidate = candidate.parent
return Path(os.getcwd()).resolve()
#!/bin/bash
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# 1 Encode + 1 PD worker (disaggregated E + PD)
# GPU 0: Encode (vision encoder)
# GPU 0: PD worker (prefill + decode, TP=1)
#
# Usage:
# bash trtllm_e_pd.sh
# bash trtllm_e_pd.sh --multimodal-embedding-cache-capacity-gb 10
# Environment variables with defaults
export DYNAMO_HOME=${DYNAMO_HOME:-"/workspace"}
export MODEL_PATH="/huggingface/local/llava-v1.6-mistral-7b-hf-pinned"
export SERVED_MODEL_NAME=${SERVED_MODEL_NAME:-"llava-v1.6-mistral-7b-hf"}
export PREFILL_ENGINE_ARGS=${PREFILL_ENGINE_ARGS:-"$DYNAMO_HOME/examples/backends/trtllm/engine_configs/llava-v1.6-mistral-7b-hf/prefill.yaml"}
export DECODE_ENGINE_ARGS=${DECODE_ENGINE_ARGS:-"$DYNAMO_HOME/examples/backends/trtllm/engine_configs/llava-v1.6-mistral-7b-hf/decode.yaml"}
export ENCODE_ENGINE_ARGS=${ENCODE_ENGINE_ARGS:-"$DYNAMO_HOME/examples/backends/trtllm/engine_configs/llava-v1.6-mistral-7b-hf/encode.yaml"}
export ENCODE_ENDPOINT=${ENCODE_ENDPOINT:-"dyn://dynamo.tensorrt_llm_encode.generate"}
export MODALITY=${MODALITY:-"multimodal"}
export ALLOWED_LOCAL_MEDIA_PATH=${ALLOWED_LOCAL_MEDIA_PATH:-"/tmp"}
export MAX_FILE_SIZE_MB=${MAX_FILE_SIZE_MB:-50}
export CUSTOM_TEMPLATE=${CUSTOM_TEMPLATE:-"$DYNAMO_HOME/examples/backends/trtllm/templates/llava_multimodal.jinja"}
# Extra arguments forwarded to the PD worker (e.g. --multimodal-embedding-cache-capacity-gb 10)
EXTRA_PD_ARGS=("$@")
# Setup cleanup trap
cleanup() {
echo "Cleaning up background processes..."
kill $DYNAMO_PID $ENCODE_PID $PD_PID_1 2>/dev/null || true
wait $DYNAMO_PID $ENCODE_PID $PD_PID_1 2>/dev/null || true
echo "Cleanup complete."
}
trap cleanup EXIT INT TERM
# run frontend
# dynamo.frontend accepts either --http-port flag or DYN_HTTP_PORT env var (defaults to 8000)
python3 -m dynamo.frontend &
DYNAMO_PID=$!
# run encode worker (vision encoder on GPU 0)
CUDA_VISIBLE_DEVICES=0 python3 -m dynamo.trtllm \
--model-path "$MODEL_PATH" \
--extra-engine-args "$ENCODE_ENGINE_ARGS" \
--modality "$MODALITY" \
--allowed-local-media-path "$ALLOWED_LOCAL_MEDIA_PATH" \
--max-file-size-mb "$MAX_FILE_SIZE_MB" \
--custom-jinja-template "$CUSTOM_TEMPLATE" \
--disaggregation-mode encode &
ENCODE_PID=$!
# run PD worker 1 (GPU 1)
CUDA_VISIBLE_DEVICES=1 python3 -m dynamo.trtllm \
--model-path "$MODEL_PATH" \
--extra-engine-args "$PD_ENGINE_ARGS" \
--modality "$MODALITY" \
--encode-endpoint "$ENCODE_ENDPOINT" \
--disaggregation-mode prefill_and_decode \
--custom-jinja-template "$CUSTOM_TEMPLATE" \
"${EXTRA_PD_ARGS[@]}" &
PD_PID_1=$!
wait $DYNAMO_PID
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# Full 3-way comparison: cache-off vs cache-on vs disaggregated EPD.
#
# Usage:
# python -m benchmarks.multimodal.sweep --config benchmarks/multimodal/sweep/experiments/embedding_cache/trtllm_e_pd.yaml
model: Qwen/Qwen3-VL-2B-Instruct
concurrencies: [16, 32, 64, 128, 256]
osl: 150
request_count: 1000
warmup_count: 5
port: 8000
timeout: 900
output_dir: benchmarks/results/embedding_cache/trtllm_e_pd
input_files:
- benchmarks/multimodal/jsonl/1000req_1img_200pool_400word_http.jsonl
- benchmarks/multimodal/jsonl/1000req_1img_800pool_400word_http.jsonl
configs:
- label: cache-off
workflow: benchmarks/multimodal/sweep/experiments/embedding_cache/trtllm_e_pd.sh
extra_args: [--multimodal-embedding-cache-capacity-gb, "0"]
- label: cache-on
workflow: benchmarks/multimodal/sweep/experiments/embedding_cache/trtllm_e_pd.sh
extra_args: [--multimodal-embedding-cache-capacity-gb, "10"]
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# Full 3-way comparison: cache-off vs cache-on vs disaggregated EPD.
#
# Usage:
# python -m benchmarks.multimodal.sweep --config benchmarks/multimodal/sweep/experiments/embedding_cache/full_comparison.yaml
model: Qwen/Qwen3-VL-30B-A3B-Instruct-FP8
concurrencies: [16, 32, 64, 128, 256]
osl: 150
request_count: 1000
warmup_count: 5
port: 8000
timeout: 900
output_dir: benchmarks/multimodal/sweep/results/vllm_e_pd
env:
ENABLE_ENCODER_CACHE: "0"
input_files:
- benchmarks/multimodal/jsonl/1000req_1img_200pool_400word_http.jsonl
- benchmarks/multimodal/jsonl/1000req_2img_200pool_400word_http.jsonl
- benchmarks/multimodal/jsonl/1000req_4img_200pool_400word_http.jsonl
- benchmarks/multimodal/jsonl/1000req_1img_800pool_400word_http.jsonl
- benchmarks/multimodal/jsonl/1000req_2img_1600pool_400word_http.jsonl
- benchmarks/multimodal/jsonl/1000req_4img_3200pool_400word_http.jsonl
configs:
- label: cache-off
workflow: examples/backends/vllm/launch/disagg_multimodal_e_pd.sh
extra_args: [--no-enable-prefix-caching, --multimodal-embedding-cache-capacity-gb, "0"]
- label: cache-on
workflow: examples/backends/vllm/launch/disagg_multimodal_e_pd.sh
extra_args: [--no-enable-prefix-caching, --multimodal-embedding-cache-capacity-gb, "10"]
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# Embedding cache ON vs OFF on a single vllm_serve workflow.
#
# Usage:
# python -m benchmarks.multimodal.sweep --config benchmarks/multimodal/sweep/experiments/embedding_cache/cache_on_off.yaml
model: Qwen/Qwen3-VL-30B-A3B-Instruct-FP8
concurrencies: [16, 32, 64, 128, 256]
osl: 150
request_count: 1000
warmup_count: 5
port: 8000
timeout: 900
output_dir: benchmarks/multimodal/sweep/results/vllm_serve
env:
ENABLE_ENCODER_CACHE: "0"
input_files:
- benchmarks/multimodal/jsonl/1000req_1img_200pool_400word_base64.jsonl
- benchmarks/multimodal/jsonl/1000req_2img_200pool_400word_base64.jsonl
- benchmarks/multimodal/jsonl/1000req_4img_200pool_400word_base64.jsonl
- benchmarks/multimodal/jsonl/1000req_2img_200pool_400word_http.jsonl
- benchmarks/multimodal/jsonl/1000req_4img_200pool_400word_http.jsonl
- benchmarks/multimodal/jsonl/1000req_1img_800pool_400word_http.jsonl
- benchmarks/multimodal/jsonl/1000req_2img_1600pool_400word_http.jsonl
- benchmarks/multimodal/jsonl/1000req_4img_3200pool_400word_http.jsonl
configs:
- label: cache-off
workflow: examples/backends/vllm/launch/vllm_serve_embedding_cache.sh
extra_args: [--no-enable-prefix-caching, --multimodal-embedding-cache-capacity-gb, "0"]
- label: cache-on
workflow: examples/backends/vllm/launch/vllm_serve_embedding_cache.sh
extra_args: [--no-enable-prefix-caching, --multimodal-embedding-cache-capacity-gb, "10"]
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
from pathlib import Path
from typing import List, Optional
from .config import SweepConfig, input_file_tag, resolve_repo_root
from .runner import run_aiperf_single, run_concurrency_sweep
from .server import ServerManager
def _resolve_workflow(workflow: str, repo_root: Path) -> str:
p = Path(workflow)
if p.is_absolute():
return str(p)
return str(repo_root / p)
def _print_banner(title: str, char: str = "=", width: int = 70) -> None:
print(f"\n{char * width}")
print(f" {title}")
print(f"{char * width}", flush=True)
def run_sweep(
config: SweepConfig,
repo_root: Optional[Path] = None,
) -> None:
"""Execute the full benchmark sweep: for each input file x benchmark config."""
if repo_root is None:
repo_root = resolve_repo_root()
output_base = Path(config.output_dir)
restart = config.restart_server_every_benchmark
_print_banner("Multimodal Benchmark Sweep")
print(f" Model: {config.model}")
print(f" Input files: {len(config.input_files)}")
for f in config.input_files:
print(f" {f}")
labels = [c.label for c in config.configs]
print(f" Configs: {labels}")
print(f" Concurrencies: {config.concurrencies}")
print(f" OSL: {config.osl}")
print(f" Requests: {config.request_count} per concurrency")
print(f" Restart every: {restart}")
print(f" Output: {output_base}")
print(flush=True)
server = ServerManager(port=config.port, timeout=config.timeout)
env_overrides = dict(config.env) if config.env else {}
try:
for input_file in config.input_files:
file_tag = input_file_tag(input_file)
file_output_dir = output_base / file_tag
_print_banner(f"Input: {Path(input_file).name} ({file_tag})", char="#")
for bench_cfg in config.configs:
_print_banner(f"[{file_tag}] Config: {bench_cfg.label}", char="-")
workflow_abs = _resolve_workflow(bench_cfg.workflow, repo_root)
sweep_dir = file_output_dir / bench_cfg.label
if restart:
_sweep_with_restart(
server=server,
workflow_script=workflow_abs,
config=config,
bench_cfg=bench_cfg,
env_overrides=env_overrides,
input_file=input_file,
output_dir=sweep_dir,
)
else:
server.start(
workflow_script=workflow_abs,
model=config.model,
extra_args=bench_cfg.extra_args,
env_overrides=env_overrides,
)
try:
run_concurrency_sweep(
model=config.model,
port=config.port,
concurrencies=config.concurrencies,
request_count=config.request_count,
warmup_count=config.warmup_count,
input_file=input_file,
osl=config.osl,
output_dir=sweep_dir,
)
finally:
server.stop()
if not config.skip_plots:
_generate_plots_for_file(
file_output_dir,
[c.label for c in config.configs],
)
finally:
if server.is_running:
server.stop()
_print_summary(config, output_base)
def _sweep_with_restart(
server: ServerManager,
workflow_script: str,
config: SweepConfig,
bench_cfg,
env_overrides: dict,
input_file: str,
output_dir: Path,
) -> None:
"""Run each concurrency level with a fresh server to avoid warm-cache effects."""
output_dir.mkdir(parents=True, exist_ok=True)
for c in sorted(config.concurrencies):
server.start(
workflow_script=workflow_script,
model=config.model,
extra_args=bench_cfg.extra_args,
env_overrides=env_overrides,
)
try:
run_aiperf_single(
model=config.model,
port=config.port,
concurrency=c,
request_count=config.request_count,
warmup_count=config.warmup_count,
input_file=input_file,
osl=config.osl,
artifact_dir=output_dir / f"c{c}",
)
finally:
server.stop()
print(f"Sweep complete. Results in {output_dir}", flush=True)
def _generate_plots_for_file(
file_output_dir: Path,
labels: List[str],
) -> None:
"""Generate comparison plots for one input file across all configs."""
try:
from benchmarks.utils.plot import generate_plots
plots_dir = file_output_dir / "plots"
print(f"\nGenerating plots -> {plots_dir}", flush=True)
generate_plots(
base_output_dir=file_output_dir,
output_dir=plots_dir,
benchmark_names=labels,
)
except ImportError:
print(
"WARNING: benchmarks.utils.plot not importable; skipping plots.",
flush=True,
)
except Exception as exc:
print(f"WARNING: Plot generation failed: {exc}", flush=True)
def _print_summary(config: SweepConfig, output_base: Path) -> None:
_print_banner("Sweep Complete!")
print(f" Results: {output_base}")
for input_file in config.input_files:
tag = input_file_tag(input_file)
print(f" [{tag}]:")
for cfg in config.configs:
result_dir = output_base / tag / cfg.label
print(f" {cfg.label}: {result_dir}")
if not config.skip_plots:
plots_dir = output_base / tag / "plots"
print(f" plots: {plots_dir}")
print(flush=True)
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import subprocess
from pathlib import Path
from typing import List
def _build_aiperf_cmd(
model: str,
port: int,
concurrency: int,
request_count: int,
warmup_count: int,
input_file: str,
osl: int,
artifact_dir: Path,
) -> List[str]:
return [
"aiperf",
"profile",
"-m",
model,
"-u",
f"http://localhost:{port}",
"--concurrency",
str(concurrency),
"--request-count",
str(request_count),
"--warmup-request-count",
str(warmup_count),
"--input-file",
input_file,
"--custom-dataset-type",
"single_turn",
"--extra-inputs",
f"max_tokens:{osl}",
"--extra-inputs",
f"min_tokens:{osl}",
"--extra-inputs",
"ignore_eos:true",
"--extra-inputs",
"stream:true",
"--streaming",
"--artifact-dir",
str(artifact_dir),
"--ui",
"none",
"--no-server-metrics",
]
def run_aiperf_single(
model: str,
port: int,
concurrency: int,
request_count: int,
warmup_count: int,
input_file: str,
osl: int,
artifact_dir: Path,
) -> None:
"""Run a single aiperf profile invocation."""
artifact_dir.mkdir(parents=True, exist_ok=True)
cmd = _build_aiperf_cmd(
model=model,
port=port,
concurrency=concurrency,
request_count=request_count,
warmup_count=warmup_count,
input_file=input_file,
osl=osl,
artifact_dir=artifact_dir,
)
print(f" aiperf concurrency={concurrency} -> {artifact_dir}", flush=True)
proc = subprocess.run(cmd, capture_output=True, text=True)
if proc.returncode != 0:
print(f" aiperf FAILED (exit {proc.returncode})", flush=True)
for stream_name, stream in [("stderr", proc.stderr), ("stdout", proc.stdout)]:
if stream:
for line in stream.strip().splitlines()[-15:]:
print(f" [{stream_name}] {line}", flush=True)
raise subprocess.CalledProcessError(
proc.returncode, cmd, output=proc.stdout, stderr=proc.stderr
)
print(f" aiperf concurrency={concurrency} done.", flush=True)
def run_concurrency_sweep(
model: str,
port: int,
concurrencies: List[int],
request_count: int,
warmup_count: int,
input_file: str,
osl: int,
output_dir: Path,
) -> None:
"""Run aiperf across all concurrency levels, writing results under output_dir/c{N}/."""
output_dir.mkdir(parents=True, exist_ok=True)
for c in sorted(concurrencies):
run_aiperf_single(
model=model,
port=port,
concurrency=c,
request_count=request_count,
warmup_count=warmup_count,
input_file=input_file,
osl=osl,
artifact_dir=output_dir / f"c{c}",
)
print(f"Sweep complete. Results in {output_dir}", flush=True)
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import os
import signal
import subprocess
import time
from pathlib import Path
from typing import List, Optional
class ServerManager:
"""Manages the lifecycle of a serving backend launched via a bash script.
Uses ``setsid`` so the server gets its own process group, allowing clean
shutdown without killing the orchestrator.
"""
def __init__(self, port: int = 8000, timeout: int = 600) -> None:
self.port = port
self.timeout = timeout
self._process: Optional[subprocess.Popen] = None
@property
def is_running(self) -> bool:
return self._process is not None and self._process.poll() is None
def start(
self,
workflow_script: str,
model: str,
extra_args: Optional[List[str]] = None,
env_overrides: Optional[dict] = None,
) -> None:
"""Launch the workflow script and block until the model is served."""
if self.is_running:
raise RuntimeError("Server is already running. Call stop() first.")
script = Path(workflow_script)
if not script.is_file():
raise FileNotFoundError(f"Workflow script not found: {script}")
model_flag = "--model-path" if "trtllm" in str(script) else "--model"
cmd = ["bash", str(script), model_flag, model]
if extra_args:
cmd.extend(extra_args)
env = os.environ.copy()
if env_overrides:
env.update(env_overrides)
print(f"Launching: {' '.join(cmd)}", flush=True)
self._process = subprocess.Popen(
cmd,
start_new_session=True,
env=env,
)
self.wait_for_ready(model)
def wait_for_ready(self, model: str) -> None:
"""Poll /v1/models until the expected model name appears."""
import urllib.error
import urllib.request
url = f"http://localhost:{self.port}/v1/models"
deadline = time.monotonic() + self.timeout
print(
f"Waiting for server at {url} to list model '{model}' "
f"(timeout: {self.timeout}s)...",
flush=True,
)
while time.monotonic() < deadline:
if not self.is_running:
raise RuntimeError(
"Server process exited unexpectedly during startup "
f"(exit code {self._process.returncode})."
)
try:
req = urllib.request.Request(url)
with urllib.request.urlopen(req, timeout=5) as resp:
body = resp.read().decode()
if model in body:
print("Server is ready (model registered).", flush=True)
return
except (urllib.error.URLError, OSError, TimeoutError):
pass
time.sleep(5)
self.stop()
raise TimeoutError(f"Server did not become ready within {self.timeout}s")
def stop(self) -> None:
"""Stop the server by killing its process group."""
if self._process is None:
return
pid = self._process.pid
print(f"Stopping server (PID {pid})...", flush=True)
try:
os.killpg(pid, signal.SIGTERM)
except (ProcessLookupError, PermissionError):
try:
self._process.terminate()
except (ProcessLookupError, PermissionError):
pass
try:
self._process.wait(timeout=15)
except subprocess.TimeoutExpired:
try:
os.killpg(pid, signal.SIGKILL)
except (ProcessLookupError, PermissionError):
pass
self._process.wait(timeout=5)
print(f"Server stopped (PID {pid}).", flush=True)
self._process = None
time.sleep(5)
...@@ -13,7 +13,6 @@ ...@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import asyncio
import logging import logging
import time import time
from io import BytesIO from io import BytesIO
...@@ -23,7 +22,6 @@ from urllib.parse import urlparse ...@@ -23,7 +22,6 @@ from urllib.parse import urlparse
from urllib.request import urlopen from urllib.request import urlopen
import torch import torch
from tensorrt_llm.inputs import default_multimodal_input_loader
from tensorrt_llm.llmapi.tokenizer import tokenizer_factory from tensorrt_llm.llmapi.tokenizer import tokenizer_factory
from dynamo.common.multimodal.image_loader import ImageLoader from dynamo.common.multimodal.image_loader import ImageLoader
...@@ -189,11 +187,6 @@ class MultimodalRequestProcessor: ...@@ -189,11 +187,6 @@ class MultimodalRequestProcessor:
"prompt": str, "prompt": str,
"prompt_token_ids": List[int] "prompt_token_ids": List[int]
} }
-----------------------------------------------------------------------------
TODO: Revert default_multimodal_input_loader calls having fixed TRT-LLM's
token IDs & MM data path in generate_async() for the embeddings case.
-----------------------------------------------------------------------------
""" """
self.previous_decoded_text = "" self.previous_decoded_text = ""
...@@ -221,50 +214,18 @@ class MultimodalRequestProcessor: ...@@ -221,50 +214,18 @@ class MultimodalRequestProcessor:
# mm_processor_kwargs must be a dict (not None) for TRT-LLM's processor # mm_processor_kwargs must be a dict (not None) for TRT-LLM's processor
processed_inputs = {"prompt_token_ids": token_ids, "mm_processor_kwargs": {}} processed_inputs = {"prompt_token_ids": token_ids, "mm_processor_kwargs": {}}
# The aforementioned fallback to default_multimodal_input_loader: # EPD Flow Case 2: Embeddings received via NIXL from encode worker
messages = request.get("extra_args", {}).get(
"messages", request.get("messages", [])
)
text_prompt, _, embedding_paths = self.extract_prompt_and_media(messages)
loader_kwargs = {}
# Two cases, both for the default_multimodal_input_loader fallback:
# 1) EPD Flow Case 2: Embeddings received via NIXL from encode worker
# The encode worker computed vision embeddings and transferred them via RDMA/NIXL # The encode worker computed vision embeddings and transferred them via RDMA/NIXL
# We need to pass these embeddings directly to TRT-LLM's generate_async # We need to pass these embeddings directly to TRT-LLM's generate_async
# 2) PD flow with no NIXL and no encoder
if embeddings is not None or embedding_paths:
if embeddings is not None: if embeddings is not None:
logging.info( logging.info(
f"Using NIXL embeddings from encoder: shape={embeddings.shape if hasattr(embeddings, 'shape') else 'N/A'}" f"Using NIXL embeddings from encoder: shape={embeddings.shape if hasattr(embeddings, 'shape') else 'N/A'}"
) )
loader_kwargs["mm_embeddings"] = [embeddings]
elif embedding_paths: # Structure embeddings in the format TRT-LLM's generate_async expects
# PD flow with no NIXL and no encoder processed_inputs["multi_modal_embeddings"] = embeddings
loader_kwargs["mm_embeddings"] = [
self.load_tensor_from_path_or_url(path) for path in embedding_paths return processed_inputs
]
logging.info(f"Using embedding paths: {embedding_paths}")
# NOTE: default_multimodal_input_loader downloads images and preprocesses them
# synchronously. Wrap in asyncio.to_thread to allow concurrent image loading
# across multiple requests, improving throughput at high concurrency.
fallback_processed_inputs = await asyncio.to_thread(
lambda: default_multimodal_input_loader(
tokenizer=self.tokenizer,
model_dir=self.model_dir,
model_type=self.model_type,
modality=self.modality,
prompts=[text_prompt],
image_data_format="pt",
device="cuda",
**loader_kwargs,
)
)
# Return the first processed input if available
if fallback_processed_inputs:
return fallback_processed_inputs[0]
return None
# PD Flow: Pre-tokenized by Rust frontend with direct media loading # PD Flow: Pre-tokenized by Rust frontend with direct media loading
# TODO: Add frontend decoding support # TODO: Add frontend decoding support
...@@ -279,6 +240,7 @@ class MultimodalRequestProcessor: ...@@ -279,6 +240,7 @@ class MultimodalRequestProcessor:
if image_items and isinstance(image_items, list): if image_items and isinstance(image_items, list):
# Separate embedding paths from regular image URLs # Separate embedding paths from regular image URLs
# Items come from Rust in format: {"Url": "..."} or {"Decoded": ...} # Items come from Rust in format: {"Url": "..."} or {"Decoded": ...}
embedding_paths = []
image_urls = [] image_urls = []
for item in image_items: for item in image_items:
...@@ -299,7 +261,9 @@ class MultimodalRequestProcessor: ...@@ -299,7 +261,9 @@ class MultimodalRequestProcessor:
continue continue
# Check if this is an embedding file based on extension # Check if this is an embedding file based on extension
if not url.endswith((".pt", ".pth", ".bin")): if url.endswith((".pt", ".pth", ".bin")):
embedding_paths.append(url)
else:
# Keep original item format for load_image_batch # Keep original item format for load_image_batch
image_urls.append( image_urls.append(
item if isinstance(item, dict) else {"Url": item} item if isinstance(item, dict) else {"Url": item}
...@@ -321,6 +285,23 @@ class MultimodalRequestProcessor: ...@@ -321,6 +285,23 @@ class MultimodalRequestProcessor:
logging.error(f"Failed to load images: {e}") logging.error(f"Failed to load images: {e}")
return None return None
# Load embedding files (.pt, .pth, .bin) for PD flow
# These are pre-computed vision encoder outputs
if embedding_paths:
try:
loaded_embeddings = [
self.load_tensor_from_path_or_url(path)
for path in embedding_paths
]
if loaded_embeddings:
processed_mm_data["embedding"] = loaded_embeddings
logging.info(
f"Loaded {len(loaded_embeddings)} embedding file(s) from paths: {embedding_paths}"
)
except Exception as e:
logging.error(f"Failed to load embeddings: {e}")
return None
# TODO: Add support for video_url, audio_url # TODO: Add support for video_url, audio_url
if processed_mm_data: if processed_mm_data:
......
...@@ -110,8 +110,6 @@ def _accumulate_embeddings( ...@@ -110,8 +110,6 @@ def _accumulate_embeddings(
) )
) )
else: else:
# Plain tensor embeddings
logger.info(f"Get embedding of shape {mm_data['image'].shape}")
# [gluo FIXME] embedding with multiple images? # [gluo FIXME] embedding with multiple images?
if multi_modal_data["image"] == []: if multi_modal_data["image"] == []:
multi_modal_data["image"] = mm_data["image"] multi_modal_data["image"] = mm_data["image"]
...@@ -250,11 +248,6 @@ async def _fetch_embeddings( ...@@ -250,11 +248,6 @@ async def _fetch_embeddings(
# ── 2. Fetch uncached from encode workers ──────────────────────── # ── 2. Fetch uncached from encode workers ────────────────────────
pending: _PendingRelease | None = None pending: _PendingRelease | None = None
if to_fetch: if to_fetch:
if cache is not None:
logger.info(
f"[{request_id}] Cache miss for {len(to_fetch)}/{len(image_urls)} URLs, "
"fetching from encode workers"
)
miss_urls = [url for _, url, _ in to_fetch] miss_urls = [url for _, url, _ in to_fetch]
groups, pending = await _fetch_from_encode_workers( groups, pending = await _fetch_from_encode_workers(
encode_worker_client, encode_worker_client,
...@@ -274,8 +267,6 @@ async def _fetch_embeddings( ...@@ -274,8 +267,6 @@ async def _fetch_embeddings(
), ),
) )
results[idx] = group results[idx] = group
else:
logger.info(f"[{request_id}] All {len(image_urls)} URLs served from cache")
return [r for r in results if r is not None], pending return [r for r in results if r is not None], pending
......
...@@ -17,7 +17,6 @@ enable_attention_dp: false ...@@ -17,7 +17,6 @@ enable_attention_dp: false
max_num_tokens: 2048 max_num_tokens: 2048
max_batch_size: 8 max_batch_size: 8
max_seq_len: 8192 max_seq_len: 8192
trust_remote_code: true
backend: pytorch backend: pytorch
enable_chunked_prefill: true enable_chunked_prefill: true
......
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
tensor_parallel_size: 1
enable_attention_dp: false
max_num_tokens: 2048
max_batch_size: 8
max_seq_len: 16384
trust_remote_code: true
backend: pytorch
enable_chunked_prefill: true
kv_cache_config:
free_gpu_memory_fraction: 0.95
enable_block_reuse: false
cache_transceiver_config:
backend: DEFAULT
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
tensor_parallel_size: 1
enable_attention_dp: false
max_num_tokens: 1024
max_batch_size: 4
trust_remote_code: true
backend: pytorch
enable_chunked_prefill: true
# Overlap scheduler not currently supported in prefill only workers.
disable_overlap_scheduler: true
# Note: Encode workers use MultimodalEncoder (vision encoder + projector only),
# which ignores most engine_args. No kv_cache_config or cache_transceiver_config
# is needed since MultimodalEncoder doesn't allocate KV cache or transfer buffers.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment