"tests/vscode:/vscode.git/clone" did not exist on "6173682b6e98ae62f612db7af6813831097b23dc"
Unverified Commit 280df2a1 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

test(kv-router): add active sequences trace replay check (#8581)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 88478b4b
......@@ -2313,7 +2313,6 @@ name = "dynamo-bench"
version = "1.1.0"
dependencies = [
"anyhow",
"async-trait",
"clap 4.6.0",
"dynamo-kv-router",
"dynamo-mocker",
......@@ -2326,8 +2325,10 @@ dependencies = [
"reqwest 0.12.24",
"serde",
"serde_json",
"tempfile",
"tokio",
"tokio-util",
"tracing",
"tracing-subscriber",
"uuid",
]
......
......@@ -56,5 +56,6 @@ For basic model registration without KV routing, use `--router-mode round-robin`
- **[Disaggregated Serving](router-disaggregated-serving.md)**: Prefill and decode routing setups
- **[Router Operations](router-operations.md)**: Replicas, persistence, and recovery
- **[Router Examples](router-examples.md)**: Python API usage, K8s examples, and custom routing patterns
- **[Router Testing](router-testing.md)**: Test layers from Rust unit tests to fixture-backed replay and full process E2E
- **[Standalone Indexer](standalone-indexer.md)**: Run the KV indexer as a separate service for independent scaling
- **[Router Design](../../design-docs/router-design.md)**: Architecture details, algorithms, and event transport modes
......@@ -217,5 +217,6 @@ You can also run the KV router as a standalone service (without the Dynamo front
- **[Disaggregated Serving](router-disaggregated-serving.md)**: Prefill and decode routing setups
- **[Router Operations](router-operations.md)**: Replicas, remote indexers, persistence, and recovery
- **[Router Examples](router-examples.md)**: Python API usage, K8s examples, and custom routing patterns
- **[Router Testing](router-testing.md)**: Recommended test layers for non-trivial router changes
- **[Standalone Indexer](standalone-indexer.md)**: Run the KV indexer as a separate service
- **[KV Event Replay — Dynamo vs vLLM](kv-event-replay-comparison.md)**: Gap detection and replay behavior
---
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
title: Router Testing
subtitle: Test layers for router changes
---
## Overview
The router has three useful test layers. When you add a non-trivial or potentially breaking feature, do not stop at the smallest local test by default. Consider extending the relevant layer or layers below so the change is covered at the same level where it can fail.
## 1. Rust Unit and Integration Tests
Use Rust tests in `lib/kv-router` and `lib/llm` for local correctness:
- cost-model math
- indexer behavior
- event application
- recovery and persistence logic
- sequence-tracking invariants
- remote-indexer query behavior
These tests should be the first line of defense for narrowly scoped logic. They are the right place to pin exact edge cases and regressions close to the implementation.
Examples:
- `lib/kv-router/src/indexer/tests.rs`
- `lib/kv-router/src/sequences/*`
- `lib/llm/src/kv_router/indexer/worker_query.rs`
Typical commands:
```bash
cargo test -p dynamo-kv-router
cargo test -p dynamo-llm --no-default-features
```
## 2. Bench-Backed E2E Invariant Tests
Use the fixture-backed tests in `lib/bench/tests` when you want a realistic replay path without launching the full router stack. These tests share the same replay machinery as the Mooncake and active-sequences benches, but run in the Rust test profile and assert invariants instead of reporting benchmark numbers.
Current coverage uses the checked-in 1000-line Mooncake trace fixture:
- [active_sequences_trace.rs](../../../lib/bench/tests/active_sequences_trace.rs)
- [mooncake_trace.rs](../../../lib/bench/tests/mooncake_trace.rs)
- [mooncake_trace_1000.jsonl](../../../lib/bench/testdata/mooncake_trace_1000.jsonl)
These tests are useful for catching regressions such as:
- state not draining at the end of replay
- unexpected `WARN` or `ERROR` logs on hot paths
- duplicate-store or similar warning metrics
- indexer-specific replay behavior differences across implementations
Typical command:
```bash
cargo test --package dynamo-bench --all-targets
```
Use this layer when a feature changes router behavior over time, depends on realistic event orderings, or should hold across multiple indexer implementations.
## 3. Full Router E2E Process Tests
Use the Python tests in `tests/router` when you need the full request plane and event plane in play. These tests launch router and mocker or backend processes and exercise cross-process behavior that bench-backed replay cannot cover.
Current entry points include:
- [test_router_e2e_with_mockers.py](../../../tests/router/test_router_e2e_with_mockers.py)
- [test_router_e2e_with_vllm.py](../../../tests/router/test_router_e2e_with_vllm.py)
- [test_router_e2e_with_trtllm.py](../../../tests/router/test_router_e2e_with_trtllm.py)
- [test_router_e2e_with_sglang.py](../../../tests/router/test_router_e2e_with_sglang.py)
Use this layer for changes involving:
- process boundaries
- request routing through the Dynamo frontend or router service
- worker registration and discovery
- event-plane transport and delivery
- backend integration behavior
- startup, recovery, or lifecycle flows
Typical command:
```bash
.venv/bin/python -m pytest tests/router/test_router_e2e_with_mockers.py
```
## Recommended Usage
When a router change is non-trivial or potentially breaking, consider the following default progression:
- Add or update Rust unit tests for the local logic.
- Add or update a bench-backed invariant test if the change affects replay ordering, indexer behavior, cache-event handling, or state-drain assumptions.
- Add or update a full `tests/router` E2E test if the change depends on real processes, transport, registration, or backend interaction.
Not every change needs all three layers. But if a change can break behavior outside a single module boundary, it usually deserves more than a unit test.
......@@ -21,11 +21,6 @@ name = "offline_replay_bench"
path = "offline_replay_bench.rs"
harness = false
[[bench]]
name = "kv_indexer_bench"
path = "kv_router/kv_indexer_bench.rs"
harness = false
[[bench]]
name = "mooncake_bench"
path = "kv_router/mooncake_bench.rs"
......@@ -49,12 +44,13 @@ tokio = { workspace = true }
dynamo-mocker = { workspace = true }
[dev-dependencies]
async-trait = { workspace = true }
dynamo-kv-router = { workspace = true, features = ["bench", "shard-metrics"] }
dynamo-kv-router = { workspace = true, features = ["bench", "metrics", "shard-metrics"] }
dynamo-tokens = { workspace = true }
minstant = "0.1.7"
plotters = { version = "0.3", default-features = false, features = ["svg_backend", "line_series", "point_series", "full_palette"] }
tokio = { workspace = true, features = ["rt", "macros", "time"] }
tokio-util = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
tempfile = { workspace = true }
uuid = { workspace = true }
......@@ -5,15 +5,11 @@
mod common;
use common::*;
#[path = "active_sequences_shared.rs"]
mod active_sequences_shared;
use active_sequences_shared::{generate_sequence_events, run_benchmark};
use clap::Parser;
use common::NoopSequencePublisher;
use dynamo_kv_router::protocols::{PrefillLoadHint, WorkerWithDpRank};
use dynamo_kv_router::{ActiveSequencesMultiWorker, SequenceRequest};
use dynamo_mocker::loadgen::Trace;
use dynamo_tokens::SequenceHash;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::time::{Duration, Instant};
#[derive(Parser, Debug)]
#[clap(
......@@ -29,483 +25,15 @@ struct Args {
sweep_output: String,
}
/// Pre-computed metadata for a request, stored before submission so the
/// output signal can look it up by UUID.
struct RequestMetadata {
block_hashes: Vec<SequenceHash>,
isl: usize,
output_length: u64,
}
/// A single timestamped entry in a worker's sequence trace.
#[derive(Clone)]
enum SequenceTraceEntry {
Add {
request_id: String,
block_hashes: Vec<SequenceHash>,
isl: usize,
output_length: u64,
},
PrefillComplete {
request_id: String,
},
Free {
request_id: String,
},
}
/// A timestamped sequence trace entry for benchmark replay.
#[derive(Clone)]
struct SequenceTrace {
entry: SequenceTraceEntry,
timestamp_us: u64,
}
/// Run requests through the mocker to produce sequence lifecycle events
/// (add / prefill_complete / free) with realistic timing.
///
/// For each worker we:
/// 1. Create a Scheduler with an output_tx channel (no KvCacheEventSink needed)
/// 2. Pre-compute block hashes for each request
/// 3. Drain OutputSignal: first signal per UUID → Add + PrefillComplete,
/// completed=true → Free
/// 4. Collect timestamps for later replay
async fn generate_sequence_events(
traces: &[Trace],
num_gpu_blocks: usize,
block_size: u32,
trace_simulation_duration_ms: u64,
) -> anyhow::Result<Vec<Vec<SequenceTrace>>> {
println!("Generating sequence events...");
let artifacts = generate_replay_artifacts(
traces,
num_gpu_blocks,
block_size,
trace_simulation_duration_ms,
)
.await?;
let mut all_traces = Vec::with_capacity(artifacts.len());
for artifact in artifacts {
let metadata = artifact
.requests
.iter()
.map(|request| {
(
request.uuid,
RequestMetadata {
block_hashes: request.replay_hashes.sequence_hashes.clone(),
isl: request.input_length,
output_length: request.output_length as u64,
},
)
})
.collect::<HashMap<_, _>>();
let mut entries = Vec::new();
let mut seen = HashMap::new();
for timed_signal in artifact.output_signals {
let signal = timed_signal.signal;
let request_id = signal.uuid.to_string();
if let std::collections::hash_map::Entry::Vacant(entry) = seen.entry(signal.uuid) {
entry.insert(());
if let Some(meta) = metadata.get(&signal.uuid) {
entries.push(SequenceTrace {
entry: SequenceTraceEntry::Add {
request_id: request_id.clone(),
block_hashes: meta.block_hashes.clone(),
isl: meta.isl,
output_length: meta.output_length,
},
timestamp_us: timed_signal.timestamp_us,
});
entries.push(SequenceTrace {
entry: SequenceTraceEntry::PrefillComplete {
request_id: request_id.clone(),
},
timestamp_us: timed_signal.timestamp_us,
});
}
}
if signal.completed {
entries.push(SequenceTrace {
entry: SequenceTraceEntry::Free { request_id },
timestamp_us: timed_signal.timestamp_us,
});
}
}
all_traces.push(entries);
}
let total_adds = all_traces
.iter()
.flatten()
.filter(|e| matches!(e.entry, SequenceTraceEntry::Add { .. }))
.count();
let total_frees = all_traces
.iter()
.flatten()
.filter(|e| matches!(e.entry, SequenceTraceEntry::Free { .. }))
.count();
println!("Add events: {}, Free events: {}", total_adds, total_frees);
Ok(all_traces)
}
/// Rescale sequence trace timestamps into the benchmark duration.
fn rescale_traces(
traces: &[Vec<SequenceTrace>],
benchmark_duration_ms: u64,
) -> Vec<Vec<SequenceTrace>> {
traces
.iter()
.map(|worker_trace| {
if worker_trace.is_empty() {
return Vec::new();
}
let max_ts = worker_trace
.last()
.map(|e| e.timestamp_us)
.unwrap_or(1)
.max(1);
let target_us = benchmark_duration_ms * 1000;
worker_trace
.iter()
.map(|entry| SequenceTrace {
entry: entry.entry.clone(),
timestamp_us: entry.timestamp_us * target_us / max_ts,
})
.collect()
})
.collect()
}
/// Run the benchmark: replay sequence trace entries against a shared
/// ActiveSequencesMultiWorker, measuring potential_blocks_and_tokens /
/// add_request / mark_prefill_completed / free latency.
async fn run_benchmark(
traces: &[Vec<SequenceTrace>],
block_size: u32,
benchmark_duration_ms: u64,
inference_worker_duplication_factor: usize,
) -> anyhow::Result<BenchmarkResults> {
let scaled = rescale_traces(traces, benchmark_duration_ms);
let num_trace_workers = scaled.len();
// Total bench workers = trace workers × duplication factor.
// Each gets a unique WorkerWithDpRank in the shared multi-worker.
let total_workers = num_trace_workers * inference_worker_duplication_factor;
let dp_range: HashMap<u64, (u32, u32)> =
(0..total_workers as u64).map(|id| (id, (0, 1))).collect();
let multi = Arc::new(ActiveSequencesMultiWorker::new(
NoopSequencePublisher,
block_size as usize,
dp_range,
false,
0,
"bench",
));
let total_entries: u64 = scaled.iter().map(|t| t.len() as u64).sum::<u64>()
* inference_worker_duplication_factor as u64;
// Count blocks before consuming traces
let total_blocks: usize = scaled
.iter()
.flat_map(|t| t.iter())
.map(|entry| match &entry.entry {
SequenceTraceEntry::Add { block_hashes, .. } => block_hashes.len(),
_ => 0,
})
.sum::<usize>()
* inference_worker_duplication_factor;
let progress = make_progress_bar(Some(total_entries));
let mut tasks = Vec::new();
for replica in 0..inference_worker_duplication_factor {
for (trace_idx, worker_trace) in scaled.iter().enumerate() {
let worker_id = (replica * num_trace_workers + trace_idx) as u64;
let worker = WorkerWithDpRank::from_worker_id(worker_id);
// Make request IDs unique per worker so the shared map has no conflicts
let trace = make_unique_trace(worker_trace, worker_id);
let progress = progress.clone();
let multi = Arc::clone(&multi);
tasks.push(tokio::spawn(async move {
let capacity = trace.len();
let mut latencies: Vec<u64> = Vec::with_capacity(capacity);
let mut target = Instant::now();
let mut iter = trace.into_iter().peekable();
let mut local_count: u64 = 0;
while let Some(entry) = iter.next() {
let entry_ts = entry.timestamp_us;
let start = minstant::Instant::now();
apply_entry(&multi, worker, entry.entry).await;
latencies.push(start.elapsed().as_nanos() as u64);
local_count += 1;
// Process all entries at the same timestamp
while iter.peek().is_some_and(|e| e.timestamp_us == entry_ts) {
let e = iter.next().unwrap();
let start = minstant::Instant::now();
apply_entry(&multi, worker, e.entry).await;
latencies.push(start.elapsed().as_nanos() as u64);
local_count += 1;
}
if let Some(next) = iter.peek() {
target += Duration::from_micros(next.timestamp_us - entry_ts);
}
if target > Instant::now() {
tokio::time::sleep_until(target).await;
}
if local_count > 100 {
progress.inc(local_count);
local_count = 0;
}
}
progress.inc(local_count);
Ok::<_, anyhow::Error>(latencies)
}));
}
}
let mut all_latencies = Vec::new();
for task in tasks {
all_latencies.extend(task.await??);
}
// Keep the post-run drain check out of the measured benchmark interval.
let total_duration = progress.elapsed();
multi.assert_completely_drained(Instant::now());
if total_duration > Duration::from_millis(benchmark_duration_ms * 11 / 10) {
eprintln!(
"WARNING: Benchmarker could not keep up. Rerun with a larger --benchmark-duration-ms."
);
}
let total_ops = all_latencies.len();
let offered_ops_throughput = total_ops as f32 / benchmark_duration_ms as f32 * 1000.0;
let ops_throughput = total_ops as f32 / total_duration.as_millis() as f32 * 1000.0;
let offered_block_throughput = total_blocks as f32 / benchmark_duration_ms as f32 * 1000.0;
let block_throughput = total_blocks as f32 / total_duration.as_millis() as f32 * 1000.0;
all_latencies.sort_unstable();
let latency_p99_us = if all_latencies.is_empty() {
0.0
} else {
all_latencies[all_latencies.len() * 99 / 100] as f32 / 1000.0
};
println!(
"Ops Throughput: offered={} ops/s achieved={} ops/s (potential_blocks_and_tokens + add + prefill_complete + free)",
offered_ops_throughput, ops_throughput
);
println!(
"Block Throughput: offered={} block ops/s achieved={} block ops/s",
offered_block_throughput, block_throughput
);
println!("Latency p99: {}us", latency_p99_us);
Ok(BenchmarkResults {
offered_ops_throughput,
ops_throughput,
offered_block_throughput,
block_throughput,
latency_p99_us,
})
}
/// Make request IDs unique by prefixing with the worker ID, so the shared
/// request_to_worker map has no conflicts when traces are duplicated.
fn make_unique_trace(trace: &[SequenceTrace], worker_id: u64) -> Vec<SequenceTrace> {
trace
.iter()
.map(|entry| {
let new_entry = match &entry.entry {
SequenceTraceEntry::Add {
request_id,
block_hashes,
isl,
output_length,
} => SequenceTraceEntry::Add {
request_id: format!("{worker_id}:{request_id}"),
block_hashes: block_hashes.clone(),
isl: *isl,
output_length: *output_length,
},
SequenceTraceEntry::PrefillComplete { request_id } => {
SequenceTraceEntry::PrefillComplete {
request_id: format!("{worker_id}:{request_id}"),
}
}
SequenceTraceEntry::Free { request_id } => SequenceTraceEntry::Free {
request_id: format!("{worker_id}:{request_id}"),
},
};
SequenceTrace {
entry: new_entry,
timestamp_us: entry.timestamp_us,
}
})
.collect()
}
async fn apply_entry(
multi: &ActiveSequencesMultiWorker<NoopSequencePublisher>,
worker: WorkerWithDpRank,
entry: SequenceTraceEntry,
) {
let decay_now = tokio::time::Instant::now();
match entry {
SequenceTraceEntry::Add {
request_id,
block_hashes,
isl,
output_length,
} => {
let _ = multi.potential_blocks_and_tokens(Some(&block_hashes), isl, HashMap::new());
let _ = multi.add_request(
SequenceRequest {
request_id,
token_sequence: Some(block_hashes),
track_prefill_tokens: true,
expected_output_tokens: Some(output_length as u32),
prefill_load_hint: Some(PrefillLoadHint {
initial_effective_prefill_tokens: isl,
expected_prefill_duration: None,
}),
worker,
lora_name: None,
},
decay_now,
);
}
SequenceTraceEntry::PrefillComplete { request_id } => {
let _ = multi.mark_prefill_completed(&request_id, decay_now);
}
SequenceTraceEntry::Free { request_id } => {
let _ = multi.free(&request_id, decay_now);
}
}
}
async fn run_tests() -> anyhow::Result<()> {
use std::fs::File;
use std::io::Write;
let path = std::env::temp_dir().join(format!(
"active_seq_bench_test_{}.jsonl",
std::process::id()
));
{
let mut f = File::create(&path)?;
writeln!(
f,
"{}",
serde_json::json!({
"session_id": "session-a",
"timestamp": 0,
"input_length": 4,
"hash_ids": [0u64, 1, 2, 3],
"output_length": 10u64,
})
)?;
writeln!(
f,
"{}",
serde_json::json!({
"session_id": "session-a",
"delay": 5.0,
"input_length": 4,
"hash_ids": [4u64, 5, 6, 7],
"output_length": 10u64,
})
)?;
}
let traces = process_mooncake_trace(path.to_str().unwrap(), 512, 1, 1, 1, 42)?;
std::fs::remove_file(&path).ok();
println!(
"Loaded {} workers, {} total requests",
traces.len(),
traces
.iter()
.map(|trace| trace
.sessions
.iter()
.map(|session| session.turns.len())
.sum::<usize>())
.sum::<usize>()
);
let seq_traces = generate_sequence_events(&traces, 1048576, 512, 100).await?;
let total_adds = seq_traces
.iter()
.flatten()
.filter(|e| matches!(e.entry, SequenceTraceEntry::Add { .. }))
.count();
let total_frees = seq_traces
.iter()
.flatten()
.filter(|e| matches!(e.entry, SequenceTraceEntry::Free { .. }))
.count();
assert!(total_adds > 0, "expected at least one Add event");
assert!(total_frees > 0, "expected at least one Free event");
assert_eq!(total_adds, total_frees, "adds and frees should match");
for trace in &seq_traces {
assert!(
trace
.windows(2)
.all(|window| window[1].timestamp_us >= window[0].timestamp_us)
);
}
let first_free_us = seq_traces[0]
.iter()
.find_map(|entry| match entry.entry {
SequenceTraceEntry::Free { .. } => Some(entry.timestamp_us),
_ => None,
})
.unwrap();
let second_add_us = seq_traces[0]
.iter()
.filter_map(|entry| match entry.entry {
SequenceTraceEntry::Add { .. } => Some(entry.timestamp_us),
_ => None,
})
.nth(1)
.unwrap();
assert!(second_add_us >= first_free_us);
println!("All tests passed.");
Ok(())
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = Args::parse();
init_sequence_logging(args.common.sequence_logs);
if args.common.test {
return run_tests().await;
anyhow::bail!(
"active_sequences_bench no longer supports --test; run `cargo test --package dynamo-bench --test active_sequences_trace` instead"
);
}
let path = match args.common.mooncake_trace_path.as_deref() {
......@@ -542,14 +70,19 @@ async fn main() -> anyhow::Result<()> {
let mut results: Vec<(u64, BenchmarkResults)> = Vec::new();
for &dur_ms in &durations {
println!("\n=== Sweep: benchmark_duration_ms = {} ===", dur_ms);
let result = run_benchmark(
let run = run_benchmark(
&seq_traces,
args.common.block_size,
dur_ms,
args.common.inference_worker_duplication_factor,
)
.await?;
results.push((dur_ms, result));
if !run.kept_up {
eprintln!(
"WARNING: Benchmarker could not keep up. Rerun with a larger --benchmark-duration-ms."
);
}
results.push((dur_ms, run.results));
}
print_sweep_summary("active-sequences", &results);
......@@ -557,13 +90,18 @@ async fn main() -> anyhow::Result<()> {
let all_results = vec![("active-sequences", results)];
plot_sweep(&all_results, &args.sweep_output)?;
} else {
run_benchmark(
let run = run_benchmark(
&seq_traces,
args.common.block_size,
args.common.benchmark_duration_ms,
args.common.inference_worker_duplication_factor,
)
.await?;
if !run.kept_up {
eprintln!(
"WARNING: Benchmarker could not keep up. Rerun with a larger --benchmark-duration-ms."
);
}
}
Ok(())
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::collections::HashMap;
use std::sync::Arc;
use dynamo_kv_router::protocols::{PrefillLoadHint, WorkerWithDpRank};
use dynamo_kv_router::{ActiveSequencesMultiWorker, SequenceRequest};
use dynamo_mocker::loadgen::Trace;
use dynamo_tokens::SequenceHash;
use tokio::time::{Duration, Instant};
use crate::common::{
BenchmarkRun, NoopSequencePublisher, compute_benchmark_run, generate_replay_artifacts,
make_progress_bar, rescale_trace_timestamps,
};
/// A single timestamped entry in a worker's sequence trace.
#[derive(Clone)]
pub enum SequenceTraceEntry {
Add {
request_id: String,
block_hashes: Vec<SequenceHash>,
isl: usize,
output_length: u64,
},
PrefillComplete {
request_id: String,
},
Free {
request_id: String,
},
}
/// A timestamped sequence trace entry for benchmark replay.
#[derive(Clone)]
pub struct SequenceTrace {
pub entry: SequenceTraceEntry,
pub timestamp_us: u64,
}
/// Pre-computed metadata for a request, stored before submission so the
/// output signal can look it up by UUID.
struct RequestMetadata {
block_hashes: Vec<SequenceHash>,
isl: usize,
output_length: u64,
}
/// Run requests through the mocker to produce sequence lifecycle events
/// (add / prefill_complete / free) with realistic timing.
pub async fn generate_sequence_events(
traces: &[Trace],
num_gpu_blocks: usize,
block_size: u32,
trace_simulation_duration_ms: Option<u64>,
) -> anyhow::Result<Vec<Vec<SequenceTrace>>> {
println!("Generating sequence events...");
let artifacts = generate_replay_artifacts(
traces,
num_gpu_blocks,
block_size,
trace_simulation_duration_ms,
)
.await?;
let mut all_traces = Vec::with_capacity(artifacts.len());
for artifact in artifacts {
let metadata = artifact
.requests
.iter()
.map(|request| {
(
request.uuid,
RequestMetadata {
block_hashes: request.replay_hashes.sequence_hashes.clone(),
isl: request.input_length,
output_length: request.output_length as u64,
},
)
})
.collect::<HashMap<_, _>>();
let mut entries = Vec::new();
let mut seen = HashMap::new();
for timed_signal in artifact.output_signals {
let signal = timed_signal.signal;
let request_id = signal.uuid.to_string();
if let std::collections::hash_map::Entry::Vacant(entry) = seen.entry(signal.uuid) {
entry.insert(());
if let Some(meta) = metadata.get(&signal.uuid) {
entries.push(SequenceTrace {
entry: SequenceTraceEntry::Add {
request_id: request_id.clone(),
block_hashes: meta.block_hashes.clone(),
isl: meta.isl,
output_length: meta.output_length,
},
timestamp_us: timed_signal.timestamp_us,
});
entries.push(SequenceTrace {
entry: SequenceTraceEntry::PrefillComplete {
request_id: request_id.clone(),
},
timestamp_us: timed_signal.timestamp_us,
});
}
}
if signal.completed {
entries.push(SequenceTrace {
entry: SequenceTraceEntry::Free { request_id },
timestamp_us: timed_signal.timestamp_us,
});
}
}
all_traces.push(entries);
}
let total_adds = all_traces
.iter()
.flatten()
.filter(|e| matches!(e.entry, SequenceTraceEntry::Add { .. }))
.count();
let total_frees = all_traces
.iter()
.flatten()
.filter(|e| matches!(e.entry, SequenceTraceEntry::Free { .. }))
.count();
println!("Add events: {}, Free events: {}", total_adds, total_frees);
Ok(all_traces)
}
/// Run the benchmark: replay sequence trace entries against a shared
/// ActiveSequencesMultiWorker, measuring potential_blocks_and_tokens /
/// add_request / mark_prefill_completed / free latency.
pub async fn run_benchmark(
traces: &[Vec<SequenceTrace>],
block_size: u32,
benchmark_duration_ms: u64,
inference_worker_duplication_factor: usize,
) -> anyhow::Result<BenchmarkRun> {
let scaled = rescale_trace_timestamps(
traces,
benchmark_duration_ms,
|entry| entry.timestamp_us,
|entry, timestamp_us| SequenceTrace {
entry: entry.entry.clone(),
timestamp_us,
},
);
let num_trace_workers = scaled.len();
let total_workers = num_trace_workers * inference_worker_duplication_factor;
let dp_range: HashMap<u64, (u32, u32)> =
(0..total_workers as u64).map(|id| (id, (0, 1))).collect();
let multi = Arc::new(ActiveSequencesMultiWorker::new(
NoopSequencePublisher,
block_size as usize,
dp_range,
false,
0,
"bench",
));
let total_entries: u64 = scaled.iter().map(|t| t.len() as u64).sum::<u64>()
* inference_worker_duplication_factor as u64;
let total_blocks: usize = scaled
.iter()
.flat_map(|t| t.iter())
.map(|entry| match &entry.entry {
SequenceTraceEntry::Add { block_hashes, .. } => block_hashes.len(),
_ => 0,
})
.sum::<usize>()
* inference_worker_duplication_factor;
let progress = make_progress_bar(Some(total_entries));
let mut tasks = Vec::new();
for replica in 0..inference_worker_duplication_factor {
for (trace_idx, worker_trace) in scaled.iter().enumerate() {
let worker_id = (replica * num_trace_workers + trace_idx) as u64;
let worker = WorkerWithDpRank::from_worker_id(worker_id);
let trace = make_unique_trace(worker_trace, worker_id);
let progress = progress.clone();
let multi = Arc::clone(&multi);
tasks.push(tokio::spawn(async move {
let capacity = trace.len();
let mut latencies: Vec<u64> = Vec::with_capacity(capacity);
let mut target = Instant::now();
let mut iter = trace.into_iter().peekable();
let mut local_count: u64 = 0;
while let Some(entry) = iter.next() {
let entry_ts = entry.timestamp_us;
let start = minstant::Instant::now();
apply_entry(&multi, worker, entry.entry).await;
latencies.push(start.elapsed().as_nanos() as u64);
local_count += 1;
while iter.peek().is_some_and(|e| e.timestamp_us == entry_ts) {
let e = iter.next().unwrap();
let start = minstant::Instant::now();
apply_entry(&multi, worker, e.entry).await;
latencies.push(start.elapsed().as_nanos() as u64);
local_count += 1;
}
if let Some(next) = iter.peek() {
target += Duration::from_micros(next.timestamp_us - entry_ts);
}
if target > Instant::now() {
tokio::time::sleep_until(target).await;
}
if local_count > 100 {
progress.inc(local_count);
local_count = 0;
}
}
progress.inc(local_count);
Ok::<_, anyhow::Error>(latencies)
}));
}
}
let mut all_latencies = Vec::new();
for task in tasks {
all_latencies.extend(task.await??);
}
let total_duration = progress.elapsed();
multi.assert_completely_drained(Instant::now());
let run = compute_benchmark_run(
all_latencies.len(),
total_blocks,
benchmark_duration_ms,
total_duration,
all_latencies,
);
println!(
"Ops Throughput: offered={} ops/s achieved={} ops/s (potential_blocks_and_tokens + add + prefill_complete + free)",
run.results.offered_ops_throughput, run.results.ops_throughput
);
println!(
"Block Throughput: offered={} block ops/s achieved={} block ops/s",
run.results.offered_block_throughput, run.results.block_throughput
);
println!("Latency p99: {}us", run.results.latency_p99_us);
Ok(run)
}
fn make_unique_trace(trace: &[SequenceTrace], worker_id: u64) -> Vec<SequenceTrace> {
trace
.iter()
.map(|entry| {
let new_entry = match &entry.entry {
SequenceTraceEntry::Add {
request_id,
block_hashes,
isl,
output_length,
} => SequenceTraceEntry::Add {
request_id: format!("{worker_id}:{request_id}"),
block_hashes: block_hashes.clone(),
isl: *isl,
output_length: *output_length,
},
SequenceTraceEntry::PrefillComplete { request_id } => {
SequenceTraceEntry::PrefillComplete {
request_id: format!("{worker_id}:{request_id}"),
}
}
SequenceTraceEntry::Free { request_id } => SequenceTraceEntry::Free {
request_id: format!("{worker_id}:{request_id}"),
},
};
SequenceTrace {
entry: new_entry,
timestamp_us: entry.timestamp_us,
}
})
.collect()
}
async fn apply_entry(
multi: &ActiveSequencesMultiWorker<NoopSequencePublisher>,
worker: WorkerWithDpRank,
entry: SequenceTraceEntry,
) {
let decay_now = tokio::time::Instant::now();
match entry {
SequenceTraceEntry::Add {
request_id,
block_hashes,
isl,
output_length,
} => {
let _ = multi.potential_blocks_and_tokens(Some(&block_hashes), isl, HashMap::new());
let _ = multi.add_request(
SequenceRequest {
request_id,
token_sequence: Some(block_hashes),
track_prefill_tokens: true,
expected_output_tokens: Some(output_length as u32),
prefill_load_hint: Some(PrefillLoadHint {
initial_effective_prefill_tokens: isl,
expected_prefill_duration: None,
}),
worker,
lora_name: None,
},
decay_now,
);
}
SequenceTraceEntry::PrefillComplete { request_id } => {
let _ = multi.mark_prefill_completed(&request_id, decay_now);
}
SequenceTraceEntry::Free { request_id } => {
let _ = multi.free(&request_id, decay_now);
}
}
}
......@@ -5,39 +5,35 @@
use std::time::Duration;
#[path = "shared.rs"]
mod shared;
use dynamo_kv_router::LocalBlockHash;
use dynamo_kv_router::protocols::{
ExternalSequenceBlockHash, KvCacheEvent, KvCacheEventData, KvCacheRemoveData, KvCacheStoreData,
KvCacheStoredBlockData, RouterEvent, WorkerId, XXH3_SEED, compute_seq_hash_for_block,
};
pub use dynamo_kv_router::test_utils::{NoopSequencePublisher, SimpleWorkerConfig};
use dynamo_mocker::common::protocols::MockEngineArgs;
use dynamo_mocker::loadgen::{
ArrivalSpec, DelaySpec, LengthSpec, ReplayRequestHashes, RouterSequence, SequenceHashMode,
SessionPartitionSpec, SyntheticTraceSpec, Trace,
};
pub use dynamo_mocker::replay::{
ReplayTimedKvEvent as TimedKvEvent, ReplayTimedOutputSignal as TimedOutputSignal,
ReplayTimedRequest as TimedReplayRequest, ReplayWorkerArtifacts as WorkerReplayArtifacts,
};
use dynamo_kv_router::protocols::XXH3_SEED;
use dynamo_mocker::loadgen::{ReplayRequestHashes, Trace};
use dynamo_tokens::compute_hash_v2;
use indicatif::{ProgressBar, ProgressStyle};
use plotters::prelude::*;
use rand::prelude::*;
use serde::{Deserialize, Serialize};
use std::fs::File;
use std::io::{BufRead, BufReader};
use tokio::task::JoinHandle;
use tracing_subscriber::EnvFilter;
use uuid::Uuid;
pub use shared::{
BenchmarkResults, BenchmarkRun, NoopSequencePublisher, WorkerReplayArtifacts,
compute_benchmark_run, default_mock_engine_args, generate_replay_artifacts, make_progress_bar,
process_mooncake_trace, rescale_trace_timestamps,
};
/// Shared CLI arguments for trace-based benchmarks.
#[derive(clap::Args, Debug)]
pub struct CommonArgs {
/// Path to a JSONL mooncake trace file.
pub mooncake_trace_path: Option<String>,
/// Run built-in self-tests instead of the benchmark.
/// Deprecated compatibility flag. Use `cargo test --package dynamo-bench --test ...`
/// for the fixture-backed integration tests instead.
#[clap(long)]
pub test: bool,
......@@ -49,9 +45,10 @@ pub struct CommonArgs {
#[clap(long, default_value = "128")]
pub block_size: u32,
/// Wall-clock duration (ms) over which the trace is replayed during event generation.
#[clap(long, default_value = "30000")]
pub trace_simulation_duration_ms: u64,
/// Optional wall-clock duration (ms) used to rescale the trace during event generation.
/// Omit to preserve the original Mooncake timestamps.
#[clap(long)]
pub trace_simulation_duration_ms: Option<u64>,
/// Wall-clock duration (ms) over which the benchmark replays operations.
#[clap(long, default_value = "60000")]
......@@ -313,171 +310,6 @@ pub fn local_block_hash_from_id(hash_id: u64, block_size: u32) -> LocalBlockHash
LocalBlockHash(compute_hash_v2(bytes, XXH3_SEED))
}
/// Create a styled progress bar, optionally with a known total length.
pub fn make_progress_bar(total: Option<u64>) -> ProgressBar {
let progress = match total {
Some(total) => ProgressBar::new(total),
None => ProgressBar::no_length(),
};
progress.set_style(
ProgressStyle::with_template(
"[{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len} ({eta}) {msg}",
)
.unwrap()
.progress_chars("#>-"),
);
progress
}
/// Results from a single benchmark run.
#[derive(Serialize)]
pub struct BenchmarkResults {
pub offered_ops_throughput: f32,
pub ops_throughput: f32,
pub offered_block_throughput: f32,
pub block_throughput: f32,
pub latency_p99_us: f32,
}
/// Load, transform, and partition the mooncake trace into per-worker request lists.
pub fn process_mooncake_trace(
path: &str,
block_size: u32,
trace_length_factor: usize,
trace_duplication_factor: usize,
num_workers: usize,
seed: u64,
) -> anyhow::Result<Vec<Trace>> {
let trace = Trace::from_mooncake(std::path::Path::new(path), block_size as usize)?
.expand_hash_prefix_depth(trace_length_factor)
.duplicate_hash_space(trace_duplication_factor);
Ok(trace.partition_by_session(SessionPartitionSpec::Random {
num_partitions: num_workers,
seed,
}))
}
/// Build default MockEngineArgs suitable for event generation.
pub fn default_mock_engine_args(
num_gpu_blocks: usize,
block_size: usize,
) -> anyhow::Result<MockEngineArgs> {
Ok(MockEngineArgs::builder()
.num_gpu_blocks(num_gpu_blocks)
.block_size(block_size)
.speedup_ratio(10.0)
.enable_prefix_caching(true)
.max_num_batched_tokens(None)
.max_num_seqs(None)
.build()?)
}
fn replay_worker_trace(
trace: Trace,
sched_args: MockEngineArgs,
trace_simulation_duration_ms: u64,
progress: ProgressBar,
) -> anyhow::Result<WorkerReplayArtifacts> {
let total_turns = trace
.sessions
.iter()
.map(|session| session.turns.len())
.sum::<usize>();
let artifacts = dynamo_mocker::replay::generate_trace_worker_artifacts_offline(
sched_args,
trace.rescale_ready_span(trace_simulation_duration_ms)?,
)?;
progress.inc(total_turns as u64);
Ok(artifacts)
}
pub async fn generate_replay_artifacts(
traces: &[Trace],
num_gpu_blocks: usize,
block_size: u32,
trace_simulation_duration_ms: u64,
) -> anyhow::Result<Vec<WorkerReplayArtifacts>> {
println!("Generating events...");
let sched_args = default_mock_engine_args(num_gpu_blocks, block_size as usize)?;
let progress = make_progress_bar(Some(
traces
.iter()
.map(|trace| {
trace
.sessions
.iter()
.map(|session| session.turns.len() as u64)
.sum::<u64>()
})
.sum::<u64>(),
));
let mut tasks: Vec<JoinHandle<anyhow::Result<WorkerReplayArtifacts>>> = Vec::new();
for trace in traces.iter().cloned() {
let sched_args = sched_args.clone();
let progress = progress.clone();
tasks.push(tokio::task::spawn_blocking(move || {
replay_worker_trace(trace, sched_args, trace_simulation_duration_ms, progress)
}));
}
let mut artifacts = Vec::new();
for task in tasks {
artifacts.push(task.await??);
}
for worker_events in artifacts.iter().map(|artifact| &artifact.kv_events) {
for i in 1..worker_events.len() {
assert!(worker_events[i].timestamp_us >= worker_events[i - 1].timestamp_us);
}
}
println!(
"Generated {} events. Processing...",
artifacts
.iter()
.map(|artifact| artifact.kv_events.len())
.sum::<usize>()
);
let mut num_stored_events = 0;
let mut num_removed_events = 0;
for event in artifacts
.iter()
.flat_map(|artifact| artifact.kv_events.iter())
{
match event.event.data {
KvCacheEventData::Stored(_) => num_stored_events += 1,
KvCacheEventData::Removed(_) => num_removed_events += 1,
_ => (),
}
}
println!("Store events: {}", num_stored_events);
println!("Remove events: {}", num_removed_events);
Ok(artifacts)
}
pub async fn generate_kv_events(
traces: &[Trace],
num_gpu_blocks: usize,
block_size: u32,
trace_simulation_duration_ms: u64,
) -> anyhow::Result<Vec<Vec<TimedKvEvent>>> {
Ok(generate_replay_artifacts(
traces,
num_gpu_blocks,
block_size,
trace_simulation_duration_ms,
)
.await?
.into_iter()
.map(|artifact| artifact.kv_events)
.collect())
}
pub fn plot_sweep(
all_results: &[(&str, Vec<(u64, BenchmarkResults)>)],
output_path: &str,
......@@ -611,153 +443,6 @@ pub fn print_sweep_summary(name: &str, results: &[(u64, BenchmarkResults)]) {
}
}
// ---------------------------------------------------------------------------
// Sequence data generation (moved from src/bench_utils.rs)
// ---------------------------------------------------------------------------
/// Pre-generated sequence data for benchmarking.
#[derive(Clone)]
pub struct SequenceData {
pub worker_id: WorkerId,
pub local_hashes: Vec<LocalBlockHash>,
pub external_hashes: Vec<ExternalSequenceBlockHash>,
}
impl From<RouterSequence> for SequenceData {
fn from(sequence: RouterSequence) -> Self {
Self {
worker_id: sequence.worker_id,
local_hashes: sequence.local_hashes,
external_hashes: sequence.external_hashes,
}
}
}
impl SequenceData {
/// Create a new sequence with synthetic hashes based on sequence ID.
pub fn new(seq_id: u64, worker_id: WorkerId, depth: usize) -> Self {
let local_hashes: Vec<LocalBlockHash> = (0..depth)
.map(|block_idx| LocalBlockHash((seq_id << 32) | (block_idx as u64)))
.collect();
let external_hashes: Vec<ExternalSequenceBlockHash> = (0..depth)
.map(|block_idx| ExternalSequenceBlockHash((seq_id << 32) | (block_idx as u64)))
.collect();
Self {
worker_id,
local_hashes,
external_hashes,
}
}
/// Create a sequence from local hashes, computing external hashes using cumulative hash.
pub fn from_local_hashes(worker_id: WorkerId, local_hashes: Vec<LocalBlockHash>) -> Self {
let seq_hashes = compute_seq_hash_for_block(&local_hashes);
let external_hashes = seq_hashes
.into_iter()
.map(ExternalSequenceBlockHash)
.collect();
Self {
worker_id,
local_hashes,
external_hashes,
}
}
/// Convert to a store event.
pub fn to_store_event(&self, event_id: u64) -> RouterEvent {
RouterEvent::new(
self.worker_id,
KvCacheEvent {
event_id,
data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: None,
start_position: None,
blocks: self
.local_hashes
.iter()
.zip(self.external_hashes.iter())
.map(|(local, ext)| KvCacheStoredBlockData {
tokens_hash: *local,
block_hash: *ext,
mm_extra_info: None,
})
.collect(),
}),
dp_rank: 0,
},
)
}
/// Convert to a remove event.
pub fn to_remove_event(&self, event_id: u64) -> RouterEvent {
RouterEvent::new(
self.worker_id,
KvCacheEvent {
event_id,
data: KvCacheEventData::Removed(KvCacheRemoveData {
block_hashes: self.external_hashes.clone(),
}),
dp_rank: 0,
},
)
}
}
/// Generate sequences with shared prefix prompts.
pub fn generate_sequences(
num_sequences: usize,
depth: usize,
num_workers: usize,
prefix_ratio: f64,
num_prefix_groups: usize,
seed: u64,
use_cumulative_hash: bool,
) -> Vec<SequenceData> {
let trace = Trace::synthetic(SyntheticTraceSpec {
block_size: 1,
num_sessions: num_sequences,
turns_per_session: 1,
input_tokens: LengthSpec {
mean: depth,
stddev: 0.0,
},
output_tokens: LengthSpec {
mean: 1,
stddev: 0.0,
},
shared_prefix_ratio: prefix_ratio,
num_prefix_groups,
first_turn_arrivals: ArrivalSpec::Burst,
inter_turn_delays: DelaySpec::None,
seed,
})
.expect("sequence generation spec must be valid");
let hash_mode = if use_cumulative_hash {
SequenceHashMode::Cumulative
} else {
SequenceHashMode::Raw
};
trace
.partition_by_session(SessionPartitionSpec::RoundRobin {
num_partitions: num_workers,
})
.into_iter()
.enumerate()
.flat_map(|(worker_idx, partition)| {
partition
.to_router_sequences(worker_idx as WorkerId, hash_mode)
.expect("synthetic trace conversion must succeed")
.into_iter()
.map(SequenceData::from)
.collect::<Vec<_>>()
})
.collect()
}
/// Compute median of durations.
pub fn median(durations: &[Duration]) -> Duration {
if durations.is_empty() {
......@@ -767,60 +452,3 @@ pub fn median(durations: &[Duration]) -> Duration {
sorted.sort();
sorted[sorted.len() / 2]
}
#[cfg(test)]
mod tests {
use super::*;
fn multiturn_trace() -> Trace {
Trace {
block_size: 2,
sessions: vec![dynamo_mocker::loadgen::SessionTrace {
session_id: "session-a".to_string(),
first_arrival_timestamp_ms: Some(0.0),
turns: vec![
dynamo_mocker::loadgen::TurnTrace {
input_length: 4,
max_output_tokens: 2,
hash_ids: vec![1, 2],
delay_after_previous_ms: 0.0,
},
dynamo_mocker::loadgen::TurnTrace {
input_length: 4,
max_output_tokens: 2,
hash_ids: vec![3, 4],
delay_after_previous_ms: 5.0,
},
],
}],
}
}
#[tokio::test]
async fn test_replay_worker_trace_releases_follow_up_turn_after_completion_delay() {
let artifacts = replay_worker_trace(
multiturn_trace(),
default_mock_engine_args(1024, 2).unwrap(),
5,
make_progress_bar(Some(2)),
)
.await
.unwrap();
assert_eq!(artifacts.requests.len(), 2);
let first_uuid = artifacts.requests[0].uuid;
let first_completion_ms = artifacts
.output_signals
.iter()
.find(|signal| signal.signal.uuid == first_uuid && signal.signal.completed)
.unwrap()
.timestamp_us as f64
/ 1000.0;
assert!(
artifacts.requests[1].scheduled_ready_at_ms + 0.1 >= first_completion_ms + 5.0,
"expected follow-up turn to wait for completion plus delay, got ready_at={} completion_at={}",
artifacts.requests[1].scheduled_ready_at_ms,
first_completion_ms
);
}
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use dynamo_kv_router::protocols::KvCacheEventData;
#[allow(unused_imports)]
pub use dynamo_kv_router::test_utils::NoopSequencePublisher;
use dynamo_mocker::common::protocols::MockEngineArgs;
use dynamo_mocker::loadgen::{SessionPartitionSpec, Trace};
pub use dynamo_mocker::replay::ReplayWorkerArtifacts as WorkerReplayArtifacts;
use indicatif::{ProgressBar, ProgressStyle};
use serde::Serialize;
use std::time::Duration;
/// Create a styled progress bar, optionally with a known total length.
pub fn make_progress_bar(total: Option<u64>) -> ProgressBar {
let progress = match total {
Some(total) => ProgressBar::new(total),
None => ProgressBar::no_length(),
};
progress.set_style(
ProgressStyle::with_template(
"[{elapsed_precise}] [{wide_bar:.cyan/blue}] {pos}/{len} ({eta}) {msg}",
)
.unwrap()
.progress_chars("#>-"),
);
progress
}
/// Results from a single benchmark run.
#[derive(Clone, Copy, Serialize)]
pub struct BenchmarkResults {
pub offered_ops_throughput: f32,
pub ops_throughput: f32,
pub offered_block_throughput: f32,
pub block_throughput: f32,
pub latency_p99_us: f32,
}
#[derive(Clone, Copy)]
pub struct BenchmarkRun {
pub results: BenchmarkResults,
pub kept_up: bool,
}
/// Load, transform, and partition the mooncake trace into per-worker request lists.
pub fn process_mooncake_trace(
path: &str,
block_size: u32,
trace_length_factor: usize,
trace_duplication_factor: usize,
num_workers: usize,
seed: u64,
) -> anyhow::Result<Vec<Trace>> {
let trace = Trace::from_mooncake(std::path::Path::new(path), block_size as usize)?
.expand_hash_prefix_depth(trace_length_factor)
.duplicate_hash_space(trace_duplication_factor);
Ok(trace.partition_by_session(SessionPartitionSpec::Random {
num_partitions: num_workers,
seed,
}))
}
pub fn maybe_rescale_ready_span(
trace: Trace,
trace_simulation_duration_ms: Option<u64>,
) -> anyhow::Result<Trace> {
match trace_simulation_duration_ms {
Some(duration_ms) => trace.rescale_ready_span(duration_ms),
None => Ok(trace),
}
}
pub fn rescale_trace_timestamps<T, GetTimestamp, WithTimestamp>(
traces: &[Vec<T>],
benchmark_duration_ms: u64,
timestamp_of: GetTimestamp,
with_timestamp: WithTimestamp,
) -> Vec<Vec<T>>
where
GetTimestamp: Fn(&T) -> u64 + Copy,
WithTimestamp: Fn(&T, u64) -> T + Copy,
{
let target_us = u128::from(benchmark_duration_ms) * 1000;
traces
.iter()
.map(|worker_trace| {
if worker_trace.is_empty() {
return Vec::new();
}
let max_timestamp_us = worker_trace.last().map(timestamp_of).unwrap_or(1).max(1);
worker_trace
.iter()
.map(|entry| {
let scaled_timestamp =
u128::from(timestamp_of(entry)) * target_us / u128::from(max_timestamp_us);
with_timestamp(entry, scaled_timestamp.min(u128::from(u64::MAX)) as u64)
})
.collect()
})
.collect()
}
pub fn compute_benchmark_run(
total_ops: usize,
total_blocks: usize,
benchmark_duration_ms: u64,
total_duration: Duration,
mut latencies_ns: Vec<u64>,
) -> BenchmarkRun {
let kept_up = total_duration <= Duration::from_millis(benchmark_duration_ms * 11 / 10);
let benchmark_duration_secs = (benchmark_duration_ms as f32 / 1000.0).max(1e-6);
let total_duration_secs = total_duration.as_secs_f32().max(1e-6);
let offered_ops_throughput = total_ops as f32 / benchmark_duration_secs;
let ops_throughput = total_ops as f32 / total_duration_secs;
let offered_block_throughput = total_blocks as f32 / benchmark_duration_secs;
let block_throughput = total_blocks as f32 / total_duration_secs;
latencies_ns.sort_unstable();
let latency_p99_us = if latencies_ns.is_empty() {
0.0
} else {
let p99_idx = latencies_ns.len().saturating_sub(1) * 99 / 100;
latencies_ns[p99_idx] as f32 / 1000.0
};
BenchmarkRun {
results: BenchmarkResults {
offered_ops_throughput,
ops_throughput,
offered_block_throughput,
block_throughput,
latency_p99_us,
},
kept_up,
}
}
/// Build default MockEngineArgs suitable for event generation.
pub fn default_mock_engine_args(
num_gpu_blocks: usize,
block_size: usize,
) -> anyhow::Result<MockEngineArgs> {
Ok(MockEngineArgs::builder()
.num_gpu_blocks(num_gpu_blocks)
.block_size(block_size)
.speedup_ratio(10.0)
.enable_prefix_caching(true)
.max_num_batched_tokens(None)
.max_num_seqs(None)
.build()?)
}
fn replay_worker_trace(
trace: Trace,
sched_args: MockEngineArgs,
trace_simulation_duration_ms: Option<u64>,
progress: ProgressBar,
) -> anyhow::Result<WorkerReplayArtifacts> {
let total_turns = trace
.sessions
.iter()
.map(|session| session.turns.len())
.sum::<usize>();
let artifacts = dynamo_mocker::replay::generate_trace_worker_artifacts_offline(
sched_args,
maybe_rescale_ready_span(trace, trace_simulation_duration_ms)?,
)?;
progress.inc(total_turns as u64);
Ok(artifacts)
}
pub async fn generate_replay_artifacts(
traces: &[Trace],
num_gpu_blocks: usize,
block_size: u32,
trace_simulation_duration_ms: Option<u64>,
) -> anyhow::Result<Vec<WorkerReplayArtifacts>> {
println!("Generating events...");
let sched_args = default_mock_engine_args(num_gpu_blocks, block_size as usize)?;
let progress = make_progress_bar(Some(
traces
.iter()
.map(|trace| {
trace
.sessions
.iter()
.map(|session| session.turns.len() as u64)
.sum::<u64>()
})
.sum::<u64>(),
));
let mut tasks = Vec::new();
for trace in traces.iter().cloned() {
let sched_args = sched_args.clone();
let progress = progress.clone();
tasks.push(tokio::task::spawn_blocking(move || {
replay_worker_trace(trace, sched_args, trace_simulation_duration_ms, progress)
}));
}
let mut artifacts = Vec::new();
for task in tasks {
artifacts.push(task.await??);
}
for (worker_idx, worker_events) in artifacts
.iter()
.enumerate()
.map(|(worker_idx, artifact)| (worker_idx, &artifact.kv_events))
{
for i in 1..worker_events.len() {
assert!(
worker_events[i].timestamp_us >= worker_events[i - 1].timestamp_us,
"worker {worker_idx} non-monotonic kv_events at idx {i}: prev={}, curr={}",
worker_events[i - 1].timestamp_us,
worker_events[i].timestamp_us
);
}
}
println!(
"Generated {} events. Processing...",
artifacts
.iter()
.map(|artifact| artifact.kv_events.len())
.sum::<usize>()
);
let mut num_stored_events = 0;
let mut num_removed_events = 0;
for event in artifacts
.iter()
.flat_map(|artifact| artifact.kv_events.iter())
{
match event.event.data {
KvCacheEventData::Stored(_) => num_stored_events += 1,
KvCacheEventData::Removed(_) => num_removed_events += 1,
_ => (),
}
}
println!("Store events: {}", num_stored_events);
println!("Remove events: {}", num_removed_events);
Ok(artifacts)
}
This diff is collapsed.
This diff is collapsed.
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use dynamo_kv_router::LocalBlockHash;
use dynamo_kv_router::indexer::{KvIndexer, KvIndexerInterface, KvIndexerMetrics};
use dynamo_kv_router::protocols::{KvCacheEvent, KvCacheEventData, RouterEvent};
use dynamo_kv_router::{
BranchShardedIndexer, ConcurrentRadixTree, ConcurrentRadixTreeCompressed, PositionalIndexer,
ThreadPoolIndexer,
};
use tokio::time::{Duration, Instant};
use tokio_util::sync::CancellationToken;
use crate::common::{
BenchmarkRun, WorkerReplayArtifacts, compute_benchmark_run, make_progress_bar,
rescale_trace_timestamps,
};
#[allow(dead_code)]
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum MooncakeIndexerKind {
RadixTree,
NestedMap,
ConcurrentRadixTree,
ConcurrentRadixTreeCompressed,
BranchShardedCrtc,
}
#[derive(Clone, Debug)]
pub struct MooncakeIndexerConfig {
pub kind: MooncakeIndexerKind,
pub jump_size: usize,
pub num_event_workers: usize,
pub num_shards: usize,
pub num_event_workers_per_shard: usize,
pub prefix_depth: usize,
}
#[allow(dead_code)]
impl MooncakeIndexerConfig {
pub fn radix_tree() -> Self {
Self {
kind: MooncakeIndexerKind::RadixTree,
jump_size: 8,
num_event_workers: 16,
num_shards: 2,
num_event_workers_per_shard: 4,
prefix_depth: 2,
}
}
pub fn nested_map(jump_size: usize, num_event_workers: usize) -> Self {
Self {
kind: MooncakeIndexerKind::NestedMap,
jump_size,
num_event_workers,
..Self::radix_tree()
}
}
pub fn concurrent_radix_tree(num_event_workers: usize) -> Self {
Self {
kind: MooncakeIndexerKind::ConcurrentRadixTree,
num_event_workers,
..Self::radix_tree()
}
}
pub fn concurrent_radix_tree_compressed(num_event_workers: usize) -> Self {
Self {
kind: MooncakeIndexerKind::ConcurrentRadixTreeCompressed,
num_event_workers,
..Self::radix_tree()
}
}
pub fn branch_sharded_crtc(
num_shards: usize,
num_event_workers_per_shard: usize,
prefix_depth: usize,
) -> Self {
Self {
kind: MooncakeIndexerKind::BranchShardedCrtc,
num_shards,
num_event_workers_per_shard,
prefix_depth,
..Self::radix_tree()
}
}
pub fn short_name(&self) -> &'static str {
match self.kind {
MooncakeIndexerKind::RadixTree => "radix-tree",
MooncakeIndexerKind::NestedMap => "nested-map",
MooncakeIndexerKind::ConcurrentRadixTree => "concurrent-radix-tree",
MooncakeIndexerKind::ConcurrentRadixTreeCompressed => {
"concurrent-radix-tree-compressed"
}
MooncakeIndexerKind::BranchShardedCrtc => "branch-sharded-crtc",
}
}
pub fn is_multi_threaded(&self) -> bool {
matches!(
self.kind,
MooncakeIndexerKind::NestedMap
| MooncakeIndexerKind::ConcurrentRadixTree
| MooncakeIndexerKind::ConcurrentRadixTreeCompressed
| MooncakeIndexerKind::BranchShardedCrtc
)
}
pub fn supports_remove(&self) -> bool {
true
}
pub fn from_short_name(name: &str, num_event_workers: usize) -> anyhow::Result<Self> {
let config = match name {
"radix-tree" => Self::radix_tree(),
"nested-map" => Self::nested_map(8, num_event_workers),
"concurrent-radix-tree" => Self::concurrent_radix_tree(num_event_workers),
"concurrent-radix-tree-compressed" => {
Self::concurrent_radix_tree_compressed(num_event_workers)
}
"branch-sharded-crtc" => Self::branch_sharded_crtc(2, num_event_workers, 2),
_ => anyhow::bail!(
"Unknown indexer '{}'. Valid names: radix-tree, nested-map, concurrent-radix-tree, concurrent-radix-tree-compressed, branch-sharded-crtc",
name
),
};
Ok(config)
}
pub fn build(
&self,
block_size: u32,
metrics: Arc<KvIndexerMetrics>,
) -> Arc<dyn KvIndexerInterface + Send + Sync> {
match self.kind {
MooncakeIndexerKind::RadixTree => Arc::new(KvIndexer::new(
CancellationToken::new(),
block_size,
metrics,
)),
MooncakeIndexerKind::NestedMap => Arc::new(ThreadPoolIndexer::new_with_metrics(
PositionalIndexer::new(self.jump_size),
self.num_event_workers,
block_size,
Some(metrics),
)),
MooncakeIndexerKind::ConcurrentRadixTree => {
Arc::new(ThreadPoolIndexer::new_with_metrics(
ConcurrentRadixTree::new(),
self.num_event_workers,
block_size,
Some(metrics),
))
}
MooncakeIndexerKind::ConcurrentRadixTreeCompressed => {
Arc::new(ThreadPoolIndexer::new_with_metrics(
ConcurrentRadixTreeCompressed::new(),
self.num_event_workers,
block_size,
Some(metrics),
))
}
MooncakeIndexerKind::BranchShardedCrtc => {
let shards = (0..self.num_shards)
.map(|_| {
ThreadPoolIndexer::new_with_metrics(
ConcurrentRadixTreeCompressed::new(),
self.num_event_workers_per_shard,
block_size,
Some(Arc::clone(&metrics)),
)
})
.collect();
Arc::new(BranchShardedIndexer::new_with_options(
shards,
self.prefix_depth,
block_size,
))
}
}
}
}
#[derive(Clone, Copy, Debug)]
pub struct MooncakeBenchmarkConfig {
pub benchmark_duration_ms: u64,
pub inference_worker_duplication_factor: usize,
pub count_events: bool,
pub find_matches_concurrency: usize,
}
/// A single entry in a worker's merged benchmark timeline.
#[derive(Clone)]
enum WorkerTraceEntry {
Request(Vec<LocalBlockHash>),
Event(KvCacheEvent),
}
/// A timestamped entry in a worker's benchmark trace, used to replay requests
/// and events at the correct relative timing.
#[derive(Clone)]
struct WorkerTrace {
entry: WorkerTraceEntry,
timestamp_us: u64,
}
fn prepare_worker_traces(
artifacts: Vec<WorkerReplayArtifacts>,
benchmark_duration_ms: u64,
) -> Vec<Vec<WorkerTrace>> {
let traces = artifacts
.into_iter()
.map(|artifact| {
let mut merged = artifact
.requests
.into_iter()
.map(|request| WorkerTrace {
timestamp_us: request.timestamp_us,
entry: WorkerTraceEntry::Request(request.replay_hashes.local_block_hashes),
})
.chain(artifact.kv_events.into_iter().map(|event| WorkerTrace {
timestamp_us: event.timestamp_us,
entry: WorkerTraceEntry::Event(event.event),
}))
.collect::<Vec<_>>();
merged.sort_by_key(|entry| entry.timestamp_us);
merged
})
.collect::<Vec<_>>();
rescale_trace_timestamps(
&traces,
benchmark_duration_ms,
|entry| entry.timestamp_us,
|entry, timestamp_us| WorkerTrace {
entry: entry.entry.clone(),
timestamp_us,
},
)
}
pub async fn run_benchmark(
indexer: Arc<dyn KvIndexerInterface + Send + Sync>,
artifacts: Vec<WorkerReplayArtifacts>,
config: MooncakeBenchmarkConfig,
) -> anyhow::Result<BenchmarkRun> {
let worker_traces = prepare_worker_traces(artifacts, config.benchmark_duration_ms);
let worker_traces = worker_traces.into_iter().map(Arc::new).collect::<Vec<_>>();
let progress = make_progress_bar(Some(
worker_traces
.iter()
.map(|trace| trace.len() as u64)
.sum::<u64>()
* config.inference_worker_duplication_factor as u64,
));
let mut tasks = Vec::new();
for replica in 0..config.inference_worker_duplication_factor {
for (worker_id, worker_trace) in worker_traces.iter().enumerate() {
let indexer = Arc::clone(&indexer);
let trace = Arc::clone(worker_trace);
let progress = progress.clone();
let worker_id = worker_id + replica * worker_traces.len();
tasks.push(tokio::spawn(async move {
let mut request_latencies = Vec::with_capacity(trace.len());
let submit = |entry: WorkerTrace| async {
match entry.entry {
WorkerTraceEntry::Request(request) => {
let start = minstant::Instant::now();
indexer.find_matches(request).await?;
Ok::<Option<u64>, anyhow::Error>(
Some(start.elapsed().as_nanos() as u64),
)
}
WorkerTraceEntry::Event(event) => {
indexer
.apply_event(RouterEvent::new(worker_id as u64, event))
.await;
Ok(None)
}
}
};
let mut target = Instant::now();
let mut trace = trace.iter().peekable();
let mut local_count = 0;
while let Some(entry) = trace.next() {
let mut processed = 1;
let entry_timestamp_us = entry.timestamp_us;
if let Some(latency) = submit(entry.clone()).await? {
request_latencies.push(latency);
}
while let Some(next) = trace.peek() {
if next.timestamp_us == entry_timestamp_us {
if let Some(latency) = submit(trace.next().unwrap().clone()).await? {
request_latencies.push(latency);
}
processed += 1;
} else {
break;
}
}
if let Some(next) = trace.peek() {
target += Duration::from_micros(next.timestamp_us - entry_timestamp_us);
}
if target > Instant::now() {
tokio::time::sleep_until(target).await;
}
local_count += processed;
if local_count > 100 {
progress.inc(local_count);
local_count = 0;
}
}
progress.inc(local_count);
Ok::<_, anyhow::Error>(request_latencies)
}));
}
}
let fm_stop = Arc::new(AtomicBool::new(false));
let mut fm_tasks = Vec::new();
if config.find_matches_concurrency > 0 {
let seq_pool: Arc<Vec<Vec<LocalBlockHash>>> = Arc::new(
worker_traces
.iter()
.flat_map(|trace| trace.iter())
.filter_map(|entry| match &entry.entry {
WorkerTraceEntry::Request(hashes) => Some(hashes.clone()),
WorkerTraceEntry::Event(_) => None,
})
.collect(),
);
if !seq_pool.is_empty() {
for task_id in 0..config.find_matches_concurrency {
let indexer = Arc::clone(&indexer);
let pool = Arc::clone(&seq_pool);
let stop = Arc::clone(&fm_stop);
fm_tasks.push(tokio::spawn(async move {
let mut latencies = Vec::new();
let mut idx = task_id % pool.len();
while !stop.load(Ordering::Relaxed) {
let seq = pool[idx].clone();
let start = minstant::Instant::now();
let _ = indexer.find_matches(seq).await;
latencies.push(start.elapsed().as_nanos() as u64);
idx = (idx + 1) % pool.len();
}
latencies
}));
}
}
}
let mut latencies = Vec::new();
for task in tasks {
latencies.extend(task.await??);
}
fm_stop.store(true, Ordering::Relaxed);
for task in fm_tasks {
if let Ok(fm_latencies) = task.await {
latencies.extend(fm_latencies);
}
}
let total_duration = progress.elapsed();
let total_events = worker_traces
.iter()
.map(|trace| {
trace
.iter()
.filter(|entry| matches!(entry.entry, WorkerTraceEntry::Event(_)))
.count()
})
.sum::<usize>()
* config.inference_worker_duplication_factor;
let total_requests = worker_traces.iter().map(|trace| trace.len()).sum::<usize>()
* config.inference_worker_duplication_factor
- total_events;
let total_request_blocks = worker_traces
.iter()
.flat_map(|trace| trace.iter())
.filter_map(|entry| match &entry.entry {
WorkerTraceEntry::Request(hashes) => Some(hashes.len()),
WorkerTraceEntry::Event(_) => None,
})
.sum::<usize>()
* config.inference_worker_duplication_factor;
let total_event_blocks = worker_traces
.iter()
.flat_map(|trace| trace.iter())
.filter_map(|entry| match &entry.entry {
WorkerTraceEntry::Event(event) => match &event.data {
KvCacheEventData::Stored(store) => Some(store.blocks.len()),
_ => Some(0),
},
WorkerTraceEntry::Request(_) => None,
})
.sum::<usize>()
* config.inference_worker_duplication_factor;
let counted_events = if config.count_events { total_events } else { 0 };
let counted_event_blocks = if config.count_events {
total_event_blocks
} else {
0
};
let run = compute_benchmark_run(
total_requests + counted_events,
total_request_blocks + counted_event_blocks,
config.benchmark_duration_ms,
total_duration,
latencies,
);
println!(
"Offered Ops Throughput: {} ops/s | Achieved: {} ops/s (requests + events)",
run.results.offered_ops_throughput as u64, run.results.ops_throughput as u64,
);
println!(
"Offered Block Throughput: {} block ops/s | Achieved: {} block ops/s",
run.results.offered_block_throughput as u64, run.results.block_throughput as u64,
);
println!("Latency p99: {}us", run.results.latency_p99_us);
Ok(run)
}
This diff is collapsed.
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
mod support;
#[path = "../kv_router/common/shared.rs"]
mod common;
#[path = "../kv_router/active_sequences_shared.rs"]
mod active_sequences_shared;
use active_sequences_shared::{generate_sequence_events, run_benchmark};
use common::process_mooncake_trace;
const BLOCK_SIZE: u32 = 128;
const NUM_GPU_BLOCKS: usize = 16384;
const TRACE_SIMULATION_DURATION_MS: Option<u64> = None;
const BENCHMARK_DURATION_MS: u64 = 4000;
const NUM_UNIQUE_INFERENCE_WORKERS: usize = 10;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn active_sequences_trace_replays_without_warnings_or_leaks() -> anyhow::Result<()> {
let warning_count = support::warning_counter(&["dynamo_kv_router::sequences", "dynamo_mocker"]);
support::reset_warning_count(&warning_count);
let fixture = support::fixture_path("mooncake_trace_1000.jsonl")?;
let traces =
process_mooncake_trace(&fixture, BLOCK_SIZE, 1, 1, NUM_UNIQUE_INFERENCE_WORKERS, 42)?;
let sequence_traces = generate_sequence_events(
&traces,
NUM_GPU_BLOCKS,
BLOCK_SIZE,
TRACE_SIMULATION_DURATION_MS,
)
.await?;
let run = run_benchmark(&sequence_traces, BLOCK_SIZE, BENCHMARK_DURATION_MS, 1).await?;
assert!(
run.kept_up,
"benchmark replay fell behind in test profile; increase BENCHMARK_DURATION_MS if this becomes too tight"
);
assert!(
run.results.ops_throughput > 0.0,
"benchmark replay should record positive throughput"
);
assert_eq!(
warning_count.load(std::sync::atomic::Ordering::Relaxed),
0,
"sequence replay emitted warn/error logs from dynamo_kv_router::sequences or dynamo_mocker"
);
Ok(())
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
mod support;
#[path = "../kv_router/common/shared.rs"]
mod common;
#[path = "../kv_router/mooncake_shared.rs"]
mod mooncake_shared;
use std::collections::HashSet;
use std::io::Write;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use std::time::Duration;
use common::{generate_replay_artifacts, process_mooncake_trace};
use dynamo_kv_router::indexer::KvIndexerMetrics;
use dynamo_mocker::loadgen::{SessionTrace, Trace, TurnTrace};
use mooncake_shared::{MooncakeBenchmarkConfig, MooncakeIndexerConfig, run_benchmark};
use tempfile::NamedTempFile;
const BLOCK_SIZE: u32 = 128;
const NUM_GPU_BLOCKS: usize = 16384;
const NUM_UNIQUE_INFERENCE_WORKERS: usize = 10;
const BENCHMARK_DURATION_MS: u64 = 2000;
const NUM_EVENT_WORKERS: usize = 4;
#[test]
fn process_mooncake_trace_expands_and_duplicates_hash_space() -> anyhow::Result<()> {
let mut file = NamedTempFile::new()?;
for (i, (hash_ids, output_length)) in [(&[0u64, 1, 2] as &[u64], 10u64), (&[0, 1, 3, 4], 10)]
.iter()
.enumerate()
{
writeln!(
file,
"{}",
serde_json::json!({
"timestamp": i as u64,
"input_length": hash_ids.len(),
"hash_ids": hash_ids,
"output_length": output_length,
})
)?;
}
let traces = process_mooncake_trace(
file.path().to_str().expect("temp path should be UTF-8"),
512,
2,
2,
2,
42,
)?;
let mut all_hashes: Vec<Vec<u64>> = traces
.into_iter()
.flat_map(|worker| worker.sessions.into_iter())
.flat_map(|session| session.turns.into_iter().map(|turn| turn.hash_ids))
.collect();
all_hashes.sort();
let mut expected = vec![
vec![0, 1, 2, 3, 4, 5],
vec![10, 11, 12, 13, 14, 15],
vec![0, 1, 2, 3, 6, 7, 8, 9],
vec![10, 11, 12, 13, 16, 17, 18, 19],
];
expected.sort();
assert_eq!(all_hashes, expected, "hash_ids mismatch");
let copy0: Vec<&Vec<u64>> = all_hashes.iter().filter(|hashes| hashes[0] == 0).collect();
let copy1: Vec<&Vec<u64>> = all_hashes.iter().filter(|hashes| hashes[0] == 10).collect();
assert_eq!(copy0.len(), 2);
assert_eq!(copy1.len(), 2);
assert_eq!(copy0[0][..4], copy0[1][..4], "copy 0 shared prefix broken");
assert_eq!(copy1[0][..4], copy1[1][..4], "copy 1 shared prefix broken");
let set0: HashSet<u64> = copy0
.iter()
.flat_map(|hashes| hashes.iter().copied())
.collect();
let set1: HashSet<u64> = copy1
.iter()
.flat_map(|hashes| hashes.iter().copied())
.collect();
assert!(set0.is_disjoint(&set1), "copies are not hash-disjoint");
Ok(())
}
#[tokio::test(flavor = "current_thread")]
async fn generate_replay_artifacts_waits_for_completion_delay() -> anyhow::Result<()> {
let trace = Trace {
block_size: 2,
sessions: vec![SessionTrace {
session_id: "session-a".to_string(),
first_arrival_timestamp_ms: Some(0.0),
turns: vec![
TurnTrace {
input_length: 4,
max_output_tokens: 2,
hash_ids: vec![1, 2],
delay_after_previous_ms: 0.0,
},
TurnTrace {
input_length: 4,
max_output_tokens: 2,
hash_ids: vec![3, 4],
delay_after_previous_ms: 5.0,
},
],
}],
};
let artifacts = generate_replay_artifacts(&[trace], 1024, 2, None).await?;
assert_eq!(artifacts.len(), 1);
assert_eq!(artifacts[0].requests.len(), 2);
let first_uuid = artifacts[0].requests[0].uuid;
let first_completion_ms = artifacts[0]
.output_signals
.iter()
.find(|signal| signal.signal.uuid == first_uuid && signal.signal.completed)
.expect("first request must complete")
.timestamp_us as f64
/ 1000.0;
assert!(
artifacts[0].requests[1].scheduled_ready_at_ms + 0.1 >= first_completion_ms + 5.0,
"expected second request to wait for completion plus delay"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn mooncake_trace_replays_without_warnings_across_indexer_variants() -> anyhow::Result<()> {
let warning_count = support::warning_counter(&["dynamo_kv_router::indexer", "dynamo_mocker"]);
let fixture = support::fixture_path("mooncake_trace_1000.jsonl")?;
let traces =
process_mooncake_trace(&fixture, BLOCK_SIZE, 1, 1, NUM_UNIQUE_INFERENCE_WORKERS, 42)?;
let artifacts = generate_replay_artifacts(&traces, NUM_GPU_BLOCKS, BLOCK_SIZE, None).await?;
let variants = [
MooncakeIndexerConfig::radix_tree(),
MooncakeIndexerConfig::nested_map(8, NUM_EVENT_WORKERS),
MooncakeIndexerConfig::concurrent_radix_tree(NUM_EVENT_WORKERS),
MooncakeIndexerConfig::concurrent_radix_tree_compressed(NUM_EVENT_WORKERS),
];
for config in variants {
support::reset_warning_count(&warning_count);
let metrics = Arc::new(KvIndexerMetrics::new_unregistered());
let run = {
let indexer = config.build(BLOCK_SIZE, Arc::clone(&metrics));
run_benchmark(
indexer,
artifacts.clone(),
MooncakeBenchmarkConfig {
benchmark_duration_ms: BENCHMARK_DURATION_MS,
inference_worker_duplication_factor: 1,
count_events: config.supports_remove(),
find_matches_concurrency: 0,
},
)
.await?
};
tokio::time::sleep(Duration::from_millis(50)).await;
assert!(
run.kept_up,
"{} replay fell behind in test profile; increase BENCHMARK_DURATION_MS if this becomes too tight",
config.short_name()
);
assert!(
run.results.ops_throughput > 0.0,
"{} replay should record positive throughput",
config.short_name()
);
assert_eq!(
warning_count.load(Ordering::Relaxed),
0,
"{} emitted warn/error logs from dynamo_kv_router::indexer or dynamo_mocker",
config.short_name()
);
assert_eq!(
support::duplicate_store_warning_count(metrics.as_ref()),
0,
"{} recorded duplicate-store warning metrics",
config.short_name()
);
}
Ok(())
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, OnceLock};
use anyhow::Context;
use dynamo_kv_router::indexer::{KvIndexerMetrics, METRIC_WARNING_DUPLICATE_STORE};
use tracing::{Event, Level, Subscriber};
use tracing_subscriber::Registry;
use tracing_subscriber::layer::{Context as LayerContext, Layer};
use tracing_subscriber::prelude::*;
struct WarningCounterLayer {
count: Arc<AtomicUsize>,
target_prefixes: &'static [&'static str],
}
impl<S> Layer<S> for WarningCounterLayer
where
S: Subscriber,
{
fn on_event(&self, event: &Event<'_>, _ctx: LayerContext<'_, S>) {
let metadata = event.metadata();
let target = metadata.target();
if matches!(*metadata.level(), Level::WARN | Level::ERROR)
&& self
.target_prefixes
.iter()
.any(|prefix| target.starts_with(prefix))
{
self.count.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn warning_counter(target_prefixes: &'static [&'static str]) -> Arc<AtomicUsize> {
static COUNTER: OnceLock<Arc<AtomicUsize>> = OnceLock::new();
COUNTER
.get_or_init(|| {
let count = Arc::new(AtomicUsize::new(0));
let subscriber = Registry::default().with(WarningCounterLayer {
count: Arc::clone(&count),
target_prefixes,
});
tracing::subscriber::set_global_default(subscriber)
.expect("global warning counter subscriber should initialize once");
count
})
.clone()
}
pub fn reset_warning_count(counter: &Arc<AtomicUsize>) {
counter.store(0, Ordering::Relaxed);
}
pub fn fixture_path(file_name: &str) -> anyhow::Result<String> {
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("testdata")
.join(file_name)
.to_str()
.map(str::to_owned)
.context("fixture path is not valid UTF-8")
}
#[allow(dead_code)]
pub fn duplicate_store_warning_count(metrics: &KvIndexerMetrics) -> u64 {
metrics
.kv_cache_event_warnings
.get_metric_with_label_values(&[METRIC_WARNING_DUPLICATE_STORE])
.expect("duplicate_store warning metric should exist")
.get()
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment