Unverified Commit 1da603a4 authored by atchernych's avatar atchernych Committed by GitHub
Browse files

fix: Bug 5792966 [DYN -1729] (#5385)


Signed-off-by: default avatarAnna Tchernych <atchernych@nvidia.com>
parent 4be72b6f
......@@ -13,6 +13,7 @@ use dynamo_llm::{
discovery::{KvWorkerMonitor, ModelWatcher},
kv_router::{protocols::*, publisher::KvEventPublisher},
};
use dynamo_runtime::discovery::DiscoveryQuery;
use dynamo_runtime::{DistributedRuntime, Worker};
static WK: OnceCell<Worker> = OnceCell::new();
static DRT: AsyncOnceCell<DistributedRuntime> = AsyncOnceCell::new();
......@@ -56,6 +57,40 @@ pub enum DynamoLlmResult {
ERR = 1,
}
/// Wait for the discovery daemon to sync and return at least one instance.
/// This ensures list() calls will have data available.
/// Returns the number of instances found, or 0 if timed out.
async fn wait_for_discovery_sync(drt: &DistributedRuntime, timeout_secs: u64) -> usize {
tracing::info!("Waiting for discovery to sync...");
let discovery = drt.discovery();
let timeout = std::time::Duration::from_secs(timeout_secs);
let start = std::time::Instant::now();
loop {
match discovery.list(DiscoveryQuery::AllModels).await {
Ok(instances) if !instances.is_empty() => {
tracing::info!(
"Discovery sync complete: found {} instances",
instances.len()
);
return instances.len();
}
Ok(_) => {
if start.elapsed() > timeout {
tracing::warn!("Discovery sync timed out waiting for instances");
return 0;
}
tracing::debug!("No instances yet, waiting...");
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
Err(e) => {
tracing::warn!("Discovery list error: {}, continuing...", e);
return 0;
}
}
}
}
/// # Safety
/// the namespace_c_str and component_c_str are passed as pointers to C strings
#[unsafe(no_mangle)]
......@@ -80,7 +115,20 @@ pub unsafe extern "C" fn dynamo_llm_init(
.get_or_try_init(async { DistributedRuntime::from_settings(rt.clone()).await })
.await
{
Ok(_) => Ok(()),
Ok(drt) => {
// Wait for discovery to sync before returning
// This is needed because dynamo_create_worker_selection_pipeline() is called
// immediately after, and it needs discovery.list() to return data
// the discovery daemon takes time to query K8s and returns async, so we need to wait.
let instance_count = wait_for_discovery_sync(drt, 10).await;
if instance_count == 0 {
tracing::error!(
"Discovery sync failed: no worker instances found. Is the backend running?"
);
return Err(DynamoLlmResult::ERR);
}
Ok(())
}
Err(e) => {
tracing::error!(error = ?e, "Failed to initialize distributed runtime");
Err(DynamoLlmResult::ERR)
......@@ -354,7 +402,7 @@ use std::pin::Pin;
const GENERATE_ENDPOINT: &str = "generate";
use anyhow::Context;
use dynamo_runtime::{Runtime, distributed::DistributedConfig, traits::DistributedRuntimeProvider};
use dynamo_runtime::{Runtime, traits::DistributedRuntimeProvider};
use dynamo_llm::discovery::ModelManager;
use dynamo_llm::entrypoint::build_routed_pipeline;
......@@ -1354,10 +1402,28 @@ pub async fn create_worker_selection_pipeline_chat(
)> {
use dynamo_llm::kv_router::PrefillRouter;
let runtime = Runtime::from_settings()?;
let dst_config = DistributedConfig::from_settings();
let drt_owned = DistributedRuntime::new(runtime, dst_config).await?;
let distributed_runtime: &'static DistributedRuntime = Box::leak(Box::new(drt_owned));
// Use the global DRT singleton - initialize if not already done
// Check if already initialized (by dynamo_llm_init) to avoid redundant sync wait
let needs_sync = DRT.get().is_none();
let distributed_runtime = DRT
.get_or_try_init(async {
tracing::debug!("Initializing DistributedRuntime singleton (standalone mode)");
DistributedRuntime::from_settings(Runtime::from_settings()?).await
})
.await
.map_err(|e| anyhow::anyhow!("Failed to initialize DistributedRuntime: {}", e))?;
// Only wait for discovery sync if we just initialized the DRT
// (dynamo_llm_init already does this when it initializes)
if needs_sync {
let instance_count = wait_for_discovery_sync(distributed_runtime, 10).await;
if instance_count == 0 {
return Err(anyhow::anyhow!(
"Discovery sync failed: no worker instances found. Is the backend running?"
));
}
}
let component = distributed_runtime
.namespace(namespace)?
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment