Unverified Commit 8f9c9998 authored by jthomson04's avatar jthomson04 Committed by GitHub
Browse files

fix(runtime): handle instance disappearance in push router transport resolution (#8007)


Co-authored-by: default avatarClaude Opus 4.6 (1M context) <noreply@anthropic.com>
parent cab6323c
......@@ -281,6 +281,13 @@ impl Client {
});
}
/// Override `instance_avail` for testing. This allows creating an inconsistency
/// between `instance_ids_avail()` and `instances()` to simulate race conditions.
#[cfg(test)]
pub(crate) fn override_instance_avail(&self, ids: Vec<u64>) {
self.instance_avail.store(Arc::new(ids));
}
async fn get_or_create_dynamic_instance_source(
endpoint: &Endpoint,
) -> Result<Arc<tokio::sync::watch::Receiver<Vec<Instance>>>> {
......
......@@ -490,7 +490,7 @@ where
async fn generate_with_fault_detection(
&self,
instance_id: u64,
mut instance_id: u64,
request: SingleIn<T>,
) -> anyhow::Result<ManyOut<U>> {
let route_start = Instant::now();
......@@ -531,43 +531,77 @@ where
}
}
// Get the address based on discovered transport type
// Get the address based on discovered transport type.
// If the selected instance disappeared between selection and dispatch
// (e.g. deregistered during scale-down), fall back to another available
// instance rather than returning a spurious 500.
let (address, _transport_kind) = {
use crate::component::TransportType;
// Get the instance and use its actual transport type
let instances = self.client.instances();
let instance = instances
.iter()
.find(|i| i.instance_id == instance_id)
.ok_or_else(|| {
anyhow::anyhow!("Instance {} not found in available instances", instance_id)
})?;
match &instance.transport {
TransportType::Http(http_endpoint) => {
tracing::debug!(
instance_id = instance_id,
http_endpoint = %http_endpoint,
"Using HTTP transport for instance"
);
(http_endpoint.clone(), "transport.http.request")
}
TransportType::Tcp(tcp_endpoint) => {
tracing::debug!(
instance_id = instance_id,
tcp_endpoint = %tcp_endpoint,
"Using TCP transport for instance"
);
(tcp_endpoint.clone(), "transport.tcp.request")
}
TransportType::Nats(subject) => {
tracing::debug!(
instance_id = instance_id,
subject = %subject,
"Using NATS transport for instance"
);
(subject.clone(), "transport.nats.request")
let resolve_transport = |id: u64| {
let instances = self.client.instances();
instances
.iter()
.find(|i| i.instance_id == id)
.map(|instance| match &instance.transport {
TransportType::Http(http_endpoint) => {
tracing::debug!(
instance_id = id,
http_endpoint = %http_endpoint,
"Using HTTP transport for instance"
);
(http_endpoint.clone(), "transport.http.request")
}
TransportType::Tcp(tcp_endpoint) => {
tracing::debug!(
instance_id = id,
tcp_endpoint = %tcp_endpoint,
"Using TCP transport for instance"
);
(tcp_endpoint.clone(), "transport.tcp.request")
}
TransportType::Nats(subject) => {
tracing::debug!(
instance_id = id,
subject = %subject,
"Using NATS transport for instance"
);
(subject.clone(), "transport.nats.request")
}
})
};
if let Some(result) = resolve_transport(instance_id) {
result
} else {
// Instance vanished — pick a different one from the current
// availability list and retry the lookup once.
let avail = self.client.instance_ids_avail();
let fallback_id = avail.iter().copied().find(|&id| id != instance_id);
match fallback_id {
Some(id) => {
tracing::warn!(
original_instance = instance_id,
fallback_instance = id,
"Instance disappeared during routing, reselecting"
);
instance_id = id;
resolve_transport(id).ok_or_else(|| {
anyhow::anyhow!(
"Fallback instance {} also not found for endpoint {}",
id,
self.client.endpoint.id()
)
})?
}
None => {
return Err(anyhow::anyhow!(
"Instance {} not found and no other instances available \
for endpoint {}",
instance_id,
self.client.endpoint.id()
));
}
}
}
};
......@@ -864,4 +898,104 @@ mod tests {
rt.shutdown();
}
/// When the router selects an instance that has deregistered between selection
/// and transport resolution, it should fall back to another available instance
/// rather than returning a 500 error.
#[tokio::test]
async fn transport_resolution_falls_back_when_selected_instance_disappears() {
let rt = Runtime::from_current().unwrap();
let drt = DistributedRuntime::new(rt.clone(), DistributedConfig::process_local())
.await
.unwrap();
let ns = drt
.namespace("test_transport_fallback".to_string())
.unwrap();
let component = ns.component("test_component".to_string()).unwrap();
let endpoint = component.endpoint("test_endpoint".to_string());
let client = endpoint.client().await.unwrap();
// Register one real instance so it appears in instance_source.
endpoint.register_endpoint_instance().await.unwrap();
client.wait_for_instances().await.unwrap();
let real_id = client.instance_ids()[0];
// Inject a stale ID into instance_avail that does NOT exist in
// instance_source. This simulates the race window where an instance
// deregistered after selection but before transport resolution.
let stale_id = real_id + 1000;
client.override_instance_avail(vec![stale_id, real_id]);
// Build a router and call direct() targeting the *real* instance to
// verify the router can still resolve transport for known instances.
let router =
PushRouter::<u64, TestResponse>::from_client(client.clone(), RouterMode::RoundRobin)
.await
.unwrap();
// Round robin should succeed — even if it picks stale_id first, the
// fallback logic should resolve transport via real_id.
// We cannot fully test the network send without a worker, but we can
// verify it doesn't fail at the transport resolution stage by checking
// that the error (if any) is a transport/network error, not
// "Instance not found".
let request = SingleIn::new(42u64);
let result = router.generate(request).await;
// The request may fail at the network level (no actual worker), but it
// must NOT fail with "Instance X not found" — that would mean the
// fallback did not work.
if let Err(err) = &result {
let msg = format!("{err}");
assert!(
!msg.contains("not found"),
"Transport resolution should have fallen back, but got: {msg}"
);
}
rt.shutdown();
}
/// When no instances are available at all (both primary and fallback),
/// the router should return a clear error.
#[tokio::test]
async fn transport_resolution_errors_when_no_instances_available() {
let rt = Runtime::from_current().unwrap();
let drt = DistributedRuntime::new(rt.clone(), DistributedConfig::process_local())
.await
.unwrap();
let ns = drt
.namespace("test_transport_no_fallback".to_string())
.unwrap();
let component = ns.component("test_component".to_string()).unwrap();
let endpoint = component.endpoint("test_endpoint".to_string());
let client = endpoint.client().await.unwrap();
// Register an instance so we can create the router (needs transport setup).
endpoint.register_endpoint_instance().await.unwrap();
client.wait_for_instances().await.unwrap();
let router =
PushRouter::<u64, TestResponse>::from_client(client.clone(), RouterMode::RoundRobin)
.await
.unwrap();
// Override avail to contain only a stale ID with no real backing
// instance AND no other available fallback.
let stale_id = 99999;
client.override_instance_avail(vec![stale_id]);
let request = SingleIn::new(42u64);
let result = router.generate(request).await;
assert!(result.is_err());
let msg = format!("{}", result.unwrap_err());
assert!(
msg.contains("not found") && msg.contains("no other instances available"),
"Expected clear error about missing instance with no fallback, got: {msg}"
);
rt.shutdown();
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment