Unverified Commit d4e30a59 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

test(mocker): cut replay helper sleeps (#7577)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 1c65a588
......@@ -9,9 +9,9 @@ This guide covers the mocker's trace replay support for Mooncake-style JSONL tra
surface is available in two forms:
- `python -m dynamo.mocker --trace-file ...`, which writes a report file and prints a replay summary
- `python -m dynamo.replay ...`, which returns the replay report JSON on stdout and exposes
`offline|online`, `round_robin|kv_router`, `arrival_speedup_ratio`, and synthetic replay inputs
directly
- `python -m dynamo.replay ...`, which prints an AIPerf-style summary table, writes the full
replay report JSON to disk, and exposes `offline|online`, `round_robin|kv_router`,
`arrival_speedup_ratio`, and synthetic replay inputs directly
Unlike normal `dynamo.mocker` usage, offline replay does not launch workers, register endpoints, or
require NATS, etcd, or a frontend. Online replay does exercise the live mock-worker runtime path.
......@@ -31,7 +31,8 @@ python -m dynamo.replay /path/to/mooncake_trace.jsonl \
--num-workers 4 \
--replay-mode offline \
--router-mode round_robin \
--extra-engine-args /path/to/mocker_args.json
--extra-engine-args '{"block_size":512,"speedup_ratio":1000.0}' \
--report-json /tmp/replay-report.json
```
Run synthetic replay through the same CLI when you want fixed request shapes without a trace file:
......@@ -45,7 +46,8 @@ python -m dynamo.replay \
--num-workers 1 \
--replay-mode offline \
--replay-concurrency 100 \
--extra-engine-args /path/to/mocker_args.json
--extra-engine-args '{"block_size":512,"speedup_ratio":1000.0}' \
--report-json /tmp/replay-report.json
```
You can also run replay through the mocker CLI by passing `--trace-file`:
......@@ -62,8 +64,9 @@ This writes a JSON report next to the trace file by default:
/path/to/mooncake_trace.replay.json
```
`python -m dynamo.replay` prints the replay report JSON directly to stdout. The mocker CLI prints a
`Replay Summary` table to stdout and writes the report JSON to disk.
`python -m dynamo.replay` prints an AIPerf-style summary table to stdout and writes the full replay
report JSON to disk. The mocker CLI prints a `Replay Summary` table to stdout and writes the report
JSON to disk.
## Input Format
......@@ -96,15 +99,13 @@ The dedicated replay CLI exposes:
- either a positional `trace_file`, or all of `--input-tokens`, `--output-tokens`, and `--request-count`
- `--replay-mode offline|online`
- `--router-mode round_robin|kv_router`
- `--router-queue-policy fcfs|wspt|lcfs`
- `--num-workers`
- `--replay-concurrency`
- `--arrival-interval-ms`
- `--arrival-speedup-ratio`
- `--extra-engine-args`
- `--extra-engine-args-json`
- `--router-config`
- `--router-config-json`
- `--extra-engine-args` (JSON string)
- `--router-config` (JSON string)
- `--report-json`
Example:
......@@ -114,8 +115,9 @@ python -m dynamo.replay /path/to/mooncake_trace.jsonl \
--router-mode kv_router \
--num-workers 4 \
--arrival-speedup-ratio 10 \
--extra-engine-args-json '{"block_size":64,"speedup_ratio":1000.0}' \
--router-config-json '{"router_queue_policy":"fcfs","router_temperature":0.0}'
--extra-engine-args '{"block_size":512,"speedup_ratio":1000.0}' \
--router-config '{"router_queue_policy":"fcfs","router_temperature":0.0}' \
--report-json /tmp/replay-report.json
```
SGLang replay uses the same CLI surface. A minimal extra-engine-args file can use either
......@@ -132,9 +134,9 @@ SGLang replay uses the same CLI surface. A minimal extra-engine-args file can us
}
```
For both `--extra-engine-args-json` and `--router-config-json`, replay accepts partial JSON
objects. Unspecified fields fall back to the same defaults used by `MockEngineArgs::default()`
and `KvRouterConfig::default()`.
Both `--extra-engine-args` and `--router-config` accept partial JSON objects. Unspecified fields
fall back to the same defaults used by `MockEngineArgs::default()` and
`KvRouterConfig::default()`.
### `python -m dynamo.mocker --trace-file`
......@@ -154,7 +156,7 @@ python -m dynamo.replay \
--arrival-interval-ms 0.5 \
--replay-mode offline \
--replay-concurrency 50 \
--extra-engine-args /path/to/mocker_args.json
--extra-engine-args '{"block_size":512,"speedup_ratio":1000.0}'
```
This is useful for parameter sweeps where Mooncake-style prefix structure is not required.
......@@ -170,7 +172,7 @@ those timestamps:
python -m dynamo.replay /path/to/mooncake_trace.jsonl \
--replay-mode offline \
--num-workers 4 \
--extra-engine-args /path/to/mocker_args.json
--extra-engine-args '{"block_size":512,"speedup_ratio":1000.0}'
```
This is the right mode when you want deterministic replay of the original arrival pattern.
......@@ -201,7 +203,7 @@ python -m dynamo.replay /path/to/mooncake_trace.jsonl \
--router-mode kv_router \
--num-workers 4 \
--arrival-speedup-ratio 10 \
--extra-engine-args /path/to/mocker_args.json
--extra-engine-args '{"block_size":512,"speedup_ratio":1000.0}'
```
### Arrival Speedup
......@@ -214,7 +216,7 @@ python -m dynamo.replay /path/to/mooncake_trace.jsonl \
--replay-mode offline \
--num-workers 4 \
--arrival-speedup-ratio 5 \
--extra-engine-args /path/to/mocker_args.json
--extra-engine-args '{"block_size":512,"speedup_ratio":1000.0}'
```
### Router Modes
......@@ -224,7 +226,8 @@ Replay currently supports:
- `round_robin`
- `kv_router`
`kv_router` uses the shared local scheduler and an in-process KV indexer. In offline replay:
`kv_router` uses the shared local scheduler and an in-process KV indexer. Router policy tuning is
provided through `--router-config`, not a dedicated top-level replay flag. In offline replay:
- `kv_router` is supported only when `num_workers > 1`
- router queueing is enabled and uses simulation time rather than wall-clock time
......@@ -233,22 +236,22 @@ Replay currently supports:
- transient in-pass prefill occupancy is still approximated at the router level rather than modeled exactly
To compare queue policies manually, keep the same trace and engine args fixed and swap only
`--router-queue-policy`:
`router_queue_policy` inside `--router-config`:
```bash
python -m dynamo.replay /path/to/mooncake_trace.jsonl \
--replay-mode offline \
--router-mode kv_router \
--router-queue-policy fcfs \
--num-workers 4 \
--extra-engine-args /path/to/mocker_args.json
--extra-engine-args '{"block_size":512,"speedup_ratio":1000.0}' \
--router-config '{"router_queue_policy":"fcfs"}'
python -m dynamo.replay /path/to/mooncake_trace.jsonl \
--replay-mode offline \
--router-mode kv_router \
--router-queue-policy lcfs \
--num-workers 4 \
--extra-engine-args /path/to/mocker_args.json
--extra-engine-args '{"block_size":512,"speedup_ratio":1000.0}' \
--router-config '{"router_queue_policy":"lcfs"}'
```
`lcfs` is intentionally a worse comparison policy under saturation; use it for experiments, not as
......@@ -280,6 +283,9 @@ The report contains:
The dedicated replay CLI returns the same report schema as the Python APIs
`dynamo.replay.run_trace_replay(...)` and `dynamo.replay.run_synthetic_trace_replay(...)`.
If `--report-json` is not provided, `python -m dynamo.replay` writes a timestamped
`dynamo_replay_report_*.json` file in the current working directory.
## Replay Constraints
Shared replay constraints:
......@@ -308,7 +314,7 @@ If you violate those constraints, replay fails immediately with a validation err
- `--speedup-ratio` still affects simulated timing
- `--arrival-speedup-ratio` affects trace timestamps, not worker compute speed
- `--arrival-interval-ms` only applies to synthetic replay
- `--extra-engine-args` can be used to provide a full mocker config JSON instead of individual CLI flags
- `--extra-engine-args` and `--router-config` are JSON strings on the standalone replay CLI
- offline replay does not need planner runtime setup, router registration, or external event transport
- the replay block size should match the trace block size, because token synthesis expands `hash_ids`
using the configured block size
......
......@@ -139,16 +139,17 @@ python -m dynamo.mocker \
```
For the standalone replay CLI, which exposes `offline|online`, `round_robin|kv_router`,
`arrival_speedup_ratio`, `router_queue_policy`, and the synthetic replay path directly:
`arrival_speedup_ratio`, and the synthetic replay path directly:
```bash
python -m dynamo.replay /path/to/mooncake_trace.jsonl \
--num-workers 4 \
--replay-mode offline \
--router-mode kv_router \
--router-queue-policy fcfs \
--arrival-speedup-ratio 5 \
--extra-engine-args /path/to/mocker_args.json
--extra-engine-args '{"block_size":512,"speedup_ratio":1000.0}' \
--router-config '{"router_queue_policy":"fcfs"}' \
--report-json /tmp/replay-report.json
```
The same CLI also supports synthetic replay without a trace file:
......@@ -162,11 +163,13 @@ python -m dynamo.replay \
--num-workers 1 \
--replay-mode offline \
--replay-concurrency 100 \
--extra-engine-args /path/to/mocker_args.json
--extra-engine-args '{"block_size":512,"speedup_ratio":1000.0}' \
--report-json /tmp/replay-report.json
```
The standalone replay CLI prints the replay report JSON directly to stdout. The `dynamo.mocker`
trace-file flow still writes a report file and prints a `Replay Summary` table.
The standalone replay CLI prints an AIPerf-style summary table to stdout and writes the full replay
report JSON to disk. The `dynamo.mocker` trace-file flow still writes a report file and prints a
`Replay Summary` table.
For full usage, constraints, and benchmarking guidance, see [Mocker Trace Replay](../benchmarks/mocker-trace-replay.md).
......
......@@ -4,7 +4,6 @@
from __future__ import annotations
import argparse
import json
import os
import sys
from collections.abc import Sequence
......@@ -13,6 +12,7 @@ os.environ.setdefault("DYNAMO_SKIP_PYTHON_LOG_INIT", "1")
from dynamo.llm import KvRouterConfig, MockEngineArgs
from dynamo.replay import run_synthetic_trace_replay, run_trace_replay
from dynamo.replay.reporting import format_report_table, write_report_json
def main(argv: Sequence[str] | None = None) -> int:
......@@ -37,6 +37,10 @@ def main(argv: Sequence[str] | None = None) -> int:
default="round_robin",
)
parser.add_argument("--arrival-speedup-ratio", type=float, default=1.0)
parser.add_argument(
"--report-json",
help="path to save the full replay report JSON; defaults to a timestamped file in the current directory",
)
args = parser.parse_args(list(sys.argv[1:] if argv is None else argv))
using_trace_file = args.trace_file is not None
......@@ -89,6 +93,8 @@ def main(argv: Sequence[str] | None = None) -> int:
arrival_interval_ms=args.arrival_interval_ms,
)
json.dump(report, sys.stdout, indent=2, sort_keys=True)
report_path = write_report_json(report, args.report_json)
sys.stdout.write(format_report_table(report))
sys.stdout.write("\n")
sys.stdout.write(f"Saved full report to: {report_path}\n")
return 0
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
import json
from datetime import datetime
from pathlib import Path
TITLE = "NVIDIA AIPerf | LLM Metrics"
STAT_COLUMNS = ("avg", "min", "max", "p99", "p90", "p75", "std")
def default_report_path() -> Path:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
return Path.cwd() / f"dynamo_replay_report_{timestamp}.json"
def write_report_json(
report: dict[str, object], output_path: str | Path | None
) -> Path:
path = Path(output_path) if output_path is not None else default_report_path()
if path.exists() and path.is_dir():
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
path = path / f"dynamo_replay_report_{timestamp}.json"
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(
json.dumps(report, indent=2, sort_keys=True) + "\n", encoding="utf-8"
)
return path
def format_report_table(report: dict[str, object]) -> str:
rows = _build_rows(report)
table = _render_table(rows)
lines = [TITLE, table]
wall_time_ms = report.get("wall_time_ms")
if isinstance(wall_time_ms, int | float):
lines.append(f"Wall Time (ms): {_format_value(wall_time_ms)}")
prefix_cache_reused_ratio = report.get("prefix_cache_reused_ratio")
if isinstance(prefix_cache_reused_ratio, int | float):
lines.append(
f"Prefix Cache Reused Ratio: {_format_value(prefix_cache_reused_ratio)}"
)
return "\n".join(lines)
def _build_rows(report: dict[str, object]) -> list[list[str]]:
rows: list[list[str]] = []
_append_stat_row(rows, report, "Time to First Token (ms)", "ttft_ms")
_append_stat_row(rows, report, "Time to Second Token (ms)", "ttst_ms")
_append_stat_row(rows, report, "Request Latency (ms)", "e2e_latency_ms")
_append_stat_row(rows, report, "Inter Token Latency (ms)", "itl_ms")
_append_stat_row(
rows,
report,
"Output Token Throughput Per User (tokens/sec/user)",
"output_token_throughput_per_user",
)
rows.append(
[
"Output Token Throughput (tokens/sec)",
_format_value(report.get("output_throughput_tok_s")),
*["N/A"] * (len(STAT_COLUMNS) - 1),
]
)
rows.append(
[
"Request Throughput (requests/sec)",
_format_value(report.get("request_throughput_rps")),
*["N/A"] * (len(STAT_COLUMNS) - 1),
]
)
rows.append(
[
"Request Count (requests)",
_format_value(report.get("completed_requests", report.get("num_requests"))),
*["N/A"] * (len(STAT_COLUMNS) - 1),
]
)
return rows
def _append_stat_row(
rows: list[list[str]], report: dict[str, object], label: str, metric_suffix: str
) -> None:
mean_key = f"mean_{metric_suffix}"
if mean_key not in report:
return
rows.append(
[
label,
_format_value(report.get(mean_key)),
_format_value(report.get(f"min_{metric_suffix}")),
_format_value(report.get(f"max_{metric_suffix}")),
_format_value(report.get(f"p99_{metric_suffix}")),
_format_value(report.get(f"p90_{metric_suffix}")),
_format_value(report.get(f"p75_{metric_suffix}")),
_format_value(report.get(f"std_{metric_suffix}")),
]
)
def _render_table(rows: list[list[str]]) -> str:
headers = ["Metric", *STAT_COLUMNS]
widths = [len(header) for header in headers]
for row in rows:
for index, value in enumerate(row):
widths[index] = max(widths[index], len(value))
def render_separator(left: str, mid: str, right: str) -> str:
return left + mid.join("━" * (width + 2) for width in widths) + right
def render_row(row: list[str]) -> str:
padded = []
for index, value in enumerate(row):
if index == 0:
padded.append(f" {value.ljust(widths[index])} ")
continue
padded.append(f" {value.rjust(widths[index])} ")
return "┃" + "┃".join(padded) + "┃"
lines = [
render_separator("┏", "┳", "┓"),
render_row(headers),
render_separator("┡", "╇", "┩"),
]
lines.extend(render_row(row) for row in rows)
lines.append(render_separator("└", "┴", "┘"))
return "\n".join(lines)
def _format_value(value: object) -> str:
if value is None:
return "N/A"
if isinstance(value, int | float):
return f"{value:,.2f}"
return str(value)
......@@ -11,7 +11,11 @@ import subprocess
import pytest
pytestmark = pytest.mark.pre_merge
pytestmark = [
pytest.mark.gpu_0,
pytest.mark.pre_merge,
pytest.mark.integration,
]
@pytest.fixture(scope="module")
......
......@@ -15,7 +15,12 @@
import pytest
pytestmark = pytest.mark.pre_merge
pytestmark = [
pytest.mark.gpu_0,
pytest.mark.parallel,
pytest.mark.pre_merge,
pytest.mark.unit,
]
def test_bindings_install():
......
......@@ -11,7 +11,11 @@ import subprocess
import pytest
pytestmark = pytest.mark.pre_merge
pytestmark = [
pytest.mark.gpu_0,
pytest.mark.pre_merge,
pytest.mark.integration,
]
@pytest.fixture(scope="module")
......
......@@ -5,7 +5,12 @@ import pytest
from dynamo.llm import HttpError
pytestmark = pytest.mark.pre_merge
pytestmark = [
pytest.mark.gpu_0,
pytest.mark.parallel,
pytest.mark.pre_merge,
pytest.mark.unit,
]
def test_raise_http_error():
......
......@@ -29,7 +29,11 @@ from dynamo.runtime import DistributedRuntime
MSG_CONTAINS_ERROR = "This message contains an 400error."
MSG_CONTAINS_INTERNAL_ERROR = "This message contains an internal server error."
pytestmark = pytest.mark.pre_merge
pytestmark = [
pytest.mark.gpu_0,
pytest.mark.pre_merge,
pytest.mark.integration,
]
class MockHttpEngine:
......
......@@ -17,7 +17,11 @@ except ImportError:
from dynamo.llm import KserveGrpcService, ModelRuntimeConfig, PythonAsyncEngine
pytestmark = pytest.mark.pre_merge
pytestmark = [
pytest.mark.gpu_0,
pytest.mark.pre_merge,
pytest.mark.integration,
]
async def _fetch_model_config(
......
......@@ -21,7 +21,12 @@ import pytest
from dynamo.llm import RadixTree
pytestmark = pytest.mark.pre_merge
pytestmark = [
pytest.mark.gpu_0,
pytest.mark.parallel,
pytest.mark.pre_merge,
pytest.mark.unit,
]
@pytest.mark.timeout(5) # Expected: ~1s, timeout set to 5x for safety
......
......@@ -5,6 +5,13 @@ import pytest
from dynamo.llm import lora_name_to_id
pytestmark = [
pytest.mark.gpu_0,
pytest.mark.parallel,
pytest.mark.pre_merge,
pytest.mark.unit,
]
max_int32 = 0x7FFFFFFF
......
......@@ -25,7 +25,12 @@ import pytest
from dynamo.llm import RadixTree, compute_block_hash_for_seq
pytestmark = pytest.mark.pre_merge
pytestmark = [
pytest.mark.gpu_0,
pytest.mark.parallel,
pytest.mark.pre_merge,
pytest.mark.unit,
]
# Constants for testing
DEFAULT_BLOCK_SIZE = 32
......
......@@ -13,9 +13,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from dynamo._core import get_reasoning_parser_names, get_tool_parser_names
pytestmark = [
pytest.mark.gpu_0,
pytest.mark.parallel,
pytest.mark.pre_merge,
pytest.mark.unit,
]
def test_get_tool_parser_names():
parsers = get_tool_parser_names()
......
......@@ -2,16 +2,22 @@
# SPDX-License-Identifier: Apache-2.0
import json
import os
import subprocess
import sys
import pytest
from dynamo.llm import KvRouterConfig, MockEngineArgs
from dynamo.replay import run_synthetic_trace_replay, run_trace_replay
from dynamo.replay.main import main
from dynamo.replay.reporting import format_report_table, write_report_json
pytestmark = [
pytest.mark.gpu_0,
pytest.mark.parallel,
pytest.mark.pre_merge,
pytest.mark.unit,
]
MOONCAKE_TRACE_FIRST20 = """{"timestamp": 0, "input_length": 6755, "output_length": 500, "hash_ids": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13]}
......@@ -37,6 +43,50 @@ MOONCAKE_TRACE_FIRST20 = """{"timestamp": 0, "input_length": 6755, "output_lengt
"""
def _vllm_args_payload():
return {
"block_size": 64,
"speedup_ratio": 1000.0,
}
def _sglang_args_payload():
return {
"engine_type": "sglang",
"num_gpu_blocks": 512,
"block_size": 64,
"speedup_ratio": 1000.0,
"sglang": {
"page_size": 64,
},
}
def _router_config_payload():
return {
"router_queue_threshold": 1.25,
"router_event_threads": 1,
"router_queue_policy": "wspt",
"router_temperature": 0.0,
"overlap_score_weight": 1.0,
"use_kv_events": True,
"durable_kv_events": False,
"router_replica_sync": False,
"router_track_active_blocks": True,
"router_track_output_blocks": False,
"router_assume_kv_reuse": True,
"router_snapshot_threshold": 1000000,
"router_reset_states": False,
"router_ttl_secs": 120.0,
"router_max_tree_size": 1048576,
"router_prune_target_ratio": 0.8,
"router_enable_cache_control": False,
"skip_initial_worker_wait": False,
"min_initial_workers": 1,
"remote_indexer_component": None,
}
def _write_trace_and_args(tmp_path):
trace_path = tmp_path / "trace.jsonl"
records = [
......@@ -60,125 +110,62 @@ def _write_trace_and_args(tmp_path):
return trace_path
def _write_cli_smoke_trace(tmp_path):
trace_path = tmp_path / "cli_smoke_trace.jsonl"
records = []
for index in range(10):
records.append(
{
"timestamp": 1000.0 + index,
"input_length": 250,
"output_length": 25,
"hash_ids": [index, index + 1, index + 2, index + 3],
}
)
trace_path.write_text(
"\n".join(json.dumps(record) for record in records) + "\n",
encoding="utf-8",
)
return trace_path
def _write_vllm_args(tmp_path):
args_path = tmp_path / "args.json"
args_path.write_text(
json.dumps(
{
"block_size": 64,
"speedup_ratio": 1000.0,
}
),
json.dumps(_vllm_args_payload()),
encoding="utf-8",
)
return args_path
def _vllm_args():
return MockEngineArgs.from_json(
json.dumps(
{
"block_size": 64,
"speedup_ratio": 1000.0,
}
)
)
return MockEngineArgs.from_json(json.dumps(_vllm_args_payload()))
def _write_sglang_args(tmp_path):
args_path = tmp_path / "sglang_args.json"
args_path.write_text(
json.dumps(
{
"engine_type": "sglang",
"num_gpu_blocks": 512,
"block_size": 64,
"speedup_ratio": 1000.0,
"sglang": {
"page_size": 64,
},
}
),
json.dumps(_sglang_args_payload()),
encoding="utf-8",
)
return args_path
def _sglang_args():
return MockEngineArgs.from_json(
json.dumps(
{
"engine_type": "sglang",
"num_gpu_blocks": 512,
"block_size": 64,
"speedup_ratio": 1000.0,
"sglang": {
"page_size": 64,
},
}
)
)
return MockEngineArgs.from_json(json.dumps(_sglang_args_payload()))
def _write_router_config(tmp_path):
config_path = tmp_path / "router_config.json"
config_path.write_text(
json.dumps(
{
"router_queue_threshold": 1.25,
"router_event_threads": 1,
"router_queue_policy": "wspt",
"router_temperature": 0.0,
"overlap_score_weight": 1.0,
"use_kv_events": True,
"durable_kv_events": False,
"router_replica_sync": False,
"router_track_active_blocks": True,
"router_track_output_blocks": False,
"router_assume_kv_reuse": True,
"router_snapshot_threshold": 1000000,
"router_reset_states": False,
"router_ttl_secs": 120.0,
"router_max_tree_size": 1048576,
"router_prune_target_ratio": 0.8,
"router_enable_cache_control": False,
"skip_initial_worker_wait": False,
"min_initial_workers": 1,
"remote_indexer_component": None,
}
),
json.dumps(_router_config_payload()),
encoding="utf-8",
)
return config_path
def _router_config():
return KvRouterConfig.from_json(
json.dumps(
{
"router_queue_threshold": 1.25,
"router_event_threads": 1,
"router_queue_policy": "wspt",
"router_temperature": 0.0,
"overlap_score_weight": 1.0,
"use_kv_events": True,
"durable_kv_events": False,
"router_replica_sync": False,
"router_track_active_blocks": True,
"router_track_output_blocks": False,
"router_assume_kv_reuse": True,
"router_snapshot_threshold": 1000000,
"router_reset_states": False,
"router_ttl_secs": 120.0,
"router_max_tree_size": 1048576,
"router_prune_target_ratio": 0.8,
"router_enable_cache_control": False,
"skip_initial_worker_wait": False,
"min_initial_workers": 1,
"remote_indexer_component": None,
}
)
)
return KvRouterConfig.from_json(json.dumps(_router_config_payload()))
def _partial_router_config():
......@@ -196,6 +183,45 @@ def _assert_basic_report_counts(report, *, num_requests, input_tokens, output_to
assert report["total_output_tokens"] == num_requests * output_tokens
def _assert_basic_report_metrics(report):
assert report["request_throughput_rps"] > 0
assert report["output_throughput_tok_s"] > 0
assert report["duration_ms"] > 0
def _replay_cli_env() -> dict[str, str]:
env = os.environ.copy()
pythonpath_entries = ["lib/bindings/python/src", "components/src"]
existing_pythonpath = env.get("PYTHONPATH")
if existing_pythonpath:
pythonpath_entries.append(existing_pythonpath)
env["PYTHONPATH"] = ":".join(pythonpath_entries)
return env
def _run_replay_cli(tmp_path, *args):
return subprocess.run(
[
sys.executable,
"-m",
"dynamo.replay",
*args,
],
capture_output=True,
check=True,
cwd=str(tmp_path),
env=_replay_cli_env(),
text=True,
)
def _assert_replay_cli_outputs(completed, report_path):
assert "NVIDIA AIPerf | LLM Metrics" in completed.stdout
assert "Saved full report to:" in completed.stdout
assert '"completed_requests"' not in completed.stdout
return json.loads(report_path.read_text(encoding="utf-8"))
@pytest.mark.parametrize("engine_type", ["vllm", "sglang"])
@pytest.mark.parametrize("replay_mode", ["offline", "online"])
@pytest.mark.parametrize("router_mode", ["round_robin", "kv_router"])
......@@ -419,3 +445,167 @@ def test_run_trace_replay_accepts_partial_extra_engine_args_json(tmp_path, repla
input_tokens=64,
output_tokens=2,
)
def test_format_report_table_matches_aiperf_shape():
report = {
"mean_ttft_ms": 18.26,
"min_ttft_ms": 11.22,
"max_ttft_ms": 106.32,
"p99_ttft_ms": 68.82,
"p90_ttft_ms": 27.76,
"p75_ttft_ms": 16.62,
"std_ttft_ms": 12.07,
"mean_ttst_ms": 11.40,
"min_ttst_ms": 0.02,
"max_ttst_ms": 85.91,
"p99_ttst_ms": 34.54,
"p90_ttst_ms": 12.59,
"p75_ttst_ms": 11.65,
"std_ttst_ms": 7.01,
"mean_e2e_latency_ms": 487.30,
"min_e2e_latency_ms": 267.07,
"max_e2e_latency_ms": 769.57,
"p99_e2e_latency_ms": 715.99,
"p90_e2e_latency_ms": 580.83,
"p75_e2e_latency_ms": 536.17,
"std_e2e_latency_ms": 79.60,
"mean_itl_ms": 11.23,
"min_itl_ms": 8.80,
"max_itl_ms": 13.17,
"p99_itl_ms": 12.48,
"p90_itl_ms": 11.73,
"p75_itl_ms": 11.37,
"std_itl_ms": 0.45,
"mean_output_token_throughput_per_user": 89.23,
"min_output_token_throughput_per_user": 75.93,
"max_output_token_throughput_per_user": 113.60,
"p99_output_token_throughput_per_user": 102.28,
"p90_output_token_throughput_per_user": 90.91,
"p75_output_token_throughput_per_user": 90.29,
"std_output_token_throughput_per_user": 3.70,
"output_throughput_tok_s": 10944.03,
"request_throughput_rps": 255.54,
"completed_requests": 711,
"wall_time_ms": 4046.31,
"prefix_cache_reused_ratio": 0.3587,
}
rendered = format_report_table(report)
assert "NVIDIA AIPerf | LLM Metrics" in rendered
assert "Time to First Token (ms)" in rendered
assert "Output Token Throughput (tokens/sec)" in rendered
assert "Request Throughput (requests/sec)" in rendered
assert "Prefix Cache Reused Ratio: 0.36" in rendered
assert "10,944.03" in rendered
assert "255.54" in rendered
assert "N/A" in rendered
def test_write_report_json_creates_file(tmp_path):
report_path = write_report_json({"completed_requests": 2}, tmp_path / "report.json")
assert (
report_path.read_text(encoding="utf-8") == '{\n "completed_requests": 2\n}\n'
)
def test_replay_cli_prints_table_and_saves_json(tmp_path, monkeypatch, capsys):
report = {
"mean_ttft_ms": 10.0,
"min_ttft_ms": 9.0,
"max_ttft_ms": 12.0,
"p99_ttft_ms": 12.0,
"p90_ttft_ms": 11.0,
"p75_ttft_ms": 10.5,
"std_ttft_ms": 1.0,
"output_throughput_tok_s": 123.0,
"request_throughput_rps": 4.0,
"completed_requests": 3,
}
def fake_run(*args, **kwargs):
return report
monkeypatch.setattr("dynamo.replay.main.run_synthetic_trace_replay", fake_run)
report_path = tmp_path / "cli_report.json"
exit_code = main(
[
"--input-tokens",
"16",
"--output-tokens",
"8",
"--request-count",
"3",
"--report-json",
str(report_path),
]
)
assert exit_code == 0
stdout = capsys.readouterr().out
assert "NVIDIA AIPerf | LLM Metrics" in stdout
assert "Saved full report to:" in stdout
assert '"completed_requests"' not in stdout
assert json.loads(report_path.read_text(encoding="utf-8")) == report
def test_replay_cli_subprocess_synthetic_smoke(tmp_path):
report_path = tmp_path / "synthetic_report.json"
completed = _run_replay_cli(
tmp_path,
"--input-tokens",
"250",
"--output-tokens",
"25",
"--request-count",
"10",
"--num-workers",
"4",
"--replay-concurrency",
"4",
"--report-json",
str(report_path),
"--extra-engine-args",
'{"block_size":64,"speedup_ratio":1000.0}',
)
report = _assert_replay_cli_outputs(completed, report_path)
_assert_basic_report_counts(
report,
num_requests=10,
input_tokens=250,
output_tokens=25,
)
_assert_basic_report_metrics(report)
def test_replay_cli_subprocess_trace_smoke(tmp_path):
trace_path = _write_cli_smoke_trace(tmp_path)
report_path = tmp_path / "trace_report.json"
completed = _run_replay_cli(
tmp_path,
str(trace_path),
"--replay-mode",
"offline",
"--router-mode",
"kv_router",
"--num-workers",
"4",
"--report-json",
str(report_path),
"--extra-engine-args",
'{"block_size":64,"speedup_ratio":1000.0}',
)
report = _assert_replay_cli_outputs(completed, report_path)
_assert_basic_report_counts(
report,
num_requests=10,
input_tokens=250,
output_tokens=25,
)
_assert_basic_report_metrics(report)
......@@ -13,6 +13,12 @@ from dynamo.runtime import DistributedRuntime
TEST_END_TO_END = os.environ.get("TEST_END_TO_END", 0)
pytestmark = [
pytest.mark.gpu_0,
pytest.mark.pre_merge,
pytest.mark.integration,
]
@pytest.mark.asyncio
async def test_register(runtime: DistributedRuntime):
......
......@@ -561,7 +561,7 @@ async fn assert_sglang_scheduler_completes_all(
let expected_tokens = num_requests * max_output_tokens;
let mut received_tokens = 0;
let timeout = tokio::time::sleep(Duration::from_secs(2));
let timeout = tokio::time::sleep(Duration::from_millis(200));
tokio::pin!(timeout);
loop {
......@@ -572,7 +572,7 @@ async fn assert_sglang_scheduler_completes_all(
if received_tokens >= expected_tokens {
break;
}
timeout.set(tokio::time::sleep(Duration::from_secs(2)));
timeout.set(tokio::time::sleep(Duration::from_millis(200)));
}
_ = &mut timeout => break,
}
......@@ -580,7 +580,6 @@ async fn assert_sglang_scheduler_completes_all(
assert_eq!(received_tokens, expected_tokens);
tokio::time::sleep(Duration::from_millis(100)).await;
let metrics = scheduler.metrics_receiver().borrow().clone();
assert!(metrics.active_decode_blocks > 0);
assert!(metrics.total_blocks > 0);
......@@ -609,7 +608,7 @@ mod router_events {
let args = MockEngineArgs::builder()
.num_gpu_blocks(500)
.block_size(64)
.speedup_ratio(10.0)
.speedup_ratio(1000.0)
.sglang(Some(SglangArgs {
schedule_policy: Some(schedule_policy.to_string()),
page_size: Some(page_size),
......
......@@ -254,7 +254,7 @@ pub(crate) async fn assert_scheduler_completes_all(
let expected_tokens = num_requests * max_output_tokens;
let mut received_tokens = 0;
let timeout = tokio::time::sleep(Duration::from_secs(2));
let timeout = tokio::time::sleep(Duration::from_millis(200));
tokio::pin!(timeout);
loop {
......@@ -265,7 +265,7 @@ pub(crate) async fn assert_scheduler_completes_all(
if received_tokens >= expected_tokens {
break;
}
timeout.set(tokio::time::sleep(Duration::from_secs(2)));
timeout.set(tokio::time::sleep(Duration::from_millis(200)));
}
_ = &mut timeout => break,
}
......@@ -276,7 +276,6 @@ pub(crate) async fn assert_scheduler_completes_all(
"Expected {expected_tokens} output signals, got {received_tokens}"
);
tokio::time::sleep(Duration::from_millis(100)).await;
let metrics = scheduler.metrics_receiver().borrow().clone();
assert_eq!(
metrics.active_decode_blocks, 0,
......
......@@ -477,7 +477,7 @@ mod live_scheduler {
let args = MockEngineArgs::builder()
.num_gpu_blocks(500)
.block_size(64)
.speedup_ratio(10.0)
.speedup_ratio(1000.0)
.enable_prefix_caching(enable_prefix_caching)
.enable_chunked_prefill(enable_chunked_prefill)
.build()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment