Unverified Commit 1dc0975b authored by Thomas Montfort's avatar Thomas Montfort Committed by GitHub
Browse files

fix(runtime): reset canary health check timer on request arrival and per streaming chunk (#8467)


Signed-off-by: default avatarThomas Montfort <tmontfort@nvidia.com>
Co-authored-by: default avatarClaude Opus 4.6 (1M context) <noreply@anthropic.com>
parent cf92be04
......@@ -119,9 +119,14 @@ impl HealthCheckManager {
}
_ = notifier.notified() => {
// Activity detected - reset timer for this endpoint only
// Activity detected - reset timer for this endpoint only.
// A notification means push_handler successfully streamed
// a non-error response chunk, proving the engine is healthy.
debug!("Activity detected for {}, resetting health check timer", endpoint_subject);
// Loop continues, timer resets
manager.drt.system_health().lock().set_endpoint_health_status(
&endpoint_subject,
crate::config::HealthStatus::Ready,
);
}
}
}
......@@ -345,6 +350,349 @@ pub async fn get_health_check_status(
}))
}
// ============================================================
// Full pipeline tests: push_handler → notify → HealthCheckManager
// These tests use the real HealthCheckManager (spawn_endpoint_health_check_task)
// and the real push_handler pipeline (TwoPartCodec + TCP + engine.generate()).
// ============================================================
#[cfg(all(test, feature = "integration"))]
mod push_handler_notify_tests {
use super::*;
use crate::component::{Instance, TransportType};
use crate::config::HealthStatus;
use crate::distributed::distributed_test_utils::create_test_drt_async;
use crate::engine::{AsyncEngine, AsyncEngineContextProvider};
use crate::local_endpoint_registry::LocalAsyncEngine;
use crate::pipeline::network::codec::{TwoPartCodec, TwoPartMessage};
use crate::pipeline::network::tcp::server::{ServerOptions, TcpStreamServer};
use crate::pipeline::network::{
ConnectionInfo, Ingress, PushWorkHandler, ResponseService, StreamOptions,
};
use crate::pipeline::{ManyOut, ResponseStream, SingleIn};
use crate::protocols::annotated::Annotated;
use async_trait::async_trait;
use bytes::Bytes;
use futures::stream;
use std::sync::Arc;
use std::time::Duration;
type TestRequest = serde_json::Value;
type TestResponse = Annotated<serde_json::Value>;
/// A mock engine that streams a configurable sequence of success/error chunks.
/// Used both as the push_handler pipeline engine and registered in
/// the local endpoint registry for health check requests.
struct MockStreamingEngine {
num_chunks: usize,
/// If set, chunks at these indices will be error responses.
error_indices: Vec<usize>,
}
impl MockStreamingEngine {
fn success(num_chunks: usize) -> Arc<Self> {
Arc::new(Self {
num_chunks,
error_indices: vec![],
})
}
fn all_errors(num_chunks: usize) -> Arc<Self> {
Arc::new(Self {
num_chunks,
error_indices: (0..num_chunks).collect(),
})
}
fn with_error_at(num_chunks: usize, error_indices: Vec<usize>) -> Arc<Self> {
Arc::new(Self {
num_chunks,
error_indices,
})
}
}
#[async_trait]
impl AsyncEngine<SingleIn<TestRequest>, ManyOut<TestResponse>, anyhow::Error>
for MockStreamingEngine
{
async fn generate(
&self,
input: SingleIn<TestRequest>,
) -> anyhow::Result<ManyOut<TestResponse>> {
let (_data, ctx) = input.into_parts();
let chunks: Vec<TestResponse> = (0..self.num_chunks)
.map(|i| {
if self.error_indices.contains(&i) {
Annotated::from_error(format!("mock error at chunk {i}"))
} else {
Annotated::from_data(serde_json::json!({"token": i}))
}
})
.collect();
Ok(ResponseStream::new(
Box::pin(stream::iter(chunks)),
ctx.context(),
))
}
}
/// Encodes a request as a TwoPartCodec payload with the given connection info.
fn encode_request(
request_id: &str,
connection_info: ConnectionInfo,
request_body: &serde_json::Value,
) -> Bytes {
let control = serde_json::json!({
"id": request_id,
"request_type": "single_in",
"response_type": "many_out",
"connection_info": connection_info,
});
let header = serde_json::to_vec(&control).unwrap();
let data = serde_json::to_vec(request_body).unwrap();
let msg = TwoPartMessage::new(Bytes::from(header), Bytes::from(data));
TwoPartCodec::default().encode_message(msg).unwrap()
}
/// Sets up a TCP server and registers a response stream for push_handler
/// to connect back to.
async fn setup_tcp_receiver(request_id: &str) -> (Arc<TcpStreamServer>, ConnectionInfo) {
let options = ServerOptions::builder().port(0).build().unwrap();
let server = TcpStreamServer::new(options).await.unwrap();
let context = crate::pipeline::Context::with_id((), request_id.to_string());
let stream_options = StreamOptions::builder()
.context(context.context())
.enable_request_stream(false)
.enable_response_stream(true)
.build()
.unwrap();
let pending = server.register(stream_options).await;
let connection_info = pending
.recv_stream
.as_ref()
.unwrap()
.connection_info
.clone();
(server, connection_info)
}
/// Registers an endpoint in the DRT with the given engine in local registry.
/// Returns the notifier that the real HealthCheckManager will listen on.
fn register_endpoint(
drt: &crate::DistributedRuntime,
endpoint_name: &str,
local_engine: LocalAsyncEngine,
) -> Arc<tokio::sync::Notify> {
let payload = serde_json::json!({
"prompt": "health",
"_health_check": true
});
drt.system_health().lock().register_health_check_target(
endpoint_name,
Instance {
component: "test_component".to_string(),
endpoint: endpoint_name.to_string(),
namespace: "test_namespace".to_string(),
instance_id: 0,
transport: TransportType::Nats(endpoint_name.to_string()),
device_type: None,
},
payload,
);
drt.local_endpoint_registry()
.register(endpoint_name.to_string(), local_engine);
drt.system_health()
.lock()
.get_endpoint_health_check_notifier(endpoint_name)
.expect("Notifier should exist for registered endpoint")
}
/// Helper: send a request through the ingress pipeline.
async fn send_request(ingress: &Ingress<SingleIn<TestRequest>, ManyOut<TestResponse>>) {
let request_id = uuid::Uuid::new_v4().to_string();
let (_server, connection_info) = setup_tcp_receiver(&request_id).await;
let payload = encode_request(
&request_id,
connection_info,
&serde_json::json!({"prompt": "test"}),
);
let result = ingress.handle_payload(payload, Some(request_id)).await;
assert!(result.is_ok(), "handle_payload should succeed");
}
/// Helper: assert endpoint health status.
fn assert_status(
drt: &crate::DistributedRuntime,
endpoint_name: &str,
expected: HealthStatus,
msg: &str,
) {
let status = drt
.system_health()
.lock()
.get_endpoint_health_status(endpoint_name);
assert_eq!(status, Some(expected), "{msg}");
}
/// Helper: create ingress pipeline with given engine and notifier.
fn create_ingress(
engine: Arc<MockStreamingEngine>,
notifier: Arc<tokio::sync::Notify>,
) -> Arc<Ingress<SingleIn<TestRequest>, ManyOut<TestResponse>>> {
let ingress =
Ingress::<SingleIn<TestRequest>, ManyOut<TestResponse>>::for_engine(engine).unwrap();
ingress
.set_endpoint_health_check_notifier(notifier)
.unwrap();
ingress
}
/// Helper: start HealthCheckManager with given canary wait.
async fn start_manager(drt: &crate::DistributedRuntime, canary_wait_ms: u64) {
let config = HealthCheckConfig {
canary_wait_time: Duration::from_millis(canary_wait_ms),
request_timeout: Duration::from_secs(1),
};
let manager = Arc::new(HealthCheckManager::new(drt.clone(), config));
manager.start().await.unwrap();
}
// =================================================================
// Test 1: Successful streaming → notification → Ready
// Canary engine returns errors, so Ready can only come from notify.
// =================================================================
#[tokio::test]
async fn test_successful_streaming_sets_ready() {
let drt = create_test_drt_async().await;
let endpoint = "test.successful_streaming";
let notifier = register_endpoint(&drt, endpoint, MockStreamingEngine::all_errors(1));
assert_status(&drt, endpoint, HealthStatus::NotReady, "initial");
let ingress = create_ingress(MockStreamingEngine::success(5), notifier);
start_manager(&drt, 500).await;
send_request(&ingress).await;
tokio::time::sleep(Duration::from_millis(200)).await;
// Ready can only come from notification (canary engine errors)
assert_status(
&drt,
endpoint,
HealthStatus::Ready,
"successful streaming should set Ready via notification path",
);
}
// =================================================================
// Test 2: Idle engine → canary fires → successful health check → Ready
// =================================================================
#[tokio::test]
async fn test_canary_fires_on_idle_engine() {
let drt = create_test_drt_async().await;
let endpoint = "test.canary_idle";
let _notifier = register_endpoint(&drt, endpoint, MockStreamingEngine::success(1));
assert_status(&drt, endpoint, HealthStatus::NotReady, "initial");
start_manager(&drt, 50).await;
tokio::time::sleep(Duration::from_millis(300)).await;
// No requests sent — canary fired and succeeded
assert_status(
&drt,
endpoint,
HealthStatus::Ready,
"canary should fire and set Ready on idle engine",
);
}
// =================================================================
// Test 3: Error streaming → no notification → canary errors → NotReady
// =================================================================
#[tokio::test]
async fn test_error_streaming_stays_not_ready() {
let drt = create_test_drt_async().await;
let endpoint = "test.error_streaming";
let notifier = register_endpoint(&drt, endpoint, MockStreamingEngine::all_errors(1));
assert_status(&drt, endpoint, HealthStatus::NotReady, "initial");
// Pipeline streams only errors — no notifications sent
let ingress = create_ingress(MockStreamingEngine::all_errors(3), notifier);
start_manager(&drt, 50).await;
send_request(&ingress).await;
// Wait for canary to fire (50ms wait + margin)
tokio::time::sleep(Duration::from_millis(300)).await;
// Error streaming didn't notify, canary fired but engine also errored
assert_status(
&drt,
endpoint,
HealthStatus::NotReady,
"error streaming should not notify, canary also errors — stays NotReady",
);
}
// =================================================================
// Test 4: Idle engine → canary fires → failing health check → NotReady
// =================================================================
#[tokio::test]
async fn test_idle_engine_with_failing_canary() {
let drt = create_test_drt_async().await;
let endpoint = "test.canary_fails";
let _notifier = register_endpoint(&drt, endpoint, MockStreamingEngine::all_errors(1));
assert_status(&drt, endpoint, HealthStatus::NotReady, "initial");
start_manager(&drt, 50).await;
tokio::time::sleep(Duration::from_millis(300)).await;
// No requests sent, canary fired but engine returned error
assert_status(
&drt,
endpoint,
HealthStatus::NotReady,
"canary fired but engine errored, status stays NotReady",
);
}
// =================================================================
// Test 5: Mixed streaming (success + trailing error) → Ready
// Successful chunks notify before the error, so status becomes Ready.
// Canary engine errors, proving Ready came from notification path.
// =================================================================
#[tokio::test]
async fn test_mixed_streaming_sets_ready() {
let drt = create_test_drt_async().await;
let endpoint = "test.mixed_streaming";
let notifier = register_endpoint(&drt, endpoint, MockStreamingEngine::all_errors(1));
assert_status(&drt, endpoint, HealthStatus::NotReady, "initial");
// 5 chunks: 4 success + error at index 4
let ingress = create_ingress(MockStreamingEngine::with_error_at(5, vec![4]), notifier);
start_manager(&drt, 500).await;
send_request(&ingress).await;
tokio::time::sleep(Duration::from_millis(200)).await;
// Successful chunks notified before the error chunk
assert_status(
&drt,
endpoint,
HealthStatus::Ready,
"successful chunks should set Ready despite trailing error",
);
}
}
// ===============================
// Integration Tests (require DRT)
// ===============================
......@@ -391,6 +739,7 @@ mod integration_tests {
namespace: "test_namespace".to_string(),
instance_id: 12345,
transport: crate::component::TransportType::Nats(endpoint.to_string()),
device_type: None,
},
payload.clone(),
);
......@@ -426,6 +775,7 @@ mod integration_tests {
namespace: "test_namespace".to_string(),
instance_id: i,
transport: crate::component::TransportType::Nats(endpoint.clone()),
device_type: None,
},
payload,
);
......@@ -469,6 +819,7 @@ mod integration_tests {
namespace: "test_namespace".to_string(),
instance_id: 999,
transport: crate::component::TransportType::Nats(endpoint.to_string()),
device_type: None,
},
payload.clone(),
);
......
......@@ -311,8 +311,13 @@ where
// TODO: Detect end-of-stream using Server-Sent Events (SSE)
let mut send_complete_final = true;
let mut saw_error_response = false;
while let Some(resp) = stream.next().await {
tracing::trace!("Sending response: {:?}", resp);
let is_error = resp.err().is_some();
if is_error {
saw_error_response = true;
}
let resp_wrapper = NetworkStreamWrapper {
data: Some(resp),
complete_final: false,
......@@ -347,6 +352,12 @@ where
.inc();
}
break;
} else if !is_error {
// Only notify on non-error chunks — error responses don't prove
// the engine is healthy and should not reset the canary timer.
if let Some(notifier) = self.endpoint_health_check_notifier.get() {
notifier.notify_one();
}
}
}
if send_complete_final {
......@@ -370,9 +381,11 @@ where
.inc();
}
}
// Notify the health check manager that the stream has finished.
// This resets the timer, delaying the next canary health check.
if let Some(notifier) = self.endpoint_health_check_notifier.get() {
// Only notify on stream completion if no error responses were seen
if let (false, Some(notifier)) = (
saw_error_response,
self.endpoint_health_check_notifier.get(),
) {
notifier.notify_one();
}
}
......
......@@ -1147,6 +1147,7 @@ mod integration_tests {
namespace: "test_namespace".to_string(),
instance_id: 1,
transport: crate::component::TransportType::Nats(endpoint.to_string()),
device_type: None,
},
health_check_payload.clone(),
);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment