Unverified Commit 501ef021 authored by Biswa Panda's avatar Biswa Panda Committed by GitHub
Browse files

fix: use channel to avoid race condition from async nats registration task (#4758)

parent 3bc129dc
......@@ -275,10 +275,38 @@ impl ComponentBuilder {
pub fn build(self) -> Result<Component, anyhow::Error> {
let component = self.build_internal()?;
// If this component is using NATS, register the NATS service in background
// If this component is using NATS, register the NATS service and wait for completion.
// This prevents a race condition where serve_endpoint() tries to look up the service
// before it's registered in the component registry.
let drt = component.drt();
if drt.request_plane().is_nats() {
drt.register_nats_service(component.clone());
let mut rx = drt.register_nats_service(component.clone());
// Wait synchronously for the NATS service registration to complete.
// Uses block_in_place() to safely call blocking_recv() from async contexts.
// This temporarily moves the current task off the runtime thread to allow
// blocking without deadlocking the runtime.
let result = tokio::task::block_in_place(|| rx.blocking_recv());
match result {
Some(Ok(())) => {
tracing::debug!(
component = component.service_name(),
"NATS service registration completed"
);
}
Some(Err(e)) => {
return Err(anyhow::anyhow!(
"NATS service registration failed for component '{}': {}",
component.service_name(),
e
));
}
None => {
return Err(anyhow::anyhow!(
"NATS service registration channel closed unexpectedly for component '{}'",
component.service_name()
));
}
}
}
Ok(component)
}
......
......@@ -423,7 +423,16 @@ impl DistributedRuntime {
/// DEPRECATED: This method exists only for NATS request plane support.
/// Once everything uses the TCP request plane, this can be removed along with
/// the NATS service registration infrastructure.
pub fn register_nats_service(&self, component: Component) {
///
/// Returns a receiver that signals when the NATS service registration is complete.
/// The caller should use `blocking_recv()` to wait for completion.
pub fn register_nats_service(
&self,
component: Component,
) -> tokio::sync::mpsc::Receiver<Result<(), String>> {
// Create a oneshot-style channel (capacity 1) to signal completion
let (tx, rx) = tokio::sync::mpsc::channel::<Result<(), String>>(1);
let drt = self.clone();
self.runtime().secondary().spawn(async move {
let service_name = component.service_name();
......@@ -440,11 +449,16 @@ impl DistributedRuntime {
// The NATS service is per component, but it is called from `serve_endpoint`, and there
// are often multiple endpoints for a component (e.g. `clear_kv_blocks` and `generate`).
tracing::trace!("Service {service_name} already exists");
// Signal success - service already exists
let _ = tx.send(Ok(())).await;
return;
}
let Some(nats_client) = drt.nats_client.as_ref() else {
tracing::error!("Cannot create NATS service without NATS.");
let _ = tx
.send(Err("Cannot create NATS service without NATS".to_string()))
.await;
return;
};
let description = None;
......@@ -458,6 +472,7 @@ impl DistributedRuntime {
Ok(service) => service,
Err(err) => {
tracing::error!(error = %err, component = service_name, "Failed to build NATS service");
let _ = tx.send(Err(format!("Failed to build NATS service: {err}"))).await;
return;
}
};
......@@ -477,7 +492,12 @@ impl DistributedRuntime {
// are often multiple endpoints for a component (e.g. `clear_kv_blocks` and `generate`).
// TODO: Is this still true?
}
// Signal completion - service registered successfully
let _ = tx.send(Ok(())).await;
});
rx
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment