Unverified Commit 92151e3e authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

fix: move metrics registration to service creation (#2664)


Co-authored-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent dc56d6ce
...@@ -573,39 +573,11 @@ impl Namespace { ...@@ -573,39 +573,11 @@ impl Namespace {
/// Create a [`Component`] in the namespace who's endpoints can be discovered with etcd /// Create a [`Component`] in the namespace who's endpoints can be discovered with etcd
pub fn component(&self, name: impl Into<String>) -> Result<Component> { pub fn component(&self, name: impl Into<String>) -> Result<Component> {
let component = ComponentBuilder::from_runtime(self.runtime.clone()) Ok(ComponentBuilder::from_runtime(self.runtime.clone())
.name(name) .name(name)
.namespace(self.clone()) .namespace(self.clone())
.is_static(self.is_static) .is_static(self.is_static)
.build()?; .build()?)
// Register the metrics callback for this component.
// If registration fails, log a warning but do not propagate the error,
// as metrics are not mission critical and should not block component creation.
if let Err(err) = component.start_scraping_nats_service_component_metrics() {
let error_str = err.to_string();
// Check if this is a duplicate metrics registration (expected in some cases)
// or a different error (unexpected)
if error_str.contains("Duplicate metrics") {
// This is not a critical error because it's possible for multiple Components
// with the same service_name to register metrics callbacks.
tracing::debug!(
"Duplicate metrics registration for component '{}' (expected when multiple components share the same service_name): {}",
component.service_name(),
error_str
);
} else {
// This is unexpected and should be more visible
tracing::warn!(
"Failed to start scraping metrics for component '{}': {}",
component.service_name(),
err
);
}
}
Ok(component)
} }
/// Create a [`Namespace`] in the parent namespace /// Create a [`Namespace`] in the parent namespace
......
...@@ -99,6 +99,15 @@ impl ServiceConfigBuilder { ...@@ -99,6 +99,15 @@ impl ServiceConfigBuilder {
// drop the guard to unlock the mutex // drop the guard to unlock the mutex
drop(guard); drop(guard);
// Register metrics callback. CRITICAL: Never fail service creation for metrics issues.
if let Err(err) = component.start_scraping_nats_service_component_metrics() {
tracing::debug!(
"Metrics registration failed for '{}': {}",
component.service_name(),
err
);
}
Ok(component) Ok(component)
} }
} }
......
...@@ -272,26 +272,16 @@ impl DistributedRuntime { ...@@ -272,26 +272,16 @@ impl DistributedRuntime {
pub fn add_prometheus_metric( pub fn add_prometheus_metric(
&self, &self,
hierarchy: &str, hierarchy: &str,
metric_name: &str,
prometheus_metric: Box<dyn prometheus::core::Collector>, prometheus_metric: Box<dyn prometheus::core::Collector>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut registries = self.hierarchy_to_metricsregistry.write().unwrap(); let mut registries = self.hierarchy_to_metricsregistry.write().unwrap();
let entry = registries.entry(hierarchy.to_string()).or_default(); let entry = registries.entry(hierarchy.to_string()).or_default();
// Try to register the metric and provide better error information // Try to register the metric
match entry.prometheus_registry.register(prometheus_metric) { entry
Ok(_) => Ok(()), .prometheus_registry
Err(e) => { .register(prometheus_metric)
let error_msg = e.to_string(); .map_err(|e| e.into())
tracing::error!(
hierarchy = ?hierarchy,
error = ?error_msg,
metric_name = ?metric_name,
"Metric registration failed"
);
Err(e.into())
}
}
} }
/// Add a callback function to metrics registries for the given hierarchies /// Add a callback function to metrics registries for the given hierarchies
......
...@@ -392,7 +392,7 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>( ...@@ -392,7 +392,7 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>(
let collector: Box<dyn prometheus::core::Collector> = Box::new(prometheus_metric.clone()); let collector: Box<dyn prometheus::core::Collector> = Box::new(prometheus_metric.clone());
registry registry
.drt() .drt()
.add_prometheus_metric(&current_hierarchy, &metric_name, collector)?; .add_prometheus_metric(&current_hierarchy, collector)?;
} }
Ok(prometheus_metric) Ok(prometheus_metric)
...@@ -1384,6 +1384,9 @@ mod test_metricsregistry_nats { ...@@ -1384,6 +1384,9 @@ mod test_metricsregistry_nats {
let namespace = drt.namespace("ns789").unwrap(); let namespace = drt.namespace("ns789").unwrap();
let components = namespace.component("comp789").unwrap(); let components = namespace.component("comp789").unwrap();
// Create a service to trigger metrics callback registration
let _service = components.service_builder().create().await.unwrap();
// Get components output which should include NATS client metrics // Get components output which should include NATS client metrics
// Additional checks for NATS client metrics (without checking specific values) // Additional checks for NATS client metrics (without checking specific values)
let component_nats_metrics = let component_nats_metrics =
...@@ -1516,15 +1519,15 @@ mod test_metricsregistry_nats { ...@@ -1516,15 +1519,15 @@ mod test_metricsregistry_nats {
(build_metric_name(nats_client::CONNECTS), 1.0, 1.0), // Should have 1 connection (build_metric_name(nats_client::CONNECTS), 1.0, 1.0), // Should have 1 connection
( (
build_metric_name(nats_client::IN_TOTAL_BYTES), build_metric_name(nats_client::IN_TOTAL_BYTES),
400.0, 800.0,
1500.0, 4000.0,
), // Wide range around 923 ), // Wide range around observed value of 1888
(build_metric_name(nats_client::IN_MESSAGES), 0.0, 5.0), // Wide range around 2 (build_metric_name(nats_client::IN_MESSAGES), 0.0, 5.0), // Wide range around 2
( (
build_metric_name(nats_client::OUT_OVERHEAD_BYTES), build_metric_name(nats_client::OUT_OVERHEAD_BYTES),
700.0, 1500.0,
2500.0, 5000.0,
), // Wide range around 1633 ), // Wide range around observed value of 2752
(build_metric_name(nats_client::OUT_MESSAGES), 0.0, 5.0), // Wide range around 2 (build_metric_name(nats_client::OUT_MESSAGES), 0.0, 5.0), // Wide range around 2
// Component NATS metrics (ordered to match COMPONENT_NATS_METRICS) // Component NATS metrics (ordered to match COMPONENT_NATS_METRICS)
(build_metric_name(nats_service::AVG_PROCESSING_MS), 0.0, 0.0), // No processing yet (build_metric_name(nats_service::AVG_PROCESSING_MS), 0.0, 0.0), // No processing yet
......
...@@ -74,6 +74,8 @@ impl SystemStatusState { ...@@ -74,6 +74,8 @@ impl SystemStatusState {
/// Create new system status server state with the provided metrics registry /// Create new system status server state with the provided metrics registry
pub fn new(drt: Arc<crate::DistributedRuntime>) -> anyhow::Result<Self> { pub fn new(drt: Arc<crate::DistributedRuntime>) -> anyhow::Result<Self> {
// Note: This metric is created at the DRT level (no namespace), so it will be prefixed with "dynamo_component_" // Note: This metric is created at the DRT level (no namespace), so it will be prefixed with "dynamo_component_"
// TODO(keiven): this is part of another upcoming refactor, where we will no longer
// have this duplicate DRT (and Duplicate metrics error).
let uptime_gauge = match drt.as_ref().create_gauge( let uptime_gauge = match drt.as_ref().create_gauge(
"uptime_seconds", "uptime_seconds",
"Total uptime of the DistributedRuntime in seconds", "Total uptime of the DistributedRuntime in seconds",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment