Unverified Commit 61d4674c authored by Neelay Shah's avatar Neelay Shah Committed by GitHub
Browse files

fix(health): make canary sole authority on endpoint readiness when enabled (#8165)


Co-authored-by: default avatarClaude Opus 4.6 (1M context) <noreply@anthropic.com>
parent 59d18d8e
...@@ -131,6 +131,7 @@ impl DistributedRuntime { ...@@ -131,6 +131,7 @@ impl DistributedRuntime {
let system_health = Arc::new(parking_lot::Mutex::new(SystemHealth::new( let system_health = Arc::new(parking_lot::Mutex::new(SystemHealth::new(
starting_health_status, starting_health_status,
use_endpoint_health_status, use_endpoint_health_status,
config.health_check_enabled,
health_endpoint_path, health_endpoint_path,
live_endpoint_path, live_endpoint_path,
))); )));
......
...@@ -96,10 +96,7 @@ impl SharedHttpServer { ...@@ -96,10 +96,7 @@ impl SharedHttpServer {
let subject_clone = subject.clone(); let subject_clone = subject.clone();
self.handlers.insert(subject, handler); self.handlers.insert(subject, handler);
// THEN set health status to Ready (after handler is registered and ready) system_health.lock().set_endpoint_registered(&endpoint_name);
system_health
.lock()
.set_endpoint_health_status(&endpoint_name, HealthStatus::Ready);
tracing::debug!("Registered endpoint handler for subject: {subject_clone}"); tracing::debug!("Registered endpoint handler for subject: {subject_clone}");
Ok(()) Ok(())
......
...@@ -52,7 +52,7 @@ impl PushEndpoint { ...@@ -52,7 +52,7 @@ impl PushEndpoint {
system_health system_health
.lock() .lock()
.set_endpoint_health_status(endpoint_name_local.as_str(), HealthStatus::Ready); .set_endpoint_registered(endpoint_name_local.as_str());
loop { loop {
let req = tokio::select! { let req = tokio::select! {
......
...@@ -330,10 +330,7 @@ impl SharedTcpServer { ...@@ -330,10 +330,7 @@ impl SharedTcpServer {
// Insert handler FIRST to ensure it's ready to receive requests // Insert handler FIRST to ensure it's ready to receive requests
self.handlers.insert(endpoint_path, handler); self.handlers.insert(endpoint_path, handler);
// THEN set health status to Ready (after handler is registered and ready) system_health.lock().set_endpoint_registered(&endpoint_name);
system_health
.lock()
.set_endpoint_health_status(&endpoint_name, crate::HealthStatus::Ready);
tracing::info!( tracing::info!(
"Registered endpoint '{fqn_endpoint}' with shared TCP server on {}", "Registered endpoint '{fqn_endpoint}' with shared TCP server on {}",
...@@ -717,6 +714,7 @@ mod tests { ...@@ -717,6 +714,7 @@ mod tests {
let system_health = Arc::new(Mutex::new(SystemHealth::new( let system_health = Arc::new(Mutex::new(SystemHealth::new(
crate::HealthStatus::Ready, crate::HealthStatus::Ready,
vec![], vec![],
false, // health_check_enabled
"/health".to_string(), "/health".to_string(),
"/live".to_string(), "/live".to_string(),
))); )));
......
...@@ -51,6 +51,7 @@ pub struct SystemHealth { ...@@ -51,6 +51,7 @@ pub struct SystemHealth {
new_endpoint_tx: mpsc::UnboundedSender<String>, new_endpoint_tx: mpsc::UnboundedSender<String>,
new_endpoint_rx: Arc<parking_lot::Mutex<Option<mpsc::UnboundedReceiver<String>>>>, new_endpoint_rx: Arc<parking_lot::Mutex<Option<mpsc::UnboundedReceiver<String>>>>,
use_endpoint_health_status: Vec<String>, use_endpoint_health_status: Vec<String>,
health_check_enabled: bool,
health_path: String, health_path: String,
live_path: String, live_path: String,
start_time: Instant, start_time: Instant,
...@@ -61,12 +62,19 @@ impl SystemHealth { ...@@ -61,12 +62,19 @@ impl SystemHealth {
pub fn new( pub fn new(
starting_health_status: HealthStatus, starting_health_status: HealthStatus,
use_endpoint_health_status: Vec<String>, use_endpoint_health_status: Vec<String>,
health_check_enabled: bool,
health_path: String, health_path: String,
live_path: String, live_path: String,
) -> Self { ) -> Self {
// Force NotReady when canary is enabled — canary verifies before marking Ready.
let initial_endpoint_status = if health_check_enabled {
HealthStatus::NotReady
} else {
starting_health_status.clone()
};
let mut endpoint_health = HashMap::new(); let mut endpoint_health = HashMap::new();
for endpoint in &use_endpoint_health_status { for endpoint in &use_endpoint_health_status {
endpoint_health.insert(endpoint.clone(), starting_health_status.clone()); endpoint_health.insert(endpoint.clone(), initial_endpoint_status.clone());
} }
// Create the channel for endpoint registration notifications // Create the channel for endpoint registration notifications
...@@ -80,12 +88,26 @@ impl SystemHealth { ...@@ -80,12 +88,26 @@ impl SystemHealth {
new_endpoint_tx: tx, new_endpoint_tx: tx,
new_endpoint_rx: Arc::new(parking_lot::Mutex::new(Some(rx))), new_endpoint_rx: Arc::new(parking_lot::Mutex::new(Some(rx))),
use_endpoint_health_status, use_endpoint_health_status,
health_check_enabled,
health_path, health_path,
live_path, live_path,
start_time: Instant::now(), start_time: Instant::now(),
uptime_gauge: OnceLock::new(), uptime_gauge: OnceLock::new(),
} }
} }
pub fn health_check_enabled(&self) -> bool {
self.health_check_enabled
}
/// Signal endpoint transport registration. Sets Ready when canary is disabled;
/// no-op when canary is enabled (canary will set Ready after verification).
pub fn set_endpoint_registered(&self, endpoint: &str) {
if !self.health_check_enabled {
self.set_endpoint_health_status(endpoint, HealthStatus::Ready);
}
}
pub fn set_health_status(&mut self, status: HealthStatus) { pub fn set_health_status(&mut self, status: HealthStatus) {
self.system_health = status; self.system_health = status;
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment