Unverified Commit a1e1954a authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

fix(replay): harden router scaling invariants (#8236)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 3da6f4d4
......@@ -570,14 +570,25 @@ impl AggRuntime {
/// work in the engine.
pub(in crate::replay) fn apply_scaling(&mut self, target_workers: usize) -> anyhow::Result<()> {
let (added, newly_marked) = self.engine.apply_target_count(target_workers);
if let Some(router) = self.router.as_mut() {
#[cfg(test)]
if let Some(new_len) = added.iter().max().map(|id| id + 1) {
self.worker_active_requests.resize(new_len, Vec::new());
}
let admissions = if let Some(router) = self.router.as_mut() {
for id in added {
router.add_worker(id)?;
}
for id in newly_marked {
router.remove_worker(id)?;
}
}
let admissions = router.on_topology_changed(self.now_ms)?.admissions;
self.record_router_pending();
admissions
} else {
Vec::new()
};
self.dispatch_router_admissions(admissions)?;
self.record_in_flight_peak();
Ok(())
}
......@@ -683,7 +694,7 @@ mod tests {
use crate::common::protocols::{EngineType, SglangArgs};
use crate::loadgen::{SessionTrace, Trace, TurnTrace};
use crate::replay::normalize_trace_requests;
use dynamo_kv_router::config::RouterQueuePolicy;
use dynamo_kv_router::config::{KvRouterConfig, RouterQueuePolicy};
fn replay_args(enable_prefix_caching: bool, enable_chunked_prefill: bool) -> MockEngineArgs {
MockEngineArgs::builder()
......@@ -725,6 +736,13 @@ mod tests {
.unwrap()
}
fn planner_router_config() -> KvRouterConfig {
KvRouterConfig {
router_queue_threshold: Some(0.5),
..KvRouterConfig::default()
}
}
fn sglang_replay_args() -> MockEngineArgs {
MockEngineArgs::builder()
.engine_type(EngineType::Sglang)
......@@ -1019,6 +1037,59 @@ mod tests {
);
}
#[test]
fn test_apply_scaling_drains_router_pending_immediately() {
let args = queueing_router_args(RouterQueuePolicy::Fcfs);
let mut runtime = AggRuntime::new(
&args,
Some(planner_router_config()),
None,
normalize_trace_requests(
vec![
DirectRequest {
tokens: vec![11; 64],
max_output_tokens: 8,
uuid: Some(Uuid::from_u128(1)),
dp_rank: 0,
arrival_timestamp_ms: Some(0.0),
},
DirectRequest {
tokens: vec![22; 64],
max_output_tokens: 8,
uuid: Some(Uuid::from_u128(2)),
dp_rank: 0,
arrival_timestamp_ms: Some(0.0),
},
],
1.0,
)
.unwrap(),
1,
ReplayMode::Trace,
ReplayRouterMode::KvRouter,
)
.unwrap();
assert!(runtime.advance_one_timestamp().unwrap());
assert_eq!(
runtime.debug_snapshot().router_pending_request_ids,
vec![Uuid::from_u128(2)]
);
runtime.apply_scaling(2).unwrap();
assert!(
runtime
.debug_snapshot()
.router_pending_request_ids
.is_empty()
);
assert_eq!(
runtime.stats.assigned_worker_by_uuid[&Uuid::from_u128(2)],
1
);
}
#[test]
fn test_multi_worker_trace_round_robin_assigns_same_timestamp_requests_deterministically() {
let args = replay_args(false, true);
......
......@@ -30,7 +30,7 @@ use crate::loadgen::ReplayRequestHashes;
use crate::replay::ReplayPrefillLoadEstimator;
use crate::replay::router_shared::{
ReplayNoopPublisher, ReplayWorkerConfig, replay_policy, replay_router_config, replay_selector,
replay_slots, replay_workers_with_configs,
replay_slots, replay_worker_config, replay_workers_with_configs,
};
type ReplayQueueKey = <RouterSchedulingPolicy as SchedulingPolicy>::Key;
......@@ -181,6 +181,7 @@ pub(crate) struct OfflineReplayRouter {
config: KvRouterConfig,
block_size: u32,
queue_threshold: Option<f64>,
worker_config_template: ReplayWorkerConfig,
workers_with_configs: HashMap<WorkerId, ReplayWorkerConfig>,
slots: Arc<ActiveSequencesMultiWorker<ReplayNoopPublisher>>,
selector: DefaultWorkerSelector,
......@@ -200,20 +201,18 @@ impl OfflineReplayRouter {
num_workers: usize,
) -> Result<Self> {
let config = replay_router_config(args, router_config);
let worker_config_template = replay_worker_config(args);
let workers_with_configs = replay_workers_with_configs(args, num_workers);
let slots = replay_slots(args, &workers_with_configs);
let selector = replay_selector(&config);
let policy = replay_policy(&config, args);
let queue_threshold = if num_workers > 1 {
config.router_queue_threshold
} else {
None
};
let queue_threshold = config.router_queue_threshold;
Ok(Self {
config,
block_size: args.block_size as u32,
queue_threshold,
worker_config_template,
workers_with_configs,
slots,
selector,
......@@ -310,29 +309,18 @@ impl OfflineReplayRouter {
self.pending.len()
}
/// Register a new worker with the router, cloning the config from existing workers.
/// Register a new worker with the router without disturbing existing slot state.
pub(crate) fn add_worker(&mut self, worker_id: usize) -> Result<()> {
let config = self
.workers_with_configs
.values()
.next()
.ok_or_else(|| anyhow!("cannot add worker to router with no existing workers"))?
.clone();
let wid = worker_id as WorkerId;
self.workers_with_configs.insert(wid, config);
// Rebuild the slots with the full worker set
let dp_range: HashMap<u64, (u32, u32)> = self
if self
.workers_with_configs
.keys()
.map(|&id| (id, (0u32, 1u32)))
.collect();
self.slots.update_workers(&dp_range);
// Enable queueing if we now have more than one worker
if self.workers_with_configs.len() > 1 && self.queue_threshold.is_none() {
self.queue_threshold = self.config.router_queue_threshold;
.insert(wid, self.worker_config_template.clone())
.is_some()
{
return Err(anyhow!("router worker {worker_id} already exists"));
}
let dp_range = HashMap::from([(wid, (0u32, 1u32))]);
self.slots.register_external_workers(&dp_range);
Ok(())
}
......@@ -352,6 +340,20 @@ impl OfflineReplayRouter {
Ok(())
}
pub(crate) fn on_topology_changed(&mut self, now_ms: f64) -> Result<RouterEffects> {
if self.workers_with_configs.is_empty() {
return Ok(RouterEffects::default());
}
let decay_now = self.decay_now(now_ms);
Ok(RouterEffects {
admissions: self
.drain_pending(decay_now)?
.into_iter()
.map(|(uuid, worker_idx)| WorkerAdmission { uuid, worker_idx })
.collect(),
})
}
#[cfg(test)]
pub(crate) fn debug_snapshot(&self, now_ms: f64) -> OfflineRouterSnapshot {
let decay_now = self.decay_now(now_ms);
......@@ -598,7 +600,7 @@ mod tests {
use dynamo_kv_router::config::{KvRouterConfig, RouterPrefillLoadModel};
use uuid::Uuid;
use super::OfflineReplayRouter;
use super::{OfflineReplayRouter, WorkerAdmission};
use crate::common::protocols::{DirectRequest, MockEngineArgs};
use crate::replay::ReplayPrefillLoadEstimator;
......@@ -625,6 +627,14 @@ mod tests {
.unwrap()
}
fn queueing_args() -> MockEngineArgs {
MockEngineArgs::builder()
.block_size(64)
.max_num_batched_tokens(Some(64))
.build()
.unwrap()
}
fn router_config() -> KvRouterConfig {
KvRouterConfig {
router_track_prefill_tokens: true,
......@@ -633,6 +643,13 @@ mod tests {
}
}
fn queueing_router_config() -> KvRouterConfig {
KvRouterConfig {
router_queue_threshold: Some(0.5),
..KvRouterConfig::default()
}
}
fn estimator(duration: Duration) -> ReplayPrefillLoadEstimator {
Arc::new(FixedPrefillLoadEstimator { duration })
}
......@@ -674,4 +691,145 @@ mod tests {
vec![(0, 0)]
);
}
#[test]
fn test_single_worker_router_honors_queue_threshold() {
let mut router =
OfflineReplayRouter::new(&queueing_args(), Some(queueing_router_config()), None, 1)
.unwrap();
let first = router
.on_request_arrival(&request(1, 7), None, 0.0)
.unwrap();
assert_eq!(first.admissions.len(), 1);
let second = router
.on_request_arrival(&request(2, 8), None, 0.0)
.unwrap();
assert!(second.admissions.is_empty());
assert_eq!(router.pending_count(), 1);
assert_eq!(
router
.debug_snapshot(0.0)
.pending
.into_iter()
.map(|request| request.uuid)
.collect::<Vec<_>>(),
vec![Uuid::from_u128(2)]
);
}
#[test]
fn test_scaled_down_router_matches_fresh_single_worker_queueing() {
let mut fresh =
OfflineReplayRouter::new(&queueing_args(), Some(queueing_router_config()), None, 1)
.unwrap();
fresh.on_request_arrival(&request(1, 7), None, 0.0).unwrap();
fresh.on_request_arrival(&request(2, 8), None, 0.0).unwrap();
let mut scaled =
OfflineReplayRouter::new(&queueing_args(), Some(queueing_router_config()), None, 2)
.unwrap();
scaled.remove_worker(1).unwrap();
scaled
.on_request_arrival(&request(1, 7), None, 0.0)
.unwrap();
scaled
.on_request_arrival(&request(2, 8), None, 0.0)
.unwrap();
assert_eq!(scaled.pending_count(), fresh.pending_count());
assert_eq!(
scaled
.debug_snapshot(0.0)
.pending
.into_iter()
.map(|request| request.uuid)
.collect::<Vec<_>>(),
fresh
.debug_snapshot(0.0)
.pending
.into_iter()
.map(|request| request.uuid)
.collect::<Vec<_>>()
);
}
#[test]
fn test_router_can_scale_from_zero_workers() {
let mut router =
OfflineReplayRouter::new(&queueing_args(), Some(queueing_router_config()), None, 1)
.unwrap();
router.remove_worker(0).unwrap();
router.add_worker(3).unwrap();
let effects = router
.on_request_arrival(&request(1, 7), None, 0.0)
.unwrap();
assert_eq!(
effects.admissions,
vec![WorkerAdmission {
uuid: Uuid::from_u128(1),
worker_idx: 3,
}]
);
}
#[test]
fn test_add_worker_preserves_draining_worker_state() {
let mut router =
OfflineReplayRouter::new(&queueing_args(), Some(queueing_router_config()), None, 2)
.unwrap();
router
.on_request_arrival(&request(1, 7), None, 0.0)
.unwrap();
router
.on_request_arrival(&request(2, 8), None, 0.0)
.unwrap();
assert_eq!(
router.debug_snapshot(0.0).active_tokens_by_worker,
vec![(0, 64), (1, 64)]
);
router.remove_worker(1).unwrap();
router.add_worker(2).unwrap();
assert_eq!(
router.debug_snapshot(0.0).active_tokens_by_worker,
vec![(0, 64), (1, 64), (2, 0)]
);
}
#[test]
fn test_topology_change_drains_pending_after_scale_up() {
let mut router =
OfflineReplayRouter::new(&queueing_args(), Some(queueing_router_config()), None, 1)
.unwrap();
router
.on_request_arrival(&request(1, 7), None, 0.0)
.unwrap();
router
.on_request_arrival(&request(2, 8), None, 0.0)
.unwrap();
assert_eq!(router.pending_count(), 1);
router.add_worker(1).unwrap();
let effects = router.on_topology_changed(0.0).unwrap();
assert_eq!(
effects.admissions,
vec![WorkerAdmission {
uuid: Uuid::from_u128(2),
worker_idx: 1,
}]
);
assert_eq!(router.pending_count(), 0);
assert_eq!(
router.debug_snapshot(0.0).active_tokens_by_worker,
vec![(0, 64), (1, 64)]
);
}
}
......@@ -791,23 +791,32 @@ impl DisaggRuntime {
target_decode: usize,
) -> Result<()> {
let (added, newly_marked) = self.prefill_engine.apply_target_count(target_prefill);
if let Some(router) = self.prefill_router.as_mut() {
let prefill_admissions = if let Some(router) = self.prefill_router.as_mut() {
for id in added {
router.add_worker(id)?;
}
for id in newly_marked {
router.remove_worker(id)?;
}
}
router.on_topology_changed(self.now_ms)?.admissions
} else {
Vec::new()
};
let (added, newly_marked) = self.decode_engine.apply_target_count(target_decode);
if let Some(router) = self.decode_router.as_mut() {
let decode_admissions = if let Some(router) = self.decode_router.as_mut() {
for id in added {
router.add_worker(id)?;
}
for id in newly_marked {
router.remove_worker(id)?;
}
}
router.on_topology_changed(self.now_ms)?.admissions
} else {
Vec::new()
};
self.record_router_pending();
self.dispatch_prefill_admissions(prefill_admissions)?;
self.dispatch_decode_admissions(decode_admissions)?;
Ok(())
}
......@@ -872,6 +881,8 @@ fn derive_decode_router_config(
#[cfg(test)]
mod tests {
use std::collections::VecDeque;
use super::super::entrypoints::{
run_concurrency_collect, run_concurrency_workload_collect, run_trace_collect,
run_trace_workload_collect,
......@@ -940,6 +951,30 @@ mod tests {
config
}
fn scaling_test_args(worker_type: WorkerType) -> MockEngineArgs {
MockEngineArgs::builder()
.block_size(64)
.num_gpu_blocks(512)
.max_num_batched_tokens(Some(64))
.max_num_seqs(Some(8))
.enable_prefix_caching(true)
.enable_chunked_prefill(true)
.speedup_ratio(1.0)
.decode_speedup_ratio(1.0)
.worker_type(worker_type)
.build()
.unwrap()
}
fn scaling_test_disagg_config() -> OfflineDisaggReplayConfig {
OfflineDisaggReplayConfig {
prefill_args: scaling_test_args(WorkerType::Prefill),
decode_args: scaling_test_args(WorkerType::Decode),
num_prefill_workers: 1,
num_decode_workers: 1,
}
}
fn router_config() -> KvRouterConfig {
KvRouterConfig {
router_queue_threshold: Some(1.25),
......@@ -947,6 +982,13 @@ mod tests {
}
}
fn planner_router_config() -> KvRouterConfig {
KvRouterConfig {
router_queue_threshold: Some(0.5),
..KvRouterConfig::default()
}
}
fn request(
uuid: u128,
prompt_tokens: usize,
......@@ -1235,6 +1277,34 @@ mod tests {
assert!(delayed_stats.handoff_ms[&uuid] >= 120.0);
}
#[test]
fn test_apply_scaling_drains_prefill_router_pending_immediately() {
let config = scaling_test_disagg_config();
let mut runtime = DisaggRuntime::new(
&config,
Some(planner_router_config()),
None,
VecDeque::from([request(1, 64, 8, 0.0), request(2, 64, 8, 0.0)]),
ReplayMode::Trace,
ReplayRouterMode::KvRouter,
)
.unwrap();
runtime.advance_to(0.0).unwrap();
assert_eq!(
runtime.state(Uuid::from_u128(2)).unwrap().phase,
DisaggPhase::QueuedPrefill
);
runtime.apply_scaling(2, 1).unwrap();
assert_eq!(
runtime.state(Uuid::from_u128(2)).unwrap().phase,
DisaggPhase::RunningPrefill
);
assert_eq!(runtime.stats.prefill_assignments[&Uuid::from_u128(2)], 1);
}
#[test]
fn test_trace_workload_follow_up_turn_arrives_after_completion_plus_delay() {
let (collector, _) = run_trace_workload_collect(
......
......@@ -63,7 +63,7 @@ pub(super) type ReplayScheduler = LocalScheduler<
DefaultWorkerSelector,
>;
fn replay_worker_config(args: &MockEngineArgs) -> ReplayWorkerConfig {
pub(in crate::replay) fn replay_worker_config(args: &MockEngineArgs) -> ReplayWorkerConfig {
ReplayWorkerConfig {
max_num_batched_tokens: args
.max_num_batched_tokens
......
......@@ -75,6 +75,9 @@ ROUTER_AIC_CONFIG = {
"aic_tp_size": 1,
"aic_model_path": "Qwen/Qwen3-32B",
}
ROUND_ROBIN_MOCKER_SKIP_REASON = (
"Flaky on CI: tcp nondurable round-robin mocker router path timed out"
)
def _require_router_aic() -> dict[str, Any]:
......@@ -780,6 +783,7 @@ def _launch_disagg_workers(
indirect=["durable_kv_events"],
)
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
@pytest.mark.skip(reason=ROUND_ROBIN_MOCKER_SKIP_REASON)
def test_mocker_router(
request,
runtime_services_dynamic_ports,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment