"deploy/vscode:/vscode.git/clone" did not exist on "49eb397a5e6908e4eb58c0f111f8f2ac7357d4ce"
Unverified Commit c2ec3359 authored by Thomas Montfort's avatar Thomas Montfort Committed by GitHub
Browse files

fix(health): use local endpoint registry for canary health checks (#8294)


Signed-off-by: default avatarThomas Montfort <tmontfort@nvidia.com>
Signed-off-by: default avatarThomas Montfort <tmontfort@nvidia.com>
Signed-off-by: default avatartmontfort <tmontfort@nvidia.com>
Co-authored-by: default avatarClaude Opus 4.6 (1M context) <noreply@anthropic.com>
parent 2cc6d1e2
...@@ -144,6 +144,21 @@ impl EndpointConfigBuilder { ...@@ -144,6 +144,21 @@ impl EndpointConfigBuilder {
// Register health check target in SystemHealth if provided // Register health check target in SystemHealth if provided
if let Some(health_check_payload) = &health_check_payload { if let Some(health_check_payload) = &health_check_payload {
if system_health.lock().health_check_enabled()
&& endpoint
.drt()
.local_endpoint_registry()
.get(&endpoint.name)
.is_none()
{
anyhow::bail!(
"Endpoint '{}' has a health_check_payload and canary is enabled, \
but no local engine is registered. Call .register_local_engine() \
before .start() so the canary health check can function.",
endpoint.name
);
}
// Build transport based on request plane mode // Build transport based on request plane mode
let transport = build_transport_type(&endpoint, &endpoint_id, connection_id).await?; let transport = build_transport_type(&endpoint, &endpoint_id, connection_id).await?;
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use crate::component::{Client, Component, Endpoint, Instance}; use crate::DistributedRuntime;
use crate::config::HealthStatus; use crate::config::HealthStatus;
use crate::pipeline::PushRouter; use crate::engine::AsyncEngine;
use crate::pipeline::{AsyncEngine, Context, ManyOut, SingleIn}; use crate::pipeline::SingleIn;
use crate::protocols::annotated::Annotated;
use crate::protocols::maybe_error::MaybeError; use crate::protocols::maybe_error::MaybeError;
use crate::{DistributedRuntime, SystemHealth};
use futures::StreamExt; use futures::StreamExt;
use parking_lot::Mutex; use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::Duration;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio::time::{MissedTickBehavior, interval};
use tracing::{debug, error, info, warn}; use tracing::{debug, error, info, warn};
/// Configuration for health check behavior /// Configuration for health check behavior
...@@ -37,17 +33,10 @@ impl Default for HealthCheckConfig { ...@@ -37,17 +33,10 @@ impl Default for HealthCheckConfig {
} }
} }
// Type alias for the router cache to improve readability
// Maps endpoint subject -> router and payload
type RouterCache =
Arc<Mutex<HashMap<String, Arc<PushRouter<serde_json::Value, Annotated<serde_json::Value>>>>>>;
/// Health check manager that monitors endpoint health /// Health check manager that monitors endpoint health
pub struct HealthCheckManager { pub struct HealthCheckManager {
drt: DistributedRuntime, drt: DistributedRuntime,
config: HealthCheckConfig, config: HealthCheckConfig,
/// Cache of PushRouters and payloads for each endpoint
router_cache: RouterCache,
/// Track per-endpoint health check tasks /// Track per-endpoint health check tasks
/// Maps: endpoint_subject -> task_handle /// Maps: endpoint_subject -> task_handle
endpoint_tasks: Arc<Mutex<HashMap<String, JoinHandle<()>>>>, endpoint_tasks: Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
...@@ -58,45 +47,10 @@ impl HealthCheckManager { ...@@ -58,45 +47,10 @@ impl HealthCheckManager {
Self { Self {
drt, drt,
config, config,
router_cache: Arc::new(Mutex::new(HashMap::new())),
endpoint_tasks: Arc::new(Mutex::new(HashMap::new())), endpoint_tasks: Arc::new(Mutex::new(HashMap::new())),
} }
} }
/// Get or create a PushRouter for an endpoint
async fn get_or_create_router(
&self,
cache_key: &str,
endpoint: Endpoint,
) -> anyhow::Result<Arc<PushRouter<serde_json::Value, Annotated<serde_json::Value>>>> {
let cache_key = cache_key.to_string();
// Check cache first
{
let cache = self.router_cache.lock();
if let Some(router) = cache.get(&cache_key) {
return Ok(router.clone());
}
}
// Create a client that discovers instances dynamically for this endpoint
let client = Client::new(endpoint).await?;
// Create PushRouter - it will use direct routing when we call direct()
let router: Arc<PushRouter<serde_json::Value, Annotated<serde_json::Value>>> = Arc::new(
PushRouter::from_client(
client,
crate::pipeline::RouterMode::RoundRobin, // Default mode, we'll use direct() explicitly
)
.await?,
);
// Cache it
self.router_cache.lock().insert(cache_key, router.clone());
Ok(router)
}
/// Start the health check manager by spawning per-endpoint monitoring tasks /// Start the health check manager by spawning per-endpoint monitoring tasks
pub async fn start(self: Arc<Self>) -> anyhow::Result<()> { pub async fn start(self: Arc<Self>) -> anyhow::Result<()> {
// Get all registered endpoints at startup // Get all registered endpoints at startup
...@@ -237,87 +191,43 @@ impl HealthCheckManager { ...@@ -237,87 +191,43 @@ impl HealthCheckManager {
Ok(()) Ok(())
} }
/// Send a health check request through AsyncEngine /// Send a health check request via the local endpoint registry (in-process).
async fn send_health_check_request( async fn send_health_check_request(
&self, &self,
endpoint_subject: &str, endpoint_subject: &str,
payload: &serde_json::Value, payload: &serde_json::Value,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let target = self
.drt
.system_health()
.lock()
.get_health_check_target(endpoint_subject)
.ok_or_else(|| {
anyhow::anyhow!("No health check target found for {}", endpoint_subject)
})?;
debug!( debug!(
"Sending health check to {} (instance_id: {})", "Sending health check to {} via local registry",
endpoint_subject, target.instance.instance_id endpoint_subject
); );
// Create the Endpoint directly from the Instance info let engine = self
let namespace = self.drt.namespace(&target.instance.namespace)?; .drt
let component = namespace.component(&target.instance.component)?; .local_endpoint_registry()
let endpoint = component.endpoint(&target.instance.endpoint); .get(endpoint_subject)
.ok_or_else(|| {
// Get or create router for this endpoint anyhow::anyhow!(
let router = self "Endpoint '{}' not found in local registry, engine may still be initializing",
.get_or_create_router(endpoint_subject, endpoint)
.await?;
// Wait for watch stream to discover instances before checking
// This ensures the router's client has populated its instance list
// from etcd before we attempt to send the health check request.
// Without this, the first health check can fail due to a race condition
// where the watch stream hasn't completed its initial discovery yet.
match tokio::time::timeout(
Duration::from_secs(10), // 10 second timeout for discovery
router.client.wait_for_instances(),
)
.await
{
Ok(Ok(instances)) => {
debug!(
"Health check for {}: watch stream ready, found {} instance(s)",
endpoint_subject,
instances.len()
);
}
Ok(Err(e)) => {
return Err(anyhow::anyhow!(
"Failed to discover instances for {} during health check: {}",
endpoint_subject,
e
));
}
Err(_) => {
return Err(anyhow::anyhow!(
"Timeout waiting for instance discovery for {} during health check",
endpoint_subject endpoint_subject
)); )
} })?;
}
// Create the request context
let request: SingleIn<serde_json::Value> = Context::new(payload.clone());
// Clone what we need for the spawned task // Clone what we need for the spawned task
let system_health = self.drt.system_health().clone(); let system_health = self.drt.system_health().clone();
let endpoint_subject_owned = endpoint_subject.to_string(); let endpoint_subject_owned = endpoint_subject.to_string();
let instance_id = target.instance.instance_id; let payload = payload.clone();
let timeout = self.config.request_timeout; let timeout = self.config.request_timeout;
// Spawn task to send health check and wait for response // Spawn task to send health check and wait for response
tokio::spawn(async move { tokio::spawn(async move {
let result = tokio::time::timeout(timeout, async { let result = tokio::time::timeout(timeout, async {
// Call direct() on the PushRouter to target specific instance let request = SingleIn::new(payload);
match router.direct(request, instance_id).await { match engine.generate(request).await {
Ok(mut response_stream) => { Ok(mut response_stream) => {
// Get the first response to verify endpoint is alive // Get the first response to verify endpoint is alive.
// Check for errors
let is_healthy = if let Some(response) = response_stream.next().await { let is_healthy = if let Some(response) = response_stream.next().await {
// Check if response indicates an error
if let Some(error) = response.err() { if let Some(error) = response.err() {
warn!( warn!(
"Health check error response from {}: {:?}", "Health check error response from {}: {:?}",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment