Unverified Commit 04442173 authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

fix: replace metrics callback with background scraping to prevent tim… (#2480)


Co-authored-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent dea0b201
......@@ -42,6 +42,7 @@ scrape_configs:
- targets: ['host.docker.internal:8080'] # on the "monitoring" network
# Launch via: DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=8081 dynamo.<backend> ...
# If you want to update the scrape_interval, you may want to also update component.rs's MAX_DELAY
- job_name: 'dynamo-backend'
scrape_interval: 6s
static_configs:
......
......@@ -1134,7 +1134,7 @@ dependencies = [
[[package]]
name = "dynamo-llm"
version = "0.4.0"
version = "0.4.0+post0"
dependencies = [
"ahash",
"akin",
......@@ -1202,7 +1202,7 @@ dependencies = [
[[package]]
name = "dynamo-py3"
version = "0.4.0"
version = "0.4.0+post0"
dependencies = [
"anyhow",
"async-openai",
......@@ -1229,7 +1229,7 @@ dependencies = [
[[package]]
name = "dynamo-runtime"
version = "0.4.0"
version = "0.4.0+post0"
dependencies = [
"anyhow",
"arc-swap",
......
......@@ -648,7 +648,7 @@ dependencies = [
[[package]]
name = "dynamo-runtime"
version = "0.4.0"
version = "0.4.0+post0"
dependencies = [
"anyhow",
"arc-swap",
......@@ -1020,7 +1020,7 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "hello_world"
version = "0.4.0"
version = "0.4.0+post0"
dependencies = [
"dynamo-runtime",
]
......@@ -2492,7 +2492,7 @@ dependencies = [
[[package]]
name = "service_metrics"
version = "0.4.0"
version = "0.4.0+post0"
dependencies = [
"dynamo-runtime",
"futures",
......@@ -2668,7 +2668,7 @@ dependencies = [
[[package]]
name = "system_metrics"
version = "0.4.0"
version = "0.4.0+post0"
dependencies = [
"anyhow",
"dynamo-runtime",
......
......@@ -43,7 +43,7 @@ async fn test_backend_with_metrics() -> Result<()> {
let distributed = DistributedRuntime::from_settings(runtime.clone()).await?;
// Get the System status server info to find the actual port
let system_status_info = distributed.system_status_info();
let system_status_info = distributed.system_status_server_info();
let system_status_port = match system_status_info {
Some(info) => {
println!("System status server running on: {}", info.address());
......
......@@ -259,6 +259,7 @@ impl Component {
/// Scrape ServiceSet, which contains NATS stats as well as user defined stats
/// embedded in data field of ServiceInfo.
pub async fn scrape_stats(&self, timeout: Duration) -> Result<ServiceSet> {
// Debug: scraping stats for component
let service_name = self.service_name();
let service_client = self.drt().service_client();
service_client
......@@ -268,9 +269,15 @@ impl Component {
/// Add Prometheus metrics for this component's service stats.
///
/// Uses a channel to synchronize with the spawned async task, ensuring
/// metrics are updated before the callback returns.
pub fn add_metrics_callback(&self) -> Result<()> {
/// Starts a background task that scrapes stats every ~4.7s and updates metrics.
/// The thinking was that it should be a little bit shorter than the Prometheus polling interval.
/// Currently Prometheus polls every 6 seconds, and I wanted every poll to be fresh, so this is set
/// as an arbitrary 4.7 seconds plus 0.3 seconds if it times out. It's a bit of a hand-wavey decision.
pub fn start_scraping_metrics(&self) -> Result<()> {
const NATS_TIMEOUT_AND_INITIAL_DELAY_MS: std::time::Duration =
std::time::Duration::from_millis(300);
const MAX_DELAY_MS: std::time::Duration = std::time::Duration::from_millis(4700);
let component_metrics = ComponentNatsPrometheusMetrics::new(self)?;
let component_clone = self.clone();
......@@ -281,59 +288,40 @@ impl Component {
self.service_name()
); // it happens that in component, hierarchy and service name are the same
// Register a metrics callback that scrapes component statistics
let metrics_callback = Arc::new(move || {
// Timeout for scraping metrics from components (in milliseconds)
// This value is also used by KV Router metrics aggregator (300ms) and other components
const METRICS_SCRAPE_TIMEOUT_MS: u64 = 300;
// Get the current Tokio runtime handle
let handle = tokio::runtime::Handle::try_current()
.map_err(|err| anyhow::anyhow!("No Tokio runtime handle available: {}", err))?;
// Start a background task that scrapes stats every 5 seconds
let m = component_metrics.clone();
let c = component_clone.clone();
// Create a channel to synchronize with the spawned task
let (tx, rx) = std::sync::mpsc::channel::<anyhow::Result<()>>();
// Use std::thread for the background task to avoid runtime context issues
std::thread::spawn(move || {
// Use the existing secondary runtime from drt for background metrics scraping
let rt = c.drt().runtime().secondary();
// Run the background scraping loop
rt.block_on(async {
let timeout = NATS_TIMEOUT_AND_INITIAL_DELAY_MS;
let mut delay = NATS_TIMEOUT_AND_INITIAL_DELAY_MS;
let timeout = std::time::Duration::from_millis(METRICS_SCRAPE_TIMEOUT_MS);
handle.spawn(async move {
let result = match c.scrape_stats(timeout).await {
loop {
match c.scrape_stats(timeout).await {
Ok(service_set) => {
m.update_from_service_set(&service_set);
Ok(())
}
Err(err) => {
// Reset metrics on failure
tracing::error!(
"Background scrape failed for {}: {}",
c.service_name(),
err
);
m.reset_to_zeros();
Err(anyhow::anyhow!("Failed to scrape stats: {}", err))
// Double delay on failure, capped at MAX_DELAY
delay = std::cmp::min(delay * 2, MAX_DELAY_MS);
}
};
// Send the result back to the waiting thread
// If send fails, the receiver has already given up waiting
let _ = tx.send(result);
});
// Wait for the spawned task to complete (with a timeout to prevent hanging)
// Add 100ms buffer to the scrape timeout to account for processing overhead
let recv_timeout = std::time::Duration::from_millis(METRICS_SCRAPE_TIMEOUT_MS + 100);
match rx.recv_timeout(recv_timeout) {
Ok(result) => result, // Return the actual result from scraping
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
component_metrics.reset_to_zeros();
Err(anyhow::anyhow!("Metrics collection timed out"))
}
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
component_metrics.reset_to_zeros();
Err(anyhow::anyhow!("Metrics collection task failed"))
}
tokio::time::sleep(delay).await;
}
});
self.drt()
.register_metrics_callback(hierarchies, metrics_callback);
});
Ok(())
}
......@@ -587,7 +575,7 @@ impl Namespace {
// Register the metrics callback for this component.
// If registration fails, log a warning but do not propagate the error,
// as metrics are not mission critical and should not block component creation.
if let Err(err) = component.add_metrics_callback() {
if let Err(err) = component.start_scraping_metrics() {
tracing::warn!(
"Failed to add metrics callback for component '{}': {}",
component.service_name(),
......
......@@ -247,7 +247,8 @@ impl DistributedRuntime {
self.instance_sources.clone()
}
/// Add a Prometheus metric to a specific hierarchy's registry
/// Add a Prometheus metric to a specific hierarchy's registry. Note that it is possible
/// to register the same metric name multiple times, as long as the labels are different.
pub fn add_prometheus_metric(
&self,
hierarchy: &str,
......@@ -257,16 +258,6 @@ impl DistributedRuntime {
let mut registries = self.hierarchy_to_metricsregistry.write().unwrap();
let entry = registries.entry(hierarchy.to_string()).or_default();
// If a metric with this name already exists for the hierarchy, warn and skip registration
if entry.has_metric_named(metric_name) {
tracing::warn!(
hierarchy = ?hierarchy,
metric_name = ?metric_name,
"Metric already exists in registry; skipping registration"
);
return Ok(());
}
// Try to register the metric and provide better error information
match entry.prometheus_registry.register(prometheus_metric) {
Ok(_) => Ok(()),
......
......@@ -1089,11 +1089,9 @@ mod test_metricsregistry_prometheus_fmt_outputs {
let endpoint_output =
super::test_helpers::remove_nats_lines(&endpoint_output_raw).join("\n");
let expected_endpoint_output = format!(
r#"# HELP dynamo_component_testcounter A test counter
let expected_endpoint_output = r#"# HELP dynamo_component_testcounter A test counter
# TYPE dynamo_component_testcounter counter
dynamo_component_testcounter{{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"}} 123.456789"#
);
dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789"#.to_string();
assert_eq!(
endpoint_output, expected_endpoint_output,
......@@ -1120,14 +1118,12 @@ dynamo_component_testcounter{{dynamo_component="comp345",dynamo_endpoint="ep345"
let component_output =
super::test_helpers::remove_nats_lines(&component_output_raw).join("\n");
let expected_component_output = format!(
r#"# HELP dynamo_component_testcounter A test counter
let expected_component_output = r#"# HELP dynamo_component_testcounter A test counter
# TYPE dynamo_component_testcounter counter
dynamo_component_testcounter{{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"}} 123.456789
dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
# HELP dynamo_component_testgauge A test gauge
# TYPE dynamo_component_testgauge gauge
dynamo_component_testgauge{{dynamo_component="comp345",dynamo_namespace="ns345"}} 50000"#
);
dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000"#.to_string();
assert_eq!(
component_output, expected_component_output,
......@@ -1153,17 +1149,15 @@ dynamo_component_testgauge{{dynamo_component="comp345",dynamo_namespace="ns345"}
let namespace_output =
super::test_helpers::remove_nats_lines(&namespace_output_raw).join("\n");
let expected_namespace_output = format!(
r#"# HELP dynamo_component_testcounter A test counter
let expected_namespace_output = r#"# HELP dynamo_component_testcounter A test counter
# TYPE dynamo_component_testcounter counter
dynamo_component_testcounter{{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"}} 123.456789
dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
# HELP dynamo_component_testgauge A test gauge
# TYPE dynamo_component_testgauge gauge
dynamo_component_testgauge{{dynamo_component="comp345",dynamo_namespace="ns345"}} 50000
dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
# HELP dynamo_component_testintcounter A test int counter
# TYPE dynamo_component_testintcounter counter
dynamo_component_testintcounter{{dynamo_namespace="ns345"}} 12345"#
);
dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string();
assert_eq!(
namespace_output, expected_namespace_output,
......@@ -1186,7 +1180,7 @@ dynamo_component_testintcounter{{dynamo_namespace="ns345"}} 12345"#
.create_intgaugevec(
"testintgaugevec",
"A test int gauge vector",
&["instance", "service", "status"],
&["instance", "status"],
&[("service", "api")],
)
.unwrap();
......@@ -1226,37 +1220,42 @@ dynamo_component_testintcounter{{dynamo_namespace="ns345"}} 12345"#
let filtered_drt_output =
super::test_helpers::remove_nats_lines(&drt_output_raw).join("\n");
let expected_drt_output = format!(
r#"# HELP dynamo_component_testcounter A test counter
let expected_drt_output = r#"# HELP dynamo_component_testcounter A test counter
# TYPE dynamo_component_testcounter counter
dynamo_component_testcounter{{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"}} 123.456789
dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
# HELP dynamo_component_testcountervec A test counter vector
# TYPE dynamo_component_testcountervec counter
dynamo_component_testcountervec{{method="GET",service="api",status="200"}} 10
dynamo_component_testcountervec{{method="POST",service="api",status="201"}} 5
dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="GET",service="api",status="200"} 10
dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="POST",service="api",status="201"} 5
# HELP dynamo_component_testgauge A test gauge
# TYPE dynamo_component_testgauge gauge
dynamo_component_testgauge{{dynamo_component="comp345",dynamo_namespace="ns345"}} 50000
dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000
# HELP dynamo_component_testhistogram A test histogram
# TYPE dynamo_component_testhistogram histogram
dynamo_component_testhistogram_bucket{{le="1"}} 0
dynamo_component_testhistogram_bucket{{le="2.5"}} 2
dynamo_component_testhistogram_bucket{{le="5"}} 3
dynamo_component_testhistogram_bucket{{le="10"}} 3
dynamo_component_testhistogram_bucket{{le="+Inf"}} 3
dynamo_component_testhistogram_sum 7.5
dynamo_component_testhistogram_count 3
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.005"} 0
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.01"} 0
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.025"} 0
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.05"} 0
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.1"} 0
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.25"} 0
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.5"} 0
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="1"} 1
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="2.5"} 2
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="5"} 3
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="10"} 3
dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="+Inf"} 3
dynamo_component_testhistogram_sum{dynamo_component="comp345",dynamo_namespace="ns345"} 7.5
dynamo_component_testhistogram_count{dynamo_component="comp345",dynamo_namespace="ns345"} 3
# HELP dynamo_component_testintcounter A test int counter
# TYPE dynamo_component_testintcounter counter
dynamo_component_testintcounter{{dynamo_namespace="ns345"}} 12345
dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345
# HELP dynamo_component_testintgauge A test int gauge
# TYPE dynamo_component_testintgauge gauge
dynamo_component_testintgauge 42
dynamo_component_testintgauge{dynamo_namespace="ns345"} 42
# HELP dynamo_component_testintgaugevec A test int gauge vector
# TYPE dynamo_component_testintgaugevec gauge
dynamo_component_testintgaugevec{{instance="server1",service="api",status="active"}} 10
dynamo_component_testintgaugevec{{instance="server2",service="api",status="inactive"}} 0"#
);
dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server1",service="api",status="active"} 10
dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server2",service="api",status="inactive"} 0"#.to_string();
assert_eq!(
filtered_drt_output, expected_drt_output,
......@@ -1480,7 +1479,7 @@ mod test_metricsregistry_nats {
input: SingleIn<String>,
) -> Result<ManyOut<Annotated<String>>, Error> {
let (data, ctx) = input.into_parts();
let response = format!("{}", data);
let response = data.to_string();
let stream = stream::iter(vec![Annotated::from_data(response)]);
Ok(ResponseStream::new(Box::pin(stream), ctx.context()))
}
......@@ -1505,7 +1504,7 @@ mod test_metricsregistry_nats {
let drt_output = drt.prometheus_metrics_fmt().unwrap();
let parsed_metrics: Vec<_> = drt_output
.lines()
.filter_map(|line| super::test_helpers::parse_prometheus_metric(line))
.filter_map(super::test_helpers::parse_prometheus_metric)
.collect();
println!("=== Initial DRT metrics output ===");
......@@ -1517,17 +1516,17 @@ mod test_metricsregistry_nats {
// DRT NATS metrics (ordered to match DRT_NATS_METRICS)
(build_metric_name(nats::CONNECTION_STATE), 1.0, 1.0), // Should be connected
(build_metric_name(nats::CONNECTS), 1.0, 1.0), // Should have 1 connection
(build_metric_name(nats::IN_TOTAL_BYTES), 300.0, 500.0), // ~75% to ~125% of 417
(build_metric_name(nats::IN_MESSAGES), 0.0, 0.0), // No messages yet
(build_metric_name(nats::OUT_OVERHEAD_BYTES), 500.0, 700.0), // ~75% to ~125% of 612 (includes endpoint creation overhead)
(build_metric_name(nats::OUT_MESSAGES), 0.0, 0.0), // No messages yet
(build_metric_name(nats::IN_TOTAL_BYTES), 400.0, 1500.0), // Wide range around 923
(build_metric_name(nats::IN_MESSAGES), 0.0, 5.0), // Wide range around 2
(build_metric_name(nats::OUT_OVERHEAD_BYTES), 700.0, 2500.0), // Wide range around 1633
(build_metric_name(nats::OUT_MESSAGES), 0.0, 5.0), // Wide range around 2
// Component NATS metrics (ordered to match COMPONENT_NATS_METRICS)
(build_metric_name(nats::AVG_PROCESSING_MS), 0.0, 0.0), // No processing yet
(build_metric_name(nats::TOTAL_ERRORS), 0.0, 0.0), // No errors yet
(build_metric_name(nats::TOTAL_REQUESTS), 0.0, 0.0), // No requests yet
(build_metric_name(nats::TOTAL_PROCESSING_MS), 0.0, 0.0), // No processing yet
(build_metric_name(nats::ACTIVE_SERVICES), 0.0, 0.0), // No services yet
(build_metric_name(nats::ACTIVE_ENDPOINTS), 0.0, 0.0), // No endpoints yet
(build_metric_name(nats::ACTIVE_SERVICES), 0.0, 2.0), // Service may not be fully active yet
(build_metric_name(nats::ACTIVE_ENDPOINTS), 0.0, 2.0), // Endpoint may not be fully active yet
];
for (metric_name, min_value, max_value) in &initial_expected_metric_values {
......@@ -1576,7 +1575,6 @@ mod test_metricsregistry_nats {
);
}
}
sleep(Duration::from_millis(100)).await;
}
println!("✓ Sent messages and received responses successfully");
......@@ -1592,42 +1590,46 @@ mod test_metricsregistry_nats {
let final_parsed_metrics: Vec<_> = super::test_helpers::extract_metrics(&final_drt_output)
.iter()
.filter_map(|line| super::test_helpers::parse_prometheus_metric(line))
.filter_map(|line| super::test_helpers::parse_prometheus_metric(line.as_str()))
.collect();
println!("\n=== Waiting 1 second for metrics to stabilize ===");
sleep(Duration::from_secs(1)).await;
println!("✓ Wait complete, checking final metrics...");
let post_expected_metric_values = [
// DRT NATS metrics (ordered to match DRT_NATS_METRICS)
(build_metric_name(nats::CONNECTION_STATE), 1.0, 1.0), // Should remain connected
(build_metric_name(nats::CONNECTS), 1.0, 1.0), // Should remain 1 connection
(build_metric_name(nats::IN_TOTAL_BYTES), 22000.0, 28000.0), // ~75% to ~125% of 24977 (10 messages × 2000 bytes + overhead)
(build_metric_name(nats::IN_MESSAGES), 10.0, 12.0), // Allow small drift (callback may run twice)
(build_metric_name(nats::OUT_OVERHEAD_BYTES), 2076.0, 3461.0), // ~75% to ~125% of 2769 (synchronous metrics collection overhead)
(build_metric_name(nats::OUT_MESSAGES), 10.0, 12.0), // Allow small drift (callback may run twice)
// Component NATS metrics (ordered to match COMPONENT_NATS_METRICS)
(build_metric_name(nats::AVG_PROCESSING_MS), 0.0, 1.0), // Should be low processing time
(build_metric_name(nats::TOTAL_ERRORS), 0.0, 0.0), // Should have no errors
(build_metric_name(nats::TOTAL_REQUESTS), 0.0, 0.0), // NATS metrics don't track work handler requests
(build_metric_name(nats::TOTAL_PROCESSING_MS), 0.0, 5.0), // Should be low total processing time
(build_metric_name(nats::ACTIVE_SERVICES), 0.0, 0.0), // NATS metrics don't track work handler services
(build_metric_name(nats::ACTIVE_ENDPOINTS), 0.0, 0.0), // NATS metrics don't track work handler endpoints
// Work handler metrics with ranges
(build_metric_name(work_handler::REQUESTS_TOTAL), 10.0, 10.0), // Exact count (10 messages)
// DRT NATS metrics
(build_metric_name(nats::CONNECTION_STATE), 1.0, 1.0), // Connected
(build_metric_name(nats::CONNECTS), 1.0, 1.0), // 1 connection
(build_metric_name(nats::IN_TOTAL_BYTES), 20000.0, 32000.0), // Wide range around 26117
(build_metric_name(nats::IN_MESSAGES), 8.0, 20.0), // Wide range around 16
(build_metric_name(nats::OUT_OVERHEAD_BYTES), 2500.0, 8000.0), // Wide range around 5524
(build_metric_name(nats::OUT_MESSAGES), 8.0, 20.0), // Wide range around 16
// Component NATS metrics
(build_metric_name(nats::AVG_PROCESSING_MS), 0.0, 1.0), // Low processing time
(build_metric_name(nats::TOTAL_ERRORS), 0.0, 0.0), // No errors
(build_metric_name(nats::TOTAL_REQUESTS), 0.0, 0.0), // No work handler requests
(build_metric_name(nats::TOTAL_PROCESSING_MS), 0.0, 5.0), // Low total processing time
(build_metric_name(nats::ACTIVE_SERVICES), 0.0, 2.0), // Service may not be fully active
(build_metric_name(nats::ACTIVE_ENDPOINTS), 0.0, 2.0), // Endpoint may not be fully active
// Work handler metrics
(build_metric_name(work_handler::REQUESTS_TOTAL), 10.0, 10.0), // 10 messages
(
build_metric_name(work_handler::REQUEST_BYTES_TOTAL),
21000.0,
26000.0,
), // ~75% to ~125% of 23520 (10 × 2000 bytes + overhead)
), // ~75-125% of 23520
(
build_metric_name(work_handler::RESPONSE_BYTES_TOTAL),
18000.0,
23000.0,
), // ~75% to ~125% of 20660 (10 × 2000 bytes + overhead, but response size varies)
// Additional component metrics
), // ~75-125% of 20660
(
build_metric_name(work_handler::CONCURRENT_REQUESTS),
0.0,
1.0,
), // Should be 0 or very low
), // 0 or very low
// Histograms have _{count,sum} suffixes
(
format!(
"{}_count",
......@@ -1635,15 +1637,15 @@ mod test_metricsregistry_nats {
),
10.0,
10.0,
), // Exact count (10 messages)
), // 10 messages
(
format!(
"{}_sum",
build_metric_name(work_handler::REQUEST_DURATION_SECONDS)
),
0.001,
0.999,
), // Processing time sum (10 messages)
0.0001,
1.0,
), // Processing time sum (wide range)
];
println!("\n=== Checking Post-Activity All Metrics (NATS + Work Handler) ===");
......
......@@ -77,10 +77,9 @@ pub struct SystemStatusState {
impl SystemStatusState {
/// Create new system status server state with the provided metrics registry
pub fn new(drt: Arc<crate::DistributedRuntime>) -> anyhow::Result<Self> {
// Note: This metric is created at the DRT level (no namespace), so we manually add "dynamo_" prefix
// to maintain consistency with the project's metric naming convention
// Note: This metric is created at the DRT level (no namespace), so it will be prefixed with "dynamo_component_"
let uptime_gauge = drt.as_ref().create_gauge(
"dynamo_uptime_seconds",
"uptime_seconds",
"Total uptime of the DistributedRuntime in seconds",
&[],
)?;
......@@ -283,8 +282,8 @@ async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
// Integration tests: cargo test system_status_server --lib --features integration
#[cfg(test)]
/// Helper function to create a DRT instance for async testing
/// Uses the test-friendly constructor without discovery
/// Helper function to create a DRT instance for basic unit tests
/// Uses from_current to leverage existing tokio runtime without environment configuration
async fn create_test_drt_async() -> crate::DistributedRuntime {
let rt = crate::Runtime::from_current().unwrap();
crate::DistributedRuntime::from_settings_without_discovery(rt)
......@@ -292,6 +291,34 @@ async fn create_test_drt_async() -> crate::DistributedRuntime {
.unwrap()
}
#[cfg(test)]
/// Helper function to create a DRT instance for integration tests
/// Uses spawn_blocking to create runtime safely without ownership issues
/// Enables system status server for integration testing
/// Note: This function uses environment variables to configure and create the DistributedRuntime.
async fn create_test_drt_with_settings_async() -> crate::DistributedRuntime {
// Create runtime in blocking context where it can be safely dropped
let handle = tokio::task::spawn_blocking(|| {
// Load configuration from environment/settings
let config = crate::config::RuntimeConfig::from_settings().unwrap();
// Create runtime with the configuration and extract handle
let runtime = config.create_runtime().unwrap();
let handle = runtime.handle().clone();
// Runtime will be automatically dropped when it goes out of scope
handle
})
.await
.unwrap();
// Create Runtime using external handle (no ownership)
let rt = crate::Runtime::from_handle(handle).unwrap();
crate::DistributedRuntime::from_settings_without_discovery(rt)
.await
.unwrap()
}
#[cfg(test)]
mod tests {
use super::*;
......@@ -308,6 +335,7 @@ mod tests {
use stdio_override::*;
use tokio::time::{sleep, Duration};
// This is a basic test to verify the HTTP server is working before testing other more complicated tests
#[tokio::test]
async fn test_http_server_lifecycle() {
let cancel_token = CancellationToken::new();
......@@ -324,8 +352,7 @@ mod tests {
.await;
});
// wait for a while to let the server start
sleep(Duration::from_millis(100)).await;
// server starts immediately, no need to wait
// cancel token
cancel_token.cancel();
......@@ -338,19 +365,36 @@ mod tests {
);
}
#[cfg(feature = "integration")]
#[tokio::test]
async fn test_uptime_without_initialization() {
// Test that uptime returns an error if start time is not initialized
temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
let drt = create_test_drt_async().await;
let system_status = SystemStatusState::new(Arc::new(drt)).unwrap();
// This should return an error because start time is not initialized
let result = system_status.uptime();
assert!(result.is_err());
assert_eq!(result.unwrap_err(), "Start time not initialized");
})
.await;
}
#[cfg(feature = "integration")]
#[tokio::test]
async fn test_runtime_metrics_initialization_and_namespace() {
// Test that metrics have correct namespace
temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
let drt = create_test_drt_async().await;
let runtime_metrics = SystemStatusState::new(Arc::new(drt)).unwrap();
let system_status = SystemStatusState::new(Arc::new(drt)).unwrap();
// Initialize start time
runtime_metrics.initialize_start_time().unwrap();
system_status.initialize_start_time().unwrap();
runtime_metrics.uptime_gauge.set(42.0);
system_status.uptime_gauge.set(42.0);
let response = runtime_metrics.drt().prometheus_metrics_fmt().unwrap();
let response = system_status.drt().prometheus_metrics_fmt().unwrap();
println!("Full metrics response:\n{}", response);
// Filter out NATS client metrics for comparison
......@@ -363,30 +407,48 @@ mod tests {
.join("\n");
let expected = "\
# HELP dynamo_component_dynamo_uptime_seconds Total uptime of the DistributedRuntime in seconds
# TYPE dynamo_component_dynamo_uptime_seconds gauge
dynamo_component_dynamo_uptime_seconds 42";
# HELP dynamo_component_uptime_seconds Total uptime of the DistributedRuntime in seconds
# TYPE dynamo_component_uptime_seconds gauge
dynamo_component_uptime_seconds 42";
assert_eq!(filtered_response, expected);
})
.await;
}
#[cfg(feature = "integration")]
#[tokio::test]
async fn test_start_time_initialization() {
// Test that start time can only be initialized once
temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
let drt = create_test_drt_async().await;
let runtime_metrics = SystemStatusState::new(Arc::new(drt)).unwrap();
let system_status = SystemStatusState::new(Arc::new(drt)).unwrap();
// First initialization should succeed
assert!(runtime_metrics.initialize_start_time().is_ok());
assert!(system_status.initialize_start_time().is_ok());
// Second initialization should fail
assert!(runtime_metrics.initialize_start_time().is_err());
assert!(system_status.initialize_start_time().is_err());
// Sleep for 100ms and verify uptime increases
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let uptime_after_sleep = system_status.uptime().unwrap();
assert!(
uptime_after_sleep >= std::time::Duration::from_millis(100),
"Uptime should be at least 100ms after sleep, got: {:?}",
uptime_after_sleep
);
// Uptime should work after initialization
let _uptime = runtime_metrics.uptime().unwrap();
// If we get here, uptime calculation works correctly
})
.await;
}
/// This test verifies the health and liveness endpoints of the system status server.
/// It checks that the endpoints respond with the correct HTTP status codes and bodies
/// based on the initial health status and any custom endpoint paths provided via environment variables.
/// The test is parameterized using multiple #[case] attributes to cover various scenarios,
/// including different initial health states ("ready" and "notready"), default and custom endpoint paths,
/// and expected response codes and bodies.
#[rstest]
#[case("ready", 200, "ready", None, None, 3)]
#[case("notready", 503, "notready", None, None, 3)]
......@@ -410,8 +472,6 @@ dynamo_component_dynamo_uptime_seconds 42";
#[case] expected_num_tests: usize,
) {
use std::sync::Arc;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
// use tokio::io::{AsyncReadExt, AsyncWriteExt};
// use reqwest for HTTP requests
......@@ -422,6 +482,8 @@ dynamo_component_dynamo_uptime_seconds 42";
#[allow(clippy::redundant_closure_call)]
temp_env::async_with_vars(
[
("DYN_SYSTEM_ENABLED", Some("true")),
("DYN_SYSTEM_PORT", Some("0")),
(
"DYN_SYSTEM_STARTING_HEALTH_STATUS",
Some(starting_health_status),
......@@ -430,20 +492,14 @@ dynamo_component_dynamo_uptime_seconds 42";
("DYN_SYSTEM_LIVE_PATH", custom_live_path),
],
(async || {
let runtime = crate::Runtime::from_settings().unwrap();
let drt = Arc::new(
crate::DistributedRuntime::from_settings_without_discovery(runtime)
.await
.unwrap(),
);
let cancel_token = CancellationToken::new();
let (addr, _) =
spawn_system_status_server("127.0.0.1", 0, cancel_token.clone(), drt)
.await
.unwrap();
println!("[test] Waiting for server to start...");
sleep(std::time::Duration::from_millis(1000)).await;
println!("[test] Server should be up, starting requests...");
let drt = Arc::new(create_test_drt_with_settings_async().await);
// Get system status server info from DRT (instead of manually spawning)
let system_info = drt
.system_status_server_info()
.expect("System status server should be started by DRT");
let addr = system_info.socket_addr;
let client = reqwest::Client::new();
// Prepare test cases
......@@ -498,14 +554,14 @@ dynamo_component_dynamo_uptime_seconds 42";
#[cfg(feature = "integration")]
async fn test_health_endpoint_tracing() -> Result<()> {
use std::sync::Arc;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
// Closure call is needed here to satisfy async_with_vars
#[allow(clippy::redundant_closure_call)]
let _ = temp_env::async_with_vars(
[
("DYN_SYSTEM_ENABLED", Some("true")),
("DYN_SYSTEM_PORT", Some("0")),
("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready")),
("DYN_LOGGING_JSONL", Some("1")),
("DYN_LOG", Some("trace")),
......@@ -516,18 +572,13 @@ dynamo_component_dynamo_uptime_seconds 42";
crate::logging::init();
let runtime = crate::Runtime::from_settings().unwrap();
let drt = Arc::new(
crate::DistributedRuntime::from_settings_without_discovery(runtime)
.await
.unwrap(),
);
let cancel_token = CancellationToken::new();
let (addr, _) =
spawn_system_status_server("127.0.0.1", 0, cancel_token.clone(), drt)
.await
.unwrap();
sleep(std::time::Duration::from_millis(1000)).await;
let drt = Arc::new(create_test_drt_with_settings_async().await);
// Get system status server info from DRT (instead of manually spawning)
let system_info = drt
.system_status_server_info()
.expect("System status server should be started by DRT");
let addr = system_info.socket_addr;
let client = reqwest::Client::new();
for path in [("/health"), ("/live"), ("/someRandomPathNotFoundHere")] {
let traceparent_value =
......@@ -558,15 +609,116 @@ dynamo_component_dynamo_uptime_seconds 42";
#[cfg(feature = "integration")]
#[tokio::test]
async fn test_uptime_without_initialization() {
// Test that uptime returns an error if start time is not initialized
let drt = create_test_drt_async().await;
let runtime_metrics = SystemStatusState::new(Arc::new(drt)).unwrap();
async fn test_health_endpoint_with_changing_health_status() {
// Test health endpoint starts in not ready status, then becomes ready
// when endpoints are created (DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS=generate)
const ENDPOINT_NAME: &str = "generate";
const ENDPOINT_HEALTH_CONFIG: &str = "[\"generate\"]";
temp_env::async_with_vars(
[
("DYN_SYSTEM_ENABLED", Some("true")),
("DYN_SYSTEM_PORT", Some("0")),
("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("notready")),
("DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Some(ENDPOINT_HEALTH_CONFIG)),
],
async {
let drt = Arc::new(create_test_drt_with_settings_async().await);
// This should return an error because start time is not initialized
let result = runtime_metrics.uptime();
assert!(result.is_err());
assert_eq!(result.unwrap_err(), "Start time not initialized");
// Check if system status server was started
let system_info_opt = drt.system_status_server_info();
// Ensure system status server was spawned by DRT
assert!(
system_info_opt.is_some(),
"System status server was not spawned by DRT. Expected DRT to spawn server when DYN_SYSTEM_ENABLED=true, but system_status_server_info() returned None. Environment: DYN_SYSTEM_ENABLED={:?}, DYN_SYSTEM_PORT={:?}",
std::env::var("DYN_SYSTEM_ENABLED"),
std::env::var("DYN_SYSTEM_PORT")
);
// Get the system status server info from DRT - this should never fail now due to above check
let system_info = system_info_opt.unwrap();
let addr = system_info.socket_addr;
// Initially check health - should be not ready
let client = reqwest::Client::new();
let health_url = format!("http://{}/health", addr);
let response = client.get(&health_url).send().await.unwrap();
let status = response.status();
let body = response.text().await.unwrap();
// Health should be not ready (503) initially
assert_eq!(status, 503, "Health should be 503 (not ready) initially, got: {}", status);
assert!(body.contains("\"status\":\"notready\""), "Health should contain status notready");
// Now create a namespace, component, and endpoint to make the system healthy
let namespace = drt.namespace("ns1234").unwrap();
let component = namespace.component("comp1234").unwrap();
// Create a simple test handler
use crate::pipeline::{async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, SingleIn};
use crate::protocols::annotated::Annotated;
struct TestHandler;
#[async_trait]
impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for TestHandler {
async fn generate(&self, input: SingleIn<String>) -> crate::Result<ManyOut<Annotated<String>>> {
let (data, ctx) = input.into_parts();
let response = Annotated::from_data(format!("You responded: {}", data));
Ok(crate::pipeline::ResponseStream::new(
Box::pin(crate::stream::iter(vec![response])),
ctx.context()
))
}
}
// Create the ingress and start the endpoint service
let ingress = Ingress::for_engine(std::sync::Arc::new(TestHandler)).unwrap();
// Start the service and endpoint
tokio::spawn(async move {
let _ = component
.service_builder()
.create()
.await
.unwrap()
.endpoint(ENDPOINT_NAME)
.endpoint_builder()
.handler(ingress)
.start()
.await;
});
// Hit health endpoint 200 times to verify consistency
let mut success_count = 0;
let mut failures = Vec::new();
for i in 1..=200 {
let response = client.get(&health_url).send().await.unwrap();
let status = response.status();
let body = response.text().await.unwrap();
if status == 200 && body.contains("\"status\":\"ready\"") {
success_count += 1;
} else {
failures.push((i, status.as_u16(), body.clone()));
if failures.len() <= 5 { // Only log first 5 failures
tracing::warn!("Request {}: status={}, body={}", i, status, body);
}
}
}
tracing::info!("Health endpoint test results: {}/200 requests succeeded", success_count);
if !failures.is_empty() {
tracing::warn!("Failed requests: {}", failures.len());
}
// Expect at least 150 out of 200 requests to be successful
assert!(success_count >= 150, "Expected at least 150 out of 200 requests to succeed, but only {} succeeded", success_count);
},
)
.await;
}
#[cfg(feature = "integration")]
......@@ -574,17 +726,19 @@ dynamo_component_dynamo_uptime_seconds 42";
async fn test_spawn_system_status_server_endpoints() {
// use reqwest for HTTP requests
temp_env::async_with_vars(
[("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready"))],
[
("DYN_SYSTEM_ENABLED", Some("true")),
("DYN_SYSTEM_PORT", Some("0")),
("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready")),
],
async {
let cancel_token = CancellationToken::new();
let drt = create_test_drt_async().await;
let (addr, server_handle) =
spawn_system_status_server("127.0.0.1", 0, cancel_token.clone(), Arc::new(drt))
.await
.unwrap();
println!("[test] Waiting for server to start...");
sleep(std::time::Duration::from_millis(1000)).await;
println!("[test] Server should be up, starting requests...");
let drt = Arc::new(create_test_drt_with_settings_async().await);
// Get system status server info from DRT (instead of manually spawning)
let system_info = drt
.system_status_server_info()
.expect("System status server should be started by DRT");
let addr = system_info.socket_addr;
let client = reqwest::Client::new();
for (path, expect_200, expect_body) in [
("/health", true, "ready"),
......@@ -612,51 +766,9 @@ dynamo_component_dynamo_uptime_seconds 42";
body
);
}
cancel_token.cancel();
match server_handle.await {
Ok(_) => println!("[test] Server shut down normally"),
Err(e) => {
if e.is_panic() {
println!("[test] Server panicked: {:?}", e);
} else {
println!("[test] Server cancelled: {:?}", e);
}
}
}
// DRT handles server cleanup automatically
},
)
.await;
}
#[cfg(feature = "integration")]
#[tokio::test]
async fn test_http_server_basic_functionality() {
// Test basic HTTP server functionality without requiring etcd
let cancel_token = CancellationToken::new();
let cancel_token_for_server = cancel_token.clone();
// Test basic HTTP server lifecycle
let app = Router::new().route("/test", get(|| async { (StatusCode::OK, "test") }));
// start HTTP server
let server_handle = tokio::spawn(async move {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let _ = axum::serve(listener, app)
.with_graceful_shutdown(cancel_token_for_server.cancelled_owned())
.await;
});
// wait for a while to let the server start
sleep(Duration::from_millis(100)).await;
// cancel token
cancel_token.cancel();
// wait for the server to shut down
let result = tokio::time::timeout(Duration::from_secs(5), server_handle).await;
assert!(
result.is_ok(),
"HTTP server should shut down when cancel token is cancelled"
);
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment