Unverified Commit 88478b4b authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

feat(replay): add agentic trace replay format (#8627)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent c8c99dd1
...@@ -18,6 +18,7 @@ except ImportError: ...@@ -18,6 +18,7 @@ except ImportError:
from dynamo.profiler.utils import replay_optimize from dynamo.profiler.utils import replay_optimize
from dynamo.profiler.utils.replay_optimize import ( from dynamo.profiler.utils.replay_optimize import (
DenseAggReplayState, DenseAggReplayState,
DenseReplayState,
EngineSpec, EngineSpec,
HardwareSpec, HardwareSpec,
ReplayObjective, ReplayObjective,
...@@ -124,6 +125,22 @@ def _trace_workload(trace_file: Path, speedup: float = 100.0) -> WorkloadSpec: ...@@ -124,6 +125,22 @@ def _trace_workload(trace_file: Path, speedup: float = 100.0) -> WorkloadSpec:
return WorkloadSpec(traceFile=str(trace_file), arrivalSpeedupRatio=speedup) return WorkloadSpec(traceFile=str(trace_file), arrivalSpeedupRatio=speedup)
def _applied_compute_agentic_trace_workload(
trace_file: Path,
*,
concurrency: int = 8,
shared_prefix_ratio: float = 0.5,
num_prefix_groups: int = 1,
) -> WorkloadSpec:
return WorkloadSpec(
traceFile=str(trace_file),
traceFormat="applied_compute_agentic",
traceReplayConcurrency=concurrency,
traceSharedPrefixRatio=shared_prefix_ratio,
traceNumPrefixGroups=num_prefix_groups,
)
def _sla(**bounds: float) -> SLASpec: def _sla(**bounds: float) -> SLASpec:
"""Build an SLASpec from keyword bounds. """Build an SLASpec from keyword bounds.
...@@ -141,6 +158,63 @@ def _sla(**bounds: float) -> SLASpec: ...@@ -141,6 +158,63 @@ def _sla(**bounds: float) -> SLASpec:
return SLASpec(**{translate.get(k, k): v for k, v in bounds.items()}) return SLASpec(**{translate.get(k, k): v for k, v in bounds.items()})
def test_applied_compute_agentic_trace_workload_requires_trace_replay_concurrency(
tmp_path,
) -> None:
trace_path = _write_trace(tmp_path)
with pytest.raises(ValueError, match="traceReplayConcurrency"):
WorkloadSpec(
traceFile=str(trace_path),
traceFormat="applied_compute_agentic",
)
def test_applied_compute_agentic_trace_workload_rejects_shared_prefix_ratio_without_groups(
tmp_path,
) -> None:
trace_path = _write_trace(tmp_path)
with pytest.raises(ValueError, match="traceNumPrefixGroups"):
WorkloadSpec(
traceFile=str(trace_path),
traceFormat="applied_compute_agentic",
traceReplayConcurrency=8,
traceSharedPrefixRatio=0.5,
traceNumPrefixGroups=0,
)
def test_run_replay_for_state_passes_applied_compute_agentic_trace_knobs(
tmp_path, monkeypatch
) -> None:
trace_path = _write_trace(tmp_path)
workload = _applied_compute_agentic_trace_workload(trace_path, concurrency=9)
captured: dict[str, Any] = {}
def fake_run_trace_replay(trace_file, **kwargs):
captured["trace_file"] = trace_file
captured["kwargs"] = kwargs
return {"output_throughput_tok_s": 1.0}
monkeypatch.setattr(
"dynamo.profiler.utils.replay_optimize.evaluate.run_trace_replay",
fake_run_trace_replay,
)
replay_optimize.evaluate._run_replay_for_state(
state=DenseReplayState(1, 1, 1, 1, 0.5),
workload=workload,
prefill_engine_args=_base_prefill_args(),
decode_engine_args=_base_decode_args(),
router_config=KvRouterConfig(),
)
assert Path(captured["trace_file"]) == trace_path
assert captured["kwargs"]["replay_concurrency"] == 9
assert captured["kwargs"]["trace_format"] == "applied_compute_agentic"
assert captured["kwargs"]["trace_shared_prefix_ratio"] == 0.5
assert captured["kwargs"]["trace_num_prefix_groups"] == 1
def _disagg_spec( def _disagg_spec(
*, *,
workload: WorkloadSpec | None = None, workload: WorkloadSpec | None = None,
......
...@@ -53,9 +53,13 @@ def _run_replay_for_state( ...@@ -53,9 +53,13 @@ def _run_replay_for_state(
router_config=router_config, router_config=router_config,
num_prefill_workers=state.prefill_workers, num_prefill_workers=state.prefill_workers,
num_decode_workers=state.decode_workers, num_decode_workers=state.decode_workers,
replay_concurrency=workload.traceReplayConcurrency,
replay_mode="offline", replay_mode="offline",
router_mode=state.router_mode, router_mode=state.router_mode,
arrival_speedup_ratio=workload.arrivalSpeedupRatio, arrival_speedup_ratio=workload.arrivalSpeedupRatio,
trace_format=workload.traceFormat,
trace_shared_prefix_ratio=workload.traceSharedPrefixRatio,
trace_num_prefix_groups=workload.traceNumPrefixGroups,
) )
return run_synthetic_trace_replay( return run_synthetic_trace_replay(
...@@ -91,9 +95,13 @@ def _run_agg_replay_for_state( ...@@ -91,9 +95,13 @@ def _run_agg_replay_for_state(
extra_engine_args=engine_args, extra_engine_args=engine_args,
router_config=router_config, router_config=router_config,
num_workers=state.workers, num_workers=state.workers,
replay_concurrency=workload.traceReplayConcurrency,
replay_mode="offline", replay_mode="offline",
router_mode=state.router_mode, router_mode=state.router_mode,
arrival_speedup_ratio=workload.arrivalSpeedupRatio, arrival_speedup_ratio=workload.arrivalSpeedupRatio,
trace_format=workload.traceFormat,
trace_shared_prefix_ratio=workload.traceSharedPrefixRatio,
trace_num_prefix_groups=workload.traceNumPrefixGroups,
) )
return run_synthetic_trace_replay( return run_synthetic_trace_replay(
......
...@@ -40,12 +40,20 @@ RESULT_COLUMNS: Sequence[str] = ( ...@@ -40,12 +40,20 @@ RESULT_COLUMNS: Sequence[str] = (
def _build_workload( def _build_workload(
*, *,
trace_file: str | None, trace_file: str | None,
trace_format: str,
arrival_speedup_ratio: float, arrival_speedup_ratio: float,
trace_replay_concurrency: int | None,
trace_shared_prefix_ratio: float,
trace_num_prefix_groups: int,
) -> WorkloadSpec: ) -> WorkloadSpec:
if trace_file is not None: if trace_file is not None:
return WorkloadSpec( return WorkloadSpec(
traceFile=trace_file, traceFile=trace_file,
traceFormat=trace_format,
arrivalSpeedupRatio=arrival_speedup_ratio, arrivalSpeedupRatio=arrival_speedup_ratio,
traceReplayConcurrency=trace_replay_concurrency,
traceSharedPrefixRatio=trace_shared_prefix_ratio,
traceNumPrefixGroups=trace_num_prefix_groups,
) )
return WorkloadSpec( return WorkloadSpec(
...@@ -70,7 +78,11 @@ def _engine_args(worker_type: str) -> MockEngineArgs: ...@@ -70,7 +78,11 @@ def _engine_args(worker_type: str) -> MockEngineArgs:
def run_example( def run_example(
*, *,
trace_file: str | None = None, trace_file: str | None = None,
trace_format: str = "mooncake",
arrival_speedup_ratio: float = 1.0, arrival_speedup_ratio: float = 1.0,
trace_replay_concurrency: int | None = None,
trace_shared_prefix_ratio: float = 0.0,
trace_num_prefix_groups: int = 0,
max_parallel_evals: int = 1, max_parallel_evals: int = 1,
) -> None: ) -> None:
spec = ReplayOptimizeSpec( spec = ReplayOptimizeSpec(
...@@ -83,7 +95,11 @@ def run_example( ...@@ -83,7 +95,11 @@ def run_example(
hardware=HardwareSpec(gpuSku=GPU_SKU, totalGpus=TOTAL_GPUS), hardware=HardwareSpec(gpuSku=GPU_SKU, totalGpus=TOTAL_GPUS),
workload=_build_workload( workload=_build_workload(
trace_file=trace_file, trace_file=trace_file,
trace_format=trace_format,
arrival_speedup_ratio=arrival_speedup_ratio, arrival_speedup_ratio=arrival_speedup_ratio,
trace_replay_concurrency=trace_replay_concurrency,
trace_shared_prefix_ratio=trace_shared_prefix_ratio,
trace_num_prefix_groups=trace_num_prefix_groups,
), ),
sla=SLASpec(ttft=50000.0, itl=100.0, e2eLatency=60000.0), sla=SLASpec(ttft=50000.0, itl=100.0, e2eLatency=60000.0),
router=RouterSpec( router=RouterSpec(
...@@ -109,7 +125,13 @@ def main() -> None: ...@@ -109,7 +125,13 @@ def main() -> None:
) )
parser.add_argument( parser.add_argument(
"--trace-file", "--trace-file",
help="Optional Mooncake-style JSONL trace. If omitted, runs the synthetic workload.", help="Optional replay trace JSONL file. If omitted, runs the synthetic workload.",
)
parser.add_argument(
"--trace-format",
choices=("mooncake", "applied_compute_agentic"),
default="mooncake",
help="Trace-file format to use with --trace-file.",
) )
parser.add_argument( parser.add_argument(
"--arrival-speedup-ratio", "--arrival-speedup-ratio",
...@@ -117,6 +139,23 @@ def main() -> None: ...@@ -117,6 +139,23 @@ def main() -> None:
default=1.0, default=1.0,
help="Arrival speedup ratio to use with --trace-file.", help="Arrival speedup ratio to use with --trace-file.",
) )
parser.add_argument(
"--trace-replay-concurrency",
type=int,
help="Optional replay concurrency cap for trace workloads; required for --trace-format=applied_compute_agentic.",
)
parser.add_argument(
"--trace-shared-prefix-ratio",
type=float,
default=0.0,
help="Fraction of initial prompt blocks shared across sessions for applied_compute_agentic trace replay.",
)
parser.add_argument(
"--trace-num-prefix-groups",
type=int,
default=0,
help="Number of shared-prefix groups for applied_compute_agentic trace replay.",
)
parser.add_argument( parser.add_argument(
"--max-parallel-evals", "--max-parallel-evals",
type=int, type=int,
...@@ -126,7 +165,11 @@ def main() -> None: ...@@ -126,7 +165,11 @@ def main() -> None:
args = parser.parse_args() args = parser.parse_args()
run_example( run_example(
trace_file=args.trace_file, trace_file=args.trace_file,
trace_format=args.trace_format,
arrival_speedup_ratio=args.arrival_speedup_ratio, arrival_speedup_ratio=args.arrival_speedup_ratio,
trace_replay_concurrency=args.trace_replay_concurrency,
trace_shared_prefix_ratio=args.trace_shared_prefix_ratio,
trace_num_prefix_groups=args.trace_num_prefix_groups,
max_parallel_evals=args.max_parallel_evals, max_parallel_evals=args.max_parallel_evals,
) )
......
...@@ -166,7 +166,11 @@ class WorkloadSpec(BaseModel): ...@@ -166,7 +166,11 @@ class WorkloadSpec(BaseModel):
# Replay trace-source extensions (mutually exclusive with synthetic fields) # Replay trace-source extensions (mutually exclusive with synthetic fields)
traceFile: str | None = None traceFile: str | None = None
traceFormat: str = "mooncake"
arrivalSpeedupRatio: float = 1.0 arrivalSpeedupRatio: float = 1.0
traceReplayConcurrency: int | None = None
traceSharedPrefixRatio: float = 0.0
traceNumPrefixGroups: int = 0
@model_validator(mode="after") @model_validator(mode="after")
def _validate_source(self) -> "WorkloadSpec": def _validate_source(self) -> "WorkloadSpec":
...@@ -181,6 +185,31 @@ class WorkloadSpec(BaseModel): ...@@ -181,6 +185,31 @@ class WorkloadSpec(BaseModel):
"trace workload (traceFile set) must not also set synthetic " "trace workload (traceFile set) must not also set synthetic "
f"fields: {mixed}" f"fields: {mixed}"
) )
if self.traceFormat not in {"mooncake", "applied_compute_agentic"}:
raise ValueError(
"traceFormat must be either 'mooncake' or 'applied_compute_agentic', got "
f"{self.traceFormat!r}"
)
if (
self.traceReplayConcurrency is not None
and self.traceReplayConcurrency < 1
):
raise ValueError("traceReplayConcurrency must be at least 1")
if not 0.0 <= self.traceSharedPrefixRatio <= 1.0:
raise ValueError("traceSharedPrefixRatio must be between 0.0 and 1.0")
if self.traceNumPrefixGroups < 0:
raise ValueError("traceNumPrefixGroups must be non-negative")
if self.traceSharedPrefixRatio > 0.0 and self.traceNumPrefixGroups == 0:
raise ValueError(
"traceSharedPrefixRatio > 0 requires traceNumPrefixGroups >= 1"
)
if (
self.traceFormat == "applied_compute_agentic"
and self.traceReplayConcurrency is None
):
raise ValueError(
"traceFormat='applied_compute_agentic' requires traceReplayConcurrency"
)
return self return self
missing = [ missing = [
......
...@@ -12,7 +12,9 @@ pre-warms the KV cache with the predicted next-turn prefix after each assistant ...@@ -12,7 +12,9 @@ pre-warms the KV cache with the predicted next-turn prefix after each assistant
response, cutting TTFT on subsequent turns. response, cutting TTFT on subsequent turns.
`offline_replay_bench` runs the Rust-native replay loop directly for profiling `offline_replay_bench` runs the Rust-native replay loop directly for profiling
and throughput measurements without going through the Python wrapper. and throughput measurements without going through the Python wrapper. It uses
the mocker's internal polynomial perf model so the results stay focused on
replay overhead instead of external timing backends.
## Quick start ## Quick start
...@@ -114,3 +116,6 @@ cargo bench --package dynamo-bench --bench offline_replay_bench -- \ ...@@ -114,3 +116,6 @@ cargo bench --package dynamo-bench --bench offline_replay_bench -- \
--trace-block-size 512 \ --trace-block-size 512 \
--block-size 64 --block-size 64
``` ```
Use `--speedup-ratio` and `--decode-speedup-ratio` if you want a simple scaling
knob while keeping the same internal polynomial model.
...@@ -3,8 +3,9 @@ ...@@ -3,8 +3,9 @@
//! Rust-native offline replay benchmark entrypoint. //! Rust-native offline replay benchmark entrypoint.
//! //!
//! Useful for profiling replay itself without the Python CLI wrapper. This keeps //! Useful for profiling replay itself without the Python CLI wrapper. This
//! the default mocker perf model unless CLI overrides are provided. //! bench intentionally uses the mocker's internal polynomial perf model so the
//! measurements stay focused on replay and router overhead.
//! //!
//! Run with: cargo bench --package dynamo-bench --bench offline_replay_bench -- --help //! Run with: cargo bench --package dynamo-bench --bench offline_replay_bench -- --help
...@@ -80,10 +81,6 @@ struct Args { ...@@ -80,10 +81,6 @@ struct Args {
#[arg(long)] #[arg(long)]
decode_speedup_ratio: Option<f64>, decode_speedup_ratio: Option<f64>,
/// Explicit planner profile NPZ to use for perf-model timing
#[arg(long)]
planner_profile_data: Option<PathBuf>,
/// Optional path to write the full replay report as pretty JSON /// Optional path to write the full replay report as pretty JSON
#[arg(long)] #[arg(long)]
report_json: Option<PathBuf>, report_json: Option<PathBuf>,
...@@ -112,9 +109,6 @@ fn build_engine_args(args: &Args) -> Result<MockEngineArgs> { ...@@ -112,9 +109,6 @@ fn build_engine_args(args: &Args) -> Result<MockEngineArgs> {
if let Some(decode_speedup_ratio) = args.decode_speedup_ratio { if let Some(decode_speedup_ratio) = args.decode_speedup_ratio {
builder = builder.decode_speedup_ratio(decode_speedup_ratio); builder = builder.decode_speedup_ratio(decode_speedup_ratio);
} }
if let Some(planner_profile_data) = args.planner_profile_data.as_ref() {
builder = builder.planner_profile_data(Some(planner_profile_data.clone()));
}
builder builder
.build() .build()
.context("failed to build replay engine args")? .context("failed to build replay engine args")?
......
...@@ -526,7 +526,7 @@ impl MockEngineArgs { ...@@ -526,7 +526,7 @@ impl MockEngineArgs {
} }
#[pyfunction] #[pyfunction]
#[pyo3(signature = (trace_file, extra_engine_args=None, prefill_engine_args=None, decode_engine_args=None, router_config=None, aic_perf_config=None, num_workers=1, num_prefill_workers=1, num_decode_workers=1, replay_concurrency=None, replay_mode="offline", router_mode="round_robin", arrival_speedup_ratio=1.0, trace_block_size=512))] #[pyo3(signature = (trace_file, extra_engine_args=None, prefill_engine_args=None, decode_engine_args=None, router_config=None, aic_perf_config=None, num_workers=1, num_prefill_workers=1, num_decode_workers=1, replay_concurrency=None, replay_mode="offline", router_mode="round_robin", arrival_speedup_ratio=1.0, trace_block_size=512, trace_format="mooncake", trace_shared_prefix_ratio=0.0, trace_num_prefix_groups=0))]
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn run_mocker_trace_replay( pub fn run_mocker_trace_replay(
py: Python<'_>, py: Python<'_>,
...@@ -544,6 +544,9 @@ pub fn run_mocker_trace_replay( ...@@ -544,6 +544,9 @@ pub fn run_mocker_trace_replay(
router_mode: &str, router_mode: &str,
arrival_speedup_ratio: f64, arrival_speedup_ratio: f64,
trace_block_size: usize, trace_block_size: usize,
trace_format: &str,
trace_shared_prefix_ratio: f64,
trace_num_prefix_groups: usize,
) -> PyResult<PyObject> { ) -> PyResult<PyObject> {
let args_selection = load_replay_args_selection( let args_selection = load_replay_args_selection(
py, py,
...@@ -555,6 +558,7 @@ pub fn run_mocker_trace_replay( ...@@ -555,6 +558,7 @@ pub fn run_mocker_trace_replay(
num_decode_workers, num_decode_workers,
)?; )?;
let router_mode = parse_replay_router_mode(router_mode)?; let router_mode = parse_replay_router_mode(router_mode)?;
let trace_format = parse_trace_file_format(trace_format)?;
let prefill_load_estimator = load_replay_prefill_load_estimator( let prefill_load_estimator = load_replay_prefill_load_estimator(
py, py,
router_mode, router_mode,
...@@ -565,12 +569,19 @@ pub fn run_mocker_trace_replay( ...@@ -565,12 +569,19 @@ pub fn run_mocker_trace_replay(
let replay_mode = replay_mode.to_owned(); let replay_mode = replay_mode.to_owned();
let report = py.allow_threads(move || { let report = py.allow_threads(move || {
let replay_concurrency = parse_replay_concurrency(replay_concurrency)?; let replay_concurrency = parse_replay_concurrency(replay_concurrency)?;
if trace_format == dynamo_mocker::loadgen::TraceFileFormat::AppliedComputeAgentic
&& replay_concurrency.is_none()
{
anyhow::bail!(
"trace_format='applied_compute_agentic' requires replay_concurrency because source traces do not contain first-turn timestamps"
);
}
match args_selection { match args_selection {
ReplayArgsSelection::Aggregated(args) => { ReplayArgsSelection::Aggregated(args) => {
match (replay_mode.as_str(), replay_concurrency) { match (replay_mode.as_str(), replay_concurrency) {
("offline", Some(max_in_flight)) => { ("offline", Some(max_in_flight)) => {
dynamo_mocker::replay::simulate_concurrency_file_with_router_mode( dynamo_mocker::replay::simulate_concurrency_file_with_router_mode_and_format(
*args, *args,
router_config.clone(), router_config.clone(),
prefill_load_estimator.clone(), prefill_load_estimator.clone(),
...@@ -579,10 +590,13 @@ pub fn run_mocker_trace_replay( ...@@ -579,10 +590,13 @@ pub fn run_mocker_trace_replay(
max_in_flight, max_in_flight,
num_workers, num_workers,
router_mode, router_mode,
trace_format,
trace_shared_prefix_ratio,
trace_num_prefix_groups,
) )
} }
("offline", None) => { ("offline", None) => {
dynamo_mocker::replay::simulate_trace_file_with_router_mode( dynamo_mocker::replay::simulate_trace_file_with_router_mode_and_format(
*args, *args,
router_config.clone(), router_config.clone(),
prefill_load_estimator.clone(), prefill_load_estimator.clone(),
...@@ -591,10 +605,13 @@ pub fn run_mocker_trace_replay( ...@@ -591,10 +605,13 @@ pub fn run_mocker_trace_replay(
num_workers, num_workers,
arrival_speedup_ratio, arrival_speedup_ratio,
router_mode, router_mode,
trace_format,
trace_shared_prefix_ratio,
trace_num_prefix_groups,
) )
} }
("online", Some(max_in_flight)) => { ("online", Some(max_in_flight)) => {
dynamo_mocker::replay::simulate_concurrency_live_file_with_router_mode( dynamo_mocker::replay::simulate_concurrency_live_file_with_router_mode_and_format(
*args, *args,
router_config.clone(), router_config.clone(),
prefill_load_estimator.clone(), prefill_load_estimator.clone(),
...@@ -603,10 +620,13 @@ pub fn run_mocker_trace_replay( ...@@ -603,10 +620,13 @@ pub fn run_mocker_trace_replay(
max_in_flight, max_in_flight,
num_workers, num_workers,
router_mode, router_mode,
trace_format,
trace_shared_prefix_ratio,
trace_num_prefix_groups,
) )
} }
("online", None) => { ("online", None) => {
dynamo_mocker::replay::simulate_trace_live_file_with_router_mode( dynamo_mocker::replay::simulate_trace_live_file_with_router_mode_and_format(
*args, *args,
router_config.clone(), router_config.clone(),
prefill_load_estimator.clone(), prefill_load_estimator.clone(),
...@@ -615,6 +635,9 @@ pub fn run_mocker_trace_replay( ...@@ -615,6 +635,9 @@ pub fn run_mocker_trace_replay(
num_workers, num_workers,
arrival_speedup_ratio, arrival_speedup_ratio,
router_mode, router_mode,
trace_format,
trace_shared_prefix_ratio,
trace_num_prefix_groups,
) )
} }
(other, _) => anyhow::bail!( (other, _) => anyhow::bail!(
...@@ -626,7 +649,7 @@ pub fn run_mocker_trace_replay( ...@@ -626,7 +649,7 @@ pub fn run_mocker_trace_replay(
ReplayArgsSelection::Disagg(config) => match (replay_mode.as_str(), replay_concurrency) ReplayArgsSelection::Disagg(config) => match (replay_mode.as_str(), replay_concurrency)
{ {
("offline", Some(max_in_flight)) => { ("offline", Some(max_in_flight)) => {
dynamo_mocker::replay::simulate_concurrency_file_disagg_with_router_mode( dynamo_mocker::replay::simulate_concurrency_file_disagg_with_router_mode_and_format(
*config, *config,
router_config.clone(), router_config.clone(),
prefill_load_estimator.clone(), prefill_load_estimator.clone(),
...@@ -634,10 +657,13 @@ pub fn run_mocker_trace_replay( ...@@ -634,10 +657,13 @@ pub fn run_mocker_trace_replay(
trace_block_size, trace_block_size,
max_in_flight, max_in_flight,
router_mode, router_mode,
trace_format,
trace_shared_prefix_ratio,
trace_num_prefix_groups,
) )
} }
("offline", None) => { ("offline", None) => {
dynamo_mocker::replay::simulate_trace_file_disagg_with_router_mode( dynamo_mocker::replay::simulate_trace_file_disagg_with_router_mode_and_format(
*config, *config,
router_config.clone(), router_config.clone(),
prefill_load_estimator.clone(), prefill_load_estimator.clone(),
...@@ -645,6 +671,9 @@ pub fn run_mocker_trace_replay( ...@@ -645,6 +671,9 @@ pub fn run_mocker_trace_replay(
trace_block_size, trace_block_size,
arrival_speedup_ratio, arrival_speedup_ratio,
router_mode, router_mode,
trace_format,
trace_shared_prefix_ratio,
trace_num_prefix_groups,
) )
} }
("online", _) => anyhow::bail!("disagg replay only supports replay_mode='offline'"), ("online", _) => anyhow::bail!("disagg replay only supports replay_mode='offline'"),
...@@ -1073,6 +1102,21 @@ fn parse_replay_router_mode( ...@@ -1073,6 +1102,21 @@ fn parse_replay_router_mode(
} }
} }
fn parse_trace_file_format(
trace_format: &str,
) -> PyResult<dynamo_mocker::loadgen::TraceFileFormat> {
match trace_format {
"mooncake" => Ok(dynamo_mocker::loadgen::TraceFileFormat::Mooncake),
"applied_compute_agentic" => {
Ok(dynamo_mocker::loadgen::TraceFileFormat::AppliedComputeAgentic)
}
other => Err(PyException::new_err(format!(
"trace_format must be either 'mooncake' or 'applied_compute_agentic', got '{}'",
other
))),
}
}
fn parse_replay_concurrency(replay_concurrency: Option<isize>) -> anyhow::Result<Option<usize>> { fn parse_replay_concurrency(replay_concurrency: Option<isize>) -> anyhow::Result<Option<usize>> {
match replay_concurrency { match replay_concurrency {
Some(value) if value < 1 => anyhow::bail!("replay_concurrency must be at least 1"), Some(value) if value < 1 => anyhow::bail!("replay_concurrency must be at least 1"),
......
...@@ -1558,6 +1558,9 @@ def run_mocker_trace_replay( ...@@ -1558,6 +1558,9 @@ def run_mocker_trace_replay(
router_mode: Literal["round_robin", "kv_router"] = "round_robin", router_mode: Literal["round_robin", "kv_router"] = "round_robin",
arrival_speedup_ratio: float = 1.0, arrival_speedup_ratio: float = 1.0,
trace_block_size: int = 512, trace_block_size: int = 512,
trace_format: Literal["mooncake", "applied_compute_agentic"] = "mooncake",
trace_shared_prefix_ratio: float = 0.0,
trace_num_prefix_groups: int = 0,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Replay a mocker trace file and return the simulation report for aggregated vLLM or SGLang configs.""" """Replay a mocker trace file and return the simulation report for aggregated vLLM or SGLang configs."""
... ...
......
...@@ -60,6 +60,9 @@ def run_mocker_trace_replay( ...@@ -60,6 +60,9 @@ def run_mocker_trace_replay(
router_mode="round_robin", router_mode="round_robin",
arrival_speedup_ratio=1.0, arrival_speedup_ratio=1.0,
trace_block_size=512, trace_block_size=512,
trace_format="mooncake",
trace_shared_prefix_ratio=0.0,
trace_num_prefix_groups=0,
): ):
return _run_mocker_trace_replay( return _run_mocker_trace_replay(
trace_file, trace_file,
...@@ -71,4 +74,7 @@ def run_mocker_trace_replay( ...@@ -71,4 +74,7 @@ def run_mocker_trace_replay(
router_mode=router_mode, router_mode=router_mode,
arrival_speedup_ratio=arrival_speedup_ratio, arrival_speedup_ratio=arrival_speedup_ratio,
trace_block_size=trace_block_size, trace_block_size=trace_block_size,
trace_format=trace_format,
trace_shared_prefix_ratio=trace_shared_prefix_ratio,
trace_num_prefix_groups=trace_num_prefix_groups,
) )
...@@ -23,6 +23,9 @@ def run_trace_replay( ...@@ -23,6 +23,9 @@ def run_trace_replay(
router_mode="round_robin", router_mode="round_robin",
arrival_speedup_ratio=1.0, arrival_speedup_ratio=1.0,
trace_block_size=512, trace_block_size=512,
trace_format="mooncake",
trace_shared_prefix_ratio=0.0,
trace_num_prefix_groups=0,
): ):
return _run_mocker_trace_replay( return _run_mocker_trace_replay(
trace_file, trace_file,
...@@ -39,6 +42,9 @@ def run_trace_replay( ...@@ -39,6 +42,9 @@ def run_trace_replay(
router_mode=router_mode, router_mode=router_mode,
arrival_speedup_ratio=arrival_speedup_ratio, arrival_speedup_ratio=arrival_speedup_ratio,
trace_block_size=trace_block_size, trace_block_size=trace_block_size,
trace_format=trace_format,
trace_shared_prefix_ratio=trace_shared_prefix_ratio,
trace_num_prefix_groups=trace_num_prefix_groups,
) )
......
...@@ -418,12 +418,30 @@ def main(argv: Sequence[str] | None = None) -> int: ...@@ -418,12 +418,30 @@ def main(argv: Sequence[str] | None = None) -> int:
default="round_robin", default="round_robin",
) )
parser.add_argument("--arrival-speedup-ratio", type=float, default=1.0) parser.add_argument("--arrival-speedup-ratio", type=float, default=1.0)
parser.add_argument(
"--trace-format",
choices=("mooncake", "applied_compute_agentic"),
default="mooncake",
help="format of trace_file when replaying from a file",
)
parser.add_argument( parser.add_argument(
"--trace-block-size", "--trace-block-size",
type=int, type=int,
default=512, default=512,
help="tokens represented by each hash_id in the trace file; only used for file replay", help="tokens represented by each hash_id in the trace file; only used for file replay",
) )
parser.add_argument(
"--trace-shared-prefix-ratio",
type=float,
default=0.0,
help="fraction of the initial prompt blocks to share across sessions for applied_compute_agentic trace replay",
)
parser.add_argument(
"--trace-num-prefix-groups",
type=int,
default=0,
help="number of cross-session shared-prefix groups for applied_compute_agentic trace replay",
)
parser.add_argument( parser.add_argument(
"--report-json", "--report-json",
help="path to save the full replay report JSON; defaults to a timestamped file in the current directory", help="path to save the full replay report JSON; defaults to a timestamped file in the current directory",
...@@ -459,6 +477,14 @@ def main(argv: Sequence[str] | None = None) -> int: ...@@ -459,6 +477,14 @@ def main(argv: Sequence[str] | None = None) -> int:
parser.error( parser.error(
"synthetic replay requires --input-tokens, --output-tokens, and --request-count" "synthetic replay requires --input-tokens, --output-tokens, and --request-count"
) )
if (
using_trace_file
and args.trace_format == "applied_compute_agentic"
and args.replay_concurrency is None
):
parser.error(
"--trace-format=applied_compute_agentic requires --replay-concurrency because the source traces do not include first-turn timestamps"
)
extra_engine_args = _load_engine_args(args.extra_engine_args) extra_engine_args = _load_engine_args(args.extra_engine_args)
prefill_engine_args = _load_engine_args(args.prefill_engine_args) prefill_engine_args = _load_engine_args(args.prefill_engine_args)
...@@ -531,6 +557,9 @@ def main(argv: Sequence[str] | None = None) -> int: ...@@ -531,6 +557,9 @@ def main(argv: Sequence[str] | None = None) -> int:
router_mode=args.router_mode, router_mode=args.router_mode,
arrival_speedup_ratio=args.arrival_speedup_ratio, arrival_speedup_ratio=args.arrival_speedup_ratio,
trace_block_size=args.trace_block_size, trace_block_size=args.trace_block_size,
trace_format=args.trace_format,
trace_shared_prefix_ratio=args.trace_shared_prefix_ratio,
trace_num_prefix_groups=args.trace_num_prefix_groups,
) )
else: else:
report = run_synthetic_trace_replay( report = run_synthetic_trace_replay(
......
...@@ -152,6 +152,33 @@ def _write_multiturn_trace(tmp_path): ...@@ -152,6 +152,33 @@ def _write_multiturn_trace(tmp_path):
return trace_path return trace_path
def _write_applied_compute_agentic_trace(tmp_path):
trace_path = tmp_path / "applied_compute_agentic_trace.jsonl"
records = [
{
"num_turns": 2,
"input_prompt_length": 64,
"assistant_response_length": [2, 2],
"tool_call_output_length": [2, 2],
"tool_call_latency": [0.001, 0.001],
"final_assistant_response_length": 2,
},
{
"num_turns": 1,
"input_prompt_length": 64,
"assistant_response_length": [2],
"tool_call_output_length": [2],
"tool_call_latency": [0.001],
"final_assistant_response_length": 2,
},
]
trace_path.write_text(
"\n".join(json.dumps(record) for record in records) + "\n",
encoding="utf-8",
)
return trace_path
def _write_cli_smoke_trace(tmp_path): def _write_cli_smoke_trace(tmp_path):
trace_path = tmp_path / "cli_smoke_trace.jsonl" trace_path = tmp_path / "cli_smoke_trace.jsonl"
records = [] records = []
......
...@@ -16,6 +16,7 @@ from .replay_utils import ( ...@@ -16,6 +16,7 @@ from .replay_utils import (
_router_config, _router_config,
_sglang_args, _sglang_args,
_vllm_args, _vllm_args,
_write_applied_compute_agentic_trace,
_write_multiturn_trace, _write_multiturn_trace,
_write_trace_and_args, _write_trace_and_args,
) )
...@@ -125,6 +126,45 @@ def test_run_trace_replay_supports_multiturn_sessions(tmp_path, replay_mode): ...@@ -125,6 +126,45 @@ def test_run_trace_replay_supports_multiturn_sessions(tmp_path, replay_mode):
) )
@pytest.mark.parametrize("replay_mode", ["offline", "online"])
def test_run_trace_replay_supports_applied_compute_agentic_format_with_concurrency(
tmp_path, replay_mode
):
trace_path = _write_applied_compute_agentic_trace(tmp_path)
report = run_trace_replay(
trace_path,
extra_engine_args=_vllm_args(),
num_workers=2,
replay_concurrency=2,
replay_mode=replay_mode,
router_mode="kv_router",
trace_format="applied_compute_agentic",
trace_shared_prefix_ratio=0.5,
trace_num_prefix_groups=1,
)
assert report["num_requests"] == 5
assert report["completed_requests"] == 5
assert report["total_input_tokens"] == 64 + 68 + 72 + 64 + 68
assert report["total_output_tokens"] == 10
def test_run_trace_replay_rejects_applied_compute_agentic_format_without_concurrency(
tmp_path,
):
trace_path = _write_applied_compute_agentic_trace(tmp_path)
with pytest.raises(Exception, match="replay_concurrency"):
run_trace_replay(
trace_path,
extra_engine_args=_vllm_args(),
num_workers=2,
replay_mode="offline",
trace_format="applied_compute_agentic",
)
@pytest.mark.parametrize("replay_mode", ["offline", "online"]) @pytest.mark.parametrize("replay_mode", ["offline", "online"])
def test_run_trace_replay_supports_distinct_trace_and_engine_block_sizes( def test_run_trace_replay_supports_distinct_trace_and_engine_block_sizes(
tmp_path, replay_mode tmp_path, replay_mode
......
...@@ -164,12 +164,8 @@ impl ActiveSequence { ...@@ -164,12 +164,8 @@ impl ActiveSequence {
/// Positional lineage hashes for all fully-tokenised blocks in the sequence. /// Positional lineage hashes for all fully-tokenised blocks in the sequence.
/// Mirrors `block_hashes()` but returns the PLH identity used by kvbm-logical. /// Mirrors `block_hashes()` but returns the PLH identity used by kvbm-logical.
pub fn positional_lineage_hashes(&self) -> Vec<PositionalLineageHash> { pub fn positional_lineage_hashes(&self) -> &[PositionalLineageHash] {
self.tokens &self.plhs
.blocks()
.iter()
.map(|block| block.positional_lineage_hash())
.collect()
} }
/// Commit a successful allocation by advancing `num_allocated_tokens`. /// Commit a successful allocation by advancing `num_allocated_tokens`.
......
...@@ -30,7 +30,6 @@ ...@@ -30,7 +30,6 @@
//! - `Lru` — simple recency-based LRU. //! - `Lru` — simple recency-based LRU.
//! - `MultiLru` — 4-tier frequency-aware LRU (requires TinyLFU tracker). //! - `MultiLru` — 4-tier frequency-aware LRU (requires TinyLFU tracker).
use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use dynamo_kv_router::protocols::{ use dynamo_kv_router::protocols::{
...@@ -42,6 +41,7 @@ use dynamo_tokens::{BlockHash, PositionalLineageHash, SequenceHash}; ...@@ -42,6 +41,7 @@ use dynamo_tokens::{BlockHash, PositionalLineageHash, SequenceHash};
use kvbm_logical::registry::BlockRegistry; use kvbm_logical::registry::BlockRegistry;
use kvbm_logical::tinylfu::TinyLFUTracker; use kvbm_logical::tinylfu::TinyLFUTracker;
use kvbm_logical::{BlockManager, ImmutableBlock, MutableBlock}; use kvbm_logical::{BlockManager, ImmutableBlock, MutableBlock};
use rustc_hash::FxHashMap;
use uuid::Uuid; use uuid::Uuid;
use crate::common::kv_cache_trace; use crate::common::kv_cache_trace;
...@@ -79,13 +79,13 @@ pub struct KvManager { ...@@ -79,13 +79,13 @@ pub struct KvManager {
/// PartialBlocks (still filling tokens) held as `MutableBlock`. /// PartialBlocks (still filling tokens) held as `MutableBlock`.
/// Dropped blocks return to kvbm-logical's reset pool. /// Dropped blocks return to kvbm-logical's reset pool.
active_partial: HashMap<Uuid, MutableBlock<G1>>, active_partial: FxHashMap<Uuid, MutableBlock<G1>>,
/// FullBlocks held as `ImmutableBlock`, keyed by `SequenceHash`. The vec /// FullBlocks held as `ImmutableBlock`, keyed by `SequenceHash`. The vec
/// length is the mocker's reference count — each `Use` pushes a clone, /// length is the mocker's reference count — each `Use` pushes a clone,
/// each `Deref` pops one. When the vec empties, the block transitions to /// each `Deref` pops one. When the vec empties, the block transitions to
/// kvbm-logical's inactive pool (RAII return on drop of the last clone). /// kvbm-logical's inactive pool (RAII return on drop of the last clone).
active_full: HashMap<SequenceHash, Vec<ImmutableBlock<G1>>>, active_full: FxHashMap<SequenceHash, Vec<ImmutableBlock<G1>>>,
/// Shadow registry of (PLH → mocker u64 seq_hash) for every block that has /// Shadow registry of (PLH → mocker u64 seq_hash) for every block that has
/// been registered in kvbm-logical. kvbm-logical's registry is keyed by /// been registered in kvbm-logical. kvbm-logical's registry is keyed by
...@@ -93,7 +93,7 @@ pub struct KvManager { ...@@ -93,7 +93,7 @@ pub struct KvManager {
/// mocker's u64 `SequenceHash` on `UniqueBlock::FullBlock`. We keep this /// mocker's u64 `SequenceHash` on `UniqueBlock::FullBlock`. We keep this
/// map so we can emit router-compatible `Removed` events when kvbm-logical /// map so we can emit router-compatible `Removed` events when kvbm-logical
/// evicts inactive blocks as a side effect of `allocate_blocks_with_evictions`. /// evicts inactive blocks as a side effect of `allocate_blocks_with_evictions`.
registered_plhs: HashMap<PositionalLineageHash, SequenceHash>, registered_plhs: FxHashMap<PositionalLineageHash, SequenceHash>,
} }
impl KvManager { impl KvManager {
...@@ -152,9 +152,9 @@ impl KvManager { ...@@ -152,9 +152,9 @@ impl KvManager {
kv_event_publishers, kv_event_publishers,
dp_rank, dp_rank,
next_event_id: 0, next_event_id: 0,
active_partial: HashMap::new(), active_partial: FxHashMap::default(),
active_full: HashMap::new(), active_full: FxHashMap::default(),
registered_plhs: HashMap::new(), registered_plhs: FxHashMap::default(),
} }
} }
...@@ -554,14 +554,10 @@ impl KvManager { ...@@ -554,14 +554,10 @@ impl KvManager {
overlap += 1; overlap += 1;
continue; continue;
} }
let Some(plh) = plhs.get(i).copied() else { let Some(plh) = plhs.get(i) else {
break; break;
}; };
let presence = self if self.registered_plhs.contains_key(plh) {
.block_manager
.block_registry()
.check_presence::<G1>(&[plh]);
if presence.first().is_some_and(|(_, present)| *present) {
overlap += 1; overlap += 1;
} else { } else {
break; break;
......
...@@ -8,7 +8,8 @@ mod types; ...@@ -8,7 +8,8 @@ mod types;
pub use driver::WorkloadDriver; pub use driver::WorkloadDriver;
pub use types::{ pub use types::{
ArrivalSpec, DelaySpec, LengthSpec, ReadyTurn, ReplayRequestHashes, RouterSequence, ArrivalSpec, DelaySpec, LengthSpec, ReadyTurn, ReplayRequestHashes, RouterSequence,
SequenceHashMode, SessionPartitionSpec, SessionTrace, SyntheticTraceSpec, Trace, TurnTrace, SequenceHashMode, SessionPartitionSpec, SessionTrace, SyntheticTraceSpec, Trace,
TraceFileFormat, TurnTrace,
}; };
#[cfg(test)] #[cfg(test)]
......
...@@ -84,6 +84,87 @@ fn test_from_mooncake_defaults_missing_input_length_from_hash_capacity() { ...@@ -84,6 +84,87 @@ fn test_from_mooncake_defaults_missing_input_length_from_hash_capacity() {
assert_eq!(trace.sessions[0].turns[0].input_length, 8); assert_eq!(trace.sessions[0].turns[0].input_length, 8);
} }
#[test]
fn test_from_applied_compute_agentic_expands_rows_into_num_turns_plus_final_request() {
let file = write_trace(&[serde_json::json!({
"num_turns": 2,
"input_prompt_length": 100,
"assistant_response_length": [10, 20],
"tool_call_output_length": [30, 40],
"tool_call_latency": [0.5, 1.25],
"final_assistant_response_length": 50,
})]);
let trace = Trace::from_applied_compute_agentic(file.path(), 64, 0.0, 0).unwrap();
assert_eq!(trace.sessions.len(), 1);
let session = &trace.sessions[0];
assert_eq!(session.first_arrival_timestamp_ms, None);
assert_eq!(session.turns.len(), 3);
assert_eq!(session.turns[0].input_length, 100);
assert_eq!(session.turns[0].max_output_tokens, 10);
assert_eq!(session.turns[0].delay_after_previous_ms, 0.0);
assert_eq!(session.turns[1].input_length, 140);
assert_eq!(session.turns[1].max_output_tokens, 20);
assert_eq!(session.turns[1].delay_after_previous_ms, 500.0);
assert_eq!(session.turns[2].input_length, 200);
assert_eq!(session.turns[2].max_output_tokens, 50);
assert_eq!(session.turns[2].delay_after_previous_ms, 1250.0);
}
#[test]
fn test_from_applied_compute_agentic_prefix_extends_hashes_across_turns() {
let file = write_trace(&[serde_json::json!({
"num_turns": 2,
"input_prompt_length": 600,
"assistant_response_length": [40, 50],
"tool_call_output_length": [40, 50],
"tool_call_latency": [0.1, 0.2],
"final_assistant_response_length": 60,
})]);
let trace = Trace::from_applied_compute_agentic(file.path(), 256, 0.0, 0).unwrap();
let turns = &trace.sessions[0].turns;
assert_eq!(turns[0].hash_ids, vec![1, 2, 3]);
assert_eq!(turns[1].hash_ids, vec![1, 2, 3]);
assert_eq!(turns[2].hash_ids, vec![1, 2, 3, 4]);
}
#[test]
fn test_from_applied_compute_agentic_can_share_initial_prefix_blocks_across_sessions() {
let file = write_trace(&[
serde_json::json!({
"num_turns": 1,
"input_prompt_length": 600,
"assistant_response_length": [10],
"tool_call_output_length": [10],
"tool_call_latency": [0.1],
"final_assistant_response_length": 10,
}),
serde_json::json!({
"num_turns": 1,
"input_prompt_length": 600,
"assistant_response_length": [20],
"tool_call_output_length": [20],
"tool_call_latency": [0.2],
"final_assistant_response_length": 20,
}),
]);
let trace = Trace::from_applied_compute_agentic(file.path(), 256, 0.5, 1).unwrap();
assert_eq!(
trace.sessions[0].turns[0].hash_ids[0],
trace.sessions[1].turns[0].hash_ids[0]
);
assert_eq!(
trace.sessions[0].turns[0].hash_ids[1],
trace.sessions[1].turns[0].hash_ids[1]
);
assert_ne!(
trace.sessions[0].turns[0].hash_ids[2],
trace.sessions[1].turns[0].hash_ids[2]
);
}
#[test] #[test]
fn test_turn_to_direct_request_repeats_hash_ids_by_block_size() { fn test_turn_to_direct_request_repeats_hash_ids_by_block_size() {
let turn = TurnTrace { let turn = TurnTrace {
......
...@@ -45,6 +45,16 @@ struct RawMooncakeRecord { ...@@ -45,6 +45,16 @@ struct RawMooncakeRecord {
delay_ms: Option<f64>, delay_ms: Option<f64>,
} }
#[derive(Debug, Deserialize)]
struct RawAppliedComputeAgenticRecord {
num_turns: usize,
input_prompt_length: usize,
assistant_response_length: Vec<usize>,
tool_call_output_length: Vec<usize>,
tool_call_latency: Vec<f64>,
final_assistant_response_length: usize,
}
impl TurnTrace { impl TurnTrace {
fn validate_block_size_and_capacity(&self, trace_block_size: usize) -> Result<()> { fn validate_block_size_and_capacity(&self, trace_block_size: usize) -> Result<()> {
if trace_block_size == 0 { if trace_block_size == 0 {
...@@ -262,6 +272,157 @@ impl Trace { ...@@ -262,6 +272,157 @@ impl Trace {
}) })
} }
pub fn from_applied_compute_agentic(
path: &Path,
trace_block_size: usize,
shared_prefix_ratio: f64,
num_prefix_groups: usize,
) -> Result<Self> {
if trace_block_size == 0 {
bail!("trace_block_size must be greater than 0");
}
if !(0.0..=1.0).contains(&shared_prefix_ratio) {
bail!(
"shared_prefix_ratio must be between 0.0 and 1.0, got {}",
shared_prefix_ratio
);
}
let file = File::open(path)
.with_context(|| format!("failed to open trace file {}", path.display()))?;
let reader = BufReader::new(file);
let mut sessions = Vec::new();
let mut next_unique_hash = 1_u64;
for (line_idx, line) in reader.lines().enumerate() {
let line = line.with_context(|| {
format!(
"failed to read line {} from {}",
line_idx + 1,
path.display()
)
})?;
if line.trim().is_empty() {
continue;
}
let raw: RawAppliedComputeAgenticRecord =
serde_json::from_str(&line).with_context(|| {
format!(
"failed to parse line {} from {} as JSON",
line_idx + 1,
path.display()
)
})?;
for (name, values) in [
(
"assistant_response_length",
raw.assistant_response_length.len(),
),
("tool_call_output_length", raw.tool_call_output_length.len()),
("tool_call_latency", raw.tool_call_latency.len()),
] {
if values != raw.num_turns {
bail!(
"trace line {} field {} length {} does not match num_turns {}",
line_idx + 1,
name,
values,
raw.num_turns
);
}
}
if raw.input_prompt_length == 0 {
bail!(
"trace line {} input_prompt_length must be positive",
line_idx + 1
);
}
let group_id = if shared_prefix_ratio > 0.0 && num_prefix_groups > 0 {
Some(line_idx % num_prefix_groups)
} else {
None
};
let mut current_input_length = raw.input_prompt_length;
let mut hash_ids = Vec::new();
let shared_initial_blocks = ((current_input_length.div_ceil(trace_block_size) as f64)
* shared_prefix_ratio)
.round() as usize;
extend_applied_compute_agentic_hash_ids(
&mut hash_ids,
current_input_length,
trace_block_size,
shared_initial_blocks,
group_id,
&mut next_unique_hash,
)?;
let mut turns = Vec::with_capacity(raw.num_turns + 1);
let mut next_turn_delay_ms = 0.0;
for turn_idx in 0..raw.num_turns {
let tool_call_latency = raw.tool_call_latency[turn_idx];
if !tool_call_latency.is_finite() || tool_call_latency < 0.0 {
bail!(
"trace line {} tool_call_latency[{}] must be a finite non-negative number",
line_idx + 1,
turn_idx
);
}
turns.push(TurnTrace {
input_length: current_input_length,
max_output_tokens: raw.assistant_response_length[turn_idx],
hash_ids: hash_ids.clone(),
delay_after_previous_ms: next_turn_delay_ms,
});
current_input_length = current_input_length
.checked_add(raw.assistant_response_length[turn_idx])
.and_then(|value| value.checked_add(raw.tool_call_output_length[turn_idx]))
.ok_or_else(|| {
anyhow!(
"trace line {} cumulative input length overflow",
line_idx + 1
)
})?;
extend_applied_compute_agentic_hash_ids(
&mut hash_ids,
current_input_length,
trace_block_size,
shared_initial_blocks,
group_id,
&mut next_unique_hash,
)?;
next_turn_delay_ms = tool_call_latency * 1000.0;
}
turns.push(TurnTrace {
input_length: current_input_length,
max_output_tokens: raw.final_assistant_response_length,
hash_ids,
delay_after_previous_ms: next_turn_delay_ms,
});
sessions.push(SessionTrace {
session_id: format!("applied_compute_agentic_session_{}", line_idx + 1),
first_arrival_timestamp_ms: None,
turns,
});
}
if sessions.is_empty() {
bail!("trace file {} did not contain any requests", path.display());
}
Ok(Self {
block_size: trace_block_size,
sessions,
})
}
pub fn synthetic(spec: SyntheticTraceSpec) -> Result<Self> { pub fn synthetic(spec: SyntheticTraceSpec) -> Result<Self> {
if spec.block_size == 0 { if spec.block_size == 0 {
bail!("block_size must be greater than 0"); bail!("block_size must be greater than 0");
...@@ -726,6 +887,31 @@ impl Trace { ...@@ -726,6 +887,31 @@ impl Trace {
} }
} }
fn extend_applied_compute_agentic_hash_ids(
hash_ids: &mut Vec<u64>,
input_length: usize,
trace_block_size: usize,
shared_initial_blocks: usize,
group_id: Option<usize>,
next_unique_hash: &mut u64,
) -> Result<()> {
let target_blocks = input_length.div_ceil(trace_block_size);
while hash_ids.len() < target_blocks {
let block_idx = hash_ids.len();
if block_idx < shared_initial_blocks
&& let Some(group_id) = group_id
{
hash_ids.push(0xA63E_0000_0000_0000 | ((group_id as u64) << 32) | block_idx as u64);
continue;
}
hash_ids.push(*next_unique_hash);
*next_unique_hash = next_unique_hash
.checked_add(1)
.ok_or_else(|| anyhow!("synthetic hash id overflow"))?;
}
Ok(())
}
fn arrival_spec_mean_gap_ms(spec: &ArrivalSpec) -> Result<f64> { fn arrival_spec_mean_gap_ms(spec: &ArrivalSpec) -> Result<f64> {
match spec { match spec {
ArrivalSpec::Burst => Ok(0.0), ArrivalSpec::Burst => Ok(0.0),
......
...@@ -14,6 +14,12 @@ pub struct Trace { ...@@ -14,6 +14,12 @@ pub struct Trace {
pub sessions: Vec<SessionTrace>, pub sessions: Vec<SessionTrace>,
} }
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TraceFileFormat {
Mooncake,
AppliedComputeAgentic,
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct SessionTrace { pub struct SessionTrace {
pub session_id: String, pub session_id: String,
......
...@@ -18,7 +18,25 @@ use super::{ ...@@ -18,7 +18,25 @@ use super::{
TraceSimulationReport, TraceSimulationReport,
}; };
use crate::common::protocols::{DirectRequest, MockEngineArgs}; use crate::common::protocols::{DirectRequest, MockEngineArgs};
use crate::loadgen::Trace; use crate::loadgen::{Trace, TraceFileFormat};
fn load_trace_from_file(
trace_path: &Path,
trace_block_size: usize,
trace_format: TraceFileFormat,
trace_shared_prefix_ratio: f64,
trace_num_prefix_groups: usize,
) -> Result<Trace> {
match trace_format {
TraceFileFormat::Mooncake => Trace::from_mooncake(trace_path, trace_block_size),
TraceFileFormat::AppliedComputeAgentic => Trace::from_applied_compute_agentic(
trace_path,
trace_block_size,
trace_shared_prefix_ratio,
trace_num_prefix_groups,
),
}
}
pub fn generate_trace_worker_artifacts_offline( pub fn generate_trace_worker_artifacts_offline(
args: MockEngineArgs, args: MockEngineArgs,
...@@ -57,12 +75,52 @@ pub fn simulate_trace_file_with_router_mode( ...@@ -57,12 +75,52 @@ pub fn simulate_trace_file_with_router_mode(
num_workers: usize, num_workers: usize,
arrival_speedup_ratio: f64, arrival_speedup_ratio: f64,
router_mode: ReplayRouterMode, router_mode: ReplayRouterMode,
) -> Result<TraceSimulationReport> {
simulate_trace_file_with_router_mode_and_format(
args,
router_config,
prefill_load_estimator,
trace_path,
trace_block_size,
num_workers,
arrival_speedup_ratio,
router_mode,
TraceFileFormat::Mooncake,
0.0,
0,
)
}
#[allow(clippy::too_many_arguments)]
pub fn simulate_trace_file_with_router_mode_and_format(
args: MockEngineArgs,
router_config: Option<KvRouterConfig>,
prefill_load_estimator: Option<ReplayPrefillLoadEstimator>,
trace_path: &Path,
trace_block_size: usize,
num_workers: usize,
arrival_speedup_ratio: f64,
router_mode: ReplayRouterMode,
trace_format: TraceFileFormat,
trace_shared_prefix_ratio: f64,
trace_num_prefix_groups: usize,
) -> Result<TraceSimulationReport> { ) -> Result<TraceSimulationReport> {
let args = args.normalized()?; let args = args.normalized()?;
validate_offline_replay_args(&args, num_workers, router_mode)?; validate_offline_replay_args(&args, num_workers, router_mode)?;
let trace = Trace::from_mooncake(trace_path, trace_block_size)? if trace_format == TraceFileFormat::AppliedComputeAgentic {
.normalize_session_starts()? bail!(
.speed_up_timing(arrival_speedup_ratio)?; "applied_compute_agentic trace format requires replay_concurrency because source traces do not contain first-turn timestamps"
);
}
let trace = load_trace_from_file(
trace_path,
trace_block_size,
trace_format,
trace_shared_prefix_ratio,
trace_num_prefix_groups,
)?
.normalize_session_starts()?
.speed_up_timing(arrival_speedup_ratio)?;
let started_at = Instant::now(); let started_at = Instant::now();
let report = crate::replay::offline::simulate_trace_workload( let report = crate::replay::offline::simulate_trace_workload(
args, args,
...@@ -83,12 +141,50 @@ pub fn simulate_trace_file_disagg_with_router_mode( ...@@ -83,12 +141,50 @@ pub fn simulate_trace_file_disagg_with_router_mode(
trace_block_size: usize, trace_block_size: usize,
arrival_speedup_ratio: f64, arrival_speedup_ratio: f64,
router_mode: ReplayRouterMode, router_mode: ReplayRouterMode,
) -> Result<TraceSimulationReport> {
simulate_trace_file_disagg_with_router_mode_and_format(
config,
router_config,
prefill_load_estimator,
trace_path,
trace_block_size,
arrival_speedup_ratio,
router_mode,
TraceFileFormat::Mooncake,
0.0,
0,
)
}
#[allow(clippy::too_many_arguments)]
pub fn simulate_trace_file_disagg_with_router_mode_and_format(
config: OfflineDisaggReplayConfig,
router_config: Option<KvRouterConfig>,
prefill_load_estimator: Option<ReplayPrefillLoadEstimator>,
trace_path: &Path,
trace_block_size: usize,
arrival_speedup_ratio: f64,
router_mode: ReplayRouterMode,
trace_format: TraceFileFormat,
trace_shared_prefix_ratio: f64,
trace_num_prefix_groups: usize,
) -> Result<TraceSimulationReport> { ) -> Result<TraceSimulationReport> {
let config = config.normalized()?; let config = config.normalized()?;
validate_offline_disagg_replay_args(&config, router_mode)?; validate_offline_disagg_replay_args(&config, router_mode)?;
let trace = Trace::from_mooncake(trace_path, trace_block_size)? if trace_format == TraceFileFormat::AppliedComputeAgentic {
.normalize_session_starts()? bail!(
.speed_up_timing(arrival_speedup_ratio)?; "applied_compute_agentic trace format requires replay_concurrency because source traces do not contain first-turn timestamps"
);
}
let trace = load_trace_from_file(
trace_path,
trace_block_size,
trace_format,
trace_shared_prefix_ratio,
trace_num_prefix_groups,
)?
.normalize_session_starts()?
.speed_up_timing(arrival_speedup_ratio)?;
let started_at = Instant::now(); let started_at = Instant::now();
let report = crate::replay::offline::simulate_trace_workload_disagg( let report = crate::replay::offline::simulate_trace_workload_disagg(
config, config,
...@@ -129,12 +225,52 @@ pub fn simulate_trace_live_file_with_router_mode( ...@@ -129,12 +225,52 @@ pub fn simulate_trace_live_file_with_router_mode(
num_workers: usize, num_workers: usize,
arrival_speedup_ratio: f64, arrival_speedup_ratio: f64,
router_mode: ReplayRouterMode, router_mode: ReplayRouterMode,
) -> Result<TraceSimulationReport> {
simulate_trace_live_file_with_router_mode_and_format(
args,
router_config,
prefill_load_estimator,
trace_path,
trace_block_size,
num_workers,
arrival_speedup_ratio,
router_mode,
TraceFileFormat::Mooncake,
0.0,
0,
)
}
#[allow(clippy::too_many_arguments)]
pub fn simulate_trace_live_file_with_router_mode_and_format(
args: MockEngineArgs,
router_config: Option<KvRouterConfig>,
prefill_load_estimator: Option<ReplayPrefillLoadEstimator>,
trace_path: &Path,
trace_block_size: usize,
num_workers: usize,
arrival_speedup_ratio: f64,
router_mode: ReplayRouterMode,
trace_format: TraceFileFormat,
trace_shared_prefix_ratio: f64,
trace_num_prefix_groups: usize,
) -> Result<TraceSimulationReport> { ) -> Result<TraceSimulationReport> {
let args = args.normalized()?; let args = args.normalized()?;
validate_online_replay_args(&args, num_workers)?; validate_online_replay_args(&args, num_workers)?;
let trace = Trace::from_mooncake(trace_path, trace_block_size)? if trace_format == TraceFileFormat::AppliedComputeAgentic {
.normalize_session_starts()? bail!(
.speed_up_timing(arrival_speedup_ratio)?; "applied_compute_agentic trace format requires replay_concurrency because source traces do not contain first-turn timestamps"
);
}
let trace = load_trace_from_file(
trace_path,
trace_block_size,
trace_format,
trace_shared_prefix_ratio,
trace_num_prefix_groups,
)?
.normalize_session_starts()?
.speed_up_timing(arrival_speedup_ratio)?;
online::simulate_trace_workload( online::simulate_trace_workload(
args, args,
router_config, router_config,
...@@ -288,10 +424,45 @@ pub fn simulate_concurrency_file_with_router_mode( ...@@ -288,10 +424,45 @@ pub fn simulate_concurrency_file_with_router_mode(
max_in_flight: usize, max_in_flight: usize,
num_workers: usize, num_workers: usize,
router_mode: ReplayRouterMode, router_mode: ReplayRouterMode,
) -> Result<TraceSimulationReport> {
simulate_concurrency_file_with_router_mode_and_format(
args,
router_config,
prefill_load_estimator,
trace_path,
trace_block_size,
max_in_flight,
num_workers,
router_mode,
TraceFileFormat::Mooncake,
0.0,
0,
)
}
#[allow(clippy::too_many_arguments)]
pub fn simulate_concurrency_file_with_router_mode_and_format(
args: MockEngineArgs,
router_config: Option<KvRouterConfig>,
prefill_load_estimator: Option<ReplayPrefillLoadEstimator>,
trace_path: &Path,
trace_block_size: usize,
max_in_flight: usize,
num_workers: usize,
router_mode: ReplayRouterMode,
trace_format: TraceFileFormat,
trace_shared_prefix_ratio: f64,
trace_num_prefix_groups: usize,
) -> Result<TraceSimulationReport> { ) -> Result<TraceSimulationReport> {
let args = args.normalized()?; let args = args.normalized()?;
validate_offline_concurrency_args(&args, num_workers, max_in_flight, router_mode)?; validate_offline_concurrency_args(&args, num_workers, max_in_flight, router_mode)?;
let trace = Trace::from_mooncake(trace_path, trace_block_size)?; let trace = load_trace_from_file(
trace_path,
trace_block_size,
trace_format,
trace_shared_prefix_ratio,
trace_num_prefix_groups,
)?;
let started_at = Instant::now(); let started_at = Instant::now();
let report = simulate_concurrency_workload_with_router_mode( let report = simulate_concurrency_workload_with_router_mode(
args, args,
...@@ -313,10 +484,43 @@ pub fn simulate_concurrency_file_disagg_with_router_mode( ...@@ -313,10 +484,43 @@ pub fn simulate_concurrency_file_disagg_with_router_mode(
trace_block_size: usize, trace_block_size: usize,
max_in_flight: usize, max_in_flight: usize,
router_mode: ReplayRouterMode, router_mode: ReplayRouterMode,
) -> Result<TraceSimulationReport> {
simulate_concurrency_file_disagg_with_router_mode_and_format(
config,
router_config,
prefill_load_estimator,
trace_path,
trace_block_size,
max_in_flight,
router_mode,
TraceFileFormat::Mooncake,
0.0,
0,
)
}
#[allow(clippy::too_many_arguments)]
pub fn simulate_concurrency_file_disagg_with_router_mode_and_format(
config: OfflineDisaggReplayConfig,
router_config: Option<KvRouterConfig>,
prefill_load_estimator: Option<ReplayPrefillLoadEstimator>,
trace_path: &Path,
trace_block_size: usize,
max_in_flight: usize,
router_mode: ReplayRouterMode,
trace_format: TraceFileFormat,
trace_shared_prefix_ratio: f64,
trace_num_prefix_groups: usize,
) -> Result<TraceSimulationReport> { ) -> Result<TraceSimulationReport> {
let config = config.normalized()?; let config = config.normalized()?;
validate_offline_disagg_concurrency_args(&config, max_in_flight, router_mode)?; validate_offline_disagg_concurrency_args(&config, max_in_flight, router_mode)?;
let trace = Trace::from_mooncake(trace_path, trace_block_size)?; let trace = load_trace_from_file(
trace_path,
trace_block_size,
trace_format,
trace_shared_prefix_ratio,
trace_num_prefix_groups,
)?;
let started_at = Instant::now(); let started_at = Instant::now();
let report = simulate_concurrency_workload_disagg_with_router_mode( let report = simulate_concurrency_workload_disagg_with_router_mode(
config, config,
...@@ -358,10 +562,45 @@ pub fn simulate_concurrency_live_file_with_router_mode( ...@@ -358,10 +562,45 @@ pub fn simulate_concurrency_live_file_with_router_mode(
max_in_flight: usize, max_in_flight: usize,
num_workers: usize, num_workers: usize,
router_mode: ReplayRouterMode, router_mode: ReplayRouterMode,
) -> Result<TraceSimulationReport> {
simulate_concurrency_live_file_with_router_mode_and_format(
args,
router_config,
prefill_load_estimator,
trace_path,
trace_block_size,
max_in_flight,
num_workers,
router_mode,
TraceFileFormat::Mooncake,
0.0,
0,
)
}
#[allow(clippy::too_many_arguments)]
pub fn simulate_concurrency_live_file_with_router_mode_and_format(
args: MockEngineArgs,
router_config: Option<KvRouterConfig>,
prefill_load_estimator: Option<ReplayPrefillLoadEstimator>,
trace_path: &Path,
trace_block_size: usize,
max_in_flight: usize,
num_workers: usize,
router_mode: ReplayRouterMode,
trace_format: TraceFileFormat,
trace_shared_prefix_ratio: f64,
trace_num_prefix_groups: usize,
) -> Result<TraceSimulationReport> { ) -> Result<TraceSimulationReport> {
let args = args.normalized()?; let args = args.normalized()?;
validate_online_concurrency_args(&args, num_workers, max_in_flight)?; validate_online_concurrency_args(&args, num_workers, max_in_flight)?;
let trace = Trace::from_mooncake(trace_path, trace_block_size)?; let trace = load_trace_from_file(
trace_path,
trace_block_size,
trace_format,
trace_shared_prefix_ratio,
trace_num_prefix_groups,
)?;
online::simulate_concurrency_workload( online::simulate_concurrency_workload(
args, args,
router_config, router_config,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment