Unverified Commit 254f4819 authored by Yingge He's avatar Yingge He Committed by GitHub
Browse files

refactor: Rename HTTP server to metrics server in worker process (#2318)

parent 2cf67765
...@@ -119,7 +119,7 @@ dynamo_component_response_bytes_total{dynamo_component="example_component",dynam ...@@ -119,7 +119,7 @@ dynamo_component_response_bytes_total{dynamo_component="example_component",dynam
# HELP uptime_seconds Total uptime of the DistributedRuntime in seconds # HELP uptime_seconds Total uptime of the DistributedRuntime in seconds
# TYPE uptime_seconds gauge # TYPE uptime_seconds gauge
uptime_seconds{dynamo_namespace="http_server"} 1.8226759879999999 uptime_seconds{dynamo_namespace="metrics_server"} 1.8226759879999999
``` ```
## Example ## Example
...@@ -176,13 +176,13 @@ if enable_custom_metrics { ...@@ -176,13 +176,13 @@ if enable_custom_metrics {
## Running the Example ## Running the Example
**Important**: You must set the `DYN_SYSTEM_PORT` environment variable to specify which port the HTTP server will run on. **Important**: You must set the `DYN_SYSTEM_PORT` environment variable to specify which port the HTTP system metrics server will run on.
```bash ```bash
# Run the system metrics example # Run the system metrics example
DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=8081 cargo run --bin system_server DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=8081 cargo run --bin system_server
``` ```
The server will start an HTTP server on the specified port (8081 in this example) that exposes the Prometheus metrics endpoint at `/metrics`. The server will start an HTTP system metrics server on the specified port (8081 in this example) that exposes the Prometheus metrics endpoint at `/metrics`.
To Run an actual LLM frontend + server (aggregated example), launch both of them. By default, the frontend listens to port 8080. To Run an actual LLM frontend + server (aggregated example), launch both of them. By default, the frontend listens to port 8080.
......
...@@ -42,15 +42,15 @@ async fn test_backend_with_metrics() -> Result<()> { ...@@ -42,15 +42,15 @@ async fn test_backend_with_metrics() -> Result<()> {
let runtime = Runtime::from_current()?; let runtime = Runtime::from_current()?;
let distributed = DistributedRuntime::from_settings(runtime.clone()).await?; let distributed = DistributedRuntime::from_settings(runtime.clone()).await?;
// Get the HTTP server info to find the actual port // Get the Metrics server info to find the actual port
let http_server_info = distributed.http_server_info(); let metrics_server_info = distributed.metrics_server_info();
let metrics_port = match http_server_info { let metrics_port = match metrics_server_info {
Some(info) => { Some(info) => {
println!("HTTP server running on: {}", info.address()); println!("Metrics server running on: {}", info.address());
info.port() info.port()
} }
None => { None => {
panic!("HTTP server not started - check DYN_SYSTEM_ENABLED environment variable"); panic!("Metrics server not started - check DYN_SYSTEM_ENABLED environment variable");
} }
}; };
......
...@@ -77,7 +77,7 @@ impl DistributedRuntime { ...@@ -77,7 +77,7 @@ impl DistributedRuntime {
}) })
.await??; .await??;
// Start HTTP server for health and metrics if enabled in configuration // Start system metrics server for health and metrics if enabled in configuration
let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default(); let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();
// IMPORTANT: We must extract cancel_token from runtime BEFORE moving runtime into the struct below. // IMPORTANT: We must extract cancel_token from runtime BEFORE moving runtime into the struct below.
// This is because after moving, runtime is no longer accessible in this scope (ownership rules). // This is because after moving, runtime is no longer accessible in this scope (ownership rules).
...@@ -102,7 +102,7 @@ impl DistributedRuntime { ...@@ -102,7 +102,7 @@ impl DistributedRuntime {
etcd_client, etcd_client,
nats_client, nats_client,
tcp_server: Arc::new(OnceCell::new()), tcp_server: Arc::new(OnceCell::new()),
http_server: Arc::new(OnceLock::new()), metrics_server: Arc::new(OnceLock::new()),
component_registry: component::Registry::new(), component_registry: component::Registry::new(),
is_static, is_static,
instance_sources: Arc::new(Mutex::new(HashMap::new())), instance_sources: Arc::new(Mutex::new(HashMap::new())),
...@@ -113,13 +113,13 @@ impl DistributedRuntime { ...@@ -113,13 +113,13 @@ impl DistributedRuntime {
system_health, system_health,
}; };
// Start HTTP server if enabled // Start metrics server if enabled
if let Some(cancel_token) = cancel_token { if let Some(cancel_token) = cancel_token {
let host = config.system_host.clone(); let host = config.system_host.clone();
let port = config.system_port; let port = config.system_port;
// Start HTTP server (it spawns its own task internally) // Start metrics server (it spawns its own task internally)
match crate::http_server::spawn_http_server( match crate::metrics_server::spawn_metrics_server(
&host, &host,
port, port,
cancel_token, cancel_token,
...@@ -128,24 +128,24 @@ impl DistributedRuntime { ...@@ -128,24 +128,24 @@ impl DistributedRuntime {
.await .await
{ {
Ok((addr, handle)) => { Ok((addr, handle)) => {
tracing::info!("HTTP server started successfully on {}", addr); tracing::info!("Metrics server started successfully on {}", addr);
// Store HTTP server information // Store metrics server information
let http_server_info = let metrics_server_info =
crate::http_server::HttpServerInfo::new(addr, Some(handle)); crate::metrics_server::MetricsServerInfo::new(addr, Some(handle));
// Initialize the http_server field // Initialize the metrics_server field
distributed_runtime distributed_runtime
.http_server .metrics_server
.set(Arc::new(http_server_info)) .set(Arc::new(metrics_server_info))
.expect("HTTP server info should only be set once"); .expect("Metrics server info should only be set once");
} }
Err(e) => { Err(e) => {
tracing::error!("HTTP server startup failed: {}", e); tracing::error!("Metrics server startup failed: {}", e);
} }
} }
} else { } else {
tracing::debug!("Health and metrics HTTP server is disabled via DYN_SYSTEM_ENABLED"); tracing::debug!("Health and metrics server is disabled via DYN_SYSTEM_ENABLED");
} }
Ok(distributed_runtime) Ok(distributed_runtime)
...@@ -226,9 +226,9 @@ impl DistributedRuntime { ...@@ -226,9 +226,9 @@ impl DistributedRuntime {
self.nats_client.clone() self.nats_client.clone()
} }
/// Get HTTP server information if available /// Get metrics server information if available
pub fn http_server_info(&self) -> Option<Arc<crate::http_server::HttpServerInfo>> { pub fn metrics_server_info(&self) -> Option<Arc<crate::metrics_server::MetricsServerInfo>> {
self.http_server.get().cloned() self.metrics_server.get().cloned()
} }
// todo(ryan): deprecate this as we move to Discovery traits and Component Identifiers // todo(ryan): deprecate this as we move to Discovery traits and Component Identifiers
......
...@@ -36,8 +36,8 @@ pub use config::RuntimeConfig; ...@@ -36,8 +36,8 @@ pub use config::RuntimeConfig;
pub mod component; pub mod component;
pub mod discovery; pub mod discovery;
pub mod engine; pub mod engine;
pub mod http_server; pub mod metrics_server;
pub use http_server::HttpServerInfo; pub use metrics_server::MetricsServerInfo;
pub mod instances; pub mod instances;
pub mod logging; pub mod logging;
pub mod metrics; pub mod metrics;
...@@ -158,7 +158,7 @@ pub struct DistributedRuntime { ...@@ -158,7 +158,7 @@ pub struct DistributedRuntime {
etcd_client: Option<transports::etcd::Client>, etcd_client: Option<transports::etcd::Client>,
nats_client: transports::nats::Client, nats_client: transports::nats::Client,
tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>, tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
http_server: Arc<OnceLock<Arc<http_server::HttpServerInfo>>>, metrics_server: Arc<OnceLock<Arc<metrics_server::MetricsServerInfo>>>,
// local registry for components // local registry for components
// the registry allows us to use share runtime resources across instances of the same component object. // the registry allows us to use share runtime resources across instances of the same component object.
......
...@@ -28,14 +28,14 @@ use tokio_util::sync::CancellationToken; ...@@ -28,14 +28,14 @@ use tokio_util::sync::CancellationToken;
use tracing; use tracing;
use tracing::Instrument; use tracing::Instrument;
/// HTTP server information containing socket address and handle /// Metrics server information containing socket address and handle
#[derive(Debug)] #[derive(Debug)]
pub struct HttpServerInfo { pub struct MetricsServerInfo {
pub socket_addr: std::net::SocketAddr, pub socket_addr: std::net::SocketAddr,
pub handle: Option<Arc<JoinHandle<()>>>, pub handle: Option<Arc<JoinHandle<()>>>,
} }
impl HttpServerInfo { impl MetricsServerInfo {
pub fn new(socket_addr: std::net::SocketAddr, handle: Option<JoinHandle<()>>) -> Self { pub fn new(socket_addr: std::net::SocketAddr, handle: Option<JoinHandle<()>>) -> Self {
Self { Self {
socket_addr, socket_addr,
...@@ -56,7 +56,7 @@ impl HttpServerInfo { ...@@ -56,7 +56,7 @@ impl HttpServerInfo {
} }
} }
impl Clone for HttpServerInfo { impl Clone for MetricsServerInfo {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
socket_addr: self.socket_addr, socket_addr: self.socket_addr,
...@@ -65,16 +65,16 @@ impl Clone for HttpServerInfo { ...@@ -65,16 +65,16 @@ impl Clone for HttpServerInfo {
} }
} }
/// HTTP server state containing metrics and uptime tracking /// Metrics server state containing metrics and uptime tracking
pub struct HttpServerState { pub struct MetricsServerState {
// global drt registry is for printing out the entire Prometheus format output // global drt registry is for printing out the entire Prometheus format output
root_drt: Arc<crate::DistributedRuntime>, root_drt: Arc<crate::DistributedRuntime>,
start_time: OnceLock<Instant>, start_time: OnceLock<Instant>,
uptime_gauge: Arc<prometheus::Gauge>, uptime_gauge: Arc<prometheus::Gauge>,
} }
impl HttpServerState { impl MetricsServerState {
/// Create new HTTP server state with the provided metrics registry /// Create new metrics server state with the provided metrics registry
pub fn new(drt: Arc<crate::DistributedRuntime>) -> anyhow::Result<Self> { pub fn new(drt: Arc<crate::DistributedRuntime>) -> anyhow::Result<Self> {
// Note: This metric is created at the DRT level (no namespace), so we manually add "dynamo_" prefix // Note: This metric is created at the DRT level (no namespace), so we manually add "dynamo_" prefix
// to maintain consistency with the project's metric naming convention // to maintain consistency with the project's metric naming convention
...@@ -121,15 +121,15 @@ impl HttpServerState { ...@@ -121,15 +121,15 @@ impl HttpServerState {
} }
} }
/// Start HTTP server with metrics support /// Start metrics server with metrics support
pub async fn spawn_http_server( pub async fn spawn_metrics_server(
host: &str, host: &str,
port: u16, port: u16,
cancel_token: CancellationToken, cancel_token: CancellationToken,
drt: Arc<crate::DistributedRuntime>, drt: Arc<crate::DistributedRuntime>,
) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> { ) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> {
// Create HTTP server state with the provided metrics registry // Create metrics server state with the provided metrics registry
let server_state = Arc::new(HttpServerState::new(drt)?); let server_state = Arc::new(MetricsServerState::new(drt)?);
let health_path = server_state let health_path = server_state
.drt() .drt()
.system_health .system_health
...@@ -187,14 +187,14 @@ pub async fn spawn_http_server( ...@@ -187,14 +187,14 @@ pub async fn spawn_http_server(
}); });
let address = format!("{}:{}", host, port); let address = format!("{}:{}", host, port);
tracing::info!("[spawn_http_server] binding to: {}", address); tracing::info!("[spawn_metrics_server] binding to: {}", address);
let listener = match TcpListener::bind(&address).await { let listener = match TcpListener::bind(&address).await {
Ok(listener) => { Ok(listener) => {
// get the actual address and port, print in debug level // get the actual address and port, print in debug level
let actual_address = listener.local_addr()?; let actual_address = listener.local_addr()?;
tracing::info!( tracing::info!(
"[spawn_http_server] HTTP server bound to: {}", "[spawn_metrics_server] metrics server bound to: {}",
actual_address actual_address
); );
(listener, actual_address) (listener, actual_address)
...@@ -213,7 +213,7 @@ pub async fn spawn_http_server( ...@@ -213,7 +213,7 @@ pub async fn spawn_http_server(
.with_graceful_shutdown(observer.cancelled_owned()) .with_graceful_shutdown(observer.cancelled_owned())
.await .await
{ {
tracing::error!("HTTP server error: {}", e); tracing::error!("Metrics server error: {}", e);
} }
}); });
Ok((actual_address, handle)) Ok((actual_address, handle))
...@@ -226,7 +226,7 @@ pub async fn spawn_http_server( ...@@ -226,7 +226,7 @@ pub async fn spawn_http_server(
x_request_id= trace_parent.x_request_id, x_request_id= trace_parent.x_request_id,
tracestate= trace_parent.tracestate))] tracestate= trace_parent.tracestate))]
async fn health_handler( async fn health_handler(
state: Arc<HttpServerState>, state: Arc<MetricsServerState>,
route: &'static str, // Used for tracing only route: &'static str, // Used for tracing only
trace_parent: TraceParent, // Used for tracing only trace_parent: TraceParent, // Used for tracing only
) -> impl IntoResponse { ) -> impl IntoResponse {
...@@ -270,7 +270,7 @@ async fn health_handler( ...@@ -270,7 +270,7 @@ async fn health_handler(
x_request_id = trace_parent.x_request_id, x_request_id = trace_parent.x_request_id,
tracestate = trace_parent.tracestate))] tracestate = trace_parent.tracestate))]
async fn metrics_handler( async fn metrics_handler(
state: Arc<HttpServerState>, state: Arc<MetricsServerState>,
route: &'static str, // Used for tracing only route: &'static str, // Used for tracing only
trace_parent: TraceParent, // Used for tracing only trace_parent: TraceParent, // Used for tracing only
) -> impl IntoResponse { ) -> impl IntoResponse {
...@@ -290,8 +290,8 @@ async fn metrics_handler( ...@@ -290,8 +290,8 @@ async fn metrics_handler(
} }
} }
// Regular tests: cargo test http_server --lib // Regular tests: cargo test metrics_server --lib
// Integration tests: cargo test http_server --lib --features integration // Integration tests: cargo test metrics_server --lib --features integration
#[cfg(test)] #[cfg(test)]
/// Helper function to create a DRT instance for async testing /// Helper function to create a DRT instance for async testing
...@@ -354,7 +354,7 @@ mod tests { ...@@ -354,7 +354,7 @@ mod tests {
async fn test_runtime_metrics_initialization_and_namespace() { async fn test_runtime_metrics_initialization_and_namespace() {
// Test that metrics have correct namespace // Test that metrics have correct namespace
let drt = create_test_drt_async().await; let drt = create_test_drt_async().await;
let runtime_metrics = HttpServerState::new(Arc::new(drt)).unwrap(); let runtime_metrics = MetricsServerState::new(Arc::new(drt)).unwrap();
// Initialize start time // Initialize start time
runtime_metrics.initialize_start_time().unwrap(); runtime_metrics.initialize_start_time().unwrap();
...@@ -377,7 +377,7 @@ dynamo_component_dynamo_uptime_seconds 42 ...@@ -377,7 +377,7 @@ dynamo_component_dynamo_uptime_seconds 42
async fn test_start_time_initialization() { async fn test_start_time_initialization() {
// Test that start time can only be initialized once // Test that start time can only be initialized once
let drt = create_test_drt_async().await; let drt = create_test_drt_async().await;
let runtime_metrics = HttpServerState::new(Arc::new(drt)).unwrap(); let runtime_metrics = MetricsServerState::new(Arc::new(drt)).unwrap();
// First initialization should succeed // First initialization should succeed
assert!(runtime_metrics.initialize_start_time().is_ok()); assert!(runtime_metrics.initialize_start_time().is_ok());
...@@ -440,7 +440,7 @@ dynamo_component_dynamo_uptime_seconds 42 ...@@ -440,7 +440,7 @@ dynamo_component_dynamo_uptime_seconds 42
.unwrap(), .unwrap(),
); );
let cancel_token = CancellationToken::new(); let cancel_token = CancellationToken::new();
let (addr, _) = spawn_http_server("127.0.0.1", 0, cancel_token.clone(), drt) let (addr, _) = spawn_metrics_server("127.0.0.1", 0, cancel_token.clone(), drt)
.await .await
.unwrap(); .unwrap();
println!("[test] Waiting for server to start..."); println!("[test] Waiting for server to start...");
...@@ -525,7 +525,7 @@ dynamo_component_dynamo_uptime_seconds 42 ...@@ -525,7 +525,7 @@ dynamo_component_dynamo_uptime_seconds 42
.unwrap(), .unwrap(),
); );
let cancel_token = CancellationToken::new(); let cancel_token = CancellationToken::new();
let (addr, _) = spawn_http_server("127.0.0.1", 0, cancel_token.clone(), drt) let (addr, _) = spawn_metrics_server("127.0.0.1", 0, cancel_token.clone(), drt)
.await .await
.unwrap(); .unwrap();
sleep(std::time::Duration::from_millis(1000)).await; sleep(std::time::Duration::from_millis(1000)).await;
...@@ -562,7 +562,7 @@ dynamo_component_dynamo_uptime_seconds 42 ...@@ -562,7 +562,7 @@ dynamo_component_dynamo_uptime_seconds 42
async fn test_uptime_without_initialization() { async fn test_uptime_without_initialization() {
// Test that uptime returns an error if start time is not initialized // Test that uptime returns an error if start time is not initialized
let drt = create_test_drt_async().await; let drt = create_test_drt_async().await;
let runtime_metrics = HttpServerState::new(Arc::new(drt)).unwrap(); let runtime_metrics = MetricsServerState::new(Arc::new(drt)).unwrap();
// This should return an error because start time is not initialized // This should return an error because start time is not initialized
let result = runtime_metrics.uptime(); let result = runtime_metrics.uptime();
...@@ -572,7 +572,7 @@ dynamo_component_dynamo_uptime_seconds 42 ...@@ -572,7 +572,7 @@ dynamo_component_dynamo_uptime_seconds 42
#[cfg(feature = "integration")] #[cfg(feature = "integration")]
#[tokio::test] #[tokio::test]
async fn test_spawn_http_server_endpoints() { async fn test_spawn_metrics_server_endpoints() {
// use reqwest for HTTP requests // use reqwest for HTTP requests
temp_env::async_with_vars( temp_env::async_with_vars(
[("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready"))], [("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready"))],
...@@ -580,7 +580,7 @@ dynamo_component_dynamo_uptime_seconds 42 ...@@ -580,7 +580,7 @@ dynamo_component_dynamo_uptime_seconds 42
let cancel_token = CancellationToken::new(); let cancel_token = CancellationToken::new();
let drt = create_test_drt_async().await; let drt = create_test_drt_async().await;
let (addr, server_handle) = let (addr, server_handle) =
spawn_http_server("127.0.0.1", 0, cancel_token.clone(), Arc::new(drt)) spawn_metrics_server("127.0.0.1", 0, cancel_token.clone(), Arc::new(drt))
.await .await
.unwrap(); .unwrap();
println!("[test] Waiting for server to start..."); println!("[test] Waiting for server to start...");
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment