Unverified Commit 7f8e88fb authored by MatejKosec's avatar MatejKosec Committed by GitHub
Browse files

feat(frontend): ResponseTimeout error type with request-plane worker quarantine (#8011)


Signed-off-by: default avatarMatej Kosec <mkosec@nvidia.com>
parent a2bc6d5c
......@@ -36,25 +36,21 @@ use std::time::Duration;
use crate::http::service::metrics::{CancellationLabels, ErrorType, InflightGuard, Metrics};
/// Environment variable name for configuring the backend stream inactivity timeout.
///
/// When set to a positive integer, `monitor_for_disconnects` will kill the engine context
/// and drop the inflight guard if no SSE event is received from the backend within this
/// many seconds. This acts as a circuit breaker for zombie workers that hold a live TCP
/// connection but never produce output, which would otherwise permanently inflate the
/// `dynamo_frontend_inflight_requests` gauge.
///
/// Set to `0` or leave unset to disable the timeout (default: disabled).
pub const BACKEND_STREAM_TIMEOUT_ENV: &str = "DYN_HTTP_BACKEND_STREAM_TIMEOUT_SECS";
use dynamo_runtime::config::environment_names::llm::DYN_HTTP_BACKEND_STREAM_TIMEOUT_SECS as BACKEND_STREAM_TIMEOUT_ENV;
/// Read the backend stream inactivity timeout from the environment.
/// Returns `None` if unset or zero (timeout disabled).
///
/// The HTTP-layer timeout uses a 2x multiplier over the configured value so that
/// the request-plane timeout in `push_router` (which uses the raw value) always
/// fires first and triggers `report_instance_down()` for worker quarantine.
/// This layer is strictly a safety net for gauge cleanup.
pub fn backend_stream_timeout() -> Option<Duration> {
std::env::var(BACKEND_STREAM_TIMEOUT_ENV)
.ok()
.and_then(|s| s.parse::<u64>().ok())
.filter(|&secs| secs > 0)
.map(Duration::from_secs)
.map(|secs| Duration::from_secs(secs.saturating_mul(2)))
}
#[derive(Clone, Copy)]
......@@ -269,14 +265,14 @@ pub fn monitor_for_disconnects(
None => std::future::pending::<()>().await,
}
} => {
inflight_guard.mark_error(ErrorType::Cancelled);
inflight_guard.mark_error(ErrorType::ResponseTimeout);
stream_handle.disarm();
tracing::warn!(
request_id = %inflight_guard.request_id(),
model = %inflight_guard.model(),
endpoint = %inflight_guard.endpoint(),
request_type = %inflight_guard.request_type(),
error_type = "cancelled",
error_type = "response_timeout",
elapsed_ms = %inflight_guard.elapsed_ms(),
timeout_secs = ?inactivity_timeout.map(|d| d.as_secs()),
"backend stream inactivity timeout; killing engine context to release inflight gauge"
......@@ -292,7 +288,7 @@ pub fn monitor_for_disconnects(
#[cfg(test)]
mod tests {
use super::*;
use crate::http::service::metrics::Endpoint;
use crate::http::service::metrics::{Endpoint, ErrorType, RequestType, Status};
use futures::StreamExt;
use serial_test::serial;
......@@ -378,15 +374,16 @@ mod tests {
#[serial]
async fn test_backend_inactivity_timeout_releases_inflight_gauge() {
let model = "zombie-model";
// Config value "1" → HTTP-layer timeout is 2s (2x safety-net multiplier)
let (metrics, guard, context, handle) = setup_test(model, "req-zombie", "1");
assert_eq!(metrics.get_inflight_count(model), 1);
let monitored = monitor_for_disconnects(hanging_stream(), context, guard, handle);
tokio::pin!(monitored);
tokio::time::advance(Duration::from_secs(2)).await;
tokio::time::advance(Duration::from_secs(3)).await;
let completed = tokio::time::timeout(Duration::from_secs(1), async move {
let completed = tokio::time::timeout(Duration::from_secs(2), async move {
while monitored.next().await.is_some() {}
})
.await;
......@@ -399,6 +396,30 @@ mod tests {
0,
"inflight gauge leaked"
);
// Verify the error was categorized as ResponseTimeout, not Cancelled
assert_eq!(
metrics.get_request_counter(
model,
&Endpoint::ChatCompletions,
&RequestType::Stream,
&Status::Error,
&ErrorType::ResponseTimeout,
),
1,
"inactivity timeout should be recorded as ResponseTimeout"
);
assert_eq!(
metrics.get_request_counter(
model,
&Endpoint::ChatCompletions,
&RequestType::Stream,
&Status::Error,
&ErrorType::Cancelled,
),
0,
"inactivity timeout should NOT be recorded as Cancelled"
);
}
/// Inactivity timeout resets on each token; only fires after a true gap.
......@@ -407,7 +428,8 @@ mod tests {
async fn test_inactivity_timeout_resets_on_each_token() {
let model = "reset-model";
// Phase 1: tokens arrive every 2s with a 5s timeout — stream completes normally.
// Phase 1: tokens arrive every 2s with a 5s config (10s HTTP timeout after 2x multiplier)
// — stream completes normally because each token resets the timer.
let (metrics, guard_1, ctx_1, handle_1) = setup_test(model, "phase1", "5");
assert_eq!(metrics.get_inflight_count(model), 1);
......@@ -449,7 +471,8 @@ mod tests {
let monitored_2 = monitor_for_disconnects(hanging_stream(), ctx_2, guard_2, handle_2);
tokio::pin!(monitored_2);
tokio::time::advance(Duration::from_secs(6)).await;
// Config "5" → HTTP timeout 10s (2x multiplier). Advance past it.
tokio::time::advance(Duration::from_secs(11)).await;
let phase2 = tokio::time::timeout(Duration::from_secs(10), async {
while monitored_2.next().await.is_some() {}
......
......@@ -366,6 +366,8 @@ pub enum ErrorType {
Overload,
/// Request cancelled by client or timeout
Cancelled,
/// Backend accepted the request but stopped responding (response inactivity timeout)
ResponseTimeout,
/// Internal server error (500 and other unexpected errors)
Internal,
/// Feature not implemented (501)
......@@ -1090,6 +1092,7 @@ impl Drop for InflightGuard {
Status::Error => {
let detail = match self.error_type {
ErrorType::Cancelled => "cancelled before completion",
ErrorType::ResponseTimeout => "backend stream inactivity timeout",
ErrorType::Internal => "internal server error during processing",
ErrorType::Validation => "invalid request parameters",
ErrorType::NotFound => "model or resource not found",
......@@ -1188,6 +1191,7 @@ impl ErrorType {
ErrorType::NotFound => frontend_service::error_type::NOT_FOUND,
ErrorType::Overload => frontend_service::error_type::OVERLOAD,
ErrorType::Cancelled => frontend_service::error_type::CANCELLED,
ErrorType::ResponseTimeout => frontend_service::error_type::RESPONSE_TIMEOUT,
ErrorType::Internal => frontend_service::error_type::INTERNAL,
ErrorType::NotImplemented => frontend_service::error_type::NOT_IMPLEMENTED,
}
......@@ -2215,6 +2219,7 @@ mod tests {
assert_eq!(ErrorType::NotFound.as_str(), "not_found");
assert_eq!(ErrorType::Overload.as_str(), "overload");
assert_eq!(ErrorType::Cancelled.as_str(), "cancelled");
assert_eq!(ErrorType::ResponseTimeout.as_str(), "response_timeout");
assert_eq!(ErrorType::Internal.as_str(), "internal");
assert_eq!(ErrorType::NotImplemented.as_str(), "not_implemented");
}
......@@ -2324,6 +2329,7 @@ mod tests {
ErrorType::NotFound,
ErrorType::Overload,
ErrorType::Cancelled,
ErrorType::ResponseTimeout,
ErrorType::Internal,
ErrorType::NotImplemented,
];
......
......@@ -292,6 +292,16 @@ pub mod llm {
pub const DYN_ENABLE_STREAMING_REASONING_DISPATCH: &str =
"DYN_ENABLE_STREAMING_REASONING_DISPATCH";
/// Backend stream inactivity timeout in seconds.
///
/// When set to a positive integer, the frontend will kill the engine context
/// and drop the inflight guard if no SSE event is received from the backend
/// within this many seconds. Acts as a circuit breaker for zombie workers
/// that hold a live TCP connection but never produce output.
///
/// Set to `0` or leave unset to disable the timeout (default: disabled).
pub const DYN_HTTP_BACKEND_STREAM_TIMEOUT_SECS: &str = "DYN_HTTP_BACKEND_STREAM_TIMEOUT_SECS";
/// Metrics configuration
pub mod metrics {
/// Custom metrics prefix (overrides default "dynamo_frontend")
......@@ -477,6 +487,7 @@ mod tests {
kvbm::leader::DYN_KVBM_LEADER_ZMQ_ACK_PORT,
// LLM
llm::DYN_HTTP_BODY_LIMIT_MB,
llm::DYN_HTTP_BACKEND_STREAM_TIMEOUT_SECS,
llm::DYN_LORA_ENABLED,
llm::DYN_LORA_PATH,
llm::DYN_ENABLE_ANTHROPIC_API,
......
......@@ -51,6 +51,8 @@ pub enum ErrorType {
Disconnected,
/// A connection or request timed out.
ConnectionTimeout,
/// The backend accepted the request but stopped responding (stream inactivity timeout).
ResponseTimeout,
/// The request was cancelled (e.g., client disconnected).
Cancelled,
/// The system does not have enough resources to handle the request.
......@@ -67,6 +69,7 @@ impl fmt::Display for ErrorType {
ErrorType::CannotConnect => write!(f, "CannotConnect"),
ErrorType::Disconnected => write!(f, "Disconnected"),
ErrorType::ConnectionTimeout => write!(f, "ConnectionTimeout"),
ErrorType::ResponseTimeout => write!(f, "ResponseTimeout"),
ErrorType::Cancelled => write!(f, "Cancelled"),
ErrorType::ResourceExhausted => write!(f, "ResourceExhausted"),
ErrorType::Backend(sub) => write!(f, "Backend{sub}"),
......@@ -91,6 +94,8 @@ pub enum BackendError {
Disconnected,
/// A connection or request timed out.
ConnectionTimeout,
/// The backend accepted the request but stopped responding (stream inactivity timeout).
ResponseTimeout,
/// The request was cancelled (e.g., client disconnected).
Cancelled,
/// The engine process has shut down or crashed.
......@@ -107,6 +112,7 @@ impl fmt::Display for BackendError {
BackendError::CannotConnect => write!(f, "CannotConnect"),
BackendError::Disconnected => write!(f, "Disconnected"),
BackendError::ConnectionTimeout => write!(f, "ConnectionTimeout"),
BackendError::ResponseTimeout => write!(f, "ResponseTimeout"),
BackendError::Cancelled => write!(f, "Cancelled"),
BackendError::EngineShutdown => write!(f, "EngineShutdown"),
BackendError::StreamIncomplete => write!(f, "StreamIncomplete"),
......@@ -461,6 +467,7 @@ mod tests {
ErrorType::ConnectionTimeout.to_string(),
"ConnectionTimeout"
);
assert_eq!(ErrorType::ResponseTimeout.to_string(), "ResponseTimeout");
assert_eq!(ErrorType::Cancelled.to_string(), "Cancelled");
assert_eq!(
ErrorType::Backend(BackendError::Unknown).to_string(),
......@@ -494,5 +501,9 @@ mod tests {
ErrorType::Backend(BackendError::StreamIncomplete).to_string(),
"BackendStreamIncomplete"
);
assert_eq!(
ErrorType::Backend(BackendError::ResponseTimeout).to_string(),
"BackendResponseTimeout"
);
}
}
......@@ -323,6 +323,9 @@ pub mod frontend_service {
/// Request cancelled by client or timeout
pub const CANCELLED: &str = "cancelled";
/// Backend accepted the request but stopped responding (response inactivity timeout)
pub const RESPONSE_TIMEOUT: &str = "response_timeout";
/// Internal server error (500 and other unexpected errors)
pub const INTERNAL: &str = "internal";
......
......@@ -3,17 +3,6 @@
use super::{AsyncEngineContextProvider, ResponseStream};
use crate::error::{BackendError, DynamoError, ErrorType, match_error_chain};
/// Check if an error chain indicates the worker should be reported as down.
fn is_inhibited(err: &(dyn std::error::Error + 'static)) -> bool {
const INHIBITED: &[ErrorType] = &[
ErrorType::CannotConnect,
ErrorType::Disconnected,
ErrorType::ConnectionTimeout,
ErrorType::Backend(BackendError::EngineShutdown),
];
match_error_chain(err, INHIBITED, &[])
}
use crate::{
component::{Client, Endpoint, RoutingOccupancyState, get_or_create_routing_occupancy_state},
dynamo_nvtx_range,
......@@ -43,6 +32,30 @@ use std::{
use tokio_stream::StreamExt;
use tracing::Instrument;
/// Check if an error chain indicates the worker should be reported as down.
fn is_inhibited(err: &(dyn std::error::Error + 'static)) -> bool {
const INHIBITED: &[ErrorType] = &[
ErrorType::CannotConnect,
ErrorType::Disconnected,
ErrorType::ConnectionTimeout,
ErrorType::ResponseTimeout,
ErrorType::Backend(BackendError::EngineShutdown),
];
match_error_chain(err, INHIBITED, &[])
}
/// Read the backend response inactivity timeout from the environment.
/// Reuses `DYN_HTTP_BACKEND_STREAM_TIMEOUT_SECS` — the same env var
/// as the HTTP-layer safety net in `disconnect.rs`.
fn response_inactivity_timeout() -> Option<std::time::Duration> {
use crate::config::environment_names::llm::DYN_HTTP_BACKEND_STREAM_TIMEOUT_SECS;
std::env::var(DYN_HTTP_BACKEND_STREAM_TIMEOUT_SECS)
.ok()
.and_then(|s| s.parse::<u64>().ok())
.filter(|&secs| secs > 0)
.map(std::time::Duration::from_secs)
}
struct OccupancyPermit {
state: Arc<RoutingOccupancyState>,
instance_id: u64,
......@@ -128,6 +141,10 @@ where
/// where transient failures are expected.
fault_detection_enabled: bool,
/// Cached response inactivity timeout. Read once at construction from
/// [`environment_names::llm::DYN_HTTP_BACKEND_STREAM_TIMEOUT_SECS`](crate::config::environment_names::llm::DYN_HTTP_BACKEND_STREAM_TIMEOUT_SECS) to avoid a syscall per request.
response_timeout: Option<std::time::Duration>,
/// Shared request occupancy state for tracked routing modes.
occupancy_state: Option<Arc<RoutingOccupancyState>>,
......@@ -235,6 +252,7 @@ where
round_robin_counter: Arc::new(AtomicU64::new(0)),
busy_threshold: None,
fault_detection_enabled: false,
response_timeout: response_inactivity_timeout(),
occupancy_state,
_phantom: PhantomData,
})
......@@ -270,6 +288,7 @@ where
round_robin_counter: Arc::new(AtomicU64::new(0)),
busy_threshold,
fault_detection_enabled: true,
response_timeout: response_inactivity_timeout(),
occupancy_state,
_phantom: PhantomData,
};
......@@ -625,6 +644,7 @@ where
}
let engine_ctx = stream.context();
let client = self.client.clone();
let client_for_timeout = self.client.clone();
let stream = stream.map(move |res| {
// Check if the error is migratable (indicates worker/connection failure)
if let Some(err) = res.err()
......@@ -637,7 +657,47 @@ where
}
res
});
Ok(ResponseStream::new(Box::pin(stream), engine_ctx))
// Request-plane inactivity timeout: emit a ResponseTimeout error item
// when the backend stops producing output. This triggers is_inhibited()
// → report_instance_down() to quarantine the worker.
let stream: Pin<Box<dyn Stream<Item = U> + Send>> = if let Some(timeout) =
self.response_timeout
{
Box::pin(async_stream::stream! {
let mut inner = Box::pin(stream);
loop {
tokio::select! {
biased;
item = inner.next() => {
match item {
Some(item) => yield item,
None => break,
}
}
_ = tokio::time::sleep(timeout) => {
tracing::warn!(
instance_id,
timeout_secs = timeout.as_secs(),
"backend response inactivity timeout — quarantining worker"
);
client_for_timeout.report_instance_down(instance_id);
yield U::from_err(
crate::error::DynamoError::builder()
.error_type(crate::error::ErrorType::ResponseTimeout)
.message("backend response inactivity timeout")
.build()
);
break;
}
}
}
})
} else {
Box::pin(stream)
};
Ok(ResponseStream::new(stream, engine_ctx))
}
Err(err) => {
if self.fault_detection_enabled && is_inhibited(err.as_ref()) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment