Unverified Commit bec1dd54 authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

fix: use tokio spawn / interval.tick(), make nats metric names clearer, fix...


fix: use tokio spawn / interval.tick(), make nats metric names clearer, fix tests sharing environment variables (temp_env) (#2506)
Co-authored-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent 80d8aa19
......@@ -48,7 +48,7 @@ use super::{
use crate::pipeline::network::{ingress::push_endpoint::PushEndpoint, PushWorkHandler};
use crate::protocols::Endpoint as EndpointId;
use crate::service::ComponentNatsPrometheusMetrics;
use crate::service::ComponentNatsServerPrometheusMetrics;
use async_nats::{
rustls::quic,
service::{Service, ServiceExt},
......@@ -223,6 +223,10 @@ impl Component {
self.name.clone()
}
pub fn labels(&self) -> &[(String, String)] {
&self.labels
}
pub fn endpoint(&self, endpoint: impl Into<String>) -> Endpoint {
Endpoint {
component: self.clone(),
......@@ -267,18 +271,19 @@ impl Component {
.await
}
/// Add Prometheus metrics for this component's service stats.
/// Add Prometheus metrics for this component's NATS service stats.
///
/// Starts a background task that scrapes stats every ~4.7s and updates metrics.
/// The thinking was that it should be a little bit shorter than the Prometheus polling interval.
/// Currently Prometheus polls every 6 seconds, and I wanted every poll to be fresh, so this is set
/// as an arbitrary 4.7 seconds plus 0.3 seconds if it times out. It's a bit of a hand-wavey decision.
pub fn start_scraping_metrics(&self) -> Result<()> {
/// Starts a background task that periodically requests service statistics from NATS
/// and updates the corresponding Prometheus metrics. The scraping interval is set to
/// approximately 873ms (MAX_DELAY_MS), which is arbitrary but any value less than a second
/// is fair game. This frequent scraping provides real-time service statistics updates.
pub fn start_scraping_nats_service_component_metrics(&self) -> Result<()> {
const NATS_TIMEOUT_AND_INITIAL_DELAY_MS: std::time::Duration =
std::time::Duration::from_millis(300);
const MAX_DELAY_MS: std::time::Duration = std::time::Duration::from_millis(4700);
const MAX_DELAY_MS: std::time::Duration = std::time::Duration::from_millis(873);
let component_metrics = ComponentNatsPrometheusMetrics::new(self)?;
// If there is another component with the same service name, this will fail.
let component_metrics = ComponentNatsServerPrometheusMetrics::new(self)?;
let component_clone = self.clone();
let mut hierarchies = self.parent_hierarchy();
......@@ -293,15 +298,18 @@ impl Component {
let m = component_metrics.clone();
let c = component_clone.clone();
// Use std::thread for the background task to avoid runtime context issues
std::thread::spawn(move || {
// Use the existing secondary runtime from drt for background metrics scraping
let rt = c.drt().runtime().secondary();
// Run the background scraping loop
rt.block_on(async {
// Use the DRT's runtime handle to spawn the background task.
// We cannot use regular `tokio::spawn` here because:
// 1. This method may be called from contexts without an active Tokio runtime
// (e.g., tests that create a DRT in a blocking context)
// 2. Tests often create a temporary runtime just to build the DRT, then drop it
// 3. `tokio::spawn` requires being called from within a runtime context
// By using the DRT's own runtime handle, we ensure the task runs in the
// correct runtime that will persist for the lifetime of the component.
c.drt().runtime().secondary().spawn(async move {
let timeout = NATS_TIMEOUT_AND_INITIAL_DELAY_MS;
let mut delay = NATS_TIMEOUT_AND_INITIAL_DELAY_MS;
let mut interval = tokio::time::interval(MAX_DELAY_MS);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
match c.scrape_stats(timeout).await {
......@@ -315,14 +323,11 @@ impl Component {
err
);
m.reset_to_zeros();
// Double delay on failure, capped at MAX_DELAY
delay = std::cmp::min(delay * 2, MAX_DELAY_MS);
}
}
tokio::time::sleep(delay).await;
interval.tick().await;
}
});
});
Ok(())
}
......@@ -576,13 +581,28 @@ impl Namespace {
// Register the metrics callback for this component.
// If registration fails, log a warning but do not propagate the error,
// as metrics are not mission critical and should not block component creation.
if let Err(err) = component.start_scraping_metrics() {
if let Err(err) = component.start_scraping_nats_service_component_metrics() {
let error_str = err.to_string();
// Check if this is a duplicate metrics registration (expected in some cases)
// or a different error (unexpected)
if error_str.contains("Duplicate metrics") {
// This is not a critical error because it's possible for multiple Components
// with the same service_name to register metrics callbacks.
tracing::debug!(
"Duplicate metrics registration for component '{}' (expected when multiple components share the same service_name): {}",
component.service_name(),
error_str
);
} else {
// This is unexpected and should be more visible
tracing::warn!(
"Failed to add metrics callback for component '{}': {}",
"Failed to start scraping metrics for component '{}': {}",
component.service_name(),
err
);
}
}
Ok(component)
}
......
......@@ -14,7 +14,7 @@
// limitations under the License.
pub use crate::component::Component;
use crate::transports::nats::DRTNatsPrometheusMetrics;
use crate::transports::nats::DRTNatsClientPrometheusMetrics;
use crate::{
component::{self, ComponentBuilder, Endpoint, InstanceSource, Namespace},
discovery::DiscoveryClient,
......@@ -95,28 +95,29 @@ impl DistributedRuntime {
system_health,
};
let sys_nats_metrics = DRTNatsPrometheusMetrics::new(
let nats_client_metrics = DRTNatsClientPrometheusMetrics::new(
&distributed_runtime,
nats_client_for_metrics.client().clone(),
)?;
let mut drt_hierarchies = distributed_runtime.parent_hierarchy();
drt_hierarchies.push(distributed_runtime.hierarchy());
// Register a callback to update NATS client metrics
let nats_metrics_callback = Arc::new({
let sys_nats_metrics_clone = sys_nats_metrics.clone();
let nats_client_callback = Arc::new({
let nats_client_clone = nats_client_metrics.clone();
move || {
sys_nats_metrics_clone.set_from_client_stats();
nats_client_clone.set_from_client_stats();
Ok(())
}
});
distributed_runtime.register_metrics_callback(drt_hierarchies, nats_metrics_callback);
distributed_runtime.register_metrics_callback(drt_hierarchies, nats_client_callback);
// Start system status server if enabled
// Handle system status server initialization
if let Some(cancel_token) = cancel_token {
// System server is enabled - start both the state and HTTP server
let host = config.system_host.clone();
let port = config.system_port;
// Start system status server (it spawns its own task internally)
// Start system status server (it creates SystemStatusState internally)
match crate::system_status_server::spawn_system_status_server(
&host,
port,
......@@ -146,7 +147,18 @@ impl DistributedRuntime {
}
}
} else {
tracing::debug!("Health and system status server is disabled via DYN_SYSTEM_ENABLED");
// System server HTTP is disabled, but still create the state for metrics
// This ensures uptime_seconds metric is always registered
let system_status_state = crate::system_status_server::SystemStatusState::new(
Arc::new(distributed_runtime.clone()),
)?;
// Initialize the start time for uptime tracking
if let Err(e) = system_status_state.initialize_start_time() {
tracing::warn!("Failed to initialize system status start time: {}", e);
}
tracing::debug!("System status server HTTP endpoints disabled, but uptime metrics are being tracked");
}
Ok(distributed_runtime)
......
......@@ -33,8 +33,8 @@ use std::collections::HashMap;
// Import commonly used items to avoid verbose prefixes
use prometheus_names::{
build_metric_name, labels, name_prefix, nats, work_handler, COMPONENT_NATS_METRICS,
DRT_NATS_METRICS,
build_metric_name, labels, name_prefix, nats_client, nats_service, work_handler,
COMPONENT_NATS_METRICS, DRT_NATS_METRICS,
};
// Pipeline imports for endpoint creation
......@@ -575,26 +575,14 @@ pub trait MetricsRegistry: Send + Sync + DistributedRuntimeProvider {
#[cfg(test)]
mod test_helpers {
use super::prometheus_names::name_prefix;
use super::prometheus_names::nats as nats_metrics;
use super::prometheus_names::{nats_client, nats_service};
use super::*;
/// Creates a test DistributedRuntime for integration tests.
/// Uses NATS; requires #[cfg(feature = "integration")].
#[cfg(feature = "integration")]
pub fn create_test_drt() -> crate::DistributedRuntime {
let rt = crate::Runtime::single_threaded().unwrap();
tokio::runtime::Runtime::new().unwrap().block_on(async {
crate::DistributedRuntime::from_settings_without_discovery(rt.clone())
.await
.unwrap()
})
}
/// Helper function to create a DRT instance for testing in async contexts
#[cfg(feature = "integration")]
pub async fn create_test_drt_async() -> crate::DistributedRuntime {
let rt = crate::Runtime::single_threaded().unwrap();
crate::DistributedRuntime::from_settings_without_discovery(rt.clone())
let rt = crate::Runtime::from_current().unwrap();
crate::DistributedRuntime::from_settings_without_discovery(rt)
.await
.unwrap()
}
......@@ -618,7 +606,11 @@ mod test_helpers {
!line.contains(&format!(
"{}{}",
name_prefix::COMPONENT,
nats_metrics::PREFIX
nats_client::PREFIX
)) && !line.contains(&format!(
"{}{}",
name_prefix::COMPONENT,
nats_service::PREFIX
)) && !line.trim().is_empty()
})
}
......@@ -629,7 +621,11 @@ mod test_helpers {
line.contains(&format!(
"{}{}",
name_prefix::COMPONENT,
nats_metrics::PREFIX
nats_client::PREFIX
)) || line.contains(&format!(
"{}{}",
name_prefix::COMPONENT,
nats_service::PREFIX
))
})
}
......@@ -929,9 +925,9 @@ mod test_metricsregistry_prefixes {
use super::*;
use prometheus::core::Collector;
#[test]
fn test_hierarchical_prefixes_and_parent_hierarchies() {
let drt = super::test_helpers::create_test_drt();
#[tokio::test]
async fn test_hierarchical_prefixes_and_parent_hierarchies() {
let drt = super::test_helpers::create_test_drt_async().await;
const DRT_NAME: &str = "";
const NAMESPACE_NAME: &str = "ns901";
......@@ -1003,10 +999,10 @@ mod test_metricsregistry_prefixes {
.is_ok());
}
#[test]
fn test_recursive_namespace() {
#[tokio::test]
async fn test_recursive_namespace() {
// Create a distributed runtime for testing
let drt = super::test_helpers::create_test_drt();
let drt = super::test_helpers::create_test_drt_async().await;
// Create a deeply chained namespace: ns1.ns2.ns3
let ns1 = drt.namespace("ns1").unwrap();
......@@ -1055,16 +1051,16 @@ mod test_metricsregistry_prefixes {
#[cfg(test)]
mod test_metricsregistry_prometheus_fmt_outputs {
use super::prometheus_names::name_prefix;
use super::prometheus_names::nats as nats_metrics;
use super::prometheus_names::{nats_client, nats_service};
use super::prometheus_names::{COMPONENT_NATS_METRICS, DRT_NATS_METRICS};
use super::*;
use prometheus::Counter;
use std::sync::Arc;
#[test]
fn test_prometheusfactory_using_metrics_registry_trait() {
#[tokio::test]
async fn test_prometheusfactory_using_metrics_registry_trait() {
// Setup real DRT and registry using the test-friendly constructor
let drt = super::test_helpers::create_test_drt();
let drt = super::test_helpers::create_test_drt_async().await;
// Use a simple constant namespace name
let namespace_name = "ns345";
......@@ -1255,7 +1251,10 @@ dynamo_component_testintgauge{dynamo_namespace="ns345"} 42
# HELP dynamo_component_testintgaugevec A test int gauge vector
# TYPE dynamo_component_testintgaugevec gauge
dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server1",service="api",status="active"} 10
dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server2",service="api",status="inactive"} 0"#.to_string();
dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server2",service="api",status="inactive"} 0
# HELP dynamo_component_uptime_seconds Total uptime of the DistributedRuntime in seconds
# TYPE dynamo_component_uptime_seconds gauge
dynamo_component_uptime_seconds 0"#.to_string();
assert_eq!(
filtered_drt_output, expected_drt_output,
......@@ -1275,15 +1274,15 @@ dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server2",ser
let test_input = r#"# HELP dynamo_component_requests Total requests
# TYPE dynamo_component_requests counter
dynamo_component_requests 42
# HELP dynamo_component_nats_connection_state Connection state
# TYPE dynamo_component_nats_connection_state gauge
dynamo_component_nats_connection_state 1
# HELP dynamo_component_nats_client_connection_state Connection state
# TYPE dynamo_component_nats_client_connection_state gauge
dynamo_component_nats_client_connection_state 1
# HELP dynamo_component_latency Response latency
# TYPE dynamo_component_latency histogram
dynamo_component_latency_bucket{le="0.1"} 10
dynamo_component_latency_bucket{le="0.5"} 25
dynamo_component_nats_total_requests 100
dynamo_component_nats_total_errors 5"#;
dynamo_component_nats_service_total_requests 100
dynamo_component_nats_service_total_errors 5"#;
// Test remove_nats_lines (excludes NATS lines but keeps help/type)
let filtered_out = super::test_helpers::remove_nats_lines(test_input);
......@@ -1310,16 +1309,16 @@ dynamo_component_nats_total_errors 5"#;
#[cfg(test)]
mod test_metricsregistry_nats {
use super::prometheus_names::name_prefix;
use super::prometheus_names::nats as nats_metrics;
use super::prometheus_names::{nats_client, nats_service};
use super::prometheus_names::{COMPONENT_NATS_METRICS, DRT_NATS_METRICS};
use super::*;
use crate::pipeline::PushRouter;
use crate::{DistributedRuntime, Runtime};
use tokio::time::{sleep, Duration};
#[test]
fn test_drt_nats_metrics() {
#[tokio::test]
async fn test_drt_nats_metrics() {
// Setup real DRT and registry using the test-friendly constructor
let drt = super::test_helpers::create_test_drt();
let drt = super::test_helpers::create_test_drt_async().await;
// Get DRT output which should include NATS client metrics
let drt_output = drt.prometheus_metrics_fmt().unwrap();
......@@ -1336,8 +1335,10 @@ mod test_metricsregistry_nats {
);
// Check for specific NATS client metric names (without values)
let drt_metrics = super::test_helpers::extract_metrics(&drt_output);
let actual_drt_nats_metrics_sorted: Vec<&str> = drt_metrics
// Extract only the metric lines from the already-filtered NATS metrics
let drt_nats_metric_lines =
super::test_helpers::extract_metrics(&drt_nats_metrics.join("\n"));
let actual_drt_nats_metrics_sorted: Vec<&str> = drt_nats_metric_lines
.iter()
.map(|line| {
let without_labels = line.split('{').next().unwrap_or(line);
......@@ -1375,13 +1376,13 @@ mod test_metricsregistry_nats {
println!("✓ DistributedRuntime NATS metrics integration test passed!");
}
#[test]
fn test_nats_metric_names() {
#[tokio::test]
async fn test_nats_metric_names() {
// This test only tests the existence of the NATS metrics. It does not check
// the values of the metrics.
// Setup real DRT and registry using the test-friendly constructor
let drt = super::test_helpers::create_test_drt();
let drt = super::test_helpers::create_test_drt_async().await;
// Create a namespace and components from the DRT
let namespace = drt.namespace("ns789").unwrap();
......@@ -1440,17 +1441,19 @@ mod test_metricsregistry_nats {
"COMPONENT_NATS_METRICS with prefix and expected_nats_metrics should be identical when sorted"
);
// Get both DRT and component output and filter for component metrics
let drt_and_component_metrics =
super::test_helpers::extract_metrics(&drt.prometheus_metrics_fmt().unwrap());
// Get both DRT and component output and filter for NATS metrics only
let drt_output = drt.prometheus_metrics_fmt().unwrap();
let drt_nats_lines = super::test_helpers::extract_nats_lines(&drt_output);
let drt_and_component_nats_metrics =
super::test_helpers::extract_metrics(&drt_nats_lines.join("\n"));
println!(
"DRT and component metrics count: {}",
drt_and_component_metrics.len()
"DRT and component NATS metrics count: {}",
drt_and_component_nats_metrics.len()
);
// Check that the NATS metrics are present in the component output
assert_eq!(
drt_and_component_metrics.len(),
drt_and_component_nats_metrics.len(),
DRT_NATS_METRICS.len() + COMPONENT_NATS_METRICS.len(),
"DRT at this point should have both the DRT and component NATS metrics"
);
......@@ -1514,19 +1517,31 @@ mod test_metricsregistry_nats {
let initial_expected_metric_values = [
// DRT NATS metrics (ordered to match DRT_NATS_METRICS)
(build_metric_name(nats::CONNECTION_STATE), 1.0, 1.0), // Should be connected
(build_metric_name(nats::CONNECTS), 1.0, 1.0), // Should have 1 connection
(build_metric_name(nats::IN_TOTAL_BYTES), 400.0, 1500.0), // Wide range around 923
(build_metric_name(nats::IN_MESSAGES), 0.0, 5.0), // Wide range around 2
(build_metric_name(nats::OUT_OVERHEAD_BYTES), 700.0, 2500.0), // Wide range around 1633
(build_metric_name(nats::OUT_MESSAGES), 0.0, 5.0), // Wide range around 2
(build_metric_name(nats_client::CONNECTION_STATE), 1.0, 1.0), // Should be connected
(build_metric_name(nats_client::CONNECTS), 1.0, 1.0), // Should have 1 connection
(
build_metric_name(nats_client::IN_TOTAL_BYTES),
400.0,
1500.0,
), // Wide range around 923
(build_metric_name(nats_client::IN_MESSAGES), 0.0, 5.0), // Wide range around 2
(
build_metric_name(nats_client::OUT_OVERHEAD_BYTES),
700.0,
2500.0,
), // Wide range around 1633
(build_metric_name(nats_client::OUT_MESSAGES), 0.0, 5.0), // Wide range around 2
// Component NATS metrics (ordered to match COMPONENT_NATS_METRICS)
(build_metric_name(nats::AVG_PROCESSING_MS), 0.0, 0.0), // No processing yet
(build_metric_name(nats::TOTAL_ERRORS), 0.0, 0.0), // No errors yet
(build_metric_name(nats::TOTAL_REQUESTS), 0.0, 0.0), // No requests yet
(build_metric_name(nats::TOTAL_PROCESSING_MS), 0.0, 0.0), // No processing yet
(build_metric_name(nats::ACTIVE_SERVICES), 0.0, 2.0), // Service may not be fully active yet
(build_metric_name(nats::ACTIVE_ENDPOINTS), 0.0, 2.0), // Endpoint may not be fully active yet
(build_metric_name(nats_service::AVG_PROCESSING_MS), 0.0, 0.0), // No processing yet
(build_metric_name(nats_service::TOTAL_ERRORS), 0.0, 0.0), // No errors yet
(build_metric_name(nats_service::TOTAL_REQUESTS), 0.0, 0.0), // No requests yet
(
build_metric_name(nats_service::TOTAL_PROCESSING_MS),
0.0,
0.0,
), // No processing yet
(build_metric_name(nats_service::ACTIVE_SERVICES), 0.0, 2.0), // Service may not be fully active yet
(build_metric_name(nats_service::ACTIVE_ENDPOINTS), 0.0, 2.0), // Endpoint may not be fully active yet
];
for (metric_name, min_value, max_value) in &initial_expected_metric_values {
......@@ -1599,19 +1614,31 @@ mod test_metricsregistry_nats {
let post_expected_metric_values = [
// DRT NATS metrics
(build_metric_name(nats::CONNECTION_STATE), 1.0, 1.0), // Connected
(build_metric_name(nats::CONNECTS), 1.0, 1.0), // 1 connection
(build_metric_name(nats::IN_TOTAL_BYTES), 20000.0, 32000.0), // Wide range around 26117
(build_metric_name(nats::IN_MESSAGES), 8.0, 20.0), // Wide range around 16
(build_metric_name(nats::OUT_OVERHEAD_BYTES), 2500.0, 8000.0), // Wide range around 5524
(build_metric_name(nats::OUT_MESSAGES), 8.0, 20.0), // Wide range around 16
(build_metric_name(nats_client::CONNECTION_STATE), 1.0, 1.0), // Connected
(build_metric_name(nats_client::CONNECTS), 1.0, 1.0), // 1 connection
(
build_metric_name(nats_client::IN_TOTAL_BYTES),
20000.0,
32000.0,
), // Wide range around 26117
(build_metric_name(nats_client::IN_MESSAGES), 8.0, 20.0), // Wide range around 16
(
build_metric_name(nats_client::OUT_OVERHEAD_BYTES),
2500.0,
8000.0,
), // Wide range around 5524
(build_metric_name(nats_client::OUT_MESSAGES), 8.0, 20.0), // Wide range around 16
// Component NATS metrics
(build_metric_name(nats::AVG_PROCESSING_MS), 0.0, 1.0), // Low processing time
(build_metric_name(nats::TOTAL_ERRORS), 0.0, 0.0), // No errors
(build_metric_name(nats::TOTAL_REQUESTS), 0.0, 0.0), // No work handler requests
(build_metric_name(nats::TOTAL_PROCESSING_MS), 0.0, 5.0), // Low total processing time
(build_metric_name(nats::ACTIVE_SERVICES), 0.0, 2.0), // Service may not be fully active
(build_metric_name(nats::ACTIVE_ENDPOINTS), 0.0, 2.0), // Endpoint may not be fully active
(build_metric_name(nats_service::AVG_PROCESSING_MS), 0.0, 1.0), // Low processing time
(build_metric_name(nats_service::TOTAL_ERRORS), 0.0, 0.0), // No errors
(build_metric_name(nats_service::TOTAL_REQUESTS), 0.0, 0.0), // No work handler requests
(
build_metric_name(nats_service::TOTAL_PROCESSING_MS),
0.0,
5.0,
), // Low total processing time
(build_metric_name(nats_service::ACTIVE_SERVICES), 0.0, 2.0), // Service may not be fully active
(build_metric_name(nats_service::ACTIVE_ENDPOINTS), 0.0, 2.0), // Endpoint may not be fully active
// Work handler metrics
(build_metric_name(work_handler::REQUESTS_TOTAL), 10.0, 10.0), // 10 messages
(
......
......@@ -32,69 +32,87 @@ pub mod labels {
pub const ENDPOINT: &str = "dynamo_endpoint";
}
/// NATS Prometheus metric names
pub mod nats {
/// NATS client metrics. DistributedRuntime contains a NATS client shared by all children)
pub mod nats_client {
/// Macro to generate NATS client metric names with the prefix
macro_rules! nats_client_name {
($name:expr) => {
concat!("nats_client_", $name)
};
}
/// Prefix for all NATS client metrics
pub const PREFIX: &str = "nats_";
pub const PREFIX: &str = nats_client_name!("");
/// ===== DistributedRuntime metrics =====
/// Total number of bytes received by NATS client
pub const IN_TOTAL_BYTES: &str = "nats_in_total_bytes";
pub const IN_TOTAL_BYTES: &str = nats_client_name!("in_total_bytes");
/// Total number of bytes sent by NATS client
pub const OUT_OVERHEAD_BYTES: &str = "nats_out_overhead_bytes";
pub const OUT_OVERHEAD_BYTES: &str = nats_client_name!("out_overhead_bytes");
/// Total number of messages received by NATS client
pub const IN_MESSAGES: &str = "nats_in_messages";
pub const IN_MESSAGES: &str = nats_client_name!("in_messages");
/// Total number of messages sent by NATS client
pub const OUT_MESSAGES: &str = "nats_out_messages";
pub const OUT_MESSAGES: &str = nats_client_name!("out_messages");
/// Total number of connections established by NATS client
pub const CONNECTS: &str = "nats_connects";
pub const CONNECTS: &str = nats_client_name!("connects");
/// Current connection state of NATS client (0=disconnected, 1=connected, 2=reconnecting)
pub const CONNECTION_STATE: &str = "nats_connection_state";
pub const CONNECTION_STATE: &str = nats_client_name!("connection_state");
}
/// NATS service metrics, from the $SRV.STATS.<service_name> requests on NATS server
pub mod nats_service {
/// Macro to generate NATS service metric names with the prefix
macro_rules! nats_service_name {
($name:expr) => {
concat!("nats_service_", $name)
};
}
/// Prefix for all NATS service metrics
pub const PREFIX: &str = nats_service_name!("");
/// ===== Component metrics (ordered to match NatsStatsMetrics fields) =====
/// Average processing time in milliseconds (maps to: average_processing_time in ms)
pub const AVG_PROCESSING_MS: &str = "nats_avg_processing_time_ms";
pub const AVG_PROCESSING_MS: &str = nats_service_name!("avg_processing_time_ms");
/// Total errors across all endpoints (maps to: num_errors)
pub const TOTAL_ERRORS: &str = "nats_total_errors";
pub const TOTAL_ERRORS: &str = nats_service_name!("total_errors");
/// Total requests across all endpoints (maps to: num_requests)
pub const TOTAL_REQUESTS: &str = "nats_total_requests";
pub const TOTAL_REQUESTS: &str = nats_service_name!("total_requests");
/// Total processing time in milliseconds (maps to: processing_time in ms)
pub const TOTAL_PROCESSING_MS: &str = "nats_total_processing_time_ms";
pub const TOTAL_PROCESSING_MS: &str = nats_service_name!("total_processing_time_ms");
/// Number of active services (derived from ServiceSet.services)
pub const ACTIVE_SERVICES: &str = "nats_active_services";
pub const ACTIVE_SERVICES: &str = nats_service_name!("active_services");
/// Number of active endpoints (derived from ServiceInfo.endpoints)
pub const ACTIVE_ENDPOINTS: &str = "nats_active_endpoints";
pub const ACTIVE_ENDPOINTS: &str = nats_service_name!("active_endpoints");
}
/// All NATS client Prometheus metric names as an array for iteration/validation
pub const DRT_NATS_METRICS: &[&str] = &[
nats::CONNECTION_STATE,
nats::CONNECTS,
nats::IN_TOTAL_BYTES,
nats::IN_MESSAGES,
nats::OUT_OVERHEAD_BYTES,
nats::OUT_MESSAGES,
nats_client::CONNECTION_STATE,
nats_client::CONNECTS,
nats_client::IN_TOTAL_BYTES,
nats_client::IN_MESSAGES,
nats_client::OUT_OVERHEAD_BYTES,
nats_client::OUT_MESSAGES,
];
/// All component service Prometheus metric names as an array for iteration/validation
/// (ordered to match NatsStatsMetrics fields)
pub const COMPONENT_NATS_METRICS: &[&str] = &[
nats::AVG_PROCESSING_MS, // maps to: average_processing_time (nanoseconds)
nats::TOTAL_ERRORS, // maps to: num_errors
nats::TOTAL_REQUESTS, // maps to: num_requests
nats::TOTAL_PROCESSING_MS, // maps to: processing_time (nanoseconds)
nats::ACTIVE_SERVICES, // derived from ServiceSet.services
nats::ACTIVE_ENDPOINTS, // derived from ServiceInfo.endpoints
nats_service::AVG_PROCESSING_MS, // maps to: average_processing_time (nanoseconds)
nats_service::TOTAL_ERRORS, // maps to: num_errors
nats_service::TOTAL_REQUESTS, // maps to: num_requests
nats_service::TOTAL_PROCESSING_MS, // maps to: processing_time (nanoseconds)
nats_service::ACTIVE_SERVICES, // derived from ServiceSet.services
nats_service::ACTIVE_ENDPOINTS, // derived from ServiceInfo.endpoints
];
/// Work handler Prometheus metric names
......
......@@ -22,7 +22,7 @@
use crate::{
component::Component,
error,
metrics::{prometheus_names, MetricsRegistry},
metrics::{prometheus_names, prometheus_names::nats_service, MetricsRegistry},
traits::*,
transports::nats,
utils::stream,
......@@ -318,67 +318,79 @@ mod tests {
/// Flow: NATS Service → NatsStatsMetrics (Counters) → Metrics Callback → Prometheus Gauge
/// Note: These are snapshots updated when execute_metrics_callbacks() is called.
#[derive(Debug, Clone)]
pub struct ComponentNatsPrometheusMetrics {
pub struct ComponentNatsServerPrometheusMetrics {
/// Average processing time in milliseconds (maps to: average_processing_time)
pub avg_processing_ms: prometheus::Gauge,
pub service_avg_processing_ms: prometheus::Gauge,
/// Total errors across all endpoints (maps to: num_errors)
pub total_errors: prometheus::IntGauge,
pub service_total_errors: prometheus::IntGauge,
/// Total requests across all endpoints (maps to: num_requests)
pub total_requests: prometheus::IntGauge,
pub service_total_requests: prometheus::IntGauge,
/// Total processing time in milliseconds (maps to: processing_time)
pub total_processing_ms: prometheus::IntGauge,
pub service_total_processing_ms: prometheus::IntGauge,
/// Number of active services (derived from ServiceSet.services)
pub active_services: prometheus::IntGauge,
pub service_active_services: prometheus::IntGauge,
/// Number of active endpoints (derived from ServiceInfo.endpoints)
pub active_endpoints: prometheus::IntGauge,
pub service_active_endpoints: prometheus::IntGauge,
}
impl ComponentNatsPrometheusMetrics {
impl ComponentNatsServerPrometheusMetrics {
/// Create new ComponentServiceMetrics using Component's DistributedRuntime's Prometheus constructors
pub fn new(component: &Component) -> Result<Self> {
let avg_processing_ms = component.create_gauge(
prometheus_names::nats::AVG_PROCESSING_MS,
let service_name = component.service_name();
// Build labels: service_name first, then component's labels
let mut labels_vec = vec![("service_name", service_name.as_str())];
// Add component's labels (convert from (String, String) to (&str, &str))
for (key, value) in component.labels() {
labels_vec.push((key.as_str(), value.as_str()));
}
let labels: &[(&str, &str)] = &labels_vec;
let service_avg_processing_ms = component.create_gauge(
nats_service::AVG_PROCESSING_MS,
"Average processing time across all component endpoints in milliseconds",
&[],
labels,
)?;
let total_errors = component.create_intgauge(
prometheus_names::nats::TOTAL_ERRORS,
let service_total_errors = component.create_intgauge(
nats_service::TOTAL_ERRORS,
"Total number of errors across all component endpoints",
&[],
labels,
)?;
let total_requests = component.create_intgauge(
prometheus_names::nats::TOTAL_REQUESTS,
let service_total_requests = component.create_intgauge(
nats_service::TOTAL_REQUESTS,
"Total number of requests across all component endpoints",
&[],
labels,
)?;
let total_processing_ms = component.create_intgauge(
prometheus_names::nats::TOTAL_PROCESSING_MS,
let service_total_processing_ms = component.create_intgauge(
nats_service::TOTAL_PROCESSING_MS,
"Total processing time across all component endpoints in milliseconds",
&[],
labels,
)?;
let active_services = component.create_intgauge(
prometheus_names::nats::ACTIVE_SERVICES,
let service_active_services = component.create_intgauge(
nats_service::ACTIVE_SERVICES,
"Number of active services in this component",
&[],
labels,
)?;
let active_endpoints = component.create_intgauge(
prometheus_names::nats::ACTIVE_ENDPOINTS,
let service_active_endpoints = component.create_intgauge(
nats_service::ACTIVE_ENDPOINTS,
"Number of active endpoints across all services",
&[],
labels,
)?;
Ok(Self {
avg_processing_ms,
total_errors,
total_requests,
total_processing_ms,
active_services,
active_endpoints,
service_avg_processing_ms,
service_total_errors,
service_total_requests,
service_total_processing_ms,
service_active_services,
service_active_endpoints,
})
}
......@@ -414,26 +426,26 @@ impl ComponentNatsPrometheusMetrics {
if processing_time_samples > 0 && total_requests > 0 {
let avg_time_nanos = total_processing_time_nanos as f64 / total_requests as f64;
let avg_time_ms = avg_time_nanos / 1_000_000.0; // Convert nanoseconds to milliseconds
self.avg_processing_ms.set(avg_time_ms);
self.service_avg_processing_ms.set(avg_time_ms);
} else {
self.avg_processing_ms.set(0.0);
self.service_avg_processing_ms.set(0.0);
}
self.total_errors.set(total_errors as i64); // maps to: num_errors
self.total_requests.set(total_requests as i64); // maps to: num_requests
self.total_processing_ms
self.service_total_errors.set(total_errors as i64); // maps to: num_errors
self.service_total_requests.set(total_requests as i64); // maps to: num_requests
self.service_total_processing_ms
.set((total_processing_time_nanos / 1_000_000) as i64); // maps to: processing_time (converted to milliseconds)
self.active_services.set(service_count); // derived from ServiceSet.services
self.active_endpoints.set(endpoint_count as i64); // derived from ServiceInfo.endpoints
self.service_active_services.set(service_count); // derived from ServiceSet.services
self.service_active_endpoints.set(endpoint_count as i64); // derived from ServiceInfo.endpoints
}
/// Reset all metrics to zero. Useful when no data is available or to clear stale values.
pub fn reset_to_zeros(&self) {
self.avg_processing_ms.set(0.0);
self.total_errors.set(0);
self.total_requests.set(0);
self.total_processing_ms.set(0);
self.active_services.set(0);
self.active_endpoints.set(0);
self.service_avg_processing_ms.set(0.0);
self.service_total_errors.set(0);
self.service_total_requests.set(0);
self.service_total_processing_ms.set(0);
self.service_active_services.set(0);
self.service_active_endpoints.set(0);
}
}
......@@ -78,11 +78,28 @@ impl SystemStatusState {
/// Create new system status server state with the provided metrics registry
pub fn new(drt: Arc<crate::DistributedRuntime>) -> anyhow::Result<Self> {
// Note: This metric is created at the DRT level (no namespace), so it will be prefixed with "dynamo_component_"
let uptime_gauge = drt.as_ref().create_gauge(
let uptime_gauge = match drt.as_ref().create_gauge(
"uptime_seconds",
"Total uptime of the DistributedRuntime in seconds",
&[],
)?;
) {
Ok(gauge) => gauge,
Err(e) if e.to_string().contains("Duplicate metrics") => {
// If the metric already exists, get it from the registry
// This can happen when SystemStatusState is created multiple times in tests
tracing::debug!(
"uptime_seconds metric already registered, retrieving existing metric"
);
// Create a non-http gauge since we can't retrieve the existing one easily
// The important thing is that the metric is registered in the registry
prometheus::Gauge::new(
"uptime_seconds",
"Total uptime of the DistributedRuntime in seconds",
)
.map_err(|e| anyhow::anyhow!("Failed to create dummy gauge: {}", e))?
}
Err(e) => return Err(e),
};
let state = Self {
root_drt: drt,
start_time: OnceLock::new(),
......@@ -387,30 +404,37 @@ mod tests {
// Test that metrics have correct namespace
temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
let drt = create_test_drt_async().await;
let system_status = SystemStatusState::new(Arc::new(drt)).unwrap();
// Initialize start time
system_status.initialize_start_time().unwrap();
// SystemStatusState is already created in distributed.rs when DYN_SYSTEM_ENABLED=false
// so we don't need to create it again here
system_status.uptime_gauge.set(42.0);
let response = system_status.drt().prometheus_metrics_fmt().unwrap();
// The uptime_seconds metric should already be registered and available
let response = drt.prometheus_metrics_fmt().unwrap();
println!("Full metrics response:\n{}", response);
// Filter out NATS client metrics for comparison
use crate::metrics::prometheus_names::nats as nats_metrics;
use crate::metrics::prometheus_names::{nats_client, nats_service};
let filtered_response: String = response
.lines()
.filter(|line| !line.contains(nats_metrics::PREFIX))
.filter(|line| {
!line.contains(nats_client::PREFIX) && !line.contains(nats_service::PREFIX)
})
.collect::<Vec<_>>()
.join("\n");
let expected = "\
# HELP dynamo_component_uptime_seconds Total uptime of the DistributedRuntime in seconds
# TYPE dynamo_component_uptime_seconds gauge
dynamo_component_uptime_seconds 42";
assert_eq!(filtered_response, expected);
// Check that uptime_seconds metric is present with correct namespace
assert!(
filtered_response.contains("# HELP dynamo_component_uptime_seconds"),
"Should contain uptime_seconds help text"
);
assert!(
filtered_response.contains("# TYPE dynamo_component_uptime_seconds gauge"),
"Should contain uptime_seconds type"
);
assert!(
filtered_response.contains("dynamo_component_uptime_seconds"),
"Should contain uptime_seconds metric with correct namespace"
);
})
.await;
}
......
......@@ -44,7 +44,7 @@ use tokio::time;
use url::Url;
use validator::{Validate, ValidationError};
use crate::metrics::prometheus_names::nats as nats_metrics;
use crate::metrics::prometheus_names::nats_client as nats_metrics;
pub use crate::slug::Slug;
use tracing as log;
......@@ -520,7 +520,7 @@ impl NatsQueue {
/// Flow: NATS Client → Client Statistics → set_from_client_stats() → Prometheus Gauge
/// Note: These are snapshots updated when set_from_client_stats() is called.
#[derive(Debug, Clone)]
pub struct DRTNatsPrometheusMetrics {
pub struct DRTNatsClientPrometheusMetrics {
nats_client: client::Client,
/// Number of bytes received (excluding protocol overhead)
pub in_bytes: IntGauge,
......@@ -536,7 +536,7 @@ pub struct DRTNatsPrometheusMetrics {
pub connection_state: IntGauge,
}
impl DRTNatsPrometheusMetrics {
impl DRTNatsClientPrometheusMetrics {
/// Create a new instance of NATS client metrics using a DistributedRuntime's Prometheus constructors
pub fn new(drt: &crate::DistributedRuntime, nats_client: client::Client) -> Result<Self> {
let in_bytes = drt.create_intgauge(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment