"ssh:/git@developer.sourcefind.cn:2222/OpenDAS/dynamo.git" did not exist on "50f1e0e17aebadbe5de9e337c6aaa5262f8559f2"
Unverified Commit b39aa54f authored by jthomson04's avatar jthomson04 Committed by GitHub
Browse files

perf: Fix prefill router round robin (#5313)


Signed-off-by: default avatarjthomson04 <jwillthomson19@gmail.com>
parent 2e381b3e
...@@ -78,6 +78,14 @@ impl InnerPrefillRouter { ...@@ -78,6 +78,14 @@ impl InnerPrefillRouter {
InnerPrefillRouter::KvRouter(_) => None, InnerPrefillRouter::KvRouter(_) => None,
} }
} }
/// Peek next worker without incrementing state (for non-KV modes only)
fn peek_next_worker(&self) -> Option<u64> {
match self {
InnerPrefillRouter::SimpleRouter(router) => router.peek_next_worker(),
InnerPrefillRouter::KvRouter(_) => None,
}
}
} }
/// PrefillRouter is a forward-only operator that sits between Migration and the decode router. /// PrefillRouter is a forward-only operator that sits between Migration and the decode router.
...@@ -267,7 +275,9 @@ impl PrefillRouter { ...@@ -267,7 +275,9 @@ impl PrefillRouter {
} }
} else { } else {
// Non-KV mode: use PushRouter's stateful selection // Non-KV mode: use PushRouter's stateful selection
let worker_id = prefill_router.select_next_worker()?; // We use peek_next_worker instead of select_next_worker to avoid double-incrementing the counter
// if we fall back to the original path.
let worker_id = prefill_router.peek_next_worker()?;
(worker_id, 0) (worker_id, 0)
}; };
...@@ -486,6 +496,14 @@ impl ...@@ -486,6 +496,14 @@ impl
.await .await
{ {
// Bootstrap optimization path: spawn prefill in background // Bootstrap optimization path: spawn prefill in background
// We successfully used the peeked worker, so we must now advance the router state
// to ensure the next request gets a different worker.
if !self.router_mode.is_kv_routing()
&& let Some(router) = self.prefill_router.get()
{
router.select_next_worker();
}
let routing = prefill_req.routing_mut(); let routing = prefill_req.routing_mut();
routing.prefill_worker_id = Some(worker_id); routing.prefill_worker_id = Some(worker_id);
routing.dp_rank = Some(dp_rank); routing.dp_rank = Some(dp_rank);
......
...@@ -226,6 +226,36 @@ where ...@@ -226,6 +226,36 @@ where
} }
} }
/// Peek the next worker according to the routing mode without incrementing the counter.
/// Useful for checking if a worker is suitable before committing to it.
pub fn peek_next_worker(&self) -> Option<u64> {
let instance_ids = self.client.instance_ids_avail();
let count = instance_ids.len();
if count == 0 {
return None;
}
match self.router_mode {
RouterMode::RoundRobin => {
// Just peek at the current counter value without incrementing
let counter = self.round_robin_counter.load(Ordering::Relaxed) as usize;
Some(instance_ids[counter % count])
}
RouterMode::Random => {
// For random, peeking implies a fresh random selection since it's stateless.
// Note: The caller must realize that select_next_worker() will pick a DIFFERENT random worker.
let counter = rand::rng().random::<u64>() as usize;
Some(instance_ids[counter % count])
}
_ => {
panic!(
"peek_next_worker should not be called for {:?} routing mode",
self.router_mode
)
}
}
}
/* /*
pub async fn r#static(&self, request: SingleIn<T>) -> anyhow::Result<ManyOut<U>> { pub async fn r#static(&self, request: SingleIn<T>) -> anyhow::Result<ManyOut<U>> {
let subject = self.client.endpoint.subject(); let subject = self.client.endpoint.subject();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment