Unverified Commit 824b456f authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

feat: improve mooncake_bench sweep logic and throughput accounting (#6631)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
Co-authored-by: default avatarClaude Opus 4.6 (1M context) <noreply@anthropic.com>
parent ab01462a
...@@ -18,8 +18,8 @@ use std::sync::Arc; ...@@ -18,8 +18,8 @@ use std::sync::Arc;
use uuid::Uuid; use uuid::Uuid;
use dynamo_kv_router::protocols::{KvCacheEvent, KvCacheEventData}; use dynamo_kv_router::protocols::{KvCacheEvent, KvCacheEventData};
use dynamo_mocker::common::protocols::{DirectRequest, KvCacheEventSink, MockEngineArgs}; use dynamo_mocker::Scheduler;
use dynamo_mocker::scheduler::Scheduler; use dynamo_mocker::protocols::{DirectRequest, KvCacheEventSink, MockEngineArgs};
use indicatif::{ProgressBar, ProgressStyle}; use indicatif::{ProgressBar, ProgressStyle};
use std::sync::Mutex; use std::sync::Mutex;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
...@@ -107,6 +107,14 @@ impl IndexerArgs { ...@@ -107,6 +107,14 @@ impl IndexerArgs {
} }
} }
fn supports_remove(name: &str) -> bool {
!matches!(name, "naive-nested-map" | "inverted-index")
}
fn is_multi_threaded(name: &str) -> bool {
matches!(name, "nested-map" | "concurrent-radix-tree")
}
/// Construct an indexer from a short name string, using `args.num_event_workers`. /// Construct an indexer from a short name string, using `args.num_event_workers`.
fn from_name( fn from_name(
name: &str, name: &str,
...@@ -151,7 +159,7 @@ struct Args { ...@@ -151,7 +159,7 @@ struct Args {
/// Number of GPU blocks available in the mock engine's KV cache. /// Number of GPU blocks available in the mock engine's KV cache.
/// Smaller values force more evictions and produce more remove events. /// Smaller values force more evictions and produce more remove events.
#[clap(long, default_value = "1048576")] #[clap(long, default_value = "16384")]
num_gpu_blocks: usize, num_gpu_blocks: usize,
/// Number of tokens per KV cache block. /// Number of tokens per KV cache block.
...@@ -653,6 +661,7 @@ fn prepare_worker_traces( ...@@ -653,6 +661,7 @@ fn prepare_worker_traces(
} }
/// Results from a single benchmark run. /// Results from a single benchmark run.
#[derive(Serialize)]
struct BenchmarkResults { struct BenchmarkResults {
offered_ops_throughput: f32, offered_ops_throughput: f32,
ops_throughput: f32, ops_throughput: f32,
...@@ -661,6 +670,13 @@ struct BenchmarkResults { ...@@ -661,6 +670,13 @@ struct BenchmarkResults {
latency_p99_us: f32, latency_p99_us: f32,
} }
#[derive(Serialize)]
struct SweepStepResult {
duration_ms: u64,
#[serde(flatten)]
results: BenchmarkResults,
}
/// Run the benchmark: replay each worker's merged trace against the indexer, /// Run the benchmark: replay each worker's merged trace against the indexer,
/// measuring find_matches latency and event processing throughput. /// measuring find_matches latency and event processing throughput.
/// ///
...@@ -673,6 +689,7 @@ async fn run_benchmark( ...@@ -673,6 +689,7 @@ async fn run_benchmark(
events: Vec<Vec<(KvCacheEvent, Instant)>>, events: Vec<Vec<(KvCacheEvent, Instant)>>,
args: &Args, args: &Args,
benchmark_duration_ms: u64, benchmark_duration_ms: u64,
count_events: bool,
) -> anyhow::Result<BenchmarkResults> { ) -> anyhow::Result<BenchmarkResults> {
let worker_traces = prepare_worker_traces( let worker_traces = prepare_worker_traces(
traces, traces,
...@@ -825,9 +842,11 @@ async fn run_benchmark( ...@@ -825,9 +842,11 @@ async fn run_benchmark(
.sum::<usize>() .sum::<usize>()
* args.inference_worker_duplication_factor; * args.inference_worker_duplication_factor;
let total_blocks = total_request_blocks + total_event_blocks; let counted_events = if count_events { total_events } else { 0 };
let counted_event_blocks = if count_events { total_event_blocks } else { 0 };
let total_ops = total_requests + total_events; let total_blocks = total_request_blocks + counted_event_blocks;
let total_ops = total_requests + counted_events;
let offered_ops_throughput = total_ops as f32 / benchmark_duration_ms as f32 * 1000.0; let offered_ops_throughput = total_ops as f32 / benchmark_duration_ms as f32 * 1000.0;
let ops_throughput = total_ops as f32 / total_duration.as_millis() as f32 * 1000.0; let ops_throughput = total_ops as f32 / total_duration.as_millis() as f32 * 1000.0;
let offered_block_throughput = total_blocks as f32 / benchmark_duration_ms as f32 * 1000.0; let offered_block_throughput = total_blocks as f32 / benchmark_duration_ms as f32 * 1000.0;
...@@ -1058,12 +1077,13 @@ async fn main() -> anyhow::Result<()> { ...@@ -1058,12 +1077,13 @@ async fn main() -> anyhow::Result<()> {
let log_min = (args.sweep_min_ms as f64).ln(); let log_min = (args.sweep_min_ms as f64).ln();
let log_max = (args.sweep_max_ms as f64).ln(); let log_max = (args.sweep_max_ms as f64).ln();
let n = args.sweep_steps; let n = args.sweep_steps;
let durations: Vec<u64> = (0..n) let durations_low_to_high: Vec<u64> = (0..n)
.map(|i| { .map(|i| {
let t = i as f64 / (n - 1) as f64; let t = i as f64 / (n - 1) as f64;
(log_max * (1.0 - t) + log_min * t).exp().round() as u64 (log_max * (1.0 - t) + log_min * t).exp().round() as u64
}) })
.collect(); .collect();
let durations_high_to_low: Vec<u64> = durations_low_to_high.iter().copied().rev().collect();
let mut all_results: Vec<(&str, Vec<(u64, BenchmarkResults)>)> = Vec::new(); let mut all_results: Vec<(&str, Vec<(u64, BenchmarkResults)>)> = Vec::new();
...@@ -1072,20 +1092,57 @@ async fn main() -> anyhow::Result<()> { ...@@ -1072,20 +1092,57 @@ async fn main() -> anyhow::Result<()> {
println!("Benchmarking indexer: {}", name); println!("Benchmarking indexer: {}", name);
println!("{}", "=".repeat(60)); println!("{}", "=".repeat(60));
let multi_threaded = IndexerArgs::is_multi_threaded(name);
let durations = if multi_threaded {
&durations_high_to_low
} else {
&durations_low_to_high
};
let mut results: Vec<(u64, BenchmarkResults)> = Vec::new(); let mut results: Vec<(u64, BenchmarkResults)> = Vec::new();
let mut consecutive_keeping_up = 0u32;
for &dur_ms in &durations { for &dur_ms in durations {
println!("\n=== Sweep: benchmark_duration_ms = {} ===", dur_ms); println!("\n=== Sweep: benchmark_duration_ms = {} ===", dur_ms);
let indexer = if args.compare.is_empty() { let indexer = if args.compare.is_empty() {
args.get_indexer().build(&args) args.get_indexer().build(&args)
} else { } else {
IndexerArgs::from_name(name, &args)? IndexerArgs::from_name(name, &args)?
}; };
let result = let count_events = IndexerArgs::supports_remove(name);
run_benchmark(indexer, traces.clone(), events.clone(), &args, dur_ms).await?; let result = run_benchmark(
results.push((dur_ms, result)); indexer,
traces.clone(),
events.clone(),
&args,
dur_ms,
count_events,
)
.await?;
if multi_threaded {
if result.block_throughput >= result.offered_block_throughput * 0.95 {
consecutive_keeping_up += 1;
} else {
consecutive_keeping_up = 0;
}
results.push((dur_ms, result));
if consecutive_keeping_up >= 5 {
println!("Early stop: achieved >= 95% offered for 5 consecutive steps");
break;
}
} else {
let saturated = result.offered_block_throughput > result.block_throughput * 5.0;
results.push((dur_ms, result));
if saturated {
println!("Early stop: offered throughput >5x achieved throughput");
break;
}
}
} }
results.sort_by_key(|(dur, _)| std::cmp::Reverse(*dur));
println!("\n=== Sweep Summary: {} ===", name); println!("\n=== Sweep Summary: {} ===", name);
println!( println!(
"{:>12} {:>14} {:>14} {:>14} {:>14} {:>10}", "{:>12} {:>14} {:>14} {:>14} {:>14} {:>10}",
...@@ -1107,6 +1164,32 @@ async fn main() -> anyhow::Result<()> { ...@@ -1107,6 +1164,32 @@ async fn main() -> anyhow::Result<()> {
} }
plot_sweep(&all_results, &args.sweep_output)?; plot_sweep(&all_results, &args.sweep_output)?;
let json_path = args
.sweep_output
.replace(".png", ".json")
.replace(".svg", ".json");
let json_map: std::collections::BTreeMap<&str, Vec<SweepStepResult>> = all_results
.iter()
.map(|(name, results)| {
let steps = results
.iter()
.map(|(dur, r)| SweepStepResult {
duration_ms: *dur,
results: BenchmarkResults {
offered_ops_throughput: r.offered_ops_throughput,
ops_throughput: r.ops_throughput,
offered_block_throughput: r.offered_block_throughput,
block_throughput: r.block_throughput,
latency_p99_us: r.latency_p99_us,
},
})
.collect();
(*name, steps)
})
.collect();
std::fs::write(&json_path, serde_json::to_string_pretty(&json_map)?)?;
println!("Sweep results saved to {}", json_path);
} else { } else {
for name in &indexer_names { for name in &indexer_names {
println!("\nBenchmarking indexer: {}", name); println!("\nBenchmarking indexer: {}", name);
...@@ -1115,12 +1198,14 @@ async fn main() -> anyhow::Result<()> { ...@@ -1115,12 +1198,14 @@ async fn main() -> anyhow::Result<()> {
} else { } else {
IndexerArgs::from_name(name, &args)? IndexerArgs::from_name(name, &args)?
}; };
let count_events = IndexerArgs::supports_remove(name);
run_benchmark( run_benchmark(
indexer, indexer,
traces.clone(), traces.clone(),
events.clone(), events.clone(),
&args, &args,
args.benchmark_duration_ms, args.benchmark_duration_ms,
count_events,
) )
.await?; .await?;
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment