Unverified Commit c91e2e49 authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

refactor: rename to system status server for consistency (#2354)


Co-authored-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent fd358991
......@@ -77,7 +77,7 @@ These labels are prefixed with "dynamo_" to avoid collisions with Kubernetes and
## Example Metrics Output
When the system is running, you'll see metrics from the /metrics HTTP path like this:
When the system is running, you'll see metrics from http://<ip>:<port>/metrics like this:
```prometheus
# HELP dynamo_component_concurrent_requests Number of requests currently being processed by component endpoint
......@@ -119,7 +119,7 @@ dynamo_component_response_bytes_total{dynamo_component="example_component",dynam
# HELP uptime_seconds Total uptime of the DistributedRuntime in seconds
# TYPE uptime_seconds gauge
uptime_seconds{dynamo_namespace="metrics_server"} 1.8226759879999999
uptime_seconds{dynamo_namespace="system_status_server"} 1.8226759879999999
```
## Example
......@@ -176,13 +176,13 @@ if enable_custom_metrics {
## Running the Example
**Important**: You must set the `DYN_SYSTEM_PORT` environment variable to specify which port the HTTP system metrics server will run on.
**Important**: You must set the `DYN_SYSTEM_PORT` environment variable to specify which port the system status server will listen on.
```bash
# Run the system metrics example
DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=8081 cargo run --bin system_server
```
The server will start an HTTP system metrics server on the specified port (8081 in this example) that exposes the Prometheus metrics endpoint at `/metrics`.
The server will start an system status server on the specified port (8081 in this example) that exposes the Prometheus metrics endpoint at `/metrics`.
To Run an actual LLM frontend + server (aggregated example), launch both of them. By default, the frontend listens to port 8080.
......
......@@ -82,7 +82,7 @@ impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for Reques
}
}
/// Backend function that sets up the system server with metrics and ingress handler
/// Backend function that sets up the system status server with metrics and ingress handler
/// This function can be reused by integration tests to ensure they use the exact same setup
pub async fn backend(drt: DistributedRuntime, endpoint_name: Option<&str>) -> Result<()> {
let endpoint_name = endpoint_name.unwrap_or(DEFAULT_ENDPOINT);
......
......@@ -42,15 +42,17 @@ async fn test_backend_with_metrics() -> Result<()> {
let runtime = Runtime::from_current()?;
let distributed = DistributedRuntime::from_settings(runtime.clone()).await?;
// Get the Metrics server info to find the actual port
let metrics_server_info = distributed.metrics_server_info();
let metrics_port = match metrics_server_info {
// Get the System status server info to find the actual port
let system_status_info = distributed.system_status_info();
let system_status_port = match system_status_info {
Some(info) => {
println!("Metrics server running on: {}", info.address());
println!("System status server running on: {}", info.address());
info.port()
}
None => {
panic!("Metrics server not started - check DYN_SYSTEM_ENABLED environment variable");
panic!(
"System status server not started - check DYN_SYSTEM_ENABLED environment variable"
);
}
};
......@@ -96,7 +98,7 @@ async fn test_backend_with_metrics() -> Result<()> {
sleep(Duration::from_millis(500)).await;
// Now fetch the HTTP metrics endpoint using the dynamic port
let metrics_url = format!("http://localhost:{}/metrics", metrics_port);
let metrics_url = format!("http://localhost:{}/metrics", system_status_port);
println!("Fetching metrics from: {}", metrics_url);
......
......@@ -81,20 +81,20 @@ pub struct RuntimeConfig {
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub max_blocking_threads: usize,
/// System server host for health and metrics endpoints
/// System status server host for health and metrics endpoints
/// Set this at runtime with environment variable DYN_SYSTEM_HOST
#[builder(default = "DEFAULT_SYSTEM_HOST.to_string()")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub system_host: String,
/// System server port for health and metrics endpoints
/// System status server port for health and metrics endpoints
/// If set to 0, the system will assign a random available port
/// Set this at runtime with environment variable DYN_SYSTEM_PORT
#[builder(default = "DEFAULT_SYSTEM_PORT")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub system_port: u16,
/// Health and metrics System server enabled
/// Health and metrics System status server enabled
/// Set this at runtime with environment variable DYN_SYSTEM_ENABLED
#[builder(default = "false")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
......
......@@ -77,7 +77,7 @@ impl DistributedRuntime {
})
.await??;
// Start system metrics server for health and metrics if enabled in configuration
// Start system status server for health and metrics if enabled in configuration
let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();
// IMPORTANT: We must extract cancel_token from runtime BEFORE moving runtime into the struct below.
// This is because after moving, runtime is no longer accessible in this scope (ownership rules).
......@@ -102,7 +102,7 @@ impl DistributedRuntime {
etcd_client,
nats_client,
tcp_server: Arc::new(OnceCell::new()),
metrics_server: Arc::new(OnceLock::new()),
system_status_server: Arc::new(OnceLock::new()),
component_registry: component::Registry::new(),
is_static,
instance_sources: Arc::new(Mutex::new(HashMap::new())),
......@@ -113,13 +113,13 @@ impl DistributedRuntime {
system_health,
};
// Start metrics server if enabled
// Start system status server if enabled
if let Some(cancel_token) = cancel_token {
let host = config.system_host.clone();
let port = config.system_port;
// Start metrics server (it spawns its own task internally)
match crate::metrics_server::spawn_metrics_server(
// Start system status server (it spawns its own task internally)
match crate::system_status_server::spawn_system_status_server(
&host,
port,
cancel_token,
......@@ -128,24 +128,27 @@ impl DistributedRuntime {
.await
{
Ok((addr, handle)) => {
tracing::info!("Metrics server started successfully on {}", addr);
tracing::info!("System status server started successfully on {}", addr);
// Store metrics server information
let metrics_server_info =
crate::metrics_server::MetricsServerInfo::new(addr, Some(handle));
// Store system status server information
let system_status_server_info =
crate::system_status_server::SystemStatusServerInfo::new(
addr,
Some(handle),
);
// Initialize the metrics_server field
// Initialize the system_status_server field
distributed_runtime
.metrics_server
.set(Arc::new(metrics_server_info))
.expect("Metrics server info should only be set once");
.system_status_server
.set(Arc::new(system_status_server_info))
.expect("System status server info should only be set once");
}
Err(e) => {
tracing::error!("Metrics server startup failed: {}", e);
tracing::error!("System status server startup failed: {}", e);
}
}
} else {
tracing::debug!("Health and metrics server is disabled via DYN_SYSTEM_ENABLED");
tracing::debug!("Health and system status server is disabled via DYN_SYSTEM_ENABLED");
}
Ok(distributed_runtime)
......@@ -226,9 +229,11 @@ impl DistributedRuntime {
self.nats_client.clone()
}
/// Get metrics server information if available
pub fn metrics_server_info(&self) -> Option<Arc<crate::metrics_server::MetricsServerInfo>> {
self.metrics_server.get().cloned()
/// Get system status server information if available
pub fn system_status_server_info(
&self,
) -> Option<Arc<crate::system_status_server::SystemStatusServerInfo>> {
self.system_status_server.get().cloned()
}
// todo(ryan): deprecate this as we move to Discovery traits and Component Identifiers
......
......@@ -36,8 +36,8 @@ pub use config::RuntimeConfig;
pub mod component;
pub mod discovery;
pub mod engine;
pub mod metrics_server;
pub use metrics_server::MetricsServerInfo;
pub mod system_status_server;
pub use system_status_server::SystemStatusServerInfo;
pub mod instances;
pub mod logging;
pub mod metrics;
......@@ -158,7 +158,7 @@ pub struct DistributedRuntime {
etcd_client: Option<transports::etcd::Client>,
nats_client: transports::nats::Client,
tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
metrics_server: Arc<OnceLock<Arc<metrics_server::MetricsServerInfo>>>,
system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
// local registry for components
// the registry allows us to use share runtime resources across instances of the same component object.
......
......@@ -268,7 +268,9 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>(
);
// Handle different metric types
let metric = if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::Histogram>() {
let prometheus_metric = if std::any::TypeId::of::<T>()
== std::any::TypeId::of::<prometheus::Histogram>()
{
// Special handling for Histogram with custom buckets
// buckets parameter is valid for Histogram, const_labels is not used
if const_labels.is_some() {
......@@ -369,14 +371,14 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>(
current_prefix.push_str(name);
// Register metric at this hierarchical level
let collector: Box<dyn prometheus::core::Collector> = Box::new(metric.clone());
let collector: Box<dyn prometheus::core::Collector> = Box::new(prometheus_metric.clone());
let _ = prometheus_registry
.entry(current_prefix.clone())
.or_default()
.register(collector);
}
Ok(metric)
Ok(prometheus_metric)
}
/// This trait should be implemented by all metric registries, including Prometheus, Envy, OpenTelemetry, and others.
......@@ -1040,12 +1042,12 @@ dynamo_component_testintgaugevec{{instance="server2",service="api",status="inact
);
assert_eq!(
drt_output, expected_drt_output,
filtered_drt_output, expected_drt_output,
"\n=== DRT COMPARISON FAILED ===\n\
Expected:\n{}\n\
Actual:\n{}\n\
==============================",
expected_drt_output, drt_output
expected_drt_output, filtered_drt_output
);
println!("✓ All Prometheus format outputs verified successfully!");
......
......@@ -29,14 +29,14 @@ use tokio_util::sync::CancellationToken;
use tower_http::trace::DefaultMakeSpan;
use tower_http::trace::TraceLayer;
/// Metrics server information containing socket address and handle
/// System status server information containing socket address and handle
#[derive(Debug)]
pub struct MetricsServerInfo {
pub struct SystemStatusServerInfo {
pub socket_addr: std::net::SocketAddr,
pub handle: Option<Arc<JoinHandle<()>>>,
}
impl MetricsServerInfo {
impl SystemStatusServerInfo {
pub fn new(socket_addr: std::net::SocketAddr, handle: Option<JoinHandle<()>>) -> Self {
Self {
socket_addr,
......@@ -57,7 +57,7 @@ impl MetricsServerInfo {
}
}
impl Clone for MetricsServerInfo {
impl Clone for SystemStatusServerInfo {
fn clone(&self) -> Self {
Self {
socket_addr: self.socket_addr,
......@@ -66,16 +66,16 @@ impl Clone for MetricsServerInfo {
}
}
/// Metrics server state containing metrics and uptime tracking
pub struct MetricsServerState {
/// System status server state containing metrics and uptime tracking
pub struct SystemStatusState {
// global drt registry is for printing out the entire Prometheus format output
root_drt: Arc<crate::DistributedRuntime>,
start_time: OnceLock<Instant>,
uptime_gauge: prometheus::Gauge,
}
impl MetricsServerState {
/// Create new metrics server state with the provided metrics registry
impl SystemStatusState {
/// Create new system status server state with the provided metrics registry
pub fn new(drt: Arc<crate::DistributedRuntime>) -> anyhow::Result<Self> {
// Note: This metric is created at the DRT level (no namespace), so we manually add "dynamo_" prefix
// to maintain consistency with the project's metric naming convention
......@@ -122,15 +122,15 @@ impl MetricsServerState {
}
}
/// Start metrics server with metrics support
pub async fn spawn_metrics_server(
/// Start system status server with metrics support
pub async fn spawn_system_status_server(
host: &str,
port: u16,
cancel_token: CancellationToken,
drt: Arc<crate::DistributedRuntime>,
) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> {
// Create metrics server state with the provided metrics registry
let server_state = Arc::new(MetricsServerState::new(drt)?);
// Create system status server state with the provided metrics registry
let server_state = Arc::new(SystemStatusState::new(drt)?);
let health_path = server_state
.drt()
.system_health
......@@ -180,14 +180,14 @@ pub async fn spawn_metrics_server(
.layer(TraceLayer::new_for_http().make_span_with(make_request_span));
let address = format!("{}:{}", host, port);
tracing::info!("[spawn_metrics_server] binding to: {}", address);
tracing::info!("[spawn_system_status_server] binding to: {}", address);
let listener = match TcpListener::bind(&address).await {
Ok(listener) => {
// get the actual address and port, print in debug level
let actual_address = listener.local_addr()?;
tracing::info!(
"[spawn_metrics_server] metrics server bound to: {}",
"[spawn_system_status_server] system status server bound to: {}",
actual_address
);
(listener, actual_address)
......@@ -206,14 +206,15 @@ pub async fn spawn_metrics_server(
.with_graceful_shutdown(observer.cancelled_owned())
.await
{
tracing::error!("Metrics server error: {}", e);
tracing::error!("System status server error: {}", e);
}
});
Ok((actual_address, handle))
}
/// Health handler
async fn health_handler(state: Arc<MetricsServerState>) -> impl IntoResponse {
#[tracing::instrument(skip_all, level = "trace")]
async fn health_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
let (mut healthy, endpoints) = state
.drt()
.system_health
......@@ -248,7 +249,8 @@ async fn health_handler(state: Arc<MetricsServerState>) -> impl IntoResponse {
}
/// Metrics handler with DistributedRuntime uptime
async fn metrics_handler(state: Arc<MetricsServerState>) -> impl IntoResponse {
#[tracing::instrument(skip_all, level = "trace")]
async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
// Update the uptime gauge with current value
state.update_uptime_gauge();
......@@ -265,8 +267,8 @@ async fn metrics_handler(state: Arc<MetricsServerState>) -> impl IntoResponse {
}
}
// Regular tests: cargo test metrics_server --lib
// Integration tests: cargo test metrics_server --lib --features integration
// Regular tests: cargo test system_status_server --lib
// Integration tests: cargo test system_status_server --lib --features integration
#[cfg(test)]
/// Helper function to create a DRT instance for async testing
......@@ -329,7 +331,7 @@ mod tests {
async fn test_runtime_metrics_initialization_and_namespace() {
// Test that metrics have correct namespace
let drt = create_test_drt_async().await;
let runtime_metrics = MetricsServerState::new(Arc::new(drt)).unwrap();
let runtime_metrics = SystemStatusState::new(Arc::new(drt)).unwrap();
// Initialize start time
runtime_metrics.initialize_start_time().unwrap();
......@@ -352,7 +354,7 @@ dynamo_component_dynamo_uptime_seconds 42
async fn test_start_time_initialization() {
// Test that start time can only be initialized once
let drt = create_test_drt_async().await;
let runtime_metrics = MetricsServerState::new(Arc::new(drt)).unwrap();
let runtime_metrics = SystemStatusState::new(Arc::new(drt)).unwrap();
// First initialization should succeed
assert!(runtime_metrics.initialize_start_time().is_ok());
......@@ -415,7 +417,8 @@ dynamo_component_dynamo_uptime_seconds 42
.unwrap(),
);
let cancel_token = CancellationToken::new();
let (addr, _) = spawn_metrics_server("127.0.0.1", 0, cancel_token.clone(), drt)
let (addr, _) =
spawn_system_status_server("127.0.0.1", 0, cancel_token.clone(), drt)
.await
.unwrap();
println!("[test] Waiting for server to start...");
......@@ -500,7 +503,8 @@ dynamo_component_dynamo_uptime_seconds 42
.unwrap(),
);
let cancel_token = CancellationToken::new();
let (addr, _) = spawn_metrics_server("127.0.0.1", 0, cancel_token.clone(), drt)
let (addr, _) =
spawn_system_status_server("127.0.0.1", 0, cancel_token.clone(), drt)
.await
.unwrap();
sleep(std::time::Duration::from_millis(1000)).await;
......@@ -537,7 +541,7 @@ dynamo_component_dynamo_uptime_seconds 42
async fn test_uptime_without_initialization() {
// Test that uptime returns an error if start time is not initialized
let drt = create_test_drt_async().await;
let runtime_metrics = MetricsServerState::new(Arc::new(drt)).unwrap();
let runtime_metrics = SystemStatusState::new(Arc::new(drt)).unwrap();
// This should return an error because start time is not initialized
let result = runtime_metrics.uptime();
......@@ -547,7 +551,7 @@ dynamo_component_dynamo_uptime_seconds 42
#[cfg(feature = "integration")]
#[tokio::test]
async fn test_spawn_metrics_server_endpoints() {
async fn test_spawn_system_status_server_endpoints() {
// use reqwest for HTTP requests
temp_env::async_with_vars(
[("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready"))],
......@@ -555,7 +559,7 @@ dynamo_component_dynamo_uptime_seconds 42
let cancel_token = CancellationToken::new();
let drt = create_test_drt_async().await;
let (addr, server_handle) =
spawn_metrics_server("127.0.0.1", 0, cancel_token.clone(), Arc::new(drt))
spawn_system_status_server("127.0.0.1", 0, cancel_token.clone(), Arc::new(drt))
.await
.unwrap();
println!("[test] Waiting for server to start...");
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment