"ssh:/git@developer.sourcefind.cn:2222/OpenDAS/vllm_cscc.git" did not exist on "0ee349b5534e3d02b499b1126f2abde73b798fe9"
Unverified Commit b48d4c3b authored by heisenberglit's avatar heisenberglit Committed by GitHub
Browse files

feat(health): extend /health endpoint to include instances (#1312) (#2011)

parent 9dba3c3f
......@@ -22,18 +22,22 @@ use dynamo_runtime::{DistributedRuntime, Runtime};
/// Build and run an HTTP service
pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Result<()> {
let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
let etcd_client = distributed_runtime.etcd_client().clone();
let http_service = service_v2::HttpService::builder()
.port(engine_config.local_model().http_port())
.enable_chat_endpoints(true)
.enable_cmpl_endpoints(true)
.enable_embeddings_endpoints(true)
.with_request_template(engine_config.local_model().request_template())
.with_etcd_client(etcd_client.clone())
.build()?;
match engine_config {
EngineConfig::Dynamic(_) => {
let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
match distributed_runtime.etcd_client() {
Some(etcd_client) => {
match etcd_client {
Some(ref etcd_client) => {
let router_config = engine_config.local_model().router_config();
// Listen for models registering themselves in etcd, add them to HTTP service
run_watcher(
......
......@@ -30,6 +30,7 @@
use super::{service_v2, RouteDoc};
use axum::{http::Method, http::StatusCode, response::IntoResponse, routing::get, Json, Router};
use dynamo_runtime::instances::list_all_instances;
use serde_json::json;
use std::sync::Arc;
......@@ -79,13 +80,25 @@ async fn health_handler(
axum::extract::State(state): axum::extract::State<Arc<service_v2::State>>,
) -> impl IntoResponse {
let model_entries = state.manager().get_model_entries();
let instances = if let Some(etcd_client) = state.etcd_client() {
match list_all_instances(etcd_client).await {
Ok(instances) => instances,
Err(err) => {
tracing::warn!("Failed to fetch instances from etcd: {}", err);
vec![]
}
}
} else {
vec![]
};
if model_entries.is_empty() {
(
StatusCode::SERVICE_UNAVAILABLE,
Json(json!({
"status": "unhealthy",
"message": "No endpoints available"
"message": "No endpoints available",
"instances": instances
})),
)
} else {
......@@ -97,7 +110,8 @@ async fn health_handler(
StatusCode::OK,
Json(json!({
"status": "healthy",
"endpoints": endpoints
"endpoints": endpoints,
"instances": instances
})),
)
}
......
......@@ -12,6 +12,7 @@ use crate::discovery::ModelManager;
use crate::request_template::RequestTemplate;
use anyhow::Result;
use derive_builder::Builder;
use dynamo_runtime::transports::etcd;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
......@@ -19,6 +20,7 @@ use tokio_util::sync::CancellationToken;
pub struct State {
metrics: Arc<Metrics>,
manager: Arc<ModelManager>,
etcd_client: Option<etcd::Client>,
}
impl State {
......@@ -26,6 +28,15 @@ impl State {
Self {
manager,
metrics: Arc::new(Metrics::default()),
etcd_client: None,
}
}
pub fn new_with_etcd(manager: Arc<ModelManager>, etcd_client: Option<etcd::Client>) -> Self {
Self {
manager,
metrics: Arc::new(Metrics::default()),
etcd_client,
}
}
......@@ -42,6 +53,10 @@ impl State {
self.manager.clone()
}
pub fn etcd_client(&self) -> Option<&etcd::Client> {
self.etcd_client.as_ref()
}
// TODO
pub fn sse_keep_alive(&self) -> Option<Duration> {
None
......@@ -84,6 +99,9 @@ pub struct HttpServiceConfig {
#[builder(default = "None")]
request_template: Option<RequestTemplate>,
#[builder(default = "None")]
etcd_client: Option<etcd::Client>,
}
impl HttpService {
......@@ -155,7 +173,7 @@ impl HttpServiceConfigBuilder {
let config: HttpServiceConfig = self.build_internal()?;
let model_manager = Arc::new(ModelManager::new());
let state = Arc::new(State::new(model_manager));
let state = Arc::new(State::new_with_etcd(model_manager, config.etcd_client));
// enable prometheus metrics
let registry = metrics::Registry::new();
......@@ -225,4 +243,9 @@ impl HttpServiceConfigBuilder {
self.request_template = Some(request_template);
self
}
pub fn with_etcd_client(mut self, etcd_client: Option<etcd::Client>) -> Self {
self.etcd_client = Some(etcd_client);
self
}
}
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Instance management functions for the distributed runtime.
//!
//! This module provides functionality to list and manage instances across
//! the entire distributed system, complementing the component-specific
//! instance listing in `component.rs`.
use crate::component::{Instance, INSTANCE_ROOT_PATH};
use crate::transports::etcd::Client as EtcdClient;
pub async fn list_all_instances(etcd_client: &EtcdClient) -> anyhow::Result<Vec<Instance>> {
let mut instances = Vec::new();
for kv in etcd_client
.kv_get_prefix(format!("{}/", INSTANCE_ROOT_PATH))
.await?
{
match serde_json::from_slice::<Instance>(kv.value()) {
Ok(instance) => instances.push(instance),
Err(err) => {
tracing::warn!(
"Failed to parse instance from etcd: {}. Key: {}, Value: {}",
err,
kv.key_str().unwrap_or("invalid_key"),
kv.value_str().unwrap_or("invalid_value")
);
}
}
}
Ok(instances)
}
......@@ -38,6 +38,7 @@ pub mod discovery;
pub mod engine;
pub mod http_server;
pub use http_server::HttpServerInfo;
pub mod instances;
pub mod logging;
pub mod metrics;
pub mod pipeline;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment