Unverified Commit 68fb3d95 authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

refactor: move uptime tracking from system_status_server(HTTP) to DRT level (#2587)


Co-authored-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent f08729ae
...@@ -117,6 +117,13 @@ impl DistributedRuntime { ...@@ -117,6 +117,13 @@ impl DistributedRuntime {
}); });
distributed_runtime.register_metrics_callback(drt_hierarchies, nats_client_callback); distributed_runtime.register_metrics_callback(drt_hierarchies, nats_client_callback);
// Initialize the uptime gauge in SystemHealth
distributed_runtime
.system_health
.lock()
.unwrap()
.initialize_uptime_gauge(&distributed_runtime)?;
// Handle system status server initialization // Handle system status server initialization
if let Some(cancel_token) = cancel_token { if let Some(cancel_token) = cancel_token {
// System server is enabled - start both the state and HTTP server // System server is enabled - start both the state and HTTP server
...@@ -153,17 +160,7 @@ impl DistributedRuntime { ...@@ -153,17 +160,7 @@ impl DistributedRuntime {
} }
} }
} else { } else {
// System server HTTP is disabled, but still create the state for metrics // System server HTTP is disabled, but uptime metrics are still being tracked via SystemHealth
// This ensures uptime_seconds metric is always registered
let system_status_state = crate::system_status_server::SystemStatusState::new(
Arc::new(distributed_runtime.clone()),
)?;
// Initialize the start time for uptime tracking
if let Err(e) = system_status_state.initialize_start_time() {
tracing::warn!("Failed to initialize system status start time: {}", e);
}
tracing::debug!( tracing::debug!(
"System status server HTTP endpoints disabled, but uptime metrics are being tracked" "System status server HTTP endpoints disabled, but uptime metrics are being tracked"
); );
...@@ -349,7 +346,7 @@ impl DistributedConfig { ...@@ -349,7 +346,7 @@ impl DistributedConfig {
} }
#[cfg(test)] #[cfg(test)]
pub mod test_helpers { pub mod distributed_test_utils {
//! Common test helper functions for DistributedRuntime tests //! Common test helper functions for DistributedRuntime tests
// TODO: Use in-memory DistributedRuntime for tests instead of full runtime when available. // TODO: Use in-memory DistributedRuntime for tests instead of full runtime when available.
...@@ -364,3 +361,61 @@ pub mod test_helpers { ...@@ -364,3 +361,61 @@ pub mod test_helpers {
.unwrap() .unwrap()
} }
} }
#[cfg(feature = "integration")]
#[cfg(test)]
mod tests {
use super::distributed_test_utils::create_test_drt_async;
#[tokio::test]
async fn test_drt_uptime_after_delay_system_disabled() {
// Test uptime with system status server disabled
temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
// Start a DRT
let drt = create_test_drt_async().await;
// Wait 50ms
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
// Check that uptime is 50+ ms
let uptime = drt.system_health.lock().unwrap().uptime();
assert!(
uptime >= std::time::Duration::from_millis(50),
"Expected uptime to be at least 50ms, but got {:?}",
uptime
);
println!(
"✓ DRT uptime test passed (system disabled): uptime = {:?}",
uptime
);
})
.await;
}
#[tokio::test]
async fn test_drt_uptime_after_delay_system_enabled() {
// Test uptime with system status server enabled
temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("true"))], async {
// Start a DRT
let drt = create_test_drt_async().await;
// Wait 50ms
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
// Check that uptime is 50+ ms
let uptime = drt.system_health.lock().unwrap().uptime();
assert!(
uptime >= std::time::Duration::from_millis(50),
"Expected uptime to be at least 50ms, but got {:?}",
uptime
);
println!(
"✓ DRT uptime test passed (system enabled): uptime = {:?}",
uptime
);
})
.await;
}
}
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
use std::{ use std::{
collections::HashMap, collections::HashMap,
sync::{Arc, OnceLock, Weak}, sync::{Arc, OnceLock, Weak},
time::Instant,
}; };
use tokio::sync::Mutex; use tokio::sync::Mutex;
...@@ -90,6 +91,8 @@ pub struct SystemHealth { ...@@ -90,6 +91,8 @@ pub struct SystemHealth {
use_endpoint_health_status: Vec<String>, use_endpoint_health_status: Vec<String>,
health_path: String, health_path: String,
live_path: String, live_path: String,
start_time: Instant,
uptime_gauge: OnceLock<prometheus::Gauge>,
} }
impl SystemHealth { impl SystemHealth {
...@@ -109,6 +112,8 @@ impl SystemHealth { ...@@ -109,6 +112,8 @@ impl SystemHealth {
use_endpoint_health_status, use_endpoint_health_status,
health_path, health_path,
live_path, live_path,
start_time: Instant::now(),
uptime_gauge: OnceLock::new(),
} }
} }
pub fn set_health_status(&mut self, status: HealthStatus) { pub fn set_health_status(&mut self, status: HealthStatus) {
...@@ -145,6 +150,34 @@ impl SystemHealth { ...@@ -145,6 +150,34 @@ impl SystemHealth {
(healthy, endpoints) (healthy, endpoints)
} }
/// Initialize the uptime gauge using the provided metrics registry
pub fn initialize_uptime_gauge<T: crate::metrics::MetricsRegistry>(
&self,
registry: &T,
) -> anyhow::Result<()> {
let gauge = registry.create_gauge(
"uptime_seconds",
"Total uptime of the DistributedRuntime in seconds",
&[],
)?;
self.uptime_gauge
.set(gauge)
.map_err(|_| anyhow::anyhow!("uptime_gauge already initialized"))?;
Ok(())
}
/// Get the current uptime as a Duration
pub fn uptime(&self) -> std::time::Duration {
self.start_time.elapsed()
}
/// Update the uptime gauge with the current uptime value
pub fn update_uptime_gauge(&self) {
if let Some(gauge) = self.uptime_gauge.get() {
gauge.set(self.uptime().as_secs_f64());
}
}
} }
/// Type alias for runtime callback functions to reduce complexity /// Type alias for runtime callback functions to reduce complexity
......
...@@ -913,7 +913,7 @@ mod test_metricsregistry_units { ...@@ -913,7 +913,7 @@ mod test_metricsregistry_units {
#[cfg(test)] #[cfg(test)]
mod test_metricsregistry_prefixes { mod test_metricsregistry_prefixes {
use super::*; use super::*;
use crate::distributed::test_helpers::create_test_drt_async; use crate::distributed::distributed_test_utils::create_test_drt_async;
use prometheus::core::Collector; use prometheus::core::Collector;
#[tokio::test] #[tokio::test]
...@@ -1047,7 +1047,7 @@ mod test_metricsregistry_prometheus_fmt_outputs { ...@@ -1047,7 +1047,7 @@ mod test_metricsregistry_prometheus_fmt_outputs {
use super::prometheus_names::{COMPONENT_NATS_METRICS, DRT_NATS_METRICS}; use super::prometheus_names::{COMPONENT_NATS_METRICS, DRT_NATS_METRICS};
use super::prometheus_names::{nats_client, nats_service}; use super::prometheus_names::{nats_client, nats_service};
use super::*; use super::*;
use crate::distributed::test_helpers::create_test_drt_async; use crate::distributed::distributed_test_utils::create_test_drt_async;
use prometheus::Counter; use prometheus::Counter;
use std::sync::Arc; use std::sync::Arc;
...@@ -1308,7 +1308,7 @@ mod test_metricsregistry_nats { ...@@ -1308,7 +1308,7 @@ mod test_metricsregistry_nats {
use super::prometheus_names::{COMPONENT_NATS_METRICS, DRT_NATS_METRICS}; use super::prometheus_names::{COMPONENT_NATS_METRICS, DRT_NATS_METRICS};
use super::prometheus_names::{nats_client, nats_service}; use super::prometheus_names::{nats_client, nats_service};
use super::*; use super::*;
use crate::distributed::test_helpers::create_test_drt_async; use crate::distributed::distributed_test_utils::create_test_drt_async;
use crate::pipeline::PushRouter; use crate::pipeline::PushRouter;
use crate::{DistributedRuntime, Runtime}; use crate::{DistributedRuntime, Runtime};
use tokio::time::{Duration, sleep}; use tokio::time::{Duration, sleep};
......
...@@ -19,6 +19,7 @@ use crate::metrics::MetricsRegistry; ...@@ -19,6 +19,7 @@ use crate::metrics::MetricsRegistry;
use crate::traits::DistributedRuntimeProvider; use crate::traits::DistributedRuntimeProvider;
use axum::{Router, http::StatusCode, response::IntoResponse, routing::get}; use axum::{Router, http::StatusCode, response::IntoResponse, routing::get};
use serde_json::json; use serde_json::json;
use std::collections::HashMap;
use std::sync::{Arc, OnceLock}; use std::sync::{Arc, OnceLock};
use std::time::Instant; use std::time::Instant;
use tokio::{net::TcpListener, task::JoinHandle}; use tokio::{net::TcpListener, task::JoinHandle};
...@@ -62,78 +63,22 @@ impl Clone for SystemStatusServerInfo { ...@@ -62,78 +63,22 @@ impl Clone for SystemStatusServerInfo {
} }
} }
/// System status server state containing metrics and uptime tracking /// System status server state containing the distributed runtime reference
pub struct SystemStatusState { pub struct SystemStatusState {
// global drt registry is for printing out the entire Prometheus format output // global drt registry is for printing out the entire Prometheus format output
root_drt: Arc<crate::DistributedRuntime>, root_drt: Arc<crate::DistributedRuntime>,
start_time: OnceLock<Instant>,
uptime_gauge: prometheus::Gauge,
} }
impl SystemStatusState { impl SystemStatusState {
/// Create new system status server state with the provided metrics registry /// Create new system status server state with the provided distributed runtime
pub fn new(drt: Arc<crate::DistributedRuntime>) -> anyhow::Result<Self> { pub fn new(drt: Arc<crate::DistributedRuntime>) -> anyhow::Result<Self> {
// Note: This metric is created at the DRT level (no namespace), so it will be prefixed with "dynamo_component_" Ok(Self { root_drt: drt })
// TODO(keiven): this is part of another upcoming refactor, where we will no longer
// have this duplicate DRT (and Duplicate metrics error).
let uptime_gauge = match drt.as_ref().create_gauge(
"uptime_seconds",
"Total uptime of the DistributedRuntime in seconds",
&[],
) {
Ok(gauge) => gauge,
Err(e) if e.to_string().contains("Duplicate metrics") => {
// If the metric already exists, get it from the registry
// This can happen when SystemStatusState is created multiple times in tests
tracing::debug!(
"uptime_seconds metric already registered, retrieving existing metric"
);
// Create a non-http gauge since we can't retrieve the existing one easily
// The important thing is that the metric is registered in the registry
prometheus::Gauge::new(
"uptime_seconds",
"Total uptime of the DistributedRuntime in seconds",
)
.map_err(|e| anyhow::anyhow!("Failed to create dummy gauge: {}", e))?
}
Err(e) => return Err(e),
};
let state = Self {
root_drt: drt,
start_time: OnceLock::new(),
uptime_gauge,
};
Ok(state)
}
/// Initialize the start time (can only be called once)
pub fn initialize_start_time(&self) -> Result<(), &'static str> {
self.start_time
.set(Instant::now())
.map_err(|_| "Start time already initialized")
}
pub fn uptime(&self) -> Result<std::time::Duration, &'static str> {
self.start_time
.get()
.ok_or("Start time not initialized")
.map(|start_time| start_time.elapsed())
} }
/// Get a reference to the distributed runtime /// Get a reference to the distributed runtime
pub fn drt(&self) -> &crate::DistributedRuntime { pub fn drt(&self) -> &crate::DistributedRuntime {
&self.root_drt &self.root_drt
} }
/// Update the uptime gauge with current value
pub fn update_uptime_gauge(&self) {
if let Ok(uptime) = self.uptime() {
let uptime_seconds = uptime.as_secs_f64();
self.uptime_gauge.set(uptime_seconds);
} else {
tracing::warn!("Failed to update uptime gauge: start time not initialized");
}
}
} }
/// Start system status server with metrics support /// Start system status server with metrics support
...@@ -143,7 +88,7 @@ pub async fn spawn_system_status_server( ...@@ -143,7 +88,7 @@ pub async fn spawn_system_status_server(
cancel_token: CancellationToken, cancel_token: CancellationToken,
drt: Arc<crate::DistributedRuntime>, drt: Arc<crate::DistributedRuntime>,
) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> { ) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> {
// Create system status server state with the provided metrics registry // Create system status server state with the provided distributed runtime
let server_state = Arc::new(SystemStatusState::new(drt)?); let server_state = Arc::new(SystemStatusState::new(drt)?);
let health_path = server_state let health_path = server_state
.drt() .drt()
...@@ -160,11 +105,6 @@ pub async fn spawn_system_status_server( ...@@ -160,11 +105,6 @@ pub async fn spawn_system_status_server(
.live_path .live_path
.clone(); .clone();
// Initialize the start time
server_state
.initialize_start_time()
.map_err(|e| anyhow::anyhow!("Failed to initialize start time: {}", e))?;
let app = Router::new() let app = Router::new()
.route( .route(
&health_path, &health_path,
...@@ -230,20 +170,9 @@ pub async fn spawn_system_status_server( ...@@ -230,20 +170,9 @@ pub async fn spawn_system_status_server(
/// Health handler /// Health handler
#[tracing::instrument(skip_all, level = "trace")] #[tracing::instrument(skip_all, level = "trace")]
async fn health_handler(state: Arc<SystemStatusState>) -> impl IntoResponse { async fn health_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
let (mut healthy, endpoints) = state let system_health = state.drt().system_health.lock().unwrap();
.drt() let (healthy, endpoints) = system_health.get_health_status();
.system_health let uptime = Some(system_health.uptime());
.lock()
.unwrap()
.get_health_status();
let uptime = match state.uptime() {
Ok(uptime_state) => Some(uptime_state),
Err(e) => {
tracing::error!("Failed to get uptime: {}", e);
healthy = false;
None
}
};
let healthy_string = if healthy { "ready" } else { "notready" }; let healthy_string = if healthy { "ready" } else { "notready" };
let status_code = if healthy { let status_code = if healthy {
...@@ -267,7 +196,12 @@ async fn health_handler(state: Arc<SystemStatusState>) -> impl IntoResponse { ...@@ -267,7 +196,12 @@ async fn health_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
#[tracing::instrument(skip_all, level = "trace")] #[tracing::instrument(skip_all, level = "trace")]
async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse { async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
// Update the uptime gauge with current value // Update the uptime gauge with current value
state.update_uptime_gauge(); state
.drt()
.system_health
.lock()
.unwrap()
.update_uptime_gauge();
// Execute all the callbacks starting at the DistributedRuntime level // Execute all the callbacks starting at the DistributedRuntime level
assert!(state.drt().basename() == ""); assert!(state.drt().basename() == "");
...@@ -334,7 +268,7 @@ mod tests { ...@@ -334,7 +268,7 @@ mod tests {
#[cfg(all(test, feature = "integration"))] #[cfg(all(test, feature = "integration"))]
mod integration_tests { mod integration_tests {
use super::*; use super::*;
use crate::distributed::test_helpers::create_test_drt_async; use crate::distributed::distributed_test_utils::create_test_drt_async;
use crate::metrics::MetricsRegistry; use crate::metrics::MetricsRegistry;
use anyhow::Result; use anyhow::Result;
use rstest::rstest; use rstest::rstest;
...@@ -342,16 +276,20 @@ mod integration_tests { ...@@ -342,16 +276,20 @@ mod integration_tests {
use tokio::time::Duration; use tokio::time::Duration;
#[tokio::test] #[tokio::test]
async fn test_uptime_without_initialization() { async fn test_uptime_from_system_health() {
// Test that uptime returns an error if start time is not initialized // Test that uptime is available from SystemHealth
temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async { temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
let drt = create_test_drt_async().await; let drt = create_test_drt_async().await;
let system_status = SystemStatusState::new(Arc::new(drt)).unwrap();
// This should return an error because start time is not initialized // Get uptime from SystemHealth
let result = system_status.uptime(); let uptime = drt.system_health.lock().unwrap().uptime();
assert!(result.is_err()); // Uptime should exist (even if close to zero)
assert_eq!(result.unwrap_err(), "Start time not initialized"); assert!(uptime.as_nanos() > 0 || uptime.is_zero());
// Sleep briefly and check uptime increases
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let uptime_after = drt.system_health.lock().unwrap().uptime();
assert!(uptime_after > uptime);
}) })
.await; .await;
} }
...@@ -397,28 +335,51 @@ mod integration_tests { ...@@ -397,28 +335,51 @@ mod integration_tests {
} }
#[tokio::test] #[tokio::test]
async fn test_start_time_initialization() { async fn test_uptime_gauge_updates() {
// Test that start time can only be initialized once // Test that the uptime gauge is properly updated and increases over time
temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async { temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
let drt = create_test_drt_async().await; let drt = create_test_drt_async().await;
let system_status = SystemStatusState::new(Arc::new(drt)).unwrap();
// First initialization should succeed // Get initial uptime
assert!(system_status.initialize_start_time().is_ok()); let initial_uptime = drt.system_health.lock().unwrap().uptime();
// Second initialization should fail // Update the gauge with initial value
assert!(system_status.initialize_start_time().is_err()); drt.system_health.lock().unwrap().update_uptime_gauge();
// Sleep for 100ms and verify uptime increases // Sleep for 100ms
tokio::time::sleep(std::time::Duration::from_millis(100)).await; tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let uptime_after_sleep = system_status.uptime().unwrap();
// Get uptime after sleep
let uptime_after_sleep = drt.system_health.lock().unwrap().uptime();
// Update the gauge again
drt.system_health.lock().unwrap().update_uptime_gauge();
// Verify uptime increased by at least 100ms
let elapsed = uptime_after_sleep - initial_uptime;
assert!(
elapsed >= std::time::Duration::from_millis(100),
"Uptime should have increased by at least 100ms after sleep, but only increased by {:?}",
elapsed
);
})
.await;
}
#[tokio::test]
async fn test_http_requests_fail_when_system_disabled() {
// Test that system status server is not running when disabled
temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
let drt = create_test_drt_async().await;
// Verify that system status server info is None when disabled
let system_info = drt.system_status_server_info();
assert!( assert!(
uptime_after_sleep >= std::time::Duration::from_millis(100), system_info.is_none(),
"Uptime should be at least 100ms after sleep, got: {:?}", "System status server should not be running when DYN_SYSTEM_ENABLED=false"
uptime_after_sleep
); );
// If we get here, uptime calculation works correctly println!("✓ System status server correctly disabled when DYN_SYSTEM_ENABLED=false");
}) })
.await; .await;
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment