Unverified Commit 54dcf3ad authored by Michael Feil's avatar Michael Feil Committed by GitHub
Browse files

feat: graceful exit leading to 500 and connection errors -...


feat: graceful exit leading to 500 and connection errors - liveness-return-false if-cancelled (#4914)
Signed-off-by: default avatarmichaelfeil <me@michaelfeil.eu>
parent 6b5842ee
...@@ -38,8 +38,19 @@ pub fn live_check_router( ...@@ -38,8 +38,19 @@ pub fn live_check_router(
} }
async fn live_handler( async fn live_handler(
axum::extract::State(_state): axum::extract::State<Arc<service_v2::State>>, axum::extract::State(state): axum::extract::State<Arc<service_v2::State>>,
) -> impl IntoResponse { ) -> impl IntoResponse {
// Check if the http service is being cancelled/shutdown
if state.is_cancelled() {
return (
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({
"status": "shutting_down",
"message": "Service is shutting down"
})),
);
}
( (
StatusCode::OK, StatusCode::OK,
Json(json!({ Json(json!({
......
...@@ -18,6 +18,7 @@ use crate::request_template::RequestTemplate; ...@@ -18,6 +18,7 @@ use crate::request_template::RequestTemplate;
use anyhow::Result; use anyhow::Result;
use axum_server::tls_rustls::RustlsConfig; use axum_server::tls_rustls::RustlsConfig;
use derive_builder::Builder; use derive_builder::Builder;
use dynamo_runtime::config::environment_names::llm as env_llm;
use dynamo_runtime::discovery::{Discovery, KVStoreDiscovery}; use dynamo_runtime::discovery::{Discovery, KVStoreDiscovery};
use dynamo_runtime::logging::make_request_span; use dynamo_runtime::logging::make_request_span;
use dynamo_runtime::metrics::prometheus_names::name_prefix; use dynamo_runtime::metrics::prometheus_names::name_prefix;
...@@ -34,6 +35,7 @@ pub struct State { ...@@ -34,6 +35,7 @@ pub struct State {
store: kv::Manager, store: kv::Manager,
discovery_client: Arc<dyn Discovery>, discovery_client: Arc<dyn Discovery>,
flags: StateFlags, flags: StateFlags,
cancel_token: CancellationToken,
} }
#[derive(Default, Debug)] #[derive(Default, Debug)]
...@@ -73,12 +75,17 @@ impl StateFlags { ...@@ -73,12 +75,17 @@ impl StateFlags {
} }
impl State { impl State {
pub fn new(manager: Arc<ModelManager>, store: kv::Manager) -> Self { pub fn new(
manager: Arc<ModelManager>,
store: kv::Manager,
cancel_token: CancellationToken,
) -> Self {
// Initialize discovery backed by KV store // Initialize discovery backed by KV store
// Create a cancellation token for the discovery's watch streams // Create a cancellation token for the discovery's watch streams
let discovery_client = { let discovery_client = {
let cancel_token = CancellationToken::new(); let discovery_cancel_token = cancel_token.child_token();
Arc::new(KVStoreDiscovery::new(store.clone(), cancel_token)) as Arc<dyn Discovery> Arc::new(KVStoreDiscovery::new(store.clone(), discovery_cancel_token))
as Arc<dyn Discovery>
}; };
Self { Self {
...@@ -92,6 +99,7 @@ impl State { ...@@ -92,6 +99,7 @@ impl State {
embeddings_endpoints_enabled: AtomicBool::new(false), embeddings_endpoints_enabled: AtomicBool::new(false),
responses_endpoints_enabled: AtomicBool::new(false), responses_endpoints_enabled: AtomicBool::new(false),
}, },
cancel_token,
} }
} }
...@@ -116,6 +124,16 @@ impl State { ...@@ -116,6 +124,16 @@ impl State {
self.discovery_client.clone() self.discovery_client.clone()
} }
/// Check if the service is shutting down
pub fn is_cancelled(&self) -> bool {
self.cancel_token.is_cancelled()
}
/// Get the cancellation token
pub fn cancel_token(&self) -> &CancellationToken {
&self.cancel_token
}
// TODO // TODO
pub fn sse_keep_alive(&self) -> Option<Duration> { pub fn sse_keep_alive(&self) -> Option<Duration> {
None None
...@@ -218,6 +236,8 @@ impl HttpService { ...@@ -218,6 +236,8 @@ impl HttpService {
let router = self.router.clone(); let router = self.router.clone();
let observer = cancel_token.child_token(); let observer = cancel_token.child_token();
let state_cancel = self.state.cancel_token().clone();
let addr: SocketAddr = address let addr: SocketAddr = address
.parse() .parse()
.map_err(|e| anyhow::anyhow!("Invalid address '{}': {}", address, e))?; .map_err(|e| anyhow::anyhow!("Invalid address '{}': {}", address, e))?;
...@@ -252,9 +272,11 @@ impl HttpService { ...@@ -252,9 +272,11 @@ impl HttpService {
result.map_err(|e| anyhow::anyhow!("HTTPS server error: {}", e))?; result.map_err(|e| anyhow::anyhow!("HTTPS server error: {}", e))?;
} }
_ = observer.cancelled() => { _ = observer.cancelled() => {
state_cancel.cancel();
tracing::info!("HTTPS server shutdown requested"); tracing::info!("HTTPS server shutdown requested");
handle.graceful_shutdown(Some(Duration::from_secs(5))); // accepting requests for 5 more seconds, to allow incorrectly routed requests to arrive
// TODO: Do we need to wait? handle.graceful_shutdown(Some(Duration::from_secs(get_graceful_shutdown_timeout() as u64)));
// no longer accepting requests, draining all existing connections
} }
} }
} else { } else {
...@@ -281,7 +303,15 @@ impl HttpService { ...@@ -281,7 +303,15 @@ impl HttpService {
})?; })?;
axum::serve(listener, router) axum::serve(listener, router)
.with_graceful_shutdown(observer.cancelled_owned()) .with_graceful_shutdown(async move {
observer.cancelled_owned().await;
state_cancel.cancel();
tracing::info!("HTTP server shutdown requested");
// accepting requests for 5 more seconds, to allow incorrectly routed requests to arrive
tokio::time::sleep(Duration::from_secs(get_graceful_shutdown_timeout() as u64))
.await;
// no longer accepting requests, draining all existing connections
})
.await .await
.inspect_err(|_| cancel_token.cancel())?; .inspect_err(|_| cancel_token.cancel())?;
} }
...@@ -304,6 +334,13 @@ impl HttpService { ...@@ -304,6 +334,13 @@ impl HttpService {
} }
} }
fn get_graceful_shutdown_timeout() -> usize {
std::env::var(env_llm::DYN_HTTP_GRACEFUL_SHUTDOWN_TIMEOUT_SECS)
.ok()
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(5)
}
/// Environment variable to set the metrics endpoint path (default: `/metrics`) /// Environment variable to set the metrics endpoint path (default: `/metrics`)
static HTTP_SVC_METRICS_PATH_ENV: &str = "DYN_HTTP_SVC_METRICS_PATH"; static HTTP_SVC_METRICS_PATH_ENV: &str = "DYN_HTTP_SVC_METRICS_PATH";
/// Environment variable to set the models endpoint path (default: `/v1/models`) /// Environment variable to set the models endpoint path (default: `/v1/models`)
...@@ -326,7 +363,9 @@ impl HttpServiceConfigBuilder { ...@@ -326,7 +363,9 @@ impl HttpServiceConfigBuilder {
let config: HttpServiceConfig = self.build_internal()?; let config: HttpServiceConfig = self.build_internal()?;
let model_manager = Arc::new(ModelManager::new()); let model_manager = Arc::new(ModelManager::new());
let state = Arc::new(State::new(model_manager, config.store)); // Create a temporary cancel token for building - will be replaced in spawn/run
let temp_cancel_token = CancellationToken::new();
let state = Arc::new(State::new(model_manager, config.store, temp_cancel_token));
state state
.flags .flags
.set(&EndpointType::Chat, config.enable_chat_endpoints); .set(&EndpointType::Chat, config.enable_chat_endpoints);
...@@ -476,3 +515,48 @@ impl HttpServiceConfigBuilder { ...@@ -476,3 +515,48 @@ impl HttpServiceConfigBuilder {
routes routes
} }
} }
#[cfg(test)]
mod tests {
use super::*;
use serial_test::serial;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
#[tokio::test]
#[serial]
async fn test_liveness_endpoint_reflects_cancellation() {
// 1. Setup service & token
let cancel_token = Arc::new(CancellationToken::new());
let service = HttpService::builder().build().unwrap();
let port = service.port;
// 2. Spawn service with shared token
let service_token = cancel_token.clone();
let handle = tokio::spawn(async move {
service.run((*service_token).clone()).await.unwrap();
});
tokio::time::sleep(std::time::Duration::from_millis(1)).await;
// 3. Cancel the token
cancel_token.cancel();
// 4. Wait a tiny bit for propagation
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
// 5. Hit the endpoint
let client = reqwest::Client::new();
let resp = client
.get(format!("http://localhost:{}/live", port))
.send()
.await
.expect("Request failed");
// 6. ASSERTION: Should be 503 Service Unavailable
assert_eq!(resp.status(), reqwest::StatusCode::SERVICE_UNAVAILABLE);
// Clean up
handle.abort();
}
}
...@@ -225,6 +225,9 @@ pub mod llm { ...@@ -225,6 +225,9 @@ pub mod llm {
/// HTTP body size limit in MB /// HTTP body size limit in MB
pub const DYN_HTTP_BODY_LIMIT_MB: &str = "DYN_HTTP_BODY_LIMIT_MB"; pub const DYN_HTTP_BODY_LIMIT_MB: &str = "DYN_HTTP_BODY_LIMIT_MB";
pub const DYN_HTTP_GRACEFUL_SHUTDOWN_TIMEOUT_SECS: &str =
"DYN_HTTP_GRACEFUL_SHUTDOWN_TIMEOUT_SECS";
/// Enable LoRA adapter support (set to "true" to enable) /// Enable LoRA adapter support (set to "true" to enable)
pub const DYN_LORA_ENABLED: &str = "DYN_LORA_ENABLED"; pub const DYN_LORA_ENABLED: &str = "DYN_LORA_ENABLED";
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment