Unverified Commit 241b254d authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

chore(profiler): reshape replay_optimize into DGDR-aligned Pydantic specs (#8585)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent d401c1a4
...@@ -18,10 +18,13 @@ except ImportError: ...@@ -18,10 +18,13 @@ except ImportError:
from dynamo.profiler.utils import replay_optimize from dynamo.profiler.utils import replay_optimize
from dynamo.profiler.utils.replay_optimize import ( from dynamo.profiler.utils.replay_optimize import (
DenseAggReplayState, DenseAggReplayState,
ReplayConstraints, EngineSpec,
HardwareSpec,
ReplayObjective, ReplayObjective,
SyntheticReplayWorkload, ReplayOptimizeSpec,
TraceReplayWorkload, RouterSpec,
SLASpec,
WorkloadSpec,
compare_agg_and_disagg_with_replay, compare_agg_and_disagg_with_replay,
optimize_dense_agg_with_replay, optimize_dense_agg_with_replay,
optimize_dense_disagg_with_replay, optimize_dense_disagg_with_replay,
...@@ -100,6 +103,106 @@ def _write_trace(tmp_path: Path) -> Path: ...@@ -100,6 +103,106 @@ def _write_trace(tmp_path: Path) -> Path:
return trace_path return trace_path
def _synthetic_workload(
*,
isl: int = 64,
osl: int = 32,
request_count: int = 8,
concurrency: float = 4,
**extras: Any,
) -> WorkloadSpec:
return WorkloadSpec(
isl=isl,
osl=osl,
requestCount=request_count,
concurrency=concurrency,
**extras,
)
def _trace_workload(trace_file: Path, speedup: float = 100.0) -> WorkloadSpec:
return WorkloadSpec(traceFile=str(trace_file), arrivalSpeedupRatio=speedup)
def _sla(**bounds: float) -> SLASpec:
"""Build an SLASpec from keyword bounds.
Accepts the legacy `mean_e2e_latency_ms`, `mean_ttft_ms`, `mean_tpot_ms`,
etc. names for test readability; maps to camelCase DGDR field names.
"""
translate = {
"mean_ttft_ms": "ttft",
"mean_tpot_ms": "itl",
"mean_e2e_latency_ms": "e2eLatency",
"p95_ttft_ms": "p95Ttft",
"p95_tpot_ms": "p95Itl",
"p95_e2e_latency_ms": "p95E2eLatency",
}
return SLASpec(**{translate.get(k, k): v for k, v in bounds.items()})
def _disagg_spec(
*,
workload: WorkloadSpec | None = None,
total_gpus: int = 4,
sla: SLASpec | None = None,
objective: ReplayObjective = ReplayObjective.THROUGHPUT,
router_mode: str = "kv_router",
overlap_weights: list[float] | None = None,
base_router_config: KvRouterConfig | None = None,
max_parallel_evals: int = 1,
) -> ReplayOptimizeSpec:
return ReplayOptimizeSpec(
engine=EngineSpec(
model=_AIC_MODEL,
backend="vllm",
basePrefillEngineArgs=_base_prefill_args(),
baseDecodeEngineArgs=_base_decode_args(),
),
hardware=HardwareSpec(gpuSku=_AIC_SYSTEM, totalGpus=total_gpus),
workload=workload if workload is not None else _synthetic_workload(),
sla=sla if sla is not None else SLASpec(),
router=RouterSpec(
mode=router_mode,
overlapWeights=overlap_weights,
baseRouterConfig=base_router_config,
),
objective=objective,
maxParallelEvals=max_parallel_evals,
)
def _agg_spec(
*,
workload: WorkloadSpec | None = None,
total_gpus: int = 4,
sla: SLASpec | None = None,
router_mode: str = "kv_router",
overlap_weights: list[float] | None = None,
base_router_config: KvRouterConfig | None = None,
max_parallel_evals: int = 1,
) -> ReplayOptimizeSpec:
return ReplayOptimizeSpec(
engine=EngineSpec(
model=_AIC_MODEL,
backend="vllm",
baseEngineArgs=_base_agg_args(),
),
hardware=HardwareSpec(gpuSku=_AIC_SYSTEM, totalGpus=total_gpus),
workload=workload if workload is not None else _synthetic_workload(),
sla=sla if sla is not None else SLASpec(),
router=RouterSpec(
mode=router_mode,
overlapWeights=overlap_weights,
baseRouterConfig=base_router_config,
),
maxParallelEvals=max_parallel_evals,
)
# ---- internal-helper tests (unchanged from Phase 1 interface) ----
def test_enumerate_dense_tp_candidates_filters_to_tp_only(monkeypatch) -> None: def test_enumerate_dense_tp_candidates_filters_to_tp_only(monkeypatch) -> None:
common = SimpleNamespace(BackendName=SimpleNamespace(vllm="vllm")) common = SimpleNamespace(BackendName=SimpleNamespace(vllm="vllm"))
task = SimpleNamespace( task = SimpleNamespace(
...@@ -220,6 +323,9 @@ def test_iter_agg_worker_states_collapses_round_robin_overlap() -> None: ...@@ -220,6 +323,9 @@ def test_iter_agg_worker_states_collapses_round_robin_overlap() -> None:
assert set(state.overlap_score_weight for state in states) == {0.0} assert set(state.overlap_score_weight for state in states) == {0.0}
# ---- public-API tests (reshaped to ReplayOptimizeSpec) ----
def test_optimizer_finds_coordinate_optimum_and_reuses_cache(monkeypatch) -> None: def test_optimizer_finds_coordinate_optimum_and_reuses_cache(monkeypatch) -> None:
call_counter: Counter = Counter() call_counter: Counter = Counter()
target_state = replay_optimize.DenseReplayState(2, 4, 2, 1, 2.0) target_state = replay_optimize.DenseReplayState(2, 4, 2, 1, 2.0)
...@@ -253,21 +359,11 @@ def test_optimizer_finds_coordinate_optimum_and_reuses_cache(monkeypatch) -> Non ...@@ -253,21 +359,11 @@ def test_optimizer_finds_coordinate_optimum_and_reuses_cache(monkeypatch) -> Non
monkeypatch.setattr(replay_optimize.evaluate, "_run_replay_for_state", fake_run) monkeypatch.setattr(replay_optimize.evaluate, "_run_replay_for_state", fake_run)
result = optimize_dense_disagg_with_replay( result = optimize_dense_disagg_with_replay(
model=_AIC_MODEL, _disagg_spec(
backend="vllm", total_gpus=8,
system=_AIC_SYSTEM, sla=_sla(mean_e2e_latency_ms=500.0),
workload=SyntheticReplayWorkload( overlap_weights=[0.0, 1.0, 2.0],
isl=64, )
osl=32,
request_count=8,
replay_concurrency=4,
),
base_prefill_engine_args=_base_prefill_args(),
base_decode_engine_args=_base_decode_args(),
max_total_gpus=8,
constraints={"mean_e2e_latency_ms": 500.0},
overlap_score_weights=[0.0, 1.0, 2.0],
max_parallel_evals=1,
) )
assert result.best_feasible is not None assert result.best_feasible is not None
...@@ -312,21 +408,12 @@ def test_agg_optimizer_finds_coordinate_optimum_and_reuses_cache(monkeypatch) -> ...@@ -312,21 +408,12 @@ def test_agg_optimizer_finds_coordinate_optimum_and_reuses_cache(monkeypatch) ->
monkeypatch.setattr(replay_optimize.evaluate, "_run_agg_replay_for_state", fake_run) monkeypatch.setattr(replay_optimize.evaluate, "_run_agg_replay_for_state", fake_run)
result = optimize_dense_agg_with_replay( result = optimize_dense_agg_with_replay(
model=_AIC_MODEL, _agg_spec(
backend="vllm", total_gpus=8,
system=_AIC_SYSTEM, sla=_sla(mean_e2e_latency_ms=500.0),
workload=SyntheticReplayWorkload( router_mode="both",
isl=64, overlap_weights=[0.0, 1.0, 2.0],
osl=32, )
request_count=8,
replay_concurrency=4,
),
base_engine_args=_base_agg_args(),
max_total_gpus=8,
constraints={"mean_e2e_latency_ms": 500.0},
router_mode="both",
overlap_score_weights=[0.0, 1.0, 2.0],
max_parallel_evals=1,
) )
assert result.best_feasible is not None assert result.best_feasible is not None
...@@ -371,21 +458,11 @@ def test_optimizer_uses_violation_penalty_when_no_state_is_feasible( ...@@ -371,21 +458,11 @@ def test_optimizer_uses_violation_penalty_when_no_state_is_feasible(
monkeypatch.setattr(replay_optimize.evaluate, "_run_replay_for_state", fake_run) monkeypatch.setattr(replay_optimize.evaluate, "_run_replay_for_state", fake_run)
result = optimize_dense_disagg_with_replay( result = optimize_dense_disagg_with_replay(
model=_AIC_MODEL, _disagg_spec(
backend="vllm", total_gpus=6,
system=_AIC_SYSTEM, sla=_sla(mean_e2e_latency_ms=50.0),
workload=SyntheticReplayWorkload( overlap_weights=[0.0, 1.0],
isl=64, )
osl=32,
request_count=8,
replay_concurrency=4,
),
base_prefill_engine_args=_base_prefill_args(),
base_decode_engine_args=_base_decode_args(),
max_total_gpus=6,
constraints={"mean_e2e_latency_ms": 50.0},
overlap_score_weights=[0.0, 1.0],
max_parallel_evals=1,
) )
assert result.best_feasible is None assert result.best_feasible is None
...@@ -429,21 +506,12 @@ def test_agg_optimizer_uses_violation_penalty_when_no_state_is_feasible( ...@@ -429,21 +506,12 @@ def test_agg_optimizer_uses_violation_penalty_when_no_state_is_feasible(
monkeypatch.setattr(replay_optimize.evaluate, "_run_agg_replay_for_state", fake_run) monkeypatch.setattr(replay_optimize.evaluate, "_run_agg_replay_for_state", fake_run)
result = optimize_dense_agg_with_replay( result = optimize_dense_agg_with_replay(
model=_AIC_MODEL, _agg_spec(
backend="vllm", total_gpus=8,
system=_AIC_SYSTEM, sla=_sla(mean_e2e_latency_ms=50.0),
workload=SyntheticReplayWorkload( router_mode="both",
isl=64, overlap_weights=[0.0, 1.0],
osl=32, )
request_count=8,
replay_concurrency=4,
),
base_engine_args=_base_agg_args(),
max_total_gpus=8,
constraints={"mean_e2e_latency_ms": 50.0},
router_mode="both",
overlap_score_weights=[0.0, 1.0],
max_parallel_evals=1,
) )
assert result.best_feasible is None assert result.best_feasible is None
...@@ -479,26 +547,17 @@ def test_optimizer_supports_round_robin_router_mode(monkeypatch) -> None: ...@@ -479,26 +547,17 @@ def test_optimizer_supports_round_robin_router_mode(monkeypatch) -> None:
monkeypatch.setattr(replay_optimize.evaluate, "_run_replay_for_state", fake_run) monkeypatch.setattr(replay_optimize.evaluate, "_run_replay_for_state", fake_run)
result = optimize_dense_disagg_with_replay( result = optimize_dense_disagg_with_replay(
model=_AIC_MODEL, _disagg_spec(
backend="vllm", total_gpus=4,
system=_AIC_SYSTEM, sla=_sla(mean_e2e_latency_ms=500.0),
workload=SyntheticReplayWorkload( router_mode="round_robin",
isl=64, overlap_weights=[0.0, 1.0, 2.0],
osl=32, )
request_count=8,
replay_concurrency=4,
),
base_prefill_engine_args=_base_prefill_args(),
base_decode_engine_args=_base_decode_args(),
max_total_gpus=4,
constraints={"mean_e2e_latency_ms": 500.0},
router_mode="round_robin",
overlap_score_weights=[0.0, 1.0, 2.0],
max_parallel_evals=1,
) )
assert result.best_feasible is not None assert result.best_feasible is not None
assert set(seen_router_modes) == {"round_robin"} assert set(seen_router_modes) == {"round_robin"}
# Guardrail #5: round_robin auto-collapses overlap weights to (0.0,)
assert set(seen_weights) == {0.0} assert set(seen_weights) == {0.0}
...@@ -533,22 +592,12 @@ def test_disagg_optimizer_supports_latency_objective(monkeypatch) -> None: ...@@ -533,22 +592,12 @@ def test_disagg_optimizer_supports_latency_objective(monkeypatch) -> None:
monkeypatch.setattr(replay_optimize.evaluate, "_run_replay_for_state", fake_run) monkeypatch.setattr(replay_optimize.evaluate, "_run_replay_for_state", fake_run)
result = optimize_dense_disagg_with_replay( result = optimize_dense_disagg_with_replay(
model=_AIC_MODEL, _disagg_spec(
backend="vllm", total_gpus=4,
system=_AIC_SYSTEM, sla=_sla(mean_e2e_latency_ms=500.0),
workload=SyntheticReplayWorkload( objective=ReplayObjective.MEAN_E2E_LATENCY,
isl=64, overlap_weights=[0.0],
osl=32, )
request_count=8,
replay_concurrency=4,
),
base_prefill_engine_args=_base_prefill_args(),
base_decode_engine_args=_base_decode_args(),
max_total_gpus=4,
constraints={"mean_e2e_latency_ms": 500.0},
objective="mean_e2e_latency",
overlap_score_weights=[0.0],
max_parallel_evals=1,
) )
assert result.best_feasible is not None assert result.best_feasible is not None
...@@ -561,22 +610,21 @@ def test_disagg_optimizer_supports_latency_objective(monkeypatch) -> None: ...@@ -561,22 +610,21 @@ def test_disagg_optimizer_supports_latency_objective(monkeypatch) -> None:
def test_disagg_optimizer_rejects_invalid_objective() -> None: def test_disagg_optimizer_rejects_invalid_objective() -> None:
# Invalid objective strings raise at spec construction (Pydantic validates
# the enum field) and at direct ReplayObjective construction.
with pytest.raises(ValueError, match="not a valid ReplayObjective"): with pytest.raises(ValueError, match="not a valid ReplayObjective"):
optimize_dense_disagg_with_replay( ReplayObjective("bad_objective")
model=_AIC_MODEL, with pytest.raises(ValueError):
backend="vllm", ReplayOptimizeSpec(
system=_AIC_SYSTEM, engine=EngineSpec(
workload=SyntheticReplayWorkload( model=_AIC_MODEL,
isl=64, backend="vllm",
osl=32, basePrefillEngineArgs=_base_prefill_args(),
request_count=8, baseDecodeEngineArgs=_base_decode_args(),
replay_concurrency=4,
), ),
base_prefill_engine_args=_base_prefill_args(), hardware=HardwareSpec(gpuSku=_AIC_SYSTEM, totalGpus=4),
base_decode_engine_args=_base_decode_args(), workload=_synthetic_workload(),
max_total_gpus=4,
objective="bad_objective", objective="bad_objective",
max_parallel_evals=1,
) )
...@@ -606,22 +654,12 @@ def test_disagg_optimizer_supports_router_mode_search(monkeypatch) -> None: ...@@ -606,22 +654,12 @@ def test_disagg_optimizer_supports_router_mode_search(monkeypatch) -> None:
monkeypatch.setattr(replay_optimize.evaluate, "_run_replay_for_state", fake_run) monkeypatch.setattr(replay_optimize.evaluate, "_run_replay_for_state", fake_run)
result = optimize_dense_disagg_with_replay( result = optimize_dense_disagg_with_replay(
model=_AIC_MODEL, _disagg_spec(
backend="vllm", total_gpus=4,
system=_AIC_SYSTEM, sla=_sla(mean_e2e_latency_ms=500.0),
workload=SyntheticReplayWorkload( router_mode="both",
isl=64, overlap_weights=[0.0, 1.0, 2.0],
osl=32, )
request_count=8,
replay_concurrency=4,
),
base_prefill_engine_args=_base_prefill_args(),
base_decode_engine_args=_base_decode_args(),
max_total_gpus=4,
constraints={"mean_e2e_latency_ms": 500.0},
router_mode="both",
overlap_score_weights=[0.0, 1.0, 2.0],
max_parallel_evals=1,
) )
assert result.best_feasible is not None assert result.best_feasible is not None
...@@ -658,21 +696,12 @@ def test_agg_optimizer_supports_router_mode_search(monkeypatch) -> None: ...@@ -658,21 +696,12 @@ def test_agg_optimizer_supports_router_mode_search(monkeypatch) -> None:
monkeypatch.setattr(replay_optimize.evaluate, "_run_agg_replay_for_state", fake_run) monkeypatch.setattr(replay_optimize.evaluate, "_run_agg_replay_for_state", fake_run)
result = optimize_dense_agg_with_replay( result = optimize_dense_agg_with_replay(
model=_AIC_MODEL, _agg_spec(
backend="vllm", total_gpus=4,
system=_AIC_SYSTEM, sla=_sla(mean_e2e_latency_ms=500.0),
workload=SyntheticReplayWorkload( router_mode="both",
isl=64, overlap_weights=[0.0, 1.0, 2.0],
osl=32, )
request_count=8,
replay_concurrency=4,
),
base_engine_args=_base_agg_args(),
max_total_gpus=4,
constraints={"mean_e2e_latency_ms": 500.0},
router_mode="both",
overlap_score_weights=[0.0, 1.0, 2.0],
max_parallel_evals=1,
) )
assert result.best_feasible is not None assert result.best_feasible is not None
...@@ -721,31 +750,34 @@ def test_compare_agg_and_disagg_with_replay_picks_expected_mode(monkeypatch) -> ...@@ -721,31 +750,34 @@ def test_compare_agg_and_disagg_with_replay_picks_expected_mode(monkeypatch) ->
) )
monkeypatch.setattr( monkeypatch.setattr(
replay_optimize.bench, "optimize_dense_agg_with_replay", lambda **_: agg_result replay_optimize.bench,
"optimize_dense_agg_with_replay",
lambda *_, **__: agg_result,
) )
monkeypatch.setattr( monkeypatch.setattr(
replay_optimize.bench, replay_optimize.bench,
"optimize_dense_disagg_with_replay", "optimize_dense_disagg_with_replay",
lambda **_: disagg_result, lambda *_, **__: disagg_result,
) )
comparison = compare_agg_and_disagg_with_replay( # Build a spec that populates both agg and disagg engine args so the
model=_AIC_MODEL, # comparison function accepts it (though the patched optimize_* bypass
backend="vllm", # the actual engine-args assertions).
system=_AIC_SYSTEM, spec = ReplayOptimizeSpec(
workload=SyntheticReplayWorkload( engine=EngineSpec(
isl=64, model=_AIC_MODEL,
osl=32, backend="vllm",
request_count=8, baseEngineArgs=_base_agg_args(),
replay_concurrency=4, basePrefillEngineArgs=_base_prefill_args(),
baseDecodeEngineArgs=_base_decode_args(),
), ),
base_engine_args=_base_agg_args(), hardware=HardwareSpec(gpuSku=_AIC_SYSTEM, totalGpus=8),
base_prefill_engine_args=_base_prefill_args(), workload=_synthetic_workload(),
base_decode_engine_args=_base_decode_args(), sla=_sla(mean_e2e_latency_ms=500.0),
max_total_gpus=8,
constraints={"mean_e2e_latency_ms": 500.0},
) )
comparison = compare_agg_and_disagg_with_replay(spec)
assert comparison["chosen_mode"] == "agg" assert comparison["chosen_mode"] == "agg"
assert comparison["chosen_best"] == agg_result.best_feasible assert comparison["chosen_best"] == agg_result.best_feasible
...@@ -761,6 +793,18 @@ def test_evaluate_state_prefers_normalized_metrics_over_report_payload() -> None ...@@ -761,6 +793,18 @@ def test_evaluate_state_prefers_normalized_metrics_over_report_payload() -> None
) )
cache: dict[replay_optimize.DenseReplayState, dict[str, Any]] = {} cache: dict[replay_optimize.DenseReplayState, dict[str, Any]] = {}
spec = ReplayOptimizeSpec(
engine=EngineSpec(
model="meta-llama/Llama-3.1-8B-Instruct",
backend="vllm",
basePrefillEngineArgs=_base_prefill_args(),
baseDecodeEngineArgs=_base_decode_args(),
),
hardware=HardwareSpec(gpuSku="h100_sxm", totalGpus=4),
workload=_synthetic_workload(isl=128, request_count=16),
sla=_sla(mean_e2e_latency_ms=1000.0),
)
with patch( with patch(
"dynamo.profiler.utils.replay_optimize.evaluate._run_replay_for_state", "dynamo.profiler.utils.replay_optimize.evaluate._run_replay_for_state",
return_value={ return_value={
...@@ -773,20 +817,7 @@ def test_evaluate_state_prefers_normalized_metrics_over_report_payload() -> None ...@@ -773,20 +817,7 @@ def test_evaluate_state_prefers_normalized_metrics_over_report_payload() -> None
): ):
record = replay_optimize.evaluate._evaluate_state( record = replay_optimize.evaluate._evaluate_state(
state=state, state=state,
workload=SyntheticReplayWorkload( spec=spec,
isl=128,
osl=32,
request_count=16,
replay_concurrency=4,
),
base_prefill_engine_args=_base_prefill_args(),
base_decode_engine_args=_base_decode_args(),
base_router_config=None,
model="meta-llama/Llama-3.1-8B-Instruct",
backend="vllm",
system="h100_sxm",
objective=ReplayObjective.THROUGHPUT,
constraints=ReplayConstraints(mean_e2e_latency_ms=1000.0),
cache=cache, cache=cache,
) )
...@@ -805,6 +836,17 @@ def test_evaluate_agg_state_prefers_normalized_metrics_over_report_payload() -> ...@@ -805,6 +836,17 @@ def test_evaluate_agg_state_prefers_normalized_metrics_over_report_payload() ->
) )
cache: dict[DenseAggReplayState, dict[str, Any]] = {} cache: dict[DenseAggReplayState, dict[str, Any]] = {}
spec = ReplayOptimizeSpec(
engine=EngineSpec(
model="meta-llama/Llama-3.1-8B-Instruct",
backend="vllm",
baseEngineArgs=_base_agg_args(),
),
hardware=HardwareSpec(gpuSku="h100_sxm", totalGpus=4),
workload=_synthetic_workload(isl=128, request_count=16),
sla=_sla(mean_e2e_latency_ms=1000.0),
)
with patch( with patch(
"dynamo.profiler.utils.replay_optimize.evaluate._run_agg_replay_for_state", "dynamo.profiler.utils.replay_optimize.evaluate._run_agg_replay_for_state",
return_value={ return_value={
...@@ -817,19 +859,7 @@ def test_evaluate_agg_state_prefers_normalized_metrics_over_report_payload() -> ...@@ -817,19 +859,7 @@ def test_evaluate_agg_state_prefers_normalized_metrics_over_report_payload() ->
): ):
record = replay_optimize.evaluate._evaluate_agg_state( record = replay_optimize.evaluate._evaluate_agg_state(
state=state, state=state,
workload=SyntheticReplayWorkload( spec=spec,
isl=128,
osl=32,
request_count=16,
replay_concurrency=4,
),
base_engine_args=_base_agg_args(),
base_router_config=None,
model="meta-llama/Llama-3.1-8B-Instruct",
backend="vllm",
system="h100_sxm",
objective=ReplayObjective.THROUGHPUT,
constraints=ReplayConstraints(mean_e2e_latency_ms=1000.0),
cache=cache, cache=cache,
) )
...@@ -859,25 +889,17 @@ def test_agg_optimizer_synthetic_replay_smoke(monkeypatch) -> None: ...@@ -859,25 +889,17 @@ def test_agg_optimizer_synthetic_replay_smoke(monkeypatch) -> None:
) )
result = optimize_dense_agg_with_replay( result = optimize_dense_agg_with_replay(
model=_AIC_MODEL, _agg_spec(
backend="vllm", workload=_synthetic_workload(isl=128, request_count=8),
system=_AIC_SYSTEM, total_gpus=4,
workload=SyntheticReplayWorkload( sla=_sla(
isl=128, mean_ttft_ms=100000.0,
osl=32, mean_tpot_ms=100000.0,
request_count=8, mean_e2e_latency_ms=100000.0,
replay_concurrency=4, ),
), router_mode="both",
base_engine_args=_base_agg_args(), overlap_weights=[0.0, 1.0],
max_total_gpus=4, )
constraints={
"mean_ttft_ms": 100000.0,
"mean_tpot_ms": 100000.0,
"mean_e2e_latency_ms": 100000.0,
},
router_mode="both",
overlap_score_weights=[0.0, 1.0],
max_parallel_evals=1,
) )
assert not result.evaluated_df.empty assert not result.evaluated_df.empty
...@@ -894,23 +916,17 @@ def test_agg_optimizer_timed_trace_smoke(tmp_path, monkeypatch) -> None: ...@@ -894,23 +916,17 @@ def test_agg_optimizer_timed_trace_smoke(tmp_path, monkeypatch) -> None:
) )
result = optimize_dense_agg_with_replay( result = optimize_dense_agg_with_replay(
model=_AIC_MODEL, _agg_spec(
backend="vllm", workload=_trace_workload(_write_trace(tmp_path)),
system=_AIC_SYSTEM, total_gpus=4,
workload=TraceReplayWorkload( sla=_sla(
trace_file=_write_trace(tmp_path), mean_ttft_ms=100000.0,
arrival_speedup_ratio=100.0, mean_tpot_ms=100000.0,
), mean_e2e_latency_ms=100000.0,
base_engine_args=_base_agg_args(), ),
max_total_gpus=4, router_mode="both",
constraints={ overlap_weights=[0.0, 1.0],
"mean_ttft_ms": 100000.0, )
"mean_tpot_ms": 100000.0,
"mean_e2e_latency_ms": 100000.0,
},
router_mode="both",
overlap_score_weights=[0.0, 1.0],
max_parallel_evals=1,
) )
assert not result.evaluated_df.empty assert not result.evaluated_df.empty
...@@ -927,25 +943,16 @@ def test_optimizer_synthetic_replay_smoke(tmp_path, monkeypatch) -> None: ...@@ -927,25 +943,16 @@ def test_optimizer_synthetic_replay_smoke(tmp_path, monkeypatch) -> None:
) )
result = optimize_dense_disagg_with_replay( result = optimize_dense_disagg_with_replay(
model=_AIC_MODEL, _disagg_spec(
backend="vllm", workload=_synthetic_workload(isl=128, request_count=8),
system=_AIC_SYSTEM, total_gpus=4,
workload=SyntheticReplayWorkload( sla=_sla(
isl=128, mean_ttft_ms=100000.0,
osl=32, mean_tpot_ms=100000.0,
request_count=8, mean_e2e_latency_ms=100000.0,
replay_concurrency=4, ),
), overlap_weights=[0.0, 1.0],
base_prefill_engine_args=_base_prefill_args(), )
base_decode_engine_args=_base_decode_args(),
max_total_gpus=4,
constraints={
"mean_ttft_ms": 100000.0,
"mean_tpot_ms": 100000.0,
"mean_e2e_latency_ms": 100000.0,
},
overlap_score_weights=[0.0, 1.0],
max_parallel_evals=1,
) )
assert not result.evaluated_df.empty assert not result.evaluated_df.empty
...@@ -962,24 +969,16 @@ def test_optimizer_timed_trace_smoke(tmp_path, monkeypatch) -> None: ...@@ -962,24 +969,16 @@ def test_optimizer_timed_trace_smoke(tmp_path, monkeypatch) -> None:
) )
result = optimize_dense_disagg_with_replay( result = optimize_dense_disagg_with_replay(
model=_AIC_MODEL, _disagg_spec(
backend="vllm", workload=_trace_workload(_write_trace(tmp_path)),
system=_AIC_SYSTEM, total_gpus=4,
workload=TraceReplayWorkload( sla=_sla(
trace_file=_write_trace(tmp_path), mean_ttft_ms=100000.0,
arrival_speedup_ratio=100.0, mean_tpot_ms=100000.0,
), mean_e2e_latency_ms=100000.0,
base_prefill_engine_args=_base_prefill_args(), ),
base_decode_engine_args=_base_decode_args(), overlap_weights=[0.0, 1.0],
max_total_gpus=4, )
constraints={
"mean_ttft_ms": 100000.0,
"mean_tpot_ms": 100000.0,
"mean_e2e_latency_ms": 100000.0,
},
overlap_score_weights=[0.0, 1.0],
max_parallel_evals=1,
) )
assert not result.evaluated_df.empty assert not result.evaluated_df.empty
assert result.best_feasible is not None
...@@ -12,7 +12,7 @@ This experiment searches over disaggregated replay states to answer a concrete q ...@@ -12,7 +12,7 @@ This experiment searches over disaggregated replay states to answer a concrete q
- given a fixed GPU budget - given a fixed GPU budget
- for a workload with real prefix overlap - for a workload with real prefix overlap
- and latency constraints that still permit meaningful throughput - and latency SLAs that still permit meaningful throughput
which `(prefill_tp, decode_tp, prefill_workers, decode_workers, overlap_score_weight)` combination which `(prefill_tp, decode_tp, prefill_workers, decode_workers, overlap_score_weight)` combination
produces the best offline replay result? produces the best offline replay result?
...@@ -20,6 +20,26 @@ produces the best offline replay result? ...@@ -20,6 +20,26 @@ produces the best offline replay result?
This is a heuristic search over replay states, not an exact optimizer over all feasible This is a heuristic search over replay states, not an exact optimizer over all feasible
configurations. configurations.
## Spec Shape (DGDR-aligned)
The public API takes a single [`ReplayOptimizeSpec`](specs.py) composed of:
- `EngineSpec` — model, backend, engine args (prefill + decode for disagg; single `baseEngineArgs`
for agg)
- `HardwareSpec` — GPU SKU + total GPU budget
- `WorkloadSpec` — synthetic workload knobs (isl/osl/concurrency/...) **or** a trace source
(`traceFile`/`arrivalSpeedupRatio`), discriminated by whether `traceFile` is set
- `SLASpec` — latency bounds (`ttft`, `itl`, `e2eLatency`, plus p95 variants); each is independent
and optional
- `RouterSpec` — router mode, overlap-score-weight sweep, KV-router base config
- `objective``ReplayObjective.THROUGHPUT` (default), `MEAN_TTFT`, or `MEAN_E2E_LATENCY`
- `maxParallelEvals` — how many replay evaluations to run concurrently
Field names use lowerCamelCase throughout to match the operator's auto-generated
[`dgdr_v1beta1_types.py`](../dgdr_v1beta1_types.py) so the eventual upstream merge into the Go
`DynamoGraphDeploymentRequestSpec` is mechanical. Method names (`.violation_penalty()`,
`.summarize()`, `.aic_task_kwargs()`) stay snake_case to match Pydantic convention.
## Prerequisites ## Prerequisites
Run from the repository root. Run from the repository root.
...@@ -72,7 +92,8 @@ export PYTHONPATH=lib/bindings/python/src:components/src ...@@ -72,7 +92,8 @@ export PYTHONPATH=lib/bindings/python/src:components/src
``` ```
If the replay search uses multiple worker processes, prefer a real script file over a heredoc. This If the replay search uses multiple worker processes, prefer a real script file over a heredoc. This
matters on macOS because `ProcessPoolExecutor` child workers need a stable module path. matters on macOS because `ProcessPoolExecutor` child workers need a stable module path, and the
driver module must guard its entry behind `if __name__ == "__main__":`.
For KV-router replay logs, this filter keeps the run readable without hiding useful `info` output: For KV-router replay logs, this filter keeps the run readable without hiding useful `info` output:
...@@ -84,12 +105,12 @@ export DYN_LOG='info,dynamo_kv_router::scheduling::selector=warn' ...@@ -84,12 +105,12 @@ export DYN_LOG='info,dynamo_kv_router::scheduling::selector=warn'
This sweep uses: This sweep uses:
- model: `Qwen/Qwen3-32B` - `EngineSpec.model`: `Qwen/Qwen3-32B`
- backend: `vllm` - `EngineSpec.backend`: `vllm`
- system: `h200_sxm` - `HardwareSpec.gpuSku`: `h200_sxm`
- router mode: `kv_router` - `HardwareSpec.totalGpus`: `16`
- workload type: `SyntheticReplayWorkload` - `RouterSpec.mode`: `kv_router`
- GPU budget: `16` - Synthetic `WorkloadSpec`
The GPU budget here is a simulated search constraint used by offline replay when it enumerates The GPU budget here is a simulated search constraint used by offline replay when it enumerates
candidate TP and worker configurations. You do not need 16 real GPUs locally to run this search. candidate TP and worker configurations. You do not need 16 real GPUs locally to run this search.
...@@ -99,10 +120,10 @@ matter: ...@@ -99,10 +120,10 @@ matter:
- `isl=32768` - `isl=32768`
- `osl=256` - `osl=256`
- `request_count=5000` - `requestCount=5000`
- `replay_concurrency=200` - `concurrency=200`
- `shared_prefix_ratio=0.5` - `sharedPrefixRatio=0.5`
- `num_prefix_groups=50` - `numPrefixGroups=50`
The base engine args stay conservative: The base engine args stay conservative:
...@@ -119,23 +140,37 @@ This setup does not force scheduler-specific bottlenecks such as: ...@@ -119,23 +140,37 @@ This setup does not force scheduler-specific bottlenecks such as:
Only add those when the experiment is specifically about scheduler limits. Only add those when the experiment is specifically about scheduler limits.
## Search Strategy
`replay_optimize` runs a coordinate descent over three dimensions per round, iterating until the incumbent stops moving or `DEFAULT_SEARCH_ROUNDS` is reached:
```mermaid
flowchart LR
A["TP search<br/>choose TP shape<br/>(prefill_tp, decode_tp)<br/>under GPU budget"] --> B["Worker search<br/>choose worker split<br/>(prefill_workers, decode_workers)<br/>for the chosen TP"]
B --> C["Router search<br/>choose routing mode<br/>and overlap_score_weight"]
C --> A
```
Each step calls [`evaluate._evaluate_states`](evaluate.py), which replays the candidate state through `run_synthetic_trace_replay` or `run_trace_replay` (see [Mocker Trace Replay](../../../../../../docs/benchmarks/mocker-trace-replay.md) for the underlying harness) and ranks the resulting records with `scoring._pick_best_record`. The ranking key is `spec.objective` (throughput, mean_ttft, or mean_e2e_latency) subject to `spec.sla` bounds and `spec.hardware.totalGpus` as a feasibility gate.
The descent is budget-focused: each step prunes to near-budget-edge states so the sweep ends up at a TP/worker shape that actually consumes the available GPU budget, rather than at a throughput-per-GPU pareto point. Aggregated replay (`optimize_dense_agg_with_replay`) collapses dimensions 1 and 2 into `(tp, workers)` but is otherwise identical; see [`search.py`](search.py) for both entrypoints.
## Driver Script ## Driver Script
The canonical starting point now lives in [example.py](example.py). Keeping it as a real module is The canonical starting point lives in [example.py](example.py). Keeping it as a real module is
better than carrying a large inline snippet in the README, and it also satisfies the macOS better than carrying a large inline snippet in the README, and it also satisfies the macOS
`ProcessPoolExecutor` requirement for a stable module path. `ProcessPoolExecutor` requirement for a stable module path.
Treat [example.py](example.py) as a starting point, not a frozen harness. Modify it as needed for Treat [example.py](example.py) as a starting point, not a frozen harness. Modify it as needed for
your search: your search:
- change the workload shape - change the `WorkloadSpec` shape (or switch to a trace source with `traceFile=...`)
- swap `SyntheticReplayWorkload` for `TraceReplayWorkload` - add SLA bounds on `SLASpec` (`ttft`, `itl`, `e2eLatency`, or their p95 variants)
- change constraints - change `RouterSpec.overlapWeights`
- change `overlap_score_weights`
- print different columns from `result.evaluated_df` or `result.feasible_df` - print different columns from `result.evaluated_df` or `result.feasible_df`
- persist the tables to CSV or parquet if you want downstream analysis - persist the tables to CSV or parquet if you want downstream analysis
If you need to understand which knobs are available, see [models.py](models.py), [search.py](search.py), If you need to understand which knobs are available, see [specs.py](specs.py), [search.py](search.py),
and [evaluate.py](evaluate.py). and [evaluate.py](evaluate.py).
The default path in [example.py](example.py) is the synthetic disaggregated sweep documented in The default path in [example.py](example.py) is the synthetic disaggregated sweep documented in
...@@ -146,8 +181,8 @@ used for the Mooncake-style replay path below without rewriting the harness from ...@@ -146,8 +181,8 @@ used for the Mooncake-style replay path below without rewriting the harness from
The returned object is a `DenseReplayOptimizationResult` with: The returned object is a `DenseReplayOptimizationResult` with:
- `best_feasible`: best visited state that satisfies all constraints - `best_feasible`: best visited state that satisfies all SLAs and the GPU budget
- `best_infeasible`: best visited state that misses at least one constraint - `best_infeasible`: best visited state that misses at least one SLA bound or the budget
- `evaluated_df`: all visited states - `evaluated_df`: all visited states
- `feasible_df`: only the feasible visited states - `feasible_df`: only the feasible visited states
...@@ -162,45 +197,41 @@ Useful columns to inspect: ...@@ -162,45 +197,41 @@ Useful columns to inspect:
- cache behavior: `prefix_cache_reused_ratio` - cache behavior: `prefix_cache_reused_ratio`
- latency: `mean_ttft_ms`, `mean_tpot_ms`, `mean_e2e_latency_ms` - latency: `mean_ttft_ms`, `mean_tpot_ms`, `mean_e2e_latency_ms`
In local testing, this setup produced a non-trivial mean-E2E winner around: Note that the report DataFrame still uses the Rust replay runner's key names
(`mean_ttft_ms`, `mean_tpot_ms`, `mean_e2e_latency_ms`) even though the input `SLASpec` uses DGDR's
- `prefill_tp=2` camelCase names (`ttft`, `itl`, `e2eLatency`). `SLASpec` carries an internal translation map;
- `decode_tp=1` renaming the Rust output keys is a follow-up in the Rust replay runner.
- `prefill_workers=2`
- `decode_workers=4`
- `overlap_score_weight=0.5`
Ballpark metrics for that point were: In local testing, this setup produced a non-trivial mean-E2E winner around:
- `prefix_cache_reused_ratio ~= 0.5` - `prefill_tp=4`, `decode_tp=1`, `prefill_workers=3`, `decode_workers=4`, `overlap_score_weight=0.5`
- `output_throughput_tok_s ~= 4500` - `output_throughput_tok_s ~= 970`, `prefix_cache_reused_ratio ~= 0.5`,
- `mean_ttft_ms ~= 4500` `mean_ttft_ms ~= 42800`, `mean_tpot_ms ~= 35`, `mean_e2e_latency_ms ~= 51900`
- `mean_tpot_ms ~= 26`
- `mean_e2e_latency_ms ~= 11150`
Treat those as sanity-check ranges, not fixed assertions. Treat those as sanity-check ranges, not fixed assertions. See the regression anchor table in the
`run-replay-bench` skill for the current reference frontier.
## Tuning This Sweep ## Tuning This Sweep
To broaden or shift the search, vary one axis at a time: To broaden or shift the search, vary one axis at a time:
- `max_total_gpus` - `HardwareSpec.totalGpus`
- `overlap_score_weights` - `RouterSpec.overlapWeights`
- `shared_prefix_ratio` - `WorkloadSpec.sharedPrefixRatio`
- `num_prefix_groups` - `WorkloadSpec.numPrefixGroups`
- base prefill/decode engine args - base prefill/decode engine args
If you want to compare routing strategies directly, use `router_mode="both"` instead of the default If you want to compare routing strategies directly, use `RouterSpec(mode="both")` instead of the
KV-router-only search. default KV-router-only search.
## Real Traffic Replay ## Real Traffic Replay
`replay_optimize` is wired up for trace-driven replay. In `replay_optimize` is wired up for trace-driven replay. `WorkloadSpec` takes a `traceFile` (plus
[evaluate.py](evaluate.py), `TraceReplayWorkload` goes through `run_trace_replay(...)`, while optional `arrivalSpeedupRatio`) discriminator; when set, [evaluate.py](evaluate.py) routes through
`SyntheticReplayWorkload` goes through `run_synthetic_trace_replay(...)`. `run_trace_replay(...)` instead of `run_synthetic_trace_replay(...)`.
Use a separate trace-driven experiment when you want to evaluate the same search structure against a Use a separate trace-driven experiment when you want to evaluate the same search structure against
real Mooncake-style workload instead of the synthetic shared-prefix workload above. a real Mooncake-style workload instead of the synthetic shared-prefix workload above.
### Download a Mooncake Trace ### Download a Mooncake Trace
...@@ -222,53 +253,44 @@ wget -O /tmp/toolagent_trace.jsonl \ ...@@ -222,53 +253,44 @@ wget -O /tmp/toolagent_trace.jsonl \
If you use [example.py](example.py), pass `--trace-file /tmp/toolagent_trace.jsonl` and optionally If you use [example.py](example.py), pass `--trace-file /tmp/toolagent_trace.jsonl` and optionally
`--arrival-speedup-ratio 0.8`. `--arrival-speedup-ratio 0.8`.
If you want to edit the driver directly, replace: If you want to edit the driver directly, replace the synthetic `WorkloadSpec`:
```python ```python
workload=SyntheticReplayWorkload( workload=WorkloadSpec(
isl=32768, isl=32768,
osl=256, osl=256,
request_count=5000, requestCount=5000,
replay_concurrency=200, concurrency=200,
shared_prefix_ratio=0.5, sharedPrefixRatio=0.5,
num_prefix_groups=50, numPrefixGroups=50,
), ),
``` ```
with: with:
```python ```python
from dynamo.profiler.utils.replay_optimize import TraceReplayWorkload workload=WorkloadSpec(
traceFile="/tmp/toolagent_trace.jsonl",
workload=TraceReplayWorkload( arrivalSpeedupRatio=1.0,
trace_file="/tmp/toolagent_trace.jsonl",
arrival_speedup_ratio=1.0,
), ),
``` ```
If you want to replay the same trace at `0.80x` of its original arrival rate, keep the same file If you want to replay the same trace at `0.80x` of its original arrival rate, keep the same file
and set: and set `arrivalSpeedupRatio=0.8`.
```python
workload=TraceReplayWorkload(
trace_file="/tmp/toolagent_trace.jsonl",
arrival_speedup_ratio=0.8,
),
```
The main behavioral change is that the workload stops generating requests in memory and instead The main behavioral change is that the workload stops generating requests in memory and instead
replays request arrivals from the JSONL trace. In this path: replays request arrivals from the JSONL trace. In this path:
- `trace_file` points at the Mooncake-style JSONL input - `traceFile` points at the Mooncake-style JSONL input
- `arrival_speedup_ratio` compresses or stretches the trace arrival process - `arrivalSpeedupRatio` compresses or stretches the trace arrival process
- synthetic-only knobs such as `isl`, `osl`, `request_count`, `replay_concurrency`, - synthetic-only knobs such as `isl`, `osl`, `requestCount`, `concurrency`,
`shared_prefix_ratio`, and `num_prefix_groups` no longer apply at the workload level `sharedPrefixRatio`, `numPrefixGroups` are ignored by the trace replay path
Important notes for the public toolagent trace: Important notes for the public toolagent trace:
- the dataset uses Mooncake-style `hash_ids` with `512` tokens per block - the dataset uses Mooncake-style `hash_ids` with `512` tokens per block
- the underlying `run_trace_replay(...)` API defaults `trace_block_size` to `512` - the underlying `run_trace_replay(...)` API defaults `trace_block_size` to `512`
- the current `TraceReplayWorkload` wrapper does not expose a separate `trace_block_size` field - `WorkloadSpec` does not yet expose a separate `traceBlockSize` field
- the prefix-data-generator tools in - the prefix-data-generator tools in
[Prefix Data Generator](../../../../../../benchmarks/prefix_data_generator/README.md) [Prefix Data Generator](../../../../../../benchmarks/prefix_data_generator/README.md)
are useful if you want to inspect the trace first or synthesize a larger derivative trace before are useful if you want to inspect the trace first or synthesize a larger derivative trace before
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
from __future__ import annotations from __future__ import annotations
from . import aic, bench, engine_args, evaluate, scoring, search from . import aic, bench, engine_args, evaluate, scoring, search, specs
from .aic import _enumerate_dense_tp_candidates, _load_aiconfigurator_modules from .aic import _enumerate_dense_tp_candidates, _load_aiconfigurator_modules
from .bench import compare_agg_and_disagg_with_replay, compare_aic_and_replay_disagg from .bench import compare_agg_and_disagg_with_replay, compare_aic_and_replay_disagg
from .engine_args import ( from .engine_args import (
...@@ -11,15 +11,7 @@ from .engine_args import ( ...@@ -11,15 +11,7 @@ from .engine_args import (
_build_candidate_engine_args, _build_candidate_engine_args,
_build_router_config, _build_router_config,
) )
from .models import ( from .models import DenseAggReplayState, DenseReplayOptimizationResult, DenseReplayState
DenseAggReplayState,
DenseReplayOptimizationResult,
DenseReplayState,
ReplayConstraints,
ReplayObjective,
SyntheticReplayWorkload,
TraceReplayWorkload,
)
from .scoring import _pick_best_record from .scoring import _pick_best_record
from .search import ( from .search import (
_iter_agg_tp_states_with_max_workers, _iter_agg_tp_states_with_max_workers,
...@@ -29,6 +21,16 @@ from .search import ( ...@@ -29,6 +21,16 @@ from .search import (
optimize_dense_agg_with_replay, optimize_dense_agg_with_replay,
optimize_dense_disagg_with_replay, optimize_dense_disagg_with_replay,
) )
from .specs import (
EngineSpec,
HardwareSpec,
ReplayObjective,
ReplayOptimizeSpec,
RouterMode,
RouterSpec,
SLASpec,
WorkloadSpec,
)
__all__ = [ __all__ = [
"_build_agg_candidate_engine_args", "_build_agg_candidate_engine_args",
...@@ -46,10 +48,14 @@ __all__ = [ ...@@ -46,10 +48,14 @@ __all__ = [
"DenseAggReplayState", "DenseAggReplayState",
"DenseReplayOptimizationResult", "DenseReplayOptimizationResult",
"DenseReplayState", "DenseReplayState",
"ReplayConstraints", "EngineSpec",
"HardwareSpec",
"ReplayObjective", "ReplayObjective",
"SyntheticReplayWorkload", "ReplayOptimizeSpec",
"TraceReplayWorkload", "RouterMode",
"RouterSpec",
"SLASpec",
"WorkloadSpec",
"aic", "aic",
"bench", "bench",
"engine_args", "engine_args",
...@@ -58,4 +64,5 @@ __all__ = [ ...@@ -58,4 +64,5 @@ __all__ = [
"optimize_dense_disagg_with_replay", "optimize_dense_disagg_with_replay",
"scoring", "scoring",
"search", "search",
"specs",
] ]
...@@ -3,65 +3,49 @@ ...@@ -3,65 +3,49 @@
from __future__ import annotations from __future__ import annotations
from collections.abc import Mapping
from typing import Any from typing import Any
import pandas as pd import pandas as pd
from aiconfigurator.sdk.task import TaskConfig, TaskRunner from aiconfigurator.sdk.task import TaskConfig, TaskRunner
from dynamo.llm import MockEngineArgs
from .models import ReplayConstraints, SyntheticReplayWorkload, TraceReplayWorkload
from .scoring import _pick_best_record from .scoring import _pick_best_record
from .search import optimize_dense_agg_with_replay, optimize_dense_disagg_with_replay from .search import optimize_dense_agg_with_replay, optimize_dense_disagg_with_replay
from .specs import ReplayOptimizeSpec
def compare_aic_and_replay_disagg( def compare_aic_and_replay_disagg(
*, spec: ReplayOptimizeSpec,
model: str,
backend: str,
system: str,
isl: int,
osl: int,
max_total_gpus: int,
replay_request_count: int,
replay_concurrency: int,
base_prefill_engine_args: MockEngineArgs,
base_decode_engine_args: MockEngineArgs,
constraints: Mapping[str, float] | None = None,
max_parallel_evals: int = 1,
) -> dict[str, Any]: ) -> dict[str, Any]:
aic_constraints = ReplayConstraints.from_mapping(constraints, max_total_gpus) """Run AIC pareto + replay optimization side-by-side for a disagg config.
Uses SLA bounds from `spec.sla` as AIC's latency targets; round_robin router
mode is forced for the replay run (AIC itself has no router sweep).
"""
if spec.workload.isTraceBased:
raise ValueError("compare_aic_and_replay_disagg requires a synthetic workload")
if spec.workload.requestCount is None or spec.workload.concurrency is None:
raise ValueError(
"compare_aic_and_replay_disagg requires synthetic WorkloadSpec with "
"requestCount and concurrency"
)
aic_task = TaskConfig( aic_task = TaskConfig(
serving_mode="disagg", serving_mode="disagg",
model_path=model, model_path=spec.engine.model,
system_name=system, system_name=str(spec.hardware.gpuSku),
backend_name=backend, backend_name=spec.engine.backend.value,
total_gpus=max_total_gpus, total_gpus=spec.hardware.totalGpus,
isl=isl, isl=spec.workload.isl,
osl=osl, osl=spec.workload.osl,
**aic_constraints.aic_task_kwargs(), **spec.sla.aic_task_kwargs(),
) )
aic_result = TaskRunner().run(aic_task) aic_result = TaskRunner().run(aic_task)
aic_df = aic_result.get("pareto_df", pd.DataFrame()) aic_df = aic_result.get("pareto_df", pd.DataFrame())
replay_result = optimize_dense_disagg_with_replay( replay_spec = spec.model_copy(
model=model, update={"router": spec.router.model_copy(update={"mode": "round_robin"})}
backend=backend,
system=system,
workload=SyntheticReplayWorkload(
isl=isl,
osl=osl,
request_count=replay_request_count,
replay_concurrency=replay_concurrency,
),
base_prefill_engine_args=base_prefill_engine_args,
base_decode_engine_args=base_decode_engine_args,
max_total_gpus=max_total_gpus,
constraints=constraints,
router_mode="round_robin",
max_parallel_evals=max_parallel_evals,
) )
replay_result = optimize_dense_disagg_with_replay(replay_spec)
aic_best = None aic_best = None
if not aic_df.empty: if not aic_df.empty:
...@@ -108,45 +92,15 @@ def compare_aic_and_replay_disagg( ...@@ -108,45 +92,15 @@ def compare_aic_and_replay_disagg(
def compare_agg_and_disagg_with_replay( def compare_agg_and_disagg_with_replay(
*, spec: ReplayOptimizeSpec,
model: str,
backend: str,
system: str,
workload: SyntheticReplayWorkload | TraceReplayWorkload,
base_engine_args: MockEngineArgs,
base_prefill_engine_args: MockEngineArgs,
base_decode_engine_args: MockEngineArgs,
max_total_gpus: int,
constraints: Mapping[str, float] | None = None,
router_mode: str = "kv_router",
overlap_score_weights: tuple[float, ...] | list[float] | None = None,
max_parallel_evals: int = 1,
) -> dict[str, Any]: ) -> dict[str, Any]:
agg_result = optimize_dense_agg_with_replay( """Run both agg and disagg replay optimizations on the same spec and pick the winner.
model=model,
backend=backend, The spec must populate `spec.engine.baseEngineArgs` (agg path) plus
system=system, `basePrefillEngineArgs` / `baseDecodeEngineArgs` (disagg path).
workload=workload, """
base_engine_args=base_engine_args, agg_result = optimize_dense_agg_with_replay(spec)
max_total_gpus=max_total_gpus, disagg_result = optimize_dense_disagg_with_replay(spec)
constraints=constraints,
router_mode=router_mode,
overlap_score_weights=overlap_score_weights,
max_parallel_evals=max_parallel_evals,
)
disagg_result = optimize_dense_disagg_with_replay(
model=model,
backend=backend,
system=system,
workload=workload,
base_prefill_engine_args=base_prefill_engine_args,
base_decode_engine_args=base_decode_engine_args,
max_total_gpus=max_total_gpus,
constraints=constraints,
router_mode=router_mode,
overlap_score_weights=overlap_score_weights,
max_parallel_evals=max_parallel_evals,
)
agg_best = agg_result.best_feasible agg_best = agg_result.best_feasible
disagg_best = disagg_result.best_feasible disagg_best = disagg_result.best_feasible
......
...@@ -13,14 +13,3 @@ AIC_BACKEND_VERSIONS = { ...@@ -13,14 +13,3 @@ AIC_BACKEND_VERSIONS = {
DEFAULT_OVERLAP_SCORE_WEIGHTS = (0.0, 0.25, 0.5, 1.0, 2.0, 4.0) DEFAULT_OVERLAP_SCORE_WEIGHTS = (0.0, 0.25, 0.5, 1.0, 2.0, 4.0)
DEFAULT_MAX_PARALLEL_EVALS = min(8, os.cpu_count() or 1) DEFAULT_MAX_PARALLEL_EVALS = min(8, os.cpu_count() or 1)
DEFAULT_SEARCH_ROUNDS = 3 DEFAULT_SEARCH_ROUNDS = 3
SUPPORTED_CONSTRAINTS = frozenset(
{
"mean_ttft_ms",
"p95_ttft_ms",
"mean_tpot_ms",
"p95_tpot_ms",
"mean_e2e_latency_ms",
"p95_e2e_latency_ms",
"max_total_gpus",
}
)
...@@ -4,8 +4,8 @@ ...@@ -4,8 +4,8 @@
"""Replay evaluation helpers for the budget-focused dense search heuristic. """Replay evaluation helpers for the budget-focused dense search heuristic.
The search in `search.py` assumes we prefer to consume the available GPU budget The search in `search.py` assumes we prefer to consume the available GPU budget
and therefore ranks visited states by raw output throughput, subject to replay and therefore ranks visited states by the selected `spec.objective`, subject to
constraints, rather than by throughput normalized per GPU. SLA and budget constraints, rather than by throughput normalized per GPU.
""" """
from __future__ import annotations from __future__ import annotations
...@@ -25,105 +25,119 @@ from .engine_args import ( ...@@ -25,105 +25,119 @@ from .engine_args import (
_build_router_config, _build_router_config,
) )
from .logging import ensure_dynamo_logging, log_state_finish, log_state_start from .logging import ensure_dynamo_logging, log_state_finish, log_state_start
from .models import ( from .models import DenseAggReplayState, DenseReplayState
DenseAggReplayState, from .specs import (
DenseReplayState, EngineSpec,
ReplayConstraints, HardwareSpec,
ReplayObjective, ReplayObjective,
SyntheticReplayWorkload, ReplayOptimizeSpec,
TraceReplayWorkload, RouterSpec,
SLASpec,
WorkloadSpec,
) )
def _run_replay_for_state( def _run_replay_for_state(
*, *,
state: DenseReplayState, state: DenseReplayState,
workload: SyntheticReplayWorkload | TraceReplayWorkload, workload: WorkloadSpec,
prefill_engine_args: MockEngineArgs, prefill_engine_args: MockEngineArgs,
decode_engine_args: MockEngineArgs, decode_engine_args: MockEngineArgs,
router_config: KvRouterConfig | None, router_config: KvRouterConfig | None,
) -> dict[str, Any]: ) -> dict[str, Any]:
if isinstance(workload, SyntheticReplayWorkload): if workload.isTraceBased:
return run_synthetic_trace_replay( return run_trace_replay(
workload.isl, Path(workload.traceFile),
workload.osl,
workload.request_count,
prefill_engine_args=prefill_engine_args, prefill_engine_args=prefill_engine_args,
decode_engine_args=decode_engine_args, decode_engine_args=decode_engine_args,
router_config=router_config, router_config=router_config,
num_prefill_workers=state.prefill_workers, num_prefill_workers=state.prefill_workers,
num_decode_workers=state.decode_workers, num_decode_workers=state.decode_workers,
replay_concurrency=workload.replay_concurrency,
replay_mode="offline", replay_mode="offline",
router_mode=state.router_mode, router_mode=state.router_mode,
arrival_interval_ms=workload.arrival_interval_ms, arrival_speedup_ratio=workload.arrivalSpeedupRatio,
turns_per_session=workload.turns_per_session,
shared_prefix_ratio=workload.shared_prefix_ratio,
num_prefix_groups=workload.num_prefix_groups,
inter_turn_delay_ms=workload.inter_turn_delay_ms,
) )
return run_trace_replay( return run_synthetic_trace_replay(
Path(workload.trace_file), workload.isl,
workload.osl,
int(workload.requestCount),
prefill_engine_args=prefill_engine_args, prefill_engine_args=prefill_engine_args,
decode_engine_args=decode_engine_args, decode_engine_args=decode_engine_args,
router_config=router_config, router_config=router_config,
num_prefill_workers=state.prefill_workers, num_prefill_workers=state.prefill_workers,
num_decode_workers=state.decode_workers, num_decode_workers=state.decode_workers,
replay_concurrency=int(workload.concurrency),
replay_mode="offline", replay_mode="offline",
router_mode=state.router_mode, router_mode=state.router_mode,
arrival_speedup_ratio=workload.arrival_speedup_ratio, arrival_interval_ms=workload.arrivalIntervalMs,
turns_per_session=workload.turnsPerSession,
shared_prefix_ratio=workload.sharedPrefixRatio,
num_prefix_groups=workload.numPrefixGroups,
inter_turn_delay_ms=workload.interTurnDelayMs,
) )
def _run_agg_replay_for_state( def _run_agg_replay_for_state(
*, *,
state: DenseAggReplayState, state: DenseAggReplayState,
workload: SyntheticReplayWorkload | TraceReplayWorkload, workload: WorkloadSpec,
engine_args: MockEngineArgs, engine_args: MockEngineArgs,
router_config: KvRouterConfig | None, router_config: KvRouterConfig | None,
) -> dict[str, Any]: ) -> dict[str, Any]:
if isinstance(workload, SyntheticReplayWorkload): if workload.isTraceBased:
return run_synthetic_trace_replay( return run_trace_replay(
workload.isl, Path(workload.traceFile),
workload.osl,
workload.request_count,
extra_engine_args=engine_args, extra_engine_args=engine_args,
router_config=router_config, router_config=router_config,
num_workers=state.workers, num_workers=state.workers,
replay_concurrency=workload.replay_concurrency,
replay_mode="offline", replay_mode="offline",
router_mode=state.router_mode, router_mode=state.router_mode,
arrival_interval_ms=workload.arrival_interval_ms, arrival_speedup_ratio=workload.arrivalSpeedupRatio,
turns_per_session=workload.turns_per_session,
shared_prefix_ratio=workload.shared_prefix_ratio,
num_prefix_groups=workload.num_prefix_groups,
inter_turn_delay_ms=workload.inter_turn_delay_ms,
) )
return run_trace_replay( return run_synthetic_trace_replay(
Path(workload.trace_file), workload.isl,
workload.osl,
int(workload.requestCount),
extra_engine_args=engine_args, extra_engine_args=engine_args,
router_config=router_config, router_config=router_config,
num_workers=state.workers, num_workers=state.workers,
replay_concurrency=int(workload.concurrency),
replay_mode="offline", replay_mode="offline",
router_mode=state.router_mode, router_mode=state.router_mode,
arrival_speedup_ratio=workload.arrival_speedup_ratio, arrival_interval_ms=workload.arrivalIntervalMs,
turns_per_session=workload.turnsPerSession,
shared_prefix_ratio=workload.sharedPrefixRatio,
num_prefix_groups=workload.numPrefixGroups,
inter_turn_delay_ms=workload.interTurnDelayMs,
) )
def _feasibility(
*,
report: Mapping[str, Any],
state: DenseReplayState | DenseAggReplayState,
sla: SLASpec,
hardware: HardwareSpec,
) -> tuple[float, bool]:
"""Split SLA violation and hardware-budget gate, combine into one penalty.
The record's `violation_penalty` field keeps its Phase 1 role (drives
infeasible-record sorting in `_pick_best_record`); over-budget adds a
constant 1.0 so it outranks small SLA misses.
"""
sla_penalty = sla.violation_penalty(report)
over_budget = state.total_gpus_used > hardware.totalGpus
penalty = sla_penalty + (1.0 if over_budget else 0.0)
feasible = sla_penalty == 0.0 and not over_budget
return penalty, feasible
def _evaluate_state( def _evaluate_state(
*, *,
state: DenseReplayState, state: DenseReplayState,
workload: SyntheticReplayWorkload | TraceReplayWorkload, spec: ReplayOptimizeSpec,
base_prefill_engine_args: MockEngineArgs,
base_decode_engine_args: MockEngineArgs,
base_router_config: KvRouterConfig | None,
model: str,
backend: str,
system: str,
objective: ReplayObjective,
constraints: ReplayConstraints,
cache: dict[DenseReplayState, dict[str, Any]], cache: dict[DenseReplayState, dict[str, Any]],
) -> dict[str, Any]: ) -> dict[str, Any]:
ensure_dynamo_logging() ensure_dynamo_logging()
...@@ -133,54 +147,57 @@ def _evaluate_state( ...@@ -133,54 +147,57 @@ def _evaluate_state(
log_state_start(state) log_state_start(state)
backend = spec.engine.backend.value
system = str(spec.hardware.gpuSku)
prefill_args = _build_candidate_engine_args( prefill_args = _build_candidate_engine_args(
base_args=base_prefill_engine_args, base_args=spec.engine.basePrefillEngineArgs,
tp_size=state.prefill_tp, tp_size=state.prefill_tp,
worker_type="prefill", worker_type="prefill",
backend=backend, backend=backend,
system=system, system=system,
model=model, model=spec.engine.model,
) )
decode_args = _build_candidate_engine_args( decode_args = _build_candidate_engine_args(
base_args=base_decode_engine_args, base_args=spec.engine.baseDecodeEngineArgs,
tp_size=state.decode_tp, tp_size=state.decode_tp,
worker_type="decode", worker_type="decode",
backend=backend, backend=backend,
system=system, system=system,
model=model, model=spec.engine.model,
) )
router_config = None router_config = None
if state.router_mode == "kv_router": if state.router_mode == "kv_router":
router_config = _build_router_config( router_config = _build_router_config(
base_router_config, state.overlap_score_weight spec.router.baseRouterConfig, state.overlap_score_weight
) )
report = _run_replay_for_state( report = _run_replay_for_state(
state=state, state=state,
workload=workload, workload=spec.workload,
prefill_engine_args=prefill_args, prefill_engine_args=prefill_args,
decode_engine_args=decode_args, decode_engine_args=decode_args,
router_config=router_config, router_config=router_config,
) )
total_gpus_used = state.total_gpus_used
throughput = float(report["output_throughput_tok_s"]) throughput = float(report["output_throughput_tok_s"])
score = objective.score(report) score = spec.objective.score(report)
penalty = constraints.violation_penalty(report, total_gpus_used) penalty, feasible = _feasibility(
feasible = penalty == 0.0 report=report, state=state, sla=spec.sla, hardware=spec.hardware
)
record = { record = {
**report, **report,
**asdict(state), **asdict(state),
"total_gpus_used": total_gpus_used, "total_gpus_used": state.total_gpus_used,
"output_throughput_tok_s": throughput, "output_throughput_tok_s": throughput,
"score": score, "score": score,
"objective": objective.value, "objective": spec.objective.value,
"feasible": feasible, "feasible": feasible,
"violation_penalty": penalty, "violation_penalty": penalty,
} }
log_state_finish( log_state_finish(
state=state, state=state,
report=report, report=report,
constraints=constraints, sla=spec.sla,
hardware=spec.hardware,
score=score, score=score,
feasible=feasible, feasible=feasible,
violation_penalty=penalty, violation_penalty=penalty,
...@@ -192,14 +209,7 @@ def _evaluate_state( ...@@ -192,14 +209,7 @@ def _evaluate_state(
def _evaluate_agg_state( def _evaluate_agg_state(
*, *,
state: DenseAggReplayState, state: DenseAggReplayState,
workload: SyntheticReplayWorkload | TraceReplayWorkload, spec: ReplayOptimizeSpec,
base_engine_args: MockEngineArgs,
base_router_config: KvRouterConfig | None,
model: str,
backend: str,
system: str,
objective: ReplayObjective,
constraints: ReplayConstraints,
cache: dict[DenseAggReplayState, dict[str, Any]], cache: dict[DenseAggReplayState, dict[str, Any]],
) -> dict[str, Any]: ) -> dict[str, Any]:
ensure_dynamo_logging() ensure_dynamo_logging()
...@@ -209,44 +219,47 @@ def _evaluate_agg_state( ...@@ -209,44 +219,47 @@ def _evaluate_agg_state(
log_state_start(state) log_state_start(state)
backend = spec.engine.backend.value
system = str(spec.hardware.gpuSku)
engine_args = _build_agg_candidate_engine_args( engine_args = _build_agg_candidate_engine_args(
base_args=base_engine_args, base_args=spec.engine.baseEngineArgs,
tp_size=state.tp, tp_size=state.tp,
backend=backend, backend=backend,
system=system, system=system,
model=model, model=spec.engine.model,
) )
router_config = None router_config = None
if state.router_mode == "kv_router": if state.router_mode == "kv_router":
router_config = _build_router_config( router_config = _build_router_config(
base_router_config, state.overlap_score_weight spec.router.baseRouterConfig, state.overlap_score_weight
) )
report = _run_agg_replay_for_state( report = _run_agg_replay_for_state(
state=state, state=state,
workload=workload, workload=spec.workload,
engine_args=engine_args, engine_args=engine_args,
router_config=router_config, router_config=router_config,
) )
total_gpus_used = state.total_gpus_used
throughput = float(report["output_throughput_tok_s"]) throughput = float(report["output_throughput_tok_s"])
score = objective.score(report) score = spec.objective.score(report)
penalty = constraints.violation_penalty(report, total_gpus_used) penalty, feasible = _feasibility(
feasible = penalty == 0.0 report=report, state=state, sla=spec.sla, hardware=spec.hardware
)
record = { record = {
**report, **report,
**asdict(state), **asdict(state),
"total_gpus_used": total_gpus_used, "total_gpus_used": state.total_gpus_used,
"output_throughput_tok_s": throughput, "output_throughput_tok_s": throughput,
"score": score, "score": score,
"objective": objective.value, "objective": spec.objective.value,
"feasible": feasible, "feasible": feasible,
"violation_penalty": penalty, "violation_penalty": penalty,
} }
log_state_finish( log_state_finish(
state=state, state=state,
report=report, report=report,
constraints=constraints, sla=spec.sla,
hardware=spec.hardware,
score=score, score=score,
feasible=feasible, feasible=feasible,
violation_penalty=penalty, violation_penalty=penalty,
...@@ -255,47 +268,102 @@ def _evaluate_agg_state( ...@@ -255,47 +268,102 @@ def _evaluate_agg_state(
return record return record
def _evaluate_state_from_json_payloads(payload: Mapping[str, Any]) -> dict[str, Any]: # ---- Cross-process payload bridge ----
return _evaluate_state( # `MockEngineArgs` and `KvRouterConfig` are Rust-bound and don't pickle through
state=payload["state"], # `ProcessPoolExecutor`; we round-trip them via their own `dump_json()` /
workload=payload["workload"], # `from_json()` methods. Everything else on `ReplayOptimizeSpec` is a Pydantic
base_prefill_engine_args=MockEngineArgs.from_json( # model and pickles natively, but we serialize the whole payload dict to keep
payload["base_prefill_engine_args_json"] # cross-process transport explicit.
def _spec_to_payload(spec: ReplayOptimizeSpec) -> dict[str, Any]:
engine = spec.engine
router = spec.router
return {
"engine_model": engine.model,
"engine_backend": engine.backend.value,
"engine_base_agg_json": (
engine.baseEngineArgs.dump_json()
if engine.baseEngineArgs is not None
else None
),
"engine_base_prefill_json": (
engine.basePrefillEngineArgs.dump_json()
if engine.basePrefillEngineArgs is not None
else None
), ),
base_decode_engine_args=MockEngineArgs.from_json( "engine_base_decode_json": (
payload["base_decode_engine_args_json"] engine.baseDecodeEngineArgs.dump_json()
if engine.baseDecodeEngineArgs is not None
else None
),
"hardware_json": spec.hardware.model_dump_json(),
"workload_json": spec.workload.model_dump_json(),
"sla_json": spec.sla.model_dump_json(),
"router_mode": router.mode,
"router_overlap_weights": (
None if router.overlapWeights is None else list(router.overlapWeights)
), ),
base_router_config=( "router_base_config_json": (
KvRouterConfig.from_json(payload["base_router_config_json"]) router.baseRouterConfig.dump_json()
if payload["base_router_config_json"] is not None if router.baseRouterConfig is not None
else None else None
), ),
model=payload["model"], "objective": spec.objective.value,
backend=payload["backend"], "max_parallel_evals": spec.maxParallelEvals,
system=payload["system"], }
objective=payload["objective"],
constraints=payload["constraints"],
def _spec_from_payload(payload: Mapping[str, Any]) -> ReplayOptimizeSpec:
return ReplayOptimizeSpec(
engine=EngineSpec(
model=payload["engine_model"],
backend=payload["engine_backend"],
baseEngineArgs=(
MockEngineArgs.from_json(payload["engine_base_agg_json"])
if payload["engine_base_agg_json"] is not None
else None
),
basePrefillEngineArgs=(
MockEngineArgs.from_json(payload["engine_base_prefill_json"])
if payload["engine_base_prefill_json"] is not None
else None
),
baseDecodeEngineArgs=(
MockEngineArgs.from_json(payload["engine_base_decode_json"])
if payload["engine_base_decode_json"] is not None
else None
),
),
hardware=HardwareSpec.model_validate_json(payload["hardware_json"]),
workload=WorkloadSpec.model_validate_json(payload["workload_json"]),
sla=SLASpec.model_validate_json(payload["sla_json"]),
router=RouterSpec(
mode=payload["router_mode"],
overlapWeights=payload["router_overlap_weights"],
baseRouterConfig=(
KvRouterConfig.from_json(payload["router_base_config_json"])
if payload["router_base_config_json"] is not None
else None
),
),
objective=ReplayObjective(payload["objective"]),
maxParallelEvals=payload["max_parallel_evals"],
)
def _evaluate_state_from_payload(payload: Mapping[str, Any]) -> dict[str, Any]:
return _evaluate_state(
state=payload["state"],
spec=_spec_from_payload(payload["spec"]),
cache={}, cache={},
) )
def _evaluate_agg_state_from_json_payloads( def _evaluate_agg_state_from_payload(payload: Mapping[str, Any]) -> dict[str, Any]:
payload: Mapping[str, Any]
) -> dict[str, Any]:
return _evaluate_agg_state( return _evaluate_agg_state(
state=payload["state"], state=payload["state"],
workload=payload["workload"], spec=_spec_from_payload(payload["spec"]),
base_engine_args=MockEngineArgs.from_json(payload["base_engine_args_json"]),
base_router_config=(
KvRouterConfig.from_json(payload["base_router_config_json"])
if payload["base_router_config_json"] is not None
else None
),
model=payload["model"],
backend=payload["backend"],
system=payload["system"],
objective=payload["objective"],
constraints=payload["constraints"],
cache={}, cache={},
) )
...@@ -303,15 +371,7 @@ def _evaluate_agg_state_from_json_payloads( ...@@ -303,15 +371,7 @@ def _evaluate_agg_state_from_json_payloads(
def _evaluate_states( def _evaluate_states(
*, *,
states: Sequence[DenseReplayState], states: Sequence[DenseReplayState],
workload: SyntheticReplayWorkload | TraceReplayWorkload, spec: ReplayOptimizeSpec,
base_prefill_engine_args: MockEngineArgs,
base_decode_engine_args: MockEngineArgs,
base_router_config: KvRouterConfig | None,
model: str,
backend: str,
system: str,
objective: ReplayObjective,
constraints: ReplayConstraints,
cache: dict[DenseReplayState, dict[str, Any]], cache: dict[DenseReplayState, dict[str, Any]],
max_parallel_evals: int, max_parallel_evals: int,
executor: Executor | None = None, executor: Executor | None = None,
...@@ -333,43 +393,13 @@ def _evaluate_states( ...@@ -333,43 +393,13 @@ def _evaluate_states(
if max_parallel_evals <= 1 or len(uncached_states) == 1 or executor is None: if max_parallel_evals <= 1 or len(uncached_states) == 1 or executor is None:
for index, state in zip(uncached_indices, uncached_states, strict=True): for index, state in zip(uncached_indices, uncached_states, strict=True):
records[index] = _evaluate_state( records[index] = _evaluate_state(state=state, spec=spec, cache=cache)
state=state,
workload=workload,
base_prefill_engine_args=base_prefill_engine_args,
base_decode_engine_args=base_decode_engine_args,
base_router_config=base_router_config,
model=model,
backend=backend,
system=system,
objective=objective,
constraints=constraints,
cache=cache,
)
return [record for record in records if record is not None] return [record for record in records if record is not None]
base_prefill_engine_args_json = base_prefill_engine_args.dump_json() spec_payload = _spec_to_payload(spec)
base_decode_engine_args_json = base_decode_engine_args.dump_json() payloads = [{"state": state, "spec": spec_payload} for state in uncached_states]
base_router_config_json = (
None if base_router_config is None else base_router_config.dump_json() future_records = list(executor.map(_evaluate_state_from_payload, payloads))
)
payloads = [
{
"state": state,
"workload": workload,
"base_prefill_engine_args_json": base_prefill_engine_args_json,
"base_decode_engine_args_json": base_decode_engine_args_json,
"base_router_config_json": base_router_config_json,
"model": model,
"backend": backend,
"system": system,
"objective": objective,
"constraints": constraints,
}
for state in uncached_states
]
future_records = list(executor.map(_evaluate_state_from_json_payloads, payloads))
for index, state, record in zip( for index, state, record in zip(
uncached_indices, uncached_indices,
...@@ -386,14 +416,7 @@ def _evaluate_states( ...@@ -386,14 +416,7 @@ def _evaluate_states(
def _evaluate_agg_states( def _evaluate_agg_states(
*, *,
states: Sequence[DenseAggReplayState], states: Sequence[DenseAggReplayState],
workload: SyntheticReplayWorkload | TraceReplayWorkload, spec: ReplayOptimizeSpec,
base_engine_args: MockEngineArgs,
base_router_config: KvRouterConfig | None,
model: str,
backend: str,
system: str,
objective: ReplayObjective,
constraints: ReplayConstraints,
cache: dict[DenseAggReplayState, dict[str, Any]], cache: dict[DenseAggReplayState, dict[str, Any]],
max_parallel_evals: int, max_parallel_evals: int,
executor: Executor | None = None, executor: Executor | None = None,
...@@ -415,42 +438,13 @@ def _evaluate_agg_states( ...@@ -415,42 +438,13 @@ def _evaluate_agg_states(
if max_parallel_evals <= 1 or len(uncached_states) == 1 or executor is None: if max_parallel_evals <= 1 or len(uncached_states) == 1 or executor is None:
for index, state in zip(uncached_indices, uncached_states, strict=True): for index, state in zip(uncached_indices, uncached_states, strict=True):
records[index] = _evaluate_agg_state( records[index] = _evaluate_agg_state(state=state, spec=spec, cache=cache)
state=state,
workload=workload,
base_engine_args=base_engine_args,
base_router_config=base_router_config,
model=model,
backend=backend,
system=system,
objective=objective,
constraints=constraints,
cache=cache,
)
return [record for record in records if record is not None] return [record for record in records if record is not None]
base_engine_args_json = base_engine_args.dump_json() spec_payload = _spec_to_payload(spec)
base_router_config_json = ( payloads = [{"state": state, "spec": spec_payload} for state in uncached_states]
None if base_router_config is None else base_router_config.dump_json()
) future_records = list(executor.map(_evaluate_agg_state_from_payload, payloads))
payloads = [
{
"state": state,
"workload": workload,
"base_engine_args_json": base_engine_args_json,
"base_router_config_json": base_router_config_json,
"model": model,
"backend": backend,
"system": system,
"objective": objective,
"constraints": constraints,
}
for state in uncached_states
]
future_records = list(
executor.map(_evaluate_agg_state_from_json_payloads, payloads)
)
for index, state, record in zip( for index, state, record in zip(
uncached_indices, uncached_indices,
......
...@@ -8,16 +8,20 @@ from collections.abc import Sequence ...@@ -8,16 +8,20 @@ from collections.abc import Sequence
from dynamo.llm import KvRouterConfig, MockEngineArgs from dynamo.llm import KvRouterConfig, MockEngineArgs
from dynamo.profiler.utils.replay_optimize import ( from dynamo.profiler.utils.replay_optimize import (
SyntheticReplayWorkload, EngineSpec,
TraceReplayWorkload, HardwareSpec,
ReplayOptimizeSpec,
RouterSpec,
SLASpec,
WorkloadSpec,
optimize_dense_disagg_with_replay, optimize_dense_disagg_with_replay,
) )
MODEL = "Qwen/Qwen3-32B" MODEL = "Qwen/Qwen3-32B"
BACKEND = "vllm" BACKEND = "vllm"
SYSTEM = "h200_sxm" GPU_SKU = "h200_sxm"
MAX_TOTAL_GPUS = 16 TOTAL_GPUS = 16
OVERLAP_SCORE_WEIGHTS = (0.0, 0.5, 1.0, 2.0) OVERLAP_WEIGHTS = [0.0, 0.5, 1.0, 2.0]
RESULT_COLUMNS: Sequence[str] = ( RESULT_COLUMNS: Sequence[str] = (
"prefill_tp", "prefill_tp",
"decode_tp", "decode_tp",
...@@ -37,24 +41,24 @@ def _build_workload( ...@@ -37,24 +41,24 @@ def _build_workload(
*, *,
trace_file: str | None, trace_file: str | None,
arrival_speedup_ratio: float, arrival_speedup_ratio: float,
) -> SyntheticReplayWorkload | TraceReplayWorkload: ) -> WorkloadSpec:
if trace_file is not None: if trace_file is not None:
return TraceReplayWorkload( return WorkloadSpec(
trace_file=trace_file, traceFile=trace_file,
arrival_speedup_ratio=arrival_speedup_ratio, arrivalSpeedupRatio=arrival_speedup_ratio,
) )
return SyntheticReplayWorkload( return WorkloadSpec(
isl=32768, isl=32768,
osl=256, osl=256,
request_count=5000, requestCount=5000,
replay_concurrency=200, concurrency=200,
shared_prefix_ratio=0.5, sharedPrefixRatio=0.5,
num_prefix_groups=50, numPrefixGroups=50,
) )
def _build_engine_args(*, worker_type: str) -> MockEngineArgs: def _engine_args(worker_type: str) -> MockEngineArgs:
return MockEngineArgs( return MockEngineArgs(
block_size=512, block_size=512,
num_gpu_blocks=20000, num_gpu_blocks=20000,
...@@ -69,27 +73,28 @@ def run_example( ...@@ -69,27 +73,28 @@ def run_example(
arrival_speedup_ratio: float = 1.0, arrival_speedup_ratio: float = 1.0,
max_parallel_evals: int = 1, max_parallel_evals: int = 1,
) -> None: ) -> None:
result = optimize_dense_disagg_with_replay( spec = ReplayOptimizeSpec(
model=MODEL, engine=EngineSpec(
backend=BACKEND, model=MODEL,
system=SYSTEM, backend=BACKEND,
basePrefillEngineArgs=_engine_args("prefill"),
baseDecodeEngineArgs=_engine_args("decode"),
),
hardware=HardwareSpec(gpuSku=GPU_SKU, totalGpus=TOTAL_GPUS),
workload=_build_workload( workload=_build_workload(
trace_file=trace_file, trace_file=trace_file,
arrival_speedup_ratio=arrival_speedup_ratio, arrival_speedup_ratio=arrival_speedup_ratio,
), ),
base_prefill_engine_args=_build_engine_args(worker_type="prefill"), sla=SLASpec(ttft=50000.0, itl=100.0, e2eLatency=60000.0),
base_decode_engine_args=_build_engine_args(worker_type="decode"), router=RouterSpec(
base_router_config=KvRouterConfig(), baseRouterConfig=KvRouterConfig(),
max_total_gpus=MAX_TOTAL_GPUS, overlapWeights=OVERLAP_WEIGHTS,
constraints={ ),
"mean_ttft_ms": 50000.0, maxParallelEvals=max_parallel_evals,
"mean_tpot_ms": 100.0,
"mean_e2e_latency_ms": 60000.0,
},
overlap_score_weights=OVERLAP_SCORE_WEIGHTS,
max_parallel_evals=max_parallel_evals,
) )
result = optimize_dense_disagg_with_replay(spec)
print("Best feasible:") print("Best feasible:")
print(result.best_feasible) print(result.best_feasible)
print() print()
......
...@@ -9,7 +9,8 @@ from typing import Any ...@@ -9,7 +9,8 @@ from typing import Any
from dynamo.runtime.logging import configure_dynamo_logging from dynamo.runtime.logging import configure_dynamo_logging
from .models import DenseAggReplayState, DenseReplayState, ReplayConstraints from .models import DenseAggReplayState, DenseReplayState
from .specs import HardwareSpec, SLASpec
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
_LOGGING_CONFIGURED = False _LOGGING_CONFIGURED = False
...@@ -27,20 +28,31 @@ def log_state_start(state: DenseReplayState | DenseAggReplayState) -> None: ...@@ -27,20 +28,31 @@ def log_state_start(state: DenseReplayState | DenseAggReplayState) -> None:
logger.info("Replay optimize evaluating %s", state.format_summary()) logger.info("Replay optimize evaluating %s", state.format_summary())
def _budget_summary(
state: DenseReplayState | DenseAggReplayState, hardware: HardwareSpec
) -> str:
used = state.total_gpus_used
budget = hardware.totalGpus
status = "satisfied" if used <= budget else "unsatisfied"
return f"totalGpus={used}<={budget} {status}"
def log_state_finish( def log_state_finish(
*, *,
state: DenseReplayState | DenseAggReplayState, state: DenseReplayState | DenseAggReplayState,
report: Mapping[str, Any], report: Mapping[str, Any],
constraints: ReplayConstraints, sla: SLASpec,
hardware: HardwareSpec,
score: float, score: float,
feasible: bool, feasible: bool,
violation_penalty: float, violation_penalty: float,
) -> None: ) -> None:
logger.info( logger.info(
"Replay optimize finished %s score=%.3f feasible=%s violation_penalty=%.6f %s", "Replay optimize finished %s score=%.3f feasible=%s violation_penalty=%.6f %s %s",
state.format_summary(), state.format_summary(),
score, score,
feasible, feasible,
violation_penalty, violation_penalty,
constraints.summarize(report, state.total_gpus_used), sla.summarize(report),
_budget_summary(state, hardware),
) )
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
"""Optimizer-internal dataclasses for replay_optimize search states.
User-facing configuration lives in `specs.py`. This module carries only the
hot-path state types (used as dict keys in the search cache, instantiated
once per visited candidate) and the result bundle returned from the
public API.
"""
from __future__ import annotations from __future__ import annotations
import math from dataclasses import dataclass
import os
from collections.abc import Iterator, Mapping
from dataclasses import dataclass, fields
from enum import Enum
from typing import Any from typing import Any
import pandas as pd import pandas as pd
from .constants import SUPPORTED_CONSTRAINTS
@dataclass(frozen=True)
class ReplayConstraints:
mean_ttft_ms: float | None = None
p95_ttft_ms: float | None = None
mean_tpot_ms: float | None = None
p95_tpot_ms: float | None = None
mean_e2e_latency_ms: float | None = None
p95_e2e_latency_ms: float | None = None
max_total_gpus: int | None = None
@classmethod
def from_mapping(
cls,
mapping: Mapping[str, float] | None,
max_total_gpus: int,
) -> ReplayConstraints:
raw = dict(mapping or {})
unknown = sorted(set(raw) - SUPPORTED_CONSTRAINTS)
if unknown:
raise ValueError(
"unsupported constraints: "
+ ", ".join(unknown)
+ f"; supported constraints are {sorted(SUPPORTED_CONSTRAINTS)}"
)
raw_gpus = raw.get("max_total_gpus")
if raw_gpus is not None and int(raw_gpus) != max_total_gpus:
raise ValueError(
"constraints['max_total_gpus'] must match max_total_gpus when both are provided"
)
def _bound(key: str) -> float | None:
value = raw.get(key)
return None if value is None or value <= 0 else float(value)
return cls(
mean_ttft_ms=_bound("mean_ttft_ms"),
p95_ttft_ms=_bound("p95_ttft_ms"),
mean_tpot_ms=_bound("mean_tpot_ms"),
p95_tpot_ms=_bound("p95_tpot_ms"),
mean_e2e_latency_ms=_bound("mean_e2e_latency_ms"),
p95_e2e_latency_ms=_bound("p95_e2e_latency_ms"),
max_total_gpus=int(max_total_gpus),
)
def _active(
self, report: Mapping[str, Any], total_gpus_used: int
) -> Iterator[tuple[str, float | None, float]]:
for field in fields(self):
if field.name == "max_total_gpus":
continue
bound = getattr(self, field.name)
if bound is None:
continue
value = report.get(field.name)
yield field.name, None if value is None else float(value), bound
if self.max_total_gpus is not None:
yield (
"max_total_gpus",
float(total_gpus_used),
float(self.max_total_gpus),
)
def violation_penalty(
self, report: Mapping[str, Any], total_gpus_used: int
) -> float:
penalty = 0.0
for _, metric, bound in self._active(report, total_gpus_used):
if metric is None:
penalty += math.inf
continue
penalty += max(metric / bound - 1.0, 0.0)
return penalty
def summarize(self, report: Mapping[str, Any], total_gpus_used: int) -> str:
statuses: list[str] = []
for name, metric, bound in self._active(report, total_gpus_used):
if metric is None:
statuses.append(f"{name}=missing<={bound:g} unsatisfied")
continue
state = "satisfied" if metric <= bound else "unsatisfied"
statuses.append(f"{name}={metric:.3f}<={bound:g} {state}")
return "constraints=" + ", ".join(statuses) if statuses else "constraints=none"
def aic_task_kwargs(self) -> dict[str, float | None]:
return {
"ttft": self.mean_ttft_ms,
"tpot": self.mean_tpot_ms,
"request_latency": self.mean_e2e_latency_ms,
}
class ReplayObjective(str, Enum):
THROUGHPUT = "throughput"
MEAN_TTFT = "mean_ttft"
MEAN_E2E_LATENCY = "mean_e2e_latency"
def score(self, report: Mapping[str, Any]) -> float:
if self is ReplayObjective.THROUGHPUT:
return float(report["output_throughput_tok_s"])
if self is ReplayObjective.MEAN_TTFT:
return -float(report["mean_ttft_ms"])
return -float(report["mean_e2e_latency_ms"])
@dataclass(frozen=True)
class SyntheticReplayWorkload:
isl: int
osl: int
request_count: int
replay_concurrency: int
arrival_interval_ms: float = 0.0
turns_per_session: int = 1
shared_prefix_ratio: float = 0.0
num_prefix_groups: int = 0
inter_turn_delay_ms: float = 0.0
@dataclass(frozen=True)
class TraceReplayWorkload:
trace_file: str | os.PathLike[str]
arrival_speedup_ratio: float = 1.0
@dataclass(frozen=True) @dataclass(frozen=True)
class DenseReplayState: class DenseReplayState:
......
...@@ -30,66 +30,21 @@ from collections.abc import Mapping, Sequence ...@@ -30,66 +30,21 @@ from collections.abc import Mapping, Sequence
from concurrent.futures import ProcessPoolExecutor from concurrent.futures import ProcessPoolExecutor
from typing import Literal from typing import Literal
from dynamo.llm import KvRouterConfig, MockEngineArgs
from . import aic, evaluate from . import aic, evaluate
from .constants import ( from .constants import DEFAULT_SEARCH_ROUNDS
AIC_BACKEND_VERSIONS, from .models import DenseAggReplayState, DenseReplayOptimizationResult, DenseReplayState
DEFAULT_MAX_PARALLEL_EVALS,
DEFAULT_OVERLAP_SCORE_WEIGHTS,
DEFAULT_SEARCH_ROUNDS,
)
from .models import (
DenseAggReplayState,
DenseReplayOptimizationResult,
DenseReplayState,
ReplayConstraints,
ReplayObjective,
SyntheticReplayWorkload,
TraceReplayWorkload,
)
from .scoring import _finalize_result, _pick_best_record from .scoring import _finalize_result, _pick_best_record
from .specs import ReplayOptimizeSpec, RouterMode
def _validate_backend(backend: str) -> str:
if backend not in AIC_BACKEND_VERSIONS:
raise ValueError(
f"backend must be one of {sorted(AIC_BACKEND_VERSIONS)}, got {backend!r}"
)
return backend
def _normalize_overlap_score_weights(
overlap_score_weights: Sequence[float] | None,
) -> tuple[float, ...]:
if overlap_score_weights is None:
return DEFAULT_OVERLAP_SCORE_WEIGHTS
weights = tuple(float(weight) for weight in overlap_score_weights)
if not weights:
raise ValueError("overlap_score_weights must not be empty")
return weights
def _normalize_router_mode(
router_mode: str,
) -> Literal["kv_router", "round_robin", "both"]:
if router_mode not in {"kv_router", "round_robin", "both"}:
raise ValueError(
"router_mode must be one of ['kv_router', 'round_robin', 'both'], "
f"got {router_mode!r}"
)
return router_mode
def _router_states( def _router_states(
*, *,
router_mode: Literal["kv_router", "round_robin", "both"], router_mode: RouterMode,
overlap_score_weights: Sequence[float], overlap_score_weights: Sequence[float],
) -> list[tuple[str, float]]: ) -> list[tuple[str, float]]:
if router_mode == "round_robin": if router_mode is RouterMode.ROUND_ROBIN:
return [("round_robin", 0.0)] return [("round_robin", 0.0)]
if router_mode == "kv_router": if router_mode is RouterMode.KV_ROUTER:
return [("kv_router", float(weight)) for weight in overlap_score_weights] return [("kv_router", float(weight)) for weight in overlap_score_weights]
return [("round_robin", 0.0)] + [ return [("round_robin", 0.0)] + [
("kv_router", float(weight)) for weight in overlap_score_weights ("kv_router", float(weight)) for weight in overlap_score_weights
...@@ -267,27 +222,15 @@ def _record_to_agg_state( ...@@ -267,27 +222,15 @@ def _record_to_agg_state(
def optimize_dense_disagg_with_replay( def optimize_dense_disagg_with_replay(
*, spec: ReplayOptimizeSpec,
model: str,
backend: Literal["vllm", "sglang"],
system: str,
workload: SyntheticReplayWorkload | TraceReplayWorkload,
base_prefill_engine_args: MockEngineArgs,
base_decode_engine_args: MockEngineArgs,
base_router_config: KvRouterConfig | None = None,
max_total_gpus: int,
constraints: Mapping[str, float] | None = None,
objective: Literal["throughput", "mean_e2e_latency", "mean_ttft"] = "throughput",
router_mode: Literal["kv_router", "round_robin", "both"] = "kv_router",
overlap_score_weights: Sequence[float] | None = None,
max_parallel_evals: int = DEFAULT_MAX_PARALLEL_EVALS,
) -> DenseReplayOptimizationResult: ) -> DenseReplayOptimizationResult:
"""Run a heuristic block search over dense disaggregated offline replay configs. """Run a heuristic block search over dense disaggregated offline replay configs.
This routine assumes we want to use as much of `max_total_gpus` as possible, Assumes the optimizer should consume as much of `spec.hardware.totalGpus` as
then ranks visited states by the selected `objective` subject to replay possible, then ranks visited states by `spec.objective` subject to the SLA
constraints. Supported objectives: `"throughput"` (default, maximize bounds in `spec.sla`. Supported objectives: `ReplayObjective.THROUGHPUT`
`output_throughput_tok_s`), `"mean_e2e_latency"` and `"mean_ttft"` (minimize (default, maximize `output_throughput_tok_s`),
`ReplayObjective.MEAN_E2E_LATENCY` and `ReplayObjective.MEAN_TTFT` (minimize
the corresponding report metric). The descended dimensions are: the corresponding report metric). The descended dimensions are:
1. `(prefill_tp, decode_tp)` at equal worker counts that fit the budget. 1. `(prefill_tp, decode_tp)` at equal worker counts that fit the budget.
2. `(prefill_workers, decode_workers)` on the budget edge for the incumbent TP 2. `(prefill_workers, decode_workers)` on the budget edge for the incumbent TP
...@@ -296,17 +239,24 @@ def optimize_dense_disagg_with_replay( ...@@ -296,17 +239,24 @@ def optimize_dense_disagg_with_replay(
Returned "best" records are best among visited states, not a global optimum. Returned "best" records are best among visited states, not a global optimum.
""" """
backend = _validate_backend(backend) # Guardrail #8: disagg path requires both prefill and decode engine args.
router_mode = _normalize_router_mode(router_mode) if (
typed_objective = ReplayObjective(objective) spec.engine.basePrefillEngineArgs is None
if max_total_gpus < 2: or spec.engine.baseDecodeEngineArgs is None
raise ValueError("max_total_gpus must be at least 2 for disaggregated replay") ):
raise ValueError(
typed_constraints = ReplayConstraints.from_mapping(constraints, max_total_gpus) "optimize_dense_disagg_with_replay requires both "
overlap_weights = _normalize_overlap_score_weights(overlap_score_weights) "EngineSpec.basePrefillEngineArgs and EngineSpec.baseDecodeEngineArgs"
if router_mode == "round_robin": )
overlap_weights = (0.0,) if spec.hardware.totalGpus < 2:
max_parallel_evals = max(1, int(max_parallel_evals)) raise ValueError(
"hardware.totalGpus must be at least 2 for disaggregated replay"
)
backend = spec.engine.backend.value
system = str(spec.hardware.gpuSku)
overlap_weights = spec.router.effectiveOverlapWeights
max_parallel_evals = max(1, int(spec.maxParallelEvals))
prefill_tps, decode_tps = aic._enumerate_dense_tp_candidates(backend, system) prefill_tps, decode_tps = aic._enumerate_dense_tp_candidates(backend, system)
if not prefill_tps or not decode_tps: if not prefill_tps or not decode_tps:
raise ValueError( raise ValueError(
...@@ -318,7 +268,7 @@ def optimize_dense_disagg_with_replay( ...@@ -318,7 +268,7 @@ def optimize_dense_disagg_with_replay(
prefill_tps=prefill_tps, prefill_tps=prefill_tps,
decode_tps=decode_tps, decode_tps=decode_tps,
overlap_score_weight=overlap_weights[0], overlap_score_weight=overlap_weights[0],
max_total_gpus=max_total_gpus, max_total_gpus=spec.hardware.totalGpus,
) )
executor = ( executor = (
...@@ -335,19 +285,11 @@ def optimize_dense_disagg_with_replay( ...@@ -335,19 +285,11 @@ def optimize_dense_disagg_with_replay(
decode_tps=decode_tps, decode_tps=decode_tps,
router_mode=incumbent.router_mode, router_mode=incumbent.router_mode,
overlap_score_weight=incumbent.overlap_score_weight, overlap_score_weight=incumbent.overlap_score_weight,
max_total_gpus=max_total_gpus, max_total_gpus=spec.hardware.totalGpus,
) )
tp_records = evaluate._evaluate_states( tp_records = evaluate._evaluate_states(
states=tp_states, states=tp_states,
workload=workload, spec=spec,
base_prefill_engine_args=base_prefill_engine_args,
base_decode_engine_args=base_decode_engine_args,
base_router_config=base_router_config,
model=model,
backend=backend,
system=system,
objective=typed_objective,
constraints=typed_constraints,
cache=cache, cache=cache,
max_parallel_evals=max_parallel_evals, max_parallel_evals=max_parallel_evals,
executor=executor, executor=executor,
...@@ -359,19 +301,11 @@ def optimize_dense_disagg_with_replay( ...@@ -359,19 +301,11 @@ def optimize_dense_disagg_with_replay(
decode_tp=incumbent.decode_tp, decode_tp=incumbent.decode_tp,
router_mode=incumbent.router_mode, router_mode=incumbent.router_mode,
overlap_score_weight=incumbent.overlap_score_weight, overlap_score_weight=incumbent.overlap_score_weight,
max_total_gpus=max_total_gpus, max_total_gpus=spec.hardware.totalGpus,
) )
worker_records = evaluate._evaluate_states( worker_records = evaluate._evaluate_states(
states=worker_states, states=worker_states,
workload=workload, spec=spec,
base_prefill_engine_args=base_prefill_engine_args,
base_decode_engine_args=base_decode_engine_args,
base_router_config=base_router_config,
model=model,
backend=backend,
system=system,
objective=typed_objective,
constraints=typed_constraints,
cache=cache, cache=cache,
max_parallel_evals=max_parallel_evals, max_parallel_evals=max_parallel_evals,
executor=executor, executor=executor,
...@@ -389,19 +323,11 @@ def optimize_dense_disagg_with_replay( ...@@ -389,19 +323,11 @@ def optimize_dense_disagg_with_replay(
router_mode=mode, router_mode=mode,
) )
for mode, weight in _router_states( for mode, weight in _router_states(
router_mode=router_mode, router_mode=spec.router.mode,
overlap_score_weights=overlap_weights, overlap_score_weights=overlap_weights,
) )
], ],
workload=workload, spec=spec,
base_prefill_engine_args=base_prefill_engine_args,
base_decode_engine_args=base_decode_engine_args,
base_router_config=base_router_config,
model=model,
backend=backend,
system=system,
objective=typed_objective,
constraints=typed_constraints,
cache=cache, cache=cache,
max_parallel_evals=max_parallel_evals, max_parallel_evals=max_parallel_evals,
executor=executor, executor=executor,
...@@ -418,44 +344,31 @@ def optimize_dense_disagg_with_replay( ...@@ -418,44 +344,31 @@ def optimize_dense_disagg_with_replay(
def optimize_dense_agg_with_replay( def optimize_dense_agg_with_replay(
*, spec: ReplayOptimizeSpec,
model: str,
backend: Literal["vllm", "sglang"],
system: str,
workload: SyntheticReplayWorkload | TraceReplayWorkload,
base_engine_args: MockEngineArgs,
base_router_config: KvRouterConfig | None = None,
max_total_gpus: int,
constraints: Mapping[str, float] | None = None,
objective: Literal["throughput", "mean_e2e_latency", "mean_ttft"] = "throughput",
router_mode: Literal["kv_router", "round_robin", "both"] = "kv_router",
overlap_score_weights: Sequence[float] | None = None,
max_parallel_evals: int = DEFAULT_MAX_PARALLEL_EVALS,
) -> DenseReplayOptimizationResult: ) -> DenseReplayOptimizationResult:
"""Run a heuristic block search over dense aggregated offline replay configs. """Run a heuristic block search over dense aggregated offline replay configs.
This routine assumes we want to use as much of `max_total_gpus` as possible, Assumes the optimizer should consume as much of `spec.hardware.totalGpus` as
then ranks visited states by the selected `objective` subject to replay possible, then ranks visited states by `spec.objective` subject to the SLA
constraints. Supported objectives: `"throughput"` (default, maximize bounds in `spec.sla`. The descended dimensions are:
`output_throughput_tok_s`), `"mean_e2e_latency"` and `"mean_ttft"` (minimize
the corresponding report metric). The descended dimensions are:
1. `tp` at the maximum worker count that fits the budget. 1. `tp` at the maximum worker count that fits the budget.
2. `workers` for the incumbent `tp`. 2. `workers` for the incumbent `tp`.
3. `(router_mode, overlap_score_weight)`. 3. `(router_mode, overlap_score_weight)`.
Returned "best" records are best among visited states, not a global optimum. Returned "best" records are best among visited states, not a global optimum.
""" """
backend = _validate_backend(backend) # Guardrail #8: agg path requires the single baseEngineArgs.
router_mode = _normalize_router_mode(router_mode) if spec.engine.baseEngineArgs is None:
typed_objective = ReplayObjective(objective) raise ValueError(
if max_total_gpus < 1: "optimize_dense_agg_with_replay requires EngineSpec.baseEngineArgs"
raise ValueError("max_total_gpus must be at least 1 for aggregated replay") )
if spec.hardware.totalGpus < 1:
typed_constraints = ReplayConstraints.from_mapping(constraints, max_total_gpus) raise ValueError("hardware.totalGpus must be at least 1 for aggregated replay")
overlap_weights = _normalize_overlap_score_weights(overlap_score_weights)
if router_mode == "round_robin": backend = spec.engine.backend.value
overlap_weights = (0.0,) system = str(spec.hardware.gpuSku)
max_parallel_evals = max(1, int(max_parallel_evals)) overlap_weights = spec.router.effectiveOverlapWeights
max_parallel_evals = max(1, int(spec.maxParallelEvals))
tps, _ = aic._enumerate_dense_tp_candidates(backend, system) tps, _ = aic._enumerate_dense_tp_candidates(backend, system)
if not tps: if not tps:
raise ValueError( raise ValueError(
...@@ -463,7 +376,9 @@ def optimize_dense_agg_with_replay( ...@@ -463,7 +376,9 @@ def optimize_dense_agg_with_replay(
) )
cache: dict[DenseAggReplayState, dict[str, float | int | bool | str]] = {} cache: dict[DenseAggReplayState, dict[str, float | int | bool | str]] = {}
incumbent = _select_initial_agg_state(tps=tps, max_total_gpus=max_total_gpus) incumbent = _select_initial_agg_state(
tps=tps, max_total_gpus=spec.hardware.totalGpus
)
executor = ( executor = (
ProcessPoolExecutor(max_workers=max_parallel_evals) ProcessPoolExecutor(max_workers=max_parallel_evals)
...@@ -478,18 +393,11 @@ def optimize_dense_agg_with_replay( ...@@ -478,18 +393,11 @@ def optimize_dense_agg_with_replay(
tps=tps, tps=tps,
router_mode=incumbent.router_mode, router_mode=incumbent.router_mode,
overlap_score_weight=incumbent.overlap_score_weight, overlap_score_weight=incumbent.overlap_score_weight,
max_total_gpus=max_total_gpus, max_total_gpus=spec.hardware.totalGpus,
) )
tp_records = evaluate._evaluate_agg_states( tp_records = evaluate._evaluate_agg_states(
states=tp_states, states=tp_states,
workload=workload, spec=spec,
base_engine_args=base_engine_args,
base_router_config=base_router_config,
model=model,
backend=backend,
system=system,
objective=typed_objective,
constraints=typed_constraints,
cache=cache, cache=cache,
max_parallel_evals=max_parallel_evals, max_parallel_evals=max_parallel_evals,
executor=executor, executor=executor,
...@@ -500,18 +408,11 @@ def optimize_dense_agg_with_replay( ...@@ -500,18 +408,11 @@ def optimize_dense_agg_with_replay(
tp=incumbent.tp, tp=incumbent.tp,
router_mode=incumbent.router_mode, router_mode=incumbent.router_mode,
overlap_score_weight=incumbent.overlap_score_weight, overlap_score_weight=incumbent.overlap_score_weight,
max_total_gpus=max_total_gpus, max_total_gpus=spec.hardware.totalGpus,
) )
worker_records = evaluate._evaluate_agg_states( worker_records = evaluate._evaluate_agg_states(
states=worker_states, states=worker_states,
workload=workload, spec=spec,
base_engine_args=base_engine_args,
base_router_config=base_router_config,
model=model,
backend=backend,
system=system,
objective=typed_objective,
constraints=typed_constraints,
cache=cache, cache=cache,
max_parallel_evals=max_parallel_evals, max_parallel_evals=max_parallel_evals,
executor=executor, executor=executor,
...@@ -527,7 +428,7 @@ def optimize_dense_agg_with_replay( ...@@ -527,7 +428,7 @@ def optimize_dense_agg_with_replay(
overlap_score_weight=weight, overlap_score_weight=weight,
) )
for mode, weight in _router_states( for mode, weight in _router_states(
router_mode=router_mode, router_mode=spec.router.mode,
overlap_score_weights=overlap_weights, overlap_score_weights=overlap_weights,
) )
if _supports_agg_router_mode( if _supports_agg_router_mode(
...@@ -535,14 +436,7 @@ def optimize_dense_agg_with_replay( ...@@ -535,14 +436,7 @@ def optimize_dense_agg_with_replay(
router_mode=mode, router_mode=mode,
) )
], ],
workload=workload, spec=spec,
base_engine_args=base_engine_args,
base_router_config=base_router_config,
model=model,
backend=backend,
system=system,
objective=typed_objective,
constraints=typed_constraints,
cache=cache, cache=cache,
max_parallel_evals=max_parallel_evals, max_parallel_evals=max_parallel_evals,
executor=executor, executor=executor,
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Pydantic specs for replay_optimize, shaped to mirror DGDR v1beta1.
Field names follow DGDR's lowerCamelCase convention so that the eventual
upstream-back into `deploy/operator/api/v1beta1/...` is a clean merge.
Method names follow Python / Pydantic convention (snake_case), matching how
`dgdr_v1beta1_types.py` itself defines its one internal method
(`_validate_sla_options`).
DGDR shapes we clone / extend:
- `EngineSpec` — local extension (DGDR has `model`/`backend` flat on the
outer; we need engine-args carriers)
- `HardwareSpec` — subset clone of DGDR.HardwareSpec (gpuSku + totalGpus only)
- `WorkloadSpec` — DGDR.WorkloadSpec + replay extensions, unified synthetic/
trace with a `traceFile` discriminator
- `SLASpec` — DGDR.SLASpec field names + p95 variants; we explicitly
do NOT clone DGDR's "ttft+itl XOR e2eLatency" validator
because our optimizer treats them as independent additive
penalties (all three may be set)
- `RouterSpec` — our sweep-oriented router config; analogous location to
DGDR.KVRouterSpec but different semantics (runtime flag
there vs. dev-time sweep here)
- `ReplayOptimizeSpec` — top-level bundle, analog to
DGDR.DynamoGraphDeploymentRequestSpec
"""
from __future__ import annotations
import math
from collections.abc import Iterator, Mapping
from enum import Enum
from typing import Any
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
from dynamo.llm import KvRouterConfig, MockEngineArgs
from dynamo.profiler.utils.dgdr_v1beta1_types import BackendType, GPUSKUType
from .constants import (
AIC_BACKEND_VERSIONS,
DEFAULT_MAX_PARALLEL_EVALS,
DEFAULT_OVERLAP_SCORE_WEIGHTS,
)
class RouterMode(str, Enum):
"""Router mode for the replay search.
`BOTH` triggers a combined sweep across `KV_ROUTER` and `ROUND_ROBIN`;
`round_robin` collapses `overlapWeights` to `(0.0,)` (guardrail #5).
Subclasses `str` for Pydantic coercion and wire-compatibility with the
existing `router_mode` field on `DenseReplayState` / `DenseAggReplayState`.
"""
KV_ROUTER = "kv_router"
ROUND_ROBIN = "round_robin"
BOTH = "both"
class ReplayObjective(str, Enum):
"""Optimization objective driving state ranking in the search."""
THROUGHPUT = "throughput"
MEAN_TTFT = "mean_ttft"
MEAN_E2E_LATENCY = "mean_e2e_latency"
def score(self, report: Mapping[str, Any]) -> float:
if self is ReplayObjective.THROUGHPUT:
return float(report["output_throughput_tok_s"])
if self is ReplayObjective.MEAN_TTFT:
return -float(report["mean_ttft_ms"])
return -float(report["mean_e2e_latency_ms"])
class EngineSpec(BaseModel):
"""Model + backend + engine-arg carriers.
DGDR has `model: str` and `backend: BackendType` flat on the outer spec and
no engine-args equivalent, so this spec is a replay-local extension.
Carries engine args for both agg and disagg paths; the relevant
`optimize_dense_*` entry asserts the right fields are populated
(guardrail #8).
`backend` has no default — pre-Phase-2 `optimize_dense_*` required it; keep
the explicit contract so a forgotten backend fails at spec construction
instead of silently falling through to a vLLM run.
"""
model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid")
model: str
backend: BackendType
baseEngineArgs: MockEngineArgs | None = None
basePrefillEngineArgs: MockEngineArgs | None = None
baseDecodeEngineArgs: MockEngineArgs | None = None
@field_validator("backend", mode="after")
@classmethod
def _validate_backend_supported_by_aic(cls, backend: BackendType) -> BackendType:
# Guardrail #6: DGDR's BackendType allows Auto / Trtllm which AIC
# doesn't support in replay_optimize; reject at spec-construction time
# instead of crashing mid-search.
if backend.value not in AIC_BACKEND_VERSIONS:
raise ValueError(
f"backend must be one of {sorted(AIC_BACKEND_VERSIONS)}, "
f"got {backend.value!r}"
)
return backend
class HardwareSpec(BaseModel):
"""GPU budget + SKU. Subset clone of DGDR.HardwareSpec."""
model_config = ConfigDict(extra="forbid")
# Guardrail #7: keep the union so exotic AIC systems outside the current
# GPUSKUType enumeration still work. Pydantic coerces matching strings to
# the enum; non-matching strings stay as str and reach AIC untouched.
gpuSku: GPUSKUType | str
totalGpus: int = Field(gt=0)
_SYNTHETIC_ONLY_FIELDS: tuple[str, ...] = (
"isl",
"osl",
"concurrency",
"requestRate",
"requestCount",
)
class WorkloadSpec(BaseModel):
"""Workload, unified across synthetic and trace replay.
Extends DGDR.WorkloadSpec with:
- replay-specific synthetic knobs (request_count, shared-prefix / multi-
turn controls, arrival interval)
- trace-source fields (`traceFile`, `arrivalSpeedupRatio`)
`traceFile` acts as a discriminator:
- when set, the workload is trace-based and the synthetic-only fields
(`isl`, `osl`, `concurrency`, `requestRate`, `requestCount`) must not
be populated — the validator rejects mixed mode to avoid silent data loss
- when unset, the synthetic fields `isl`, `osl`, `concurrency`, and
`requestCount` are all required
"""
model_config = ConfigDict(extra="forbid")
# DGDR base fields
isl: int | None = None
osl: int | None = None
concurrency: float | None = None
requestRate: float | None = None
# Replay synthetic extensions
requestCount: int | None = None
sharedPrefixRatio: float = 0.0
numPrefixGroups: int = 0
turnsPerSession: int = 1
interTurnDelayMs: float = 0.0
arrivalIntervalMs: float = 0.0
# Replay trace-source extensions (mutually exclusive with synthetic fields)
traceFile: str | None = None
arrivalSpeedupRatio: float = 1.0
@model_validator(mode="after")
def _validate_source(self) -> "WorkloadSpec":
if self.traceFile is not None:
mixed = [
name
for name in _SYNTHETIC_ONLY_FIELDS
if getattr(self, name) is not None
]
if mixed:
raise ValueError(
"trace workload (traceFile set) must not also set synthetic "
f"fields: {mixed}"
)
return self
missing = [
name
for name in ("isl", "osl", "concurrency", "requestCount")
if getattr(self, name) is None
]
if missing:
raise ValueError(
"synthetic workload requires "
+ ", ".join(missing)
+ "; or set traceFile for trace replay"
)
return self
@property
def isTraceBased(self) -> bool:
return self.traceFile is not None
# SLASpec field names → Rust replay-report keys. The Rust runner emits
# snake_case + `_ms` suffix (`mean_ttft_ms`, `mean_tpot_ms`, ...) while DGDR's
# convention is camelCase + no unit suffix. This table bridges the two; the
# Rust side renaming is out of scope here.
_SLA_REPORT_KEYS: dict[str, str] = {
"ttft": "mean_ttft_ms",
"itl": "mean_tpot_ms",
"e2eLatency": "mean_e2e_latency_ms",
"p95Ttft": "p95_ttft_ms",
"p95Itl": "p95_tpot_ms",
"p95E2eLatency": "p95_e2e_latency_ms",
}
class SLASpec(BaseModel):
"""Latency SLA bounds.
Guardrail #1 + #2: defaults are `None` (unconstrained); we do NOT clone
DGDR's `_validate_sla_options` — our optimizer treats ttft / itl /
e2eLatency as independent additive penalties and the example uses all
three simultaneously.
`itl` is the DGDR-native name for what the Rust runner reports as
`mean_tpot_ms` (inter-token latency == time-per-output-token).
"""
model_config = ConfigDict(extra="forbid")
ttft: float | None = None
itl: float | None = None
e2eLatency: float | None = None
p95Ttft: float | None = None
p95Itl: float | None = None
p95E2eLatency: float | None = None
def _active(
self, report: Mapping[str, Any]
) -> Iterator[tuple[str, float | None, float]]:
"""Yield `(field_name, metric_from_report_or_None, bound)` for each set bound."""
for field_name, report_key in _SLA_REPORT_KEYS.items():
bound = getattr(self, field_name)
if bound is None or bound <= 0:
continue
value = report.get(report_key)
yield field_name, None if value is None else float(value), float(bound)
def violation_penalty(self, report: Mapping[str, Any]) -> float:
"""Sum of positive (metric/bound - 1) across active SLA bounds.
Missing report key contributes `inf` (fails the feasibility gate)
rather than silently scoring as zero.
"""
penalty = 0.0
for _, metric, bound in self._active(report):
if metric is None:
penalty += math.inf
continue
penalty += max(metric / bound - 1.0, 0.0)
return penalty
def summarize(self, report: Mapping[str, Any]) -> str:
statuses: list[str] = []
for field_name, metric, bound in self._active(report):
if metric is None:
statuses.append(f"{field_name}=missing<={bound:g} unsatisfied")
continue
state = "satisfied" if metric <= bound else "unsatisfied"
statuses.append(f"{field_name}={metric:.3f}<={bound:g} {state}")
return "sla=" + ", ".join(statuses) if statuses else "sla=none"
def aic_task_kwargs(self) -> dict[str, float | None]:
"""Translate to `aiconfigurator.sdk.task.TaskConfig` kwargs.
AIC's external API still uses `tpot` and `request_latency`; we keep
those wire names untouched.
"""
return {
"ttft": self.ttft,
"tpot": self.itl,
"request_latency": self.e2eLatency,
}
class RouterSpec(BaseModel):
"""Router config for the search.
Analogous location to DGDR.KVRouterSpec but semantically different: DGDR
has a single runtime on/off flag, we have a dev-time sweep over overlap
score weights plus a mode selector.
"""
model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid")
mode: RouterMode = RouterMode.KV_ROUTER
# None → fallback to DEFAULT_OVERLAP_SCORE_WEIGHTS (guardrail #3). Empty
# list rejected (guardrail #4). Round-robin auto-collapse happens in
# `effectiveOverlapWeights` (guardrail #5).
overlapWeights: list[float] | None = None
baseRouterConfig: KvRouterConfig | None = None
@field_validator("overlapWeights", mode="after")
@classmethod
def _reject_empty_weights(cls, weights: list[float] | None) -> list[float] | None:
if weights is not None and len(weights) == 0:
raise ValueError("overlapWeights must not be empty")
return weights
@property
def effectiveOverlapWeights(self) -> tuple[float, ...]:
"""Resolve to the concrete weight sweep used by the search."""
if self.mode is RouterMode.ROUND_ROBIN:
return (0.0,)
if self.overlapWeights is None:
return DEFAULT_OVERLAP_SCORE_WEIGHTS
return tuple(float(w) for w in self.overlapWeights)
class ReplayOptimizeSpec(BaseModel):
"""Top-level spec; analog to DGDR.DynamoGraphDeploymentRequestSpec."""
model_config = ConfigDict(extra="forbid")
engine: EngineSpec
hardware: HardwareSpec
workload: WorkloadSpec
sla: SLASpec = Field(default_factory=SLASpec)
router: RouterSpec = Field(default_factory=RouterSpec)
objective: ReplayObjective = ReplayObjective.THROUGHPUT
maxParallelEvals: int = Field(default=DEFAULT_MAX_PARALLEL_EVALS, gt=0)
...@@ -19,6 +19,52 @@ Use this when you want to: ...@@ -19,6 +19,52 @@ Use this when you want to:
- compare timing and cache behavior across mocker configurations - compare timing and cache behavior across mocker configurations
- validate replay logic in CI without bringing up a distributed stack - validate replay logic in CI without bringing up a distributed stack
## Harness Overview
The replay harness wires a load driver (trace file or synthetic workload generator) into one or more mocker engine simulations and tees request/token timing into a trace collector.
```mermaid
flowchart LR
LD[Load Driver] --> H[Replay Harness]
H --> SES[Single Engine Simulation]
H --> MES[Multi Engine Simulation]
SES --> H
MES --> H
H --> TC[Trace Collector]
```
The load driver is either a Mooncake-style JSONL trace (timestamps, ISL/OSL, `hash_ids`) or a synthetic generator parameterized by `isl`/`osl`/`concurrency`. Single-engine simulation (`SES`) is the fast path for `num_workers == 1` with the vLLM engine; multi-engine simulation (`MES`) covers aggregated multi-worker replay, disaggregated prefill/decode replay, and KV-router replay. The trace collector produces the AIPerf-style summary table, the JSON report, and the per-request timing fields consumed by downstream analysis.
Each simulation composes a different set of components. SES drives the engine core directly (scheduler + forward-pass modeling). MES composes multiple engine cores with KV transfer/offloading, KV routing, and planner simulation layered on top:
```mermaid
flowchart TD
subgraph SEC[Single Engine Core]
subgraph SCH[Scheduler Modeling]
F[Fwd Pass Modeling]
end
end
KV[KV Transfer + Offloading Simulation]
KR[KV Router Simulation]
P[Planner Simulation]
SES[Single Engine Simulation]
MES[Multi Engine Simulation]
SES --> SEC
MES --> SEC
MES --> KV
MES --> KR
MES --> P
```
See [`lib/mocker/src/replay/offline/README.md`](../../lib/mocker/src/replay/offline/README.md) for offline-harness internals (logical clock, event queue, worker model) and [`docs/mocker/mocker.md`](../mocker/mocker.md) for engine-core details (scheduler, KV block manager).
## Quick Start ## Quick Start
Run offline replay through the dedicated replay CLI: Run offline replay through the dedicated replay CLI:
......
...@@ -90,7 +90,7 @@ python -m dynamo.mocker \ ...@@ -90,7 +90,7 @@ python -m dynamo.mocker \
| `--num-workers` | 1 | Workers per process | | `--num-workers` | 1 | Workers per process |
| `--reasoning` | None | JSON config for emitting reasoning token spans, with `start_thinking_token_id`, `end_thinking_token_id`, and `thinking_ratio` | | `--reasoning` | None | JSON config for emitting reasoning token spans, with `start_thinking_token_id`, `end_thinking_token_id`, and `thinking_ratio` |
| `--engine-type` | `vllm` | Engine simulation type: `vllm` or `sglang` | | `--engine-type` | `vllm` | Engine simulation type: `vllm` or `sglang` |
| `--sglang-schedule-policy` | `fifo` / `fcfs` | SGLang scheduling policy override | | `--sglang-schedule-policy` | `fifo` / `fcfs` | SGLang scheduling policy: `fifo`/`fcfs` (default) or `lpm` (longest prefix match) |
| `--sglang-page-size` | 1 | SGLang radix-cache page size in tokens. Also becomes the effective block size when `--engine-type sglang` and `--block-size` is omitted | | `--sglang-page-size` | 1 | SGLang radix-cache page size in tokens. Also becomes the effective block size when `--engine-type sglang` and `--block-size` is omitted |
| `--sglang-max-prefill-tokens` | 16384 | SGLang max prefill-token budget per batch | | `--sglang-max-prefill-tokens` | 16384 | SGLang max prefill-token budget per batch |
| `--sglang-chunked-prefill-size` | 8192 | SGLang chunked-prefill chunk size | | `--sglang-chunked-prefill-size` | 8192 | SGLang chunked-prefill chunk size |
...@@ -366,7 +366,7 @@ kubectl apply -f examples/backends/mocker/deploy/disagg.yaml ...@@ -366,7 +366,7 @@ kubectl apply -f examples/backends/mocker/deploy/disagg.yaml
## Architecture ## Architecture
The mocker is organized into several cooperating components that mirror the internal architecture of production LLM inference engines. The mocker is organized into several cooperating components that mirror the internal architecture of production LLM inference engines. The scheduler (vLLM-style and SGLang-style variants) and KV block manager live inside the engine core. Multi-engine behavior — KV transfer/offloading simulation, KV router simulation, planner simulation — is added by the replay harness on top of multiple engine cores; see [Mocker Trace Replay](../benchmarks/mocker-trace-replay.md) for the component-level diagram and for offline replay internals under [`lib/mocker/src/replay/offline/`](../../lib/mocker/src/replay/offline/README.md).
### Scheduler ### Scheduler
...@@ -388,43 +388,46 @@ When resources become constrained, the mocker simulates the engine's real recove ...@@ -388,43 +388,46 @@ When resources become constrained, the mocker simulates the engine's real recove
### KV Block Manager ### KV Block Manager
The block manager tracks KV cache blocks using reference counting and an LRU eviction policy. Blocks exist in one of two pools: The mocker's KV block manager is now built on [`kvbm-logical::BlockManager<G1>`](../../lib/kvbm-logical/), the same logical block manager the real Dynamo runtime uses. The mocker wraps it in [`lib/mocker/src/kv_manager/kvbm_backend.rs`](../../lib/mocker/src/kv_manager/kvbm_backend.rs) and translates its own `MoveBlock` protocol onto kvbm-logical's RAII lifecycle (`allocate → stage → register → drop`).
- **Active Pool** - Blocks currently in use by one or more sequences, tracked with reference counts Blocks still conceptually live in one of two pools:
- **Inactive Pool** - Blocks no longer actively referenced but kept for potential reuse (prefix caching)
When a sequence needs blocks, the manager first checks if they already exist (cache hit). If not, it allocates new blocks, potentially evicting the least-recently-used inactive blocks to make room. When a sequence completes or is preempted, its blocks are either moved to the inactive pool (for potential reuse) or freed entirely. - **Active** — blocks currently held by at least one sequence. Partial (still-filling) blocks are held as `MutableBlock<G1>`; full blocks are held as `ImmutableBlock<G1>` clones (the clone vec length is the mocker's refcount, one per `Use`).
- **Inactive** — blocks no longer referenced by any sequence but kept for prefix-cache reuse. Handled entirely by kvbm-logical's inactive pool; the mocker never tracks them manually.
The following diagram illustrates the block lifecycle, based on vLLM's block manager design: The lifecycle is RAII: dropping the last `ImmutableBlock` clone transitions the block from active to inactive (kvbm-logical's `reset` pool), with no explicit `deref`/`evict` bookkeeping on the mocker side. When a sequence completes or is preempted, the mocker simply drops its handles; kvbm-logical recovers the capacity.
```mermaid ```mermaid
stateDiagram-v2 stateDiagram-v2
[*] --> Active : alloc [*] --> Active : allocate + stage + register
Active --> Inactive : deref Active --> Inactive : last handle dropped (RAII)
Inactive --> Active : cache hit (reuse) Inactive --> Active : match_blocks(PLH) reuse
Inactive --> Freed : evict Inactive --> Freed : evicted by backend
Active --> Freed : destroy (preemption) Active --> Freed : explicit Removed (Destroy)
Freed --> [*] Freed --> [*]
state Active { state Active {
[*] --> Tracked : ref_count tracked [*] --> Partial : MutableBlock<G1>
} Partial --> Full : promote (PLH / SequenceHash)
state Inactive { [*] --> Full : ImmutableBlock<G1> clones
[*] --> Ordered : LRU order
} }
``` ```
### Evictor Three `Use` outcomes are tracked for KV-event emission: `ActiveHit` (bump refcount on an already-pinned block), `InactiveHit` (reactivate via `match_blocks(plh)`), and `NewStore` (fresh allocation). Only `NewStore` emits a `Stored` KV event — the router radix tree already knows about the other two and only forgets on explicit `Removed`.
### Eviction Backends
The LRU evictor maintains blocks ordered by a monotonic counter, enabling O(log n) eviction of the lowest-priority block. Each `insert` assigns the next counter value, so blocks inserted later have higher counters and survive longer. The kvbm-logical inactive pool selects eviction victims via one of three backends, exposed as `MockerEvictionBackend` in [`lib/mocker/src/common/protocols.rs`](../../lib/mocker/src/common/protocols.rs):
This produces a **depth-aware eviction policy**: when a sequence completes, `free_signal` releases its blocks in reverse order (tail first). Deeper suffix blocks therefore receive lower counters and are evicted before shallower prefix blocks. This keeps shared prefixes cached longer, improving cache hit rates across requests with common prefixes. - **`Lineage`** (default) — parent-chain aware: evicts leaf blocks first, preserving shared prefix chains. Subsumes the preemption-priority behavior the old hand-rolled `LRUEvictor::push_front` used to provide.
- **`Lru`** — plain recency-based LRU.
- **`MultiLru`** — 4-tier frequency-aware LRU built on a TinyLFU tracker.
The evictor also supports front-insertion (negative counters) for marking blocks for immediate eviction, though this is not currently used in the scheduler. All three give the same "suffix blocks evicted before shared prefixes" outcome that the old evictor was designed to produce; `Lineage` does it structurally (via the block parent chain) rather than via monotonic counters.
### Sequence Tracking ### Sequence Tracking
Each active request is tracked as a sequence, managing its token blocks and generation state. As tokens are generated, the sequence tracks which blocks are partial (still being filled) versus full (complete and hashable for prefix caching). When a partial block fills up, it gets "promoted" to a full block with a content-based hash, enabling future cache hits from requests with matching prefixes. Each active request is tracked as a sequence, managing its token blocks and generation state. As tokens are generated, the sequence tracks which blocks are partial (`MutableBlock<G1>`, still being filled) versus full (`ImmutableBlock<G1>`, complete and hashable for prefix caching). When a partial block fills up, it gets "promoted" to a full block with a content-based `SequenceHash` (or collapses onto an existing registered handle if the PLH is already present), enabling future cache hits from requests with matching prefixes.
### Performance Model ### Performance Model
......
...@@ -4,6 +4,8 @@ This directory contains the in-process offline replay harness used by `dynamo_mo ...@@ -4,6 +4,8 @@ This directory contains the in-process offline replay harness used by `dynamo_mo
The goal is to simulate trace execution without spinning up async runtimes, network planes, or real worker tasks. Instead, the harness advances a logical clock, steps mock engine cores directly, and records request/token timing into `TraceCollector` in `lib/mocker/src/replay/collector.rs`. The goal is to simulate trace execution without spinning up async runtimes, network planes, or real worker tasks. Instead, the harness advances a logical clock, steps mock engine cores directly, and records request/token timing into `TraceCollector` in `lib/mocker/src/replay/collector.rs`.
For the harness-level picture (load driver → harness → SES/MES → trace collector) and operator-facing CLI docs, see [`docs/benchmarks/mocker-trace-replay.md`](../../../../../docs/benchmarks/mocker-trace-replay.md). This README dives into the offline-specific internals: logical clock, event queue, per-worker state machine.
## Where It Sits ## Where It Sits
The public replay entrypoints live one level up in `lib/mocker/src/replay/entrypoints.rs`. They: The public replay entrypoints live one level up in `lib/mocker/src/replay/entrypoints.rs`. They:
...@@ -34,9 +36,20 @@ Offline replay starts in `lib/mocker/src/replay/offline/mod.rs`. ...@@ -34,9 +36,20 @@ Offline replay starts in `lib/mocker/src/replay/offline/mod.rs`.
- `lib/mocker/src/replay/offline/state.rs` - `lib/mocker/src/replay/offline/state.rs`
Per-worker wrapper around `EngineCore`, including optional KV event capture. Per-worker wrapper around `EngineCore`, including optional KV event capture.
- `lib/mocker/src/replay/offline/events.rs` - `lib/mocker/src/replay/offline/events.rs`
Priority-queue event type used by the multi-worker harness. `SimulationEvent` + `SimulationEventKind` priority-queue types used by the multi-worker harness.
- `lib/mocker/src/replay/offline/core.rs` - `lib/mocker/src/replay/offline/core.rs`
Small `ReplayWorkerCore` wrapper used by the single-worker path. Small `ReplayWorkerCore` wrapper used by the single-worker path.
- `lib/mocker/src/replay/offline/runtime_utils.rs`
Shared helpers used by `agg.rs` and `disagg.rs`: `WorkerCompletionPayload`, event scheduling, `next_timestamp`.
- `lib/mocker/src/replay/offline/progress.rs`
`ReplayProgress`, the indicatif-based progress bar used by the harnesses.
- `lib/mocker/src/replay/offline/components/`
Shared abstractions split out from the runtimes:
- `router.rs``OfflineReplayRouter` (synchronous in-process router, KV + round-robin modes) and `OfflineRouterSnapshot`.
- `engine.rs``EngineComponent`, `EngineEffects`, `EnginePassMode` wrappers around `EngineCore`.
- `admission.rs` — admission queue and trace/workload request gating.
- `types.rs``WorkerAdmission`, `RouterEffects`, `ScheduledWorkerCompletion`, `TrafficAccumulator`, `TrafficStats`, `ReplayMode`.
- `mod.rs` — re-exports.
## Single-Worker Fast Path ## Single-Worker Fast Path
...@@ -117,21 +130,25 @@ So offline replay is not a toy simulator. It reuses the real per-pass mocker sch ...@@ -117,21 +130,25 @@ So offline replay is not a toy simulator. It reuses the real per-pass mocker sch
## Completion Event Queue ## Completion Event Queue
The multi-worker and disagg harnesses use `SimulationEvent` from `lib/mocker/src/replay/offline/events.rs` as a min-time priority queue implemented with `BinaryHeap`. The multi-worker and disagg harnesses use `SimulationEvent` from `lib/mocker/src/replay/offline/events.rs` as a min-time priority queue implemented with `BinaryHeap`. The event itself is a small struct carrying the scheduled timestamp, a sequence number for tie-breaking, and a typed payload:
Right now the only scheduled event type is:
- `WorkerCompletion` ```rust
pub(crate) struct SimulationEvent {
pub(crate) at_ms: f64,
pub(crate) seq_no: u64,
pub(crate) kind: SimulationEventKind,
}
That event carries: pub(crate) enum SimulationEventKind {
WorkerCompletion { stage, worker_idx, completed_requests, output_signals, kv_events },
- worker `stage` (`aggregated`, `prefill`, or `decode`) DecodeHandoff { uuid },
- `worker_idx` WorkerReady { stage, worker_id },
- `completed_requests` }
- `output_signals` ```
- router-visible `kv_events`
Those are emitted after a worker pass is executed and then applied later when the harness clock reaches `pass.end_ms`. - `WorkerCompletion` is emitted after a worker pass is executed and applied when the harness clock reaches `pass.end_ms`. It carries the `stage` (`Aggregated`, `Prefill`, or `Decode`), `worker_idx`, `completed_requests`, `output_signals`, and router-visible `kv_events`.
- `DecodeHandoff` is used by the disaggregated harness to move a request from prefill to decode at the same logical timestamp (see below).
- `WorkerReady` marks the point at which a worker returns to the admission pool after a pass completes.
## Router Integration ## Router Integration
...@@ -140,7 +157,7 @@ Offline replay can run in: ...@@ -140,7 +157,7 @@ Offline replay can run in:
- `round_robin` - `round_robin`
- `kv_router` - `kv_router`
The router implementation for offline mode lives in `lib/mocker/src/replay/router/offline.rs`. The router implementation for offline mode lives in `lib/mocker/src/replay/offline/components/router.rs` (`OfflineReplayRouter`).
This router is synchronous and in-process: This router is synchronous and in-process:
......
...@@ -8,7 +8,6 @@ import time ...@@ -8,7 +8,6 @@ import time
import pytest import pytest
import requests import requests
from openai import APIError, OpenAI
from tests.utils.constants import FAULT_TOLERANCE_MODEL_NAME from tests.utils.constants import FAULT_TOLERANCE_MODEL_NAME
from tests.utils.managed_process import ( from tests.utils.managed_process import (
...@@ -16,6 +15,12 @@ from tests.utils.managed_process import ( ...@@ -16,6 +15,12 @@ from tests.utils.managed_process import (
) )
from tests.utils.managed_process import ManagedProcess, terminate_process_tree from tests.utils.managed_process import ManagedProcess, terminate_process_tree
openai = pytest.importorskip(
"openai", reason="openai package is required for fault tolerance migration tests"
)
APIError = openai.APIError
OpenAI = openai.OpenAI
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment