Unverified Commit ece76a62 authored by ZichengMa's avatar ZichengMa Committed by GitHub
Browse files

feat: Build DistributedRuntime-level HTTP server with /health /metrics (#1656)

parent 789c8284
......@@ -1816,6 +1816,7 @@ dependencies = [
"async-stream",
"async-trait",
"async_zmq",
"axum 0.8.3",
"blake3",
"bytes",
"chrono",
......
......@@ -73,6 +73,7 @@ thiserror = { version = "2.0.11" }
tokio = { version = "1", features = ["full"] }
tokio-stream = { version = "0.1" }
tokio-util = { version = "0.7", features = ["codec", "net"] }
axum = { version = "0.8" }
tracing = { version = "0.1" }
tracing-subscriber = { version = "0.3", features = ["env-filter", "local-time", "json"] }
validator = { version = "0.20.0", features = ["derive"] }
......
......@@ -1706,6 +1706,7 @@ dependencies = [
"async-stream",
"async-trait",
"async_zmq",
"axum 0.8.3",
"blake3",
"bytes",
"chrono",
......
......@@ -94,7 +94,7 @@ nix = { version = "0.26", optional = true }
unicode-segmentation = "1.12"
# http-service
axum = "0.8"
axum = { workspace = true }
# tokenizers
tokenizers = { version = "0.21.1", default-features = false, features = [
......
......@@ -37,6 +37,7 @@ async-nats = { workspace = true }
async-stream = { workspace = true }
async-trait = { workspace = true }
async_zmq = { workspace = true }
axum = { workspace = true }
blake3 = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
......
......@@ -176,14 +176,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f"
dependencies = [
"async-trait",
"axum-core",
"axum-core 0.4.5",
"bytes",
"futures-util",
"http",
"http-body",
"http-body-util",
"itoa",
"matchit",
"matchit 0.7.3",
"memchr",
"mime",
"percent-encoding",
......@@ -196,6 +196,40 @@ dependencies = [
"tower-service",
]
[[package]]
name = "axum"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "021e862c184ae977658b36c4500f7feac3221ca5da43e3f25bd04ab6c79a29b5"
dependencies = [
"axum-core 0.5.2",
"bytes",
"form_urlencoded",
"futures-util",
"http",
"http-body",
"http-body-util",
"hyper",
"hyper-util",
"itoa",
"matchit 0.8.4",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"rustversion",
"serde",
"serde_json",
"serde_path_to_error",
"serde_urlencoded",
"sync_wrapper",
"tokio",
"tower 0.5.2",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "axum-core"
version = "0.4.5"
......@@ -216,6 +250,26 @@ dependencies = [
"tower-service",
]
[[package]]
name = "axum-core"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68464cd0412f486726fb3373129ef5d2993f90c34bc2bc1c1e9943b2f4fc7ca6"
dependencies = [
"bytes",
"futures-core",
"http",
"http-body",
"http-body-util",
"mime",
"pin-project-lite",
"rustversion",
"sync_wrapper",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "backtrace"
version = "0.3.74"
......@@ -631,6 +685,7 @@ dependencies = [
"async-stream",
"async-trait",
"async_zmq",
"axum 0.8.4",
"blake3",
"bytes",
"chrono",
......@@ -1431,6 +1486,12 @@ version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
[[package]]
name = "matchit"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3"
[[package]]
name = "memchr"
version = "2.7.4"
......@@ -2284,6 +2345,16 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_path_to_error"
version = "0.1.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59fab13f937fa393d08645bf3a84bdfe86e296747b506ada67bb15f10f218b2a"
dependencies = [
"itoa",
"serde",
]
[[package]]
name = "serde_repr"
version = "0.1.20"
......@@ -2304,6 +2375,18 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_urlencoded"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd"
dependencies = [
"form_urlencoded",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "service_metrics"
version = "0.3.2"
......@@ -2722,7 +2805,7 @@ checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52"
dependencies = [
"async-stream",
"async-trait",
"axum",
"axum 0.7.9",
"base64",
"bytes",
"h2",
......@@ -2790,8 +2873,10 @@ dependencies = [
"futures-util",
"pin-project-lite",
"sync_wrapper",
"tokio",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
......@@ -2812,6 +2897,7 @@ version = "0.1.41"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0"
dependencies = [
"log",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
......
......@@ -22,6 +22,12 @@ use figment::{
use serde::{Deserialize, Serialize};
use validator::Validate;
/// Default HTTP server host
const DEFAULT_HTTP_SERVER_HOST: &str = "0.0.0.0";
/// Default HTTP server port
const DEFAULT_HTTP_SERVER_PORT: u16 = 9090;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkerConfig {
/// Grace shutdown period for http-service.
......@@ -71,6 +77,22 @@ pub struct RuntimeConfig {
#[builder(default = "512")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub max_blocking_threads: usize,
/// HTTP server host for health and metrics endpoints
#[builder(default = "DEFAULT_HTTP_SERVER_HOST.to_string()")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub http_server_host: String,
/// HTTP server port for health and metrics endpoints
/// If set to 0, the system will assign a random available port
#[builder(default = "DEFAULT_HTTP_SERVER_PORT")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub http_server_port: u16,
/// Health and metrics HTTP server enabled
#[builder(default = "false")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub http_enabled: bool,
}
impl RuntimeConfig {
......@@ -107,10 +129,20 @@ impl RuntimeConfig {
Ok(config)
}
/// Check if HTTP server should be enabled
/// HTTP server is enabled by default, but can be disabled by setting DYN_RUNTIME_HTTP_ENABLED to false
/// If a port is explicitly provided, HTTP server will be enabled regardless
pub fn http_server_enabled(&self) -> bool {
self.http_enabled
}
pub fn single_threaded() -> Self {
RuntimeConfig {
num_worker_threads: 1,
max_blocking_threads: 1,
http_server_host: DEFAULT_HTTP_SERVER_HOST.to_string(),
http_server_port: DEFAULT_HTTP_SERVER_PORT,
http_enabled: false,
}
}
......@@ -129,6 +161,9 @@ impl Default for RuntimeConfig {
Self {
num_worker_threads: 16,
max_blocking_threads: 16,
http_server_host: DEFAULT_HTTP_SERVER_HOST.to_string(),
http_server_port: DEFAULT_HTTP_SERVER_PORT,
http_enabled: false,
}
}
}
......@@ -142,6 +177,22 @@ impl RuntimeConfigBuilder {
}
}
/// Check if a string is truthy
/// This will be used to evaluate environment variables or any other subjective
/// configuration parameters that can be set by the user that should be evaluated
/// as a boolean value.
pub fn is_truthy(val: &str) -> bool {
matches!(val.to_lowercase().as_str(), "1" | "true" | "on" | "yes")
}
/// Check if a string is falsey
/// This will be used to evaluate environment variables or any other subjective
/// configuration parameters that can be set by the user that should be evaluated
/// as a boolean value (opposite of is_truthy).
pub fn is_falsey(val: &str) -> bool {
matches!(val.to_lowercase().as_str(), "0" | "false" | "off" | "no")
}
/// Check if an environment variable is truthy
pub fn env_is_truthy(env: &str) -> bool {
match std::env::var(env) {
......@@ -150,12 +201,12 @@ pub fn env_is_truthy(env: &str) -> bool {
}
}
/// Check if a string is truthy
/// This will be used to evaluate environment variables or any other subjective
/// configuration parameters that can be set by the user that should be evaluated
/// as a boolean value.
pub fn is_truthy(val: &str) -> bool {
matches!(val.to_lowercase().as_str(), "1" | "true" | "on" | "yes")
/// Check if an environment variable is falsey
pub fn env_is_falsey(env: &str) -> bool {
match std::env::var(env) {
Ok(val) => is_falsey(val.as_str()),
Err(_) => false,
}
}
/// Check whether JSONL logging enabled
......@@ -239,4 +290,91 @@ mod tests {
},
)
}
#[test]
fn test_runtime_config_http_server_env_vars() -> Result<()> {
temp_env::with_vars(
vec![
("DYN_RUNTIME_HTTP_SERVER_HOST", Some("127.0.0.1")),
("DYN_RUNTIME_HTTP_SERVER_PORT", Some("9090")),
],
|| {
let config = RuntimeConfig::from_settings()?;
assert_eq!(config.http_server_host, "127.0.0.1");
assert_eq!(config.http_server_port, 9090);
Ok(())
},
)
}
#[test]
fn test_http_server_enabled_by_default() {
temp_env::with_vars(vec![("DYN_RUNTIME_HTTP_ENABLED", None::<&str>)], || {
let config = RuntimeConfig::from_settings().unwrap();
assert!(!config.http_server_enabled());
});
}
#[test]
fn test_http_server_disabled_explicitly() {
temp_env::with_vars(vec![("DYN_RUNTIME_HTTP_ENABLED", Some("false"))], || {
let config = RuntimeConfig::from_settings().unwrap();
assert!(!config.http_server_enabled());
});
}
#[test]
fn test_http_server_enabled_explicitly() {
temp_env::with_vars(vec![("DYN_RUNTIME_HTTP_ENABLED", Some("true"))], || {
let config = RuntimeConfig::from_settings().unwrap();
assert!(config.http_server_enabled());
});
}
#[test]
fn test_http_server_enabled_by_port() {
temp_env::with_vars(vec![("DYN_RUNTIME_HTTP_SERVER_PORT", Some("8080"))], || {
let config = RuntimeConfig::from_settings().unwrap();
assert!(!config.http_server_enabled());
assert_eq!(config.http_server_port, 8080);
});
}
#[test]
fn test_is_truthy_and_falsey() {
// Test truthy values
assert!(is_truthy("1"));
assert!(is_truthy("true"));
assert!(is_truthy("TRUE"));
assert!(is_truthy("on"));
assert!(is_truthy("yes"));
// Test falsey values
assert!(is_falsey("0"));
assert!(is_falsey("false"));
assert!(is_falsey("FALSE"));
assert!(is_falsey("off"));
assert!(is_falsey("no"));
// Test opposite behavior
assert!(!is_truthy("0"));
assert!(!is_falsey("1"));
// Test env functions
temp_env::with_vars(vec![("TEST_TRUTHY", Some("true"))], || {
assert!(env_is_truthy("TEST_TRUTHY"));
assert!(!env_is_falsey("TEST_TRUTHY"));
});
temp_env::with_vars(vec![("TEST_FALSEY", Some("false"))], || {
assert!(!env_is_truthy("TEST_FALSEY"));
assert!(env_is_falsey("TEST_FALSEY"));
});
// Test missing env vars
temp_env::with_vars(vec![("TEST_MISSING", None::<&str>)], || {
assert!(!env_is_truthy("TEST_MISSING"));
assert!(!env_is_falsey("TEST_MISSING"));
});
}
}
......@@ -65,7 +65,7 @@ impl DistributedRuntime {
})
.await??;
Ok(Self {
let distributed_runtime = Self {
runtime,
etcd_client,
nats_client,
......@@ -73,7 +73,35 @@ impl DistributedRuntime {
component_registry: component::Registry::new(),
is_static,
instance_sources: Arc::new(Mutex::new(HashMap::new())),
})
start_time: std::time::Instant::now(),
};
// Start HTTP server for health and metrics (if enabled)
let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();
if config.http_server_enabled() {
let drt_arc = Arc::new(distributed_runtime.clone());
let runtime_clone = distributed_runtime.runtime.clone();
secondary.spawn(async move {
if let Err(e) = crate::http_server::start_http_server(
&config.http_server_host,
config.http_server_port,
runtime_clone.child_token(),
drt_arc,
)
.await
{
tracing::error!("HTTP server startup failed: {}", e);
} else {
tracing::debug!("HTTP server started successfully");
}
});
} else {
tracing::debug!(
"Health and metrics HTTP server is disabled via DYN_RUNTIME_HTTP_ENABLED"
);
}
Ok(distributed_runtime)
}
pub async fn from_settings(runtime: Runtime) -> Result<Self> {
......@@ -163,6 +191,11 @@ impl DistributedRuntime {
pub fn instance_sources(&self) -> Arc<Mutex<HashMap<Endpoint, Weak<InstanceSource>>>> {
self.instance_sources.clone()
}
/// Get the uptime of this DistributedRuntime in seconds
pub fn uptime(&self) -> std::time::Duration {
self.start_time.elapsed()
}
}
#[derive(Dissolve)]
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use axum::{body, http::StatusCode, response::IntoResponse, routing::get, Router};
use prometheus::{
proto, register_gauge_with_registry, Encoder, Gauge, Opts, Registry, TextEncoder,
};
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio_util::sync::CancellationToken;
use tracing;
/// Runtime metrics for HTTP server
pub struct RuntimeMetrics {
uptime_gauge: Gauge,
}
impl RuntimeMetrics {
pub fn new(metrics_registry: &Arc<Registry>) -> anyhow::Result<Arc<Self>> {
let uptime_opts = Opts::new(
"uptime_seconds",
"Total uptime of the DistributedRuntime in seconds",
)
.namespace("dynamo")
.subsystem("runtime");
let uptime_gauge = register_gauge_with_registry!(uptime_opts, metrics_registry)?;
Ok(Arc::new(Self { uptime_gauge }))
}
pub fn update_uptime(&self, uptime_seconds: f64) {
self.uptime_gauge.set(uptime_seconds);
}
}
/// HTTP server state containing pre-created metrics
pub struct HttpServerState {
drt: Arc<crate::DistributedRuntime>,
registry: Arc<Registry>,
runtime_metrics: Arc<RuntimeMetrics>,
}
impl HttpServerState {
/// Create new HTTP server state with pre-created metrics
pub fn new(drt: Arc<crate::DistributedRuntime>) -> anyhow::Result<Self> {
let registry = Arc::new(Registry::new());
// Create runtime metrics
let runtime_metrics = RuntimeMetrics::new(&registry)?;
Ok(Self {
drt,
registry,
runtime_metrics,
})
}
}
/// Start HTTP server with DistributedRuntime support
pub async fn start_http_server(
host: &str,
port: u16,
cancel_token: CancellationToken,
drt: Arc<crate::DistributedRuntime>,
) -> anyhow::Result<()> {
// Create HTTP server state with pre-created metrics
let server_state = Arc::new(HttpServerState::new(drt)?);
let app = Router::new()
// .route(
// "/health",
// get({
// let state = Arc::clone(&server_state);
// move || health_handler(state)
// }),
// )
.route(
"/metrics",
get({
let state = Arc::clone(&server_state);
move || metrics_handler(state)
}),
);
let address = format!("{}:{}", host, port);
tracing::debug!("Starting HTTP server on: {}", address);
let listener = match TcpListener::bind(&address).await {
Ok(listener) => {
// get the actual address and port, print in debug level
let actual_address = listener.local_addr()?;
tracing::debug!("HTTP server bound to: {}", actual_address);
listener
}
Err(e) => {
tracing::error!("Failed to bind to address {}: {}", address, e);
return Err(anyhow::anyhow!("Failed to bind to address: {}", e));
}
};
let observer = cancel_token.child_token();
if let Err(e) = axum::serve(listener, app)
.with_graceful_shutdown(observer.cancelled_owned())
.await
{
tracing::error!("HTTP server error: {}", e);
}
Ok(())
}
// /// Health handler
// async fn health_handler(state: Arc<HttpServerState>) -> impl IntoResponse {
// let uptime = state.drt.uptime();
// let response = format!("OK\nUptime: {} seconds", uptime.as_secs());
// (StatusCode::OK, response)
// }
/// Metrics handler with DistributedRuntime uptime
async fn metrics_handler(state: Arc<HttpServerState>) -> impl IntoResponse {
// Update the uptime gauge with current value
let uptime_seconds = state.drt.uptime().as_secs_f64();
state.runtime_metrics.update_uptime(uptime_seconds);
// Gather metrics from the registry
let metric_families = state.registry.gather();
let encoder = TextEncoder::new();
let mut buffer = Vec::new();
match encoder.encode(&metric_families, &mut buffer) {
Ok(()) => match String::from_utf8(buffer) {
Ok(response) => (StatusCode::OK, response),
Err(e) => {
tracing::error!("Failed to encode metrics as UTF-8: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to encode metrics as UTF-8".to_string(),
)
}
},
Err(e) => {
tracing::error!("Failed to encode metrics: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to encode metrics".to_string(),
)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::time::{sleep, Duration};
#[tokio::test]
async fn test_http_server_lifecycle() {
let cancel_token = CancellationToken::new();
let cancel_token_for_server = cancel_token.clone();
// Test basic HTTP server lifecycle without DistributedRuntime
let app = Router::new().route("/test", get(|| async { (StatusCode::OK, "test") }));
// start HTTP server
let server_handle = tokio::spawn(async move {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let _ = axum::serve(listener, app)
.with_graceful_shutdown(cancel_token_for_server.cancelled_owned())
.await;
});
// wait for a while to let the server start
sleep(Duration::from_millis(100)).await;
// cancel token
cancel_token.cancel();
// wait for the server to shut down
let result = tokio::time::timeout(Duration::from_secs(5), server_handle).await;
assert!(
result.is_ok(),
"HTTP server should shut down when cancel token is cancelled"
);
}
#[tokio::test]
async fn test_runtime_metrics_creation() {
// Test RuntimeMetrics creation and functionality
let registry = Arc::new(Registry::new());
let runtime_metrics = RuntimeMetrics::new(&registry).unwrap();
// Wait a bit to ensure uptime is measurable
tokio::time::sleep(Duration::from_millis(10)).await;
// Test updating uptime
let uptime_seconds = 123.456;
runtime_metrics.update_uptime(uptime_seconds);
// Gather metrics from the registry
let metric_families = registry.gather();
let encoder = TextEncoder::new();
let mut buffer = Vec::new();
encoder.encode(&metric_families, &mut buffer).unwrap();
let response = String::from_utf8(buffer).unwrap();
assert!(response.contains("dynamo_runtime_uptime_seconds"));
assert!(response.contains("123.456"));
}
#[tokio::test]
async fn test_runtime_metrics_namespace() {
// Test that metrics have correct namespace
let registry = Arc::new(Registry::new());
let runtime_metrics = RuntimeMetrics::new(&registry).unwrap();
runtime_metrics.update_uptime(42.0);
let metric_families = registry.gather();
let encoder = TextEncoder::new();
let mut buffer = Vec::new();
encoder.encode(&metric_families, &mut buffer).unwrap();
let response = String::from_utf8(buffer).unwrap();
// Check for the full metric name with namespace and subsystem
assert!(response.contains("dynamo_runtime_uptime_seconds"));
assert!(response.contains("Total uptime of the DistributedRuntime in seconds"));
}
}
......@@ -36,6 +36,7 @@ pub use config::RuntimeConfig;
pub mod component;
pub mod discovery;
pub mod engine;
pub mod http_server;
pub mod logging;
pub mod pipeline;
pub mod prelude;
......@@ -97,4 +98,7 @@ pub struct DistributedRuntime {
is_static: bool,
instance_sources: Arc<Mutex<HashMap<Endpoint, Weak<InstanceSource>>>>,
// Start time for tracking uptime
start_time: std::time::Instant,
}
......@@ -60,6 +60,7 @@ pub const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE: u64 = 30;
#[derive(Debug, Clone)]
pub struct Worker {
runtime: Runtime,
config: RuntimeConfig,
}
impl Worker {
......@@ -84,7 +85,7 @@ impl Worker {
})?;
let runtime = Runtime::from_handle(rt.handle().clone())?;
Ok(Worker { runtime })
Ok(Worker { runtime, config })
}
pub fn tokio_runtime(&self) -> Result<&'static tokio::runtime::Runtime> {
......@@ -202,7 +203,8 @@ impl Worker {
return Err(error!("Worker already initialized"));
}
let runtime = Runtime::from_current()?;
Ok(Worker { runtime })
let config = RuntimeConfig::from_settings()?;
Ok(Worker { runtime, config })
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment