"vscode:/vscode.git/clone" did not exist on "bfc59cd26567dd9a22746de189851cb2ee901992"
Unverified Commit f51ec24d authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

fix: test_router_decisions async fix race condition (#6084)


Signed-off-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent 241cc783
......@@ -7,7 +7,7 @@
//! This module provides the runtime-dependent engine wrapper.
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::{Arc, LazyLock};
use std::time::Duration;
use anyhow::Result;
......@@ -19,6 +19,7 @@ use tokio_util::sync::CancellationToken;
use uuid::Uuid;
use dynamo_runtime::DistributedRuntime;
use dynamo_runtime::config::environment_names::mocker as env_mocker;
use dynamo_runtime::protocols::annotated::Annotated;
use dynamo_runtime::{
component::Component,
......@@ -42,6 +43,9 @@ pub use dynamo_mocker::{
pub const MOCKER_COMPONENT: &str = "mocker";
static MOCKER_DIRECT_SYNC: LazyLock<bool> =
LazyLock::new(|| dynamo_runtime::config::env_is_truthy(env_mocker::DYN_MOCKER_SYNC_DIRECT));
/// Wrapper to adapt KvEventPublisher to the KvCacheEventSink trait
struct KvEventSinkAdapter(KvEventPublisher);
......@@ -131,9 +135,36 @@ impl MockVllmEngine {
Ok(())
}
pub fn direct(&self, request: DirectRequest, dp_rank: usize) {
/// Send a request to the appropriate scheduler.
///
/// Set `DYN_MOCKER_SYNC_DIRECT=1` to use the original direct path.
/// - `DYN_MOCKER_SYNC_DIRECT=1` (original, race-condition prone): 922/1000 pass
/// - `DYN_MOCKER_SYNC_DIRECT=0` (use timeout to wait for init): 1000/1000 pass
pub async fn direct(&self, request: DirectRequest, dp_rank: usize) {
// `direct()` can be called before `start_schedulers()` finishes populating
// `request_senders` under load. The original path panics immediately; the
// default path waits briefly for initialization to complete.
if *MOCKER_DIRECT_SYNC {
let senders = self.request_senders.get().expect("Not initialized");
let _ = senders[dp_rank].send(request);
return;
}
// Poll request_senders until initialized (or time out) to avoid the startup race.
let start = std::time::Instant::now();
loop {
if let Some(senders) = self.request_senders.get() {
let _ = senders[dp_rank].send(request);
return;
}
// We can parameterize the timeout to be more flexible.
// For example, on production this could be very short, but in a
// CPU-heavy test environment, this should be very high.
if start.elapsed() > Duration::from_secs(10) {
panic!("Scheduler initialization timed out after 10s");
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
/// Create schedulers and spawn their background tasks for distributing token notifications
......@@ -355,7 +386,7 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<LLMEngineOutput>, Error>
}
// Send the request to the appropriate scheduler based on dp_rank
self.direct(direct_request, dp_rank as usize);
self.direct(direct_request, dp_rank as usize).await;
// Create a simple channel for the stream
let (stream_tx, stream_rx) = mpsc::unbounded_channel::<LLMEngineOutput>();
......
......@@ -361,6 +361,12 @@ pub mod build {
pub mod mocker {
/// Enable structured KV cache allocation/eviction trace logs (set to "1" or "true" to enable)
pub const DYN_MOCKER_KV_CACHE_TRACE: &str = "DYN_MOCKER_KV_CACHE_TRACE";
/// Use the original direct() code path in the mocker request dispatch.
///
/// This path is race-prone during startup; prefer leaving it unset unless you are
/// explicitly trying to reproduce the original behavior.
pub const DYN_MOCKER_SYNC_DIRECT: &str = "DYN_MOCKER_SYNC_DIRECT";
}
/// Testing environment variables
......@@ -463,6 +469,7 @@ mod tests {
build::OUT_DIR,
// Mocker
mocker::DYN_MOCKER_KV_CACHE_TRACE,
mocker::DYN_MOCKER_SYNC_DIRECT,
// Testing
testing::DYN_QUEUED_UP_PROCESSING,
testing::DYN_SOAK_RUN_DURATION,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment