Unverified Commit 7e07495f authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

feat(mocker): add --decode-speedup-ratio for speculative decoding simulation (#7349)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 374107c0
...@@ -23,6 +23,7 @@ The mocker engine now supports a vLLM-style CLI interface with individual argume ...@@ -23,6 +23,7 @@ The mocker engine now supports a vLLM-style CLI interface with individual argume
- `--enable-chunked-prefill` / `--no-enable-chunked-prefill`: Enable/disable chunked prefill (default: True) - `--enable-chunked-prefill` / `--no-enable-chunked-prefill`: Enable/disable chunked prefill (default: True)
- `--preemption-mode`: Preemption mode for decode eviction under memory pressure: `lifo` (default, matches vLLM v1) or `fifo` - `--preemption-mode`: Preemption mode for decode eviction under memory pressure: `lifo` (default, matches vLLM v1) or `fifo`
- `--speedup-ratio`: Speed multiplier for token generation (default: 1.0). Higher values make the simulation engines run faster. Use `0` for infinite speedup (no simulation delays) - `--speedup-ratio`: Speed multiplier for token generation (default: 1.0). Higher values make the simulation engines run faster. Use `0` for infinite speedup (no simulation delays)
- `--decode-speedup-ratio`: Additional speedup multiplier applied only to decode steps (default: 1.0). Models speculative decoding (e.g. Eagle) where decode throughput improves without affecting prefill latency. Effective decode speedup is `speedup_ratio * decode_speedup_ratio`
- `--data-parallel-size`: Number of data parallel workers to simulate (default: 1) - `--data-parallel-size`: Number of data parallel workers to simulate (default: 1)
- `--num-workers`: Number of mocker workers to launch in the same process (default: 1). All workers share the same tokio runtime and thread pool - `--num-workers`: Number of mocker workers to launch in the same process (default: 1). All workers share the same tokio runtime and thread pool
- `--stagger-delay`: Delay in seconds between launching each worker to avoid overwhelming etcd/NATS/frontend. Set to 0 to disable staggering. Use -1 for auto mode (stagger dependent on number of workers). Default: -1 (auto) - `--stagger-delay`: Delay in seconds between launching each worker to avoid overwhelming etcd/NATS/frontend. Set to 0 to disable staggering. Use -1 for auto mode (stagger dependent on number of workers). Default: -1 (auto)
......
...@@ -109,6 +109,7 @@ def create_temp_engine_args_file(args: argparse.Namespace) -> Path: ...@@ -109,6 +109,7 @@ def create_temp_engine_args_file(args: argparse.Namespace) -> Path:
"enable_chunked_prefill": getattr(args, "enable_chunked_prefill", None), "enable_chunked_prefill": getattr(args, "enable_chunked_prefill", None),
"preemption_mode": getattr(args, "preemption_mode", None), "preemption_mode": getattr(args, "preemption_mode", None),
"speedup_ratio": getattr(args, "speedup_ratio", None), "speedup_ratio": getattr(args, "speedup_ratio", None),
"decode_speedup_ratio": getattr(args, "decode_speedup_ratio", None),
"dp_size": getattr(args, "dp_size", None), "dp_size": getattr(args, "dp_size", None),
"startup_time": getattr(args, "startup_time", None), "startup_time": getattr(args, "startup_time", None),
"planner_profile_data": ( "planner_profile_data": (
...@@ -301,6 +302,14 @@ def parse_args() -> argparse.Namespace: ...@@ -301,6 +302,14 @@ def parse_args() -> argparse.Namespace:
default=None, default=None,
help="Speedup ratio for mock execution (default: 1.0). Use 0 for infinite speedup (no simulation delays).", help="Speedup ratio for mock execution (default: 1.0). Use 0 for infinite speedup (no simulation delays).",
) )
parser.add_argument(
"--decode-speedup-ratio",
type=float,
default=None,
help="Additional speedup multiplier applied only to decode steps (default: 1.0). "
"Models speculative decoding (e.g. Eagle) where decode throughput improves "
"without affecting prefill latency. Effective decode speedup is speedup_ratio * decode_speedup_ratio.",
)
parser.add_argument( parser.add_argument(
"--data-parallel-size", "--data-parallel-size",
type=int, type=int,
......
...@@ -81,6 +81,7 @@ python -m dynamo.mocker \ ...@@ -81,6 +81,7 @@ python -m dynamo.mocker \
| `--enable-chunked-prefill` | True | Enable chunked prefill | | `--enable-chunked-prefill` | True | Enable chunked prefill |
| `--watermark` | 0.01 | KV cache watermark (fraction reserved) | | `--watermark` | 0.01 | KV cache watermark (fraction reserved) |
| `--speedup-ratio` | 1.0 | Timing speedup factor | | `--speedup-ratio` | 1.0 | Timing speedup factor |
| `--decode-speedup-ratio` | 1.0 | Decode-only speedup multiplier (e.g. for Eagle speculation) |
| `--data-parallel-size` | 1 | Number of DP replicas | | `--data-parallel-size` | 1 | Number of DP replicas |
| `--startup-time` | None | Simulated startup delay (seconds) | | `--startup-time` | None | Simulated startup delay (seconds) |
| `--planner-profile-data` | None | Path to NPZ file with timing data | | `--planner-profile-data` | None | Path to NPZ file with timing data |
......
...@@ -166,6 +166,14 @@ pub struct MockEngineArgs { ...@@ -166,6 +166,14 @@ pub struct MockEngineArgs {
#[validate(range(min = 0.0))] #[validate(range(min = 0.0))]
pub speedup_ratio: f64, pub speedup_ratio: f64,
/// Additional speedup multiplier applied only to decode steps.
/// Models speculative decoding (e.g. Eagle) where decode throughput improves
/// without affecting prefill latency. The effective decode speedup is
/// `speedup_ratio * decode_speedup_ratio`.
#[builder(default = "1.0")]
#[validate(range(min = 0.0))]
pub decode_speedup_ratio: f64,
#[builder(default = "1")] #[builder(default = "1")]
#[validate(range(min = 1))] #[validate(range(min = 1))]
pub dp_size: u32, pub dp_size: u32,
...@@ -272,6 +280,7 @@ impl MockEngineArgs { ...@@ -272,6 +280,7 @@ impl MockEngineArgs {
"enable_prefix_caching", "enable_prefix_caching",
"enable_chunked_prefill", "enable_chunked_prefill",
"speedup_ratio", "speedup_ratio",
"decode_speedup_ratio",
"dp_size", "dp_size",
"startup_time", "startup_time",
"is_prefill", "is_prefill",
...@@ -348,6 +357,12 @@ impl MockEngineArgs { ...@@ -348,6 +357,12 @@ impl MockEngineArgs {
builder = builder.speedup_ratio(num); builder = builder.speedup_ratio(num);
} }
if let Some(value) = extra_args.get("decode_speedup_ratio")
&& let Some(num) = value.as_f64()
{
builder = builder.decode_speedup_ratio(num);
}
if let Some(value) = extra_args.get("dp_size") if let Some(value) = extra_args.get("dp_size")
&& let Some(num) = value.as_u64() && let Some(num) = value.as_u64()
{ {
......
...@@ -28,7 +28,6 @@ ...@@ -28,7 +28,6 @@
//! ## NOTE //! ## NOTE
//! The current prefill and decoding time simulations are not scientific at all and are WIP //! The current prefill and decoding time simulations are not scientific at all and are WIP
use crate::common::perf_model::PerfModel;
use crate::common::protocols::{ use crate::common::protocols::{
DirectRequest, KvCacheEventSink, MockEngineArgs, MoveBlock, OutputSignal, PreemptionMode, DirectRequest, KvCacheEventSink, MockEngineArgs, MoveBlock, OutputSignal, PreemptionMode,
WorkerType, WorkerType,
...@@ -228,16 +227,7 @@ impl Scheduler { ...@@ -228,16 +227,7 @@ impl Scheduler {
// 2. Simulate prefill + decode // 2. Simulate prefill + decode
simulate_prefill(&mut state, &mut kv_manager, &mut hit_rates, &args).await; simulate_prefill(&mut state, &mut kv_manager, &mut hit_rates, &args).await;
simulate_decode( simulate_decode(&mut state, &mut kv_manager, &output_tx, &args).await;
&mut state,
&mut kv_manager,
&output_tx,
&args.perf_model,
args.block_size,
args.speedup_ratio,
args.preemption_mode,
)
.await;
// 3. Send metrics once per forward pass (after all prefill and decode processing) // 3. Send metrics once per forward pass (after all prefill and decode processing)
let _ = metrics_tx.send(MockerMetrics { let _ = metrics_tx.send(MockerMetrics {
...@@ -429,15 +419,12 @@ async fn simulate_decode( ...@@ -429,15 +419,12 @@ async fn simulate_decode(
state: &mut SchedulerState, state: &mut SchedulerState,
kv_manager: &mut KvManager, kv_manager: &mut KvManager,
output_tx: &Option<mpsc::UnboundedSender<OutputSignal>>, output_tx: &Option<mpsc::UnboundedSender<OutputSignal>>,
perf_model: &PerfModel, args: &MockEngineArgs,
block_size: usize,
speedup_ratio: f64,
preemption_mode: PreemptionMode,
) -> Duration { ) -> Duration {
let start_time = Instant::now(); let start_time = Instant::now();
// Compute decode timing // Compute decode timing
let active_kv_tokens = kv_manager.num_active_blocks() * block_size; let active_kv_tokens = kv_manager.num_active_blocks() * args.block_size;
// Compute average context length across all active decode requests // Compute average context length across all active decode requests
let total_length: usize = state let total_length: usize = state
...@@ -454,7 +441,9 @@ async fn simulate_decode( ...@@ -454,7 +441,9 @@ async fn simulate_decode(
let count = state.decode.len(); let count = state.decode.len();
let context_length = if count > 0 { total_length / count } else { 0 }; let context_length = if count > 0 { total_length / count } else { 0 };
let decoding_time = perf_model.predict_decode_time(active_kv_tokens, context_length); let decoding_time = args
.perf_model
.predict_decode_time(active_kv_tokens, context_length);
let total_time = Duration::from_secs_f64(decoding_time / 1000.0); let total_time = Duration::from_secs_f64(decoding_time / 1000.0);
// Process decoding // Process decoding
...@@ -481,7 +470,7 @@ async fn simulate_decode( ...@@ -481,7 +470,7 @@ async fn simulate_decode(
} }
// Preempt one request and free its blocks // Preempt one request and free its blocks
for signal in state.preempt(preemption_mode) { for signal in state.preempt(args.preemption_mode) {
kv_manager.process(&signal); kv_manager.process(&signal);
} }
...@@ -521,8 +510,9 @@ async fn simulate_decode( ...@@ -521,8 +510,9 @@ async fn simulate_decode(
} }
} }
if speedup_ratio > 0.0 && total_time > Duration::ZERO { let effective_ratio = args.speedup_ratio * args.decode_speedup_ratio;
let sleep_duration = Duration::from_secs_f64(total_time.as_secs_f64() / speedup_ratio); if effective_ratio > 0.0 && total_time > Duration::ZERO {
let sleep_duration = Duration::from_secs_f64(total_time.as_secs_f64() / effective_ratio);
let deadline = start_time + sleep_duration; let deadline = start_time + sleep_duration;
sleep_until_precise(deadline).await; sleep_until_precise(deadline).await;
...@@ -889,16 +879,7 @@ mod tests { ...@@ -889,16 +879,7 @@ mod tests {
// ── Step 3: First simulate_decode ── // ── Step 3: First simulate_decode ──
// R1 generates 1 token, gains a partial block. // R1 generates 1 token, gains a partial block.
simulate_decode( simulate_decode(&mut state, &mut kv_manager, &output_tx, &args).await;
&mut state,
&mut kv_manager,
&output_tx,
&args.perf_model,
args.block_size,
args.speedup_ratio,
args.preemption_mode,
)
.await;
assert_eq!(state.decode.len(), 1); assert_eq!(state.decode.len(), 1);
assert_eq!(state.decode[0], r1_uuid); assert_eq!(state.decode[0], r1_uuid);
...@@ -933,16 +914,7 @@ mod tests { ...@@ -933,16 +914,7 @@ mod tests {
// ── Step 5: Second simulate_decode ── // ── Step 5: Second simulate_decode ──
// R1 generates 2nd token → complete. Frees 3 blocks (1 destroyed, 2 deactivated). // R1 generates 2nd token → complete. Frees 3 blocks (1 destroyed, 2 deactivated).
simulate_decode( simulate_decode(&mut state, &mut kv_manager, &output_tx, &args).await;
&mut state,
&mut kv_manager,
&output_tx,
&args.perf_model,
args.block_size,
args.speedup_ratio,
args.preemption_mode,
)
.await;
assert!(!state.requests.contains_key(&r1_uuid), "R1 completed"); assert!(!state.requests.contains_key(&r1_uuid), "R1 completed");
assert_eq!(state.decode.len(), 0); assert_eq!(state.decode.len(), 0);
...@@ -965,16 +937,7 @@ mod tests { ...@@ -965,16 +937,7 @@ mod tests {
// ── Steps 7+: Cycle until all requests complete ── // ── Steps 7+: Cycle until all requests complete ──
loop { loop {
simulate_prefill(&mut state, &mut kv_manager, &mut hit_rates, &args).await; simulate_prefill(&mut state, &mut kv_manager, &mut hit_rates, &args).await;
simulate_decode( simulate_decode(&mut state, &mut kv_manager, &output_tx, &args).await;
&mut state,
&mut kv_manager,
&output_tx,
&args.perf_model,
args.block_size,
args.speedup_ratio,
args.preemption_mode,
)
.await;
if state.is_empty() { if state.is_empty() {
break; break;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment