// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 //! Metrics registry trait and implementation for Prometheus metrics //! //! This module provides a trait-based interface for creating and managing Prometheus metrics //! with automatic label injection and hierarchical naming support. pub mod prometheus_names; use parking_lot::Mutex; use std::collections::HashSet; use std::sync::Arc; use crate::component::ComponentBuilder; use anyhow; use once_cell::sync::Lazy; use regex::Regex; use std::any::Any; use std::collections::HashMap; // Import commonly used items to avoid verbose prefixes use prometheus_names::{ build_component_metric_name, labels, name_prefix, sanitize_prometheus_label, sanitize_prometheus_name, work_handler, }; // Pipeline imports for endpoint creation use crate::pipeline::{ AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, ResponseStream, SingleIn, async_trait, network::Ingress, }; use crate::protocols::annotated::Annotated; use crate::stream; use crate::stream::StreamExt; // Prometheus imports use prometheus::Encoder; /// Validate that a label slice has no duplicate keys. /// Returns Ok(()) when all keys are unique; otherwise returns an error naming the duplicate key. fn validate_no_duplicate_label_keys(labels: &[(&str, &str)]) -> anyhow::Result<()> { let mut seen_keys = std::collections::HashSet::new(); for (key, _) in labels { if !seen_keys.insert(*key) { return Err(anyhow::anyhow!( "Duplicate label key '{}' found in labels", key )); } } Ok(()) } /// ============================== /// Prometheus section /// ============================== /// Trait that defines common behavior for Prometheus metric types pub trait PrometheusMetric: prometheus::core::Collector + Clone + Send + Sync + 'static { /// Create a new metric with the given options fn with_opts(opts: prometheus::Opts) -> Result where Self: Sized; /// Create a new metric with histogram options and custom buckets /// This is a default implementation that will panic for non-histogram metrics fn with_histogram_opts_and_buckets( _opts: prometheus::HistogramOpts, _buckets: Option>, ) -> Result where Self: Sized, { panic!("with_histogram_opts_and_buckets is not implemented for this metric type"); } /// Create a new metric with counter options and label names (for CounterVec) /// This is a default implementation that will panic for non-countervec metrics fn with_opts_and_label_names( _opts: prometheus::Opts, _label_names: &[&str], ) -> Result where Self: Sized, { panic!("with_opts_and_label_names is not implemented for this metric type"); } } // Implement the trait for Counter, IntCounter, and Gauge impl PrometheusMetric for prometheus::Counter { fn with_opts(opts: prometheus::Opts) -> Result { prometheus::Counter::with_opts(opts) } } impl PrometheusMetric for prometheus::IntCounter { fn with_opts(opts: prometheus::Opts) -> Result { prometheus::IntCounter::with_opts(opts) } } impl PrometheusMetric for prometheus::Gauge { fn with_opts(opts: prometheus::Opts) -> Result { prometheus::Gauge::with_opts(opts) } } impl PrometheusMetric for prometheus::IntGauge { fn with_opts(opts: prometheus::Opts) -> Result { prometheus::IntGauge::with_opts(opts) } } impl PrometheusMetric for prometheus::GaugeVec { fn with_opts(_opts: prometheus::Opts) -> Result { Err(prometheus::Error::Msg( "GaugeVec requires label names, use with_opts_and_label_names instead".to_string(), )) } fn with_opts_and_label_names( opts: prometheus::Opts, label_names: &[&str], ) -> Result { prometheus::GaugeVec::new(opts, label_names) } } impl PrometheusMetric for prometheus::IntGaugeVec { fn with_opts(_opts: prometheus::Opts) -> Result { Err(prometheus::Error::Msg( "IntGaugeVec requires label names, use with_opts_and_label_names instead".to_string(), )) } fn with_opts_and_label_names( opts: prometheus::Opts, label_names: &[&str], ) -> Result { prometheus::IntGaugeVec::new(opts, label_names) } } impl PrometheusMetric for prometheus::IntCounterVec { fn with_opts(_opts: prometheus::Opts) -> Result { Err(prometheus::Error::Msg( "IntCounterVec requires label names, use with_opts_and_label_names instead".to_string(), )) } fn with_opts_and_label_names( opts: prometheus::Opts, label_names: &[&str], ) -> Result { prometheus::IntCounterVec::new(opts, label_names) } } // Implement the trait for Histogram impl PrometheusMetric for prometheus::Histogram { fn with_opts(opts: prometheus::Opts) -> Result { // Convert Opts to HistogramOpts let histogram_opts = prometheus::HistogramOpts::new(opts.name, opts.help); prometheus::Histogram::with_opts(histogram_opts) } fn with_histogram_opts_and_buckets( mut opts: prometheus::HistogramOpts, buckets: Option>, ) -> Result { if let Some(custom_buckets) = buckets { opts = opts.buckets(custom_buckets); } prometheus::Histogram::with_opts(opts) } } // Implement the trait for CounterVec impl PrometheusMetric for prometheus::CounterVec { fn with_opts(_opts: prometheus::Opts) -> Result { // This will panic - CounterVec needs label names panic!("CounterVec requires label names, use with_opts_and_label_names instead"); } fn with_opts_and_label_names( opts: prometheus::Opts, label_names: &[&str], ) -> Result { prometheus::CounterVec::new(opts, label_names) } } /// ============================== /// Metrics section /// ============================== /// Public helper function to create metrics - accessible for Python bindings pub fn create_metric( hierarchy: &H, metric_name: &str, metric_desc: &str, labels: &[(&str, &str)], buckets: Option>, const_labels: Option<&[&str]>, ) -> anyhow::Result { // Validate that user-provided labels don't have duplicate keys validate_no_duplicate_label_keys(labels)?; // Note: stored labels functionality has been removed let basename = hierarchy.basename(); let parent_hierarchies = hierarchy.parent_hierarchies(); // Build hierarchy path as vector of strings: parent names + [basename] let mut hierarchy_names: Vec = parent_hierarchies.iter().map(|p| p.basename()).collect(); hierarchy_names.push(basename.clone()); let metric_name = build_component_metric_name(metric_name); // Build updated_labels: auto-labels first, then `labels` + stored labels let mut updated_labels: Vec<(String, String)> = Vec::new(); // Auto-label injection: Always add dynamo_namespace, dynamo_component, dynamo_endpoint labels // based on the hierarchy. Label constants defined in prometheus_names.rs labels module. // // Python counterpart: components/src/dynamo/common/utils/prometheus.py register_engine_metrics_callback() // Validate that user-provided labels don't conflict with auto-generated labels for (key, _) in labels { if *key == labels::NAMESPACE || *key == labels::COMPONENT || *key == labels::ENDPOINT { return Err(anyhow::anyhow!( "Label '{}' is automatically added by auto-label injection and cannot be manually set", key )); } } // Add auto-generated labels with sanitized values // Hierarchy: [drt, namespace, component, endpoint] if hierarchy_names.len() > 1 { let namespace = &hierarchy_names[1]; if !namespace.is_empty() { let valid_namespace = sanitize_prometheus_label(namespace)?; if !valid_namespace.is_empty() { updated_labels.push((labels::NAMESPACE.to_string(), valid_namespace)); } } } if hierarchy_names.len() > 2 { let component = &hierarchy_names[2]; if !component.is_empty() { let valid_component = sanitize_prometheus_label(component)?; if !valid_component.is_empty() { updated_labels.push((labels::COMPONENT.to_string(), valid_component)); } } } if hierarchy_names.len() > 3 { let endpoint = &hierarchy_names[3]; if !endpoint.is_empty() { let valid_endpoint = sanitize_prometheus_label(endpoint)?; if !valid_endpoint.is_empty() { updated_labels.push((labels::ENDPOINT.to_string(), valid_endpoint)); } } } // Add user labels updated_labels.extend( labels .iter() .map(|(k, v)| ((*k).to_string(), (*v).to_string())), ); // Note: stored labels functionality has been removed // Handle different metric types let prometheus_metric = if std::any::TypeId::of::() == std::any::TypeId::of::() { // Special handling for CounterVec with label names // const_labels parameter is required for CounterVec if buckets.is_some() { return Err(anyhow::anyhow!( "buckets parameter is not valid for CounterVec" )); } let mut opts = prometheus::Opts::new(&metric_name, metric_desc); for (key, value) in &updated_labels { opts = opts.const_label(key.clone(), value.clone()); } let label_names = const_labels .ok_or_else(|| anyhow::anyhow!("CounterVec requires const_labels parameter"))?; T::with_opts_and_label_names(opts, label_names)? } else if std::any::TypeId::of::() == std::any::TypeId::of::() { // Special handling for GaugeVec with label names // const_labels parameter is required for GaugeVec if buckets.is_some() { return Err(anyhow::anyhow!( "buckets parameter is not valid for GaugeVec" )); } let mut opts = prometheus::Opts::new(&metric_name, metric_desc); for (key, value) in &updated_labels { opts = opts.const_label(key.clone(), value.clone()); } let label_names = const_labels .ok_or_else(|| anyhow::anyhow!("GaugeVec requires const_labels parameter"))?; T::with_opts_and_label_names(opts, label_names)? } else if std::any::TypeId::of::() == std::any::TypeId::of::() { // Special handling for Histogram with custom buckets // buckets parameter is valid for Histogram, const_labels is not used if const_labels.is_some() { return Err(anyhow::anyhow!( "const_labels parameter is not valid for Histogram" )); } let mut opts = prometheus::HistogramOpts::new(&metric_name, metric_desc); for (key, value) in &updated_labels { opts = opts.const_label(key.clone(), value.clone()); } T::with_histogram_opts_and_buckets(opts, buckets)? } else if std::any::TypeId::of::() == std::any::TypeId::of::() { // Special handling for IntCounterVec with label names // const_labels parameter is required for IntCounterVec if buckets.is_some() { return Err(anyhow::anyhow!( "buckets parameter is not valid for IntCounterVec" )); } let mut opts = prometheus::Opts::new(&metric_name, metric_desc); for (key, value) in &updated_labels { opts = opts.const_label(key.clone(), value.clone()); } let label_names = const_labels .ok_or_else(|| anyhow::anyhow!("IntCounterVec requires const_labels parameter"))?; T::with_opts_and_label_names(opts, label_names)? } else if std::any::TypeId::of::() == std::any::TypeId::of::() { // Special handling for IntGaugeVec with label names // const_labels parameter is required for IntGaugeVec if buckets.is_some() { return Err(anyhow::anyhow!( "buckets parameter is not valid for IntGaugeVec" )); } let mut opts = prometheus::Opts::new(&metric_name, metric_desc); for (key, value) in &updated_labels { opts = opts.const_label(key.clone(), value.clone()); } let label_names = const_labels .ok_or_else(|| anyhow::anyhow!("IntGaugeVec requires const_labels parameter"))?; T::with_opts_and_label_names(opts, label_names)? } else { // Standard handling for Counter, IntCounter, Gauge, IntGauge // buckets and const_labels parameters are not valid for these types if buckets.is_some() { return Err(anyhow::anyhow!( "buckets parameter is not valid for Counter, IntCounter, Gauge, or IntGauge" )); } if const_labels.is_some() { return Err(anyhow::anyhow!( "const_labels parameter is not valid for Counter, IntCounter, Gauge, or IntGauge" )); } let mut opts = prometheus::Opts::new(&metric_name, metric_desc); for (key, value) in &updated_labels { opts = opts.const_label(key.clone(), value.clone()); } T::with_opts(opts)? }; let collector: Box = Box::new(prometheus_metric.clone()); hierarchy.get_metrics_registry().add_metric(collector)?; Ok(prometheus_metric) } /// Wrapper struct that provides access to metrics functionality /// This struct is accessed via the `.metrics()` method on DistributedRuntime, Namespace, Component, and Endpoint pub struct Metrics { hierarchy: H, } impl Metrics { pub fn new(hierarchy: H) -> Self { Self { hierarchy } } // TODO: Add support for additional Prometheus metric types: // - Counter: ✅ IMPLEMENTED - create_counter() // - CounterVec: ✅ IMPLEMENTED - create_countervec() // - Gauge: ✅ IMPLEMENTED - create_gauge() // - GaugeVec: ✅ IMPLEMENTED - create_gaugevec() // - GaugeHistogram: create_gauge_histogram() - for gauge histograms // - Histogram: ✅ IMPLEMENTED - create_histogram() // - HistogramVec with custom buckets: create_histogram_with_buckets() // - Info: create_info() - for info metrics with labels // - IntCounter: ✅ IMPLEMENTED - create_intcounter() // - IntCounterVec: ✅ IMPLEMENTED - create_intcountervec() // - IntGauge: ✅ IMPLEMENTED - create_intgauge() // - IntGaugeVec: ✅ IMPLEMENTED - create_intgaugevec() // - Stateset: create_stateset() - for state-based metrics // - Summary: create_summary() - for quantiles and sum/count metrics // - SummaryVec: create_summary_vec() - for labeled summaries // - Untyped: create_untyped() - for untyped metrics // // NOTE: The order of create_* methods below is mirrored in lib/bindings/python/rust/lib.rs::Metrics // Keep them synchronized when adding new metric types /// Create a Counter metric pub fn create_counter( &self, name: &str, description: &str, labels: &[(&str, &str)], ) -> anyhow::Result { create_metric(&self.hierarchy, name, description, labels, None, None) } /// Create a CounterVec metric with label names (for dynamic labels) pub fn create_countervec( &self, name: &str, description: &str, const_labels: &[&str], const_label_values: &[(&str, &str)], ) -> anyhow::Result { create_metric( &self.hierarchy, name, description, const_label_values, None, Some(const_labels), ) } /// Create a Gauge metric pub fn create_gauge( &self, name: &str, description: &str, labels: &[(&str, &str)], ) -> anyhow::Result { create_metric(&self.hierarchy, name, description, labels, None, None) } /// Create a GaugeVec metric with label names (for dynamic labels) pub fn create_gaugevec( &self, name: &str, description: &str, const_labels: &[&str], const_label_values: &[(&str, &str)], ) -> anyhow::Result { create_metric( &self.hierarchy, name, description, const_label_values, None, Some(const_labels), ) } /// Create a Histogram metric with custom buckets pub fn create_histogram( &self, name: &str, description: &str, labels: &[(&str, &str)], buckets: Option>, ) -> anyhow::Result { create_metric(&self.hierarchy, name, description, labels, buckets, None) } /// Create an IntCounter metric pub fn create_intcounter( &self, name: &str, description: &str, labels: &[(&str, &str)], ) -> anyhow::Result { create_metric(&self.hierarchy, name, description, labels, None, None) } /// Create an IntCounterVec metric with label names (for dynamic labels) pub fn create_intcountervec( &self, name: &str, description: &str, const_labels: &[&str], const_label_values: &[(&str, &str)], ) -> anyhow::Result { create_metric( &self.hierarchy, name, description, const_label_values, None, Some(const_labels), ) } /// Create an IntGauge metric pub fn create_intgauge( &self, name: &str, description: &str, labels: &[(&str, &str)], ) -> anyhow::Result { create_metric(&self.hierarchy, name, description, labels, None, None) } /// Create an IntGaugeVec metric with label names (for dynamic labels) pub fn create_intgaugevec( &self, name: &str, description: &str, const_labels: &[&str], const_label_values: &[(&str, &str)], ) -> anyhow::Result { create_metric( &self.hierarchy, name, description, const_label_values, None, Some(const_labels), ) } /// Get metrics in Prometheus text format pub fn prometheus_expfmt(&self) -> anyhow::Result { self.hierarchy .get_metrics_registry() .prometheus_expfmt_combined() } } /// This trait should be implemented by all metric registries, including Prometheus, Envy, OpenTelemetry, and others. /// It offers a unified interface for creating and managing metrics, organizing sub-registries, and /// generating output in Prometheus text format. use crate::traits::DistributedRuntimeProvider; pub trait MetricsHierarchy: Send + Sync { // ======================================================================== // Required methods - must be implemented by all types // ======================================================================== /// Get the name of this hierarchy (without any hierarchy prefix) fn basename(&self) -> String; /// Get the parent hierarchies as actual objects (not strings) /// Returns a vector of hierarchy references, ordered from root to immediate parent. /// For example, an Endpoint would return [DRT, Namespace, Component]. fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy>; /// Get a reference to this hierarchy's metrics registry fn get_metrics_registry(&self) -> &MetricsRegistry; // ======================================================================== // Provided methods - have default implementations // ======================================================================== /// Access the metrics interface for this hierarchy /// This is a provided method that works for any type implementing MetricsHierarchy fn metrics(&self) -> Metrics<&Self> where Self: Sized, { Metrics::new(self) } } // Blanket implementation for references to types that implement MetricsHierarchy impl MetricsHierarchy for &T { fn basename(&self) -> String { (**self).basename() } fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> { (**self).parent_hierarchies() } fn get_metrics_registry(&self) -> &MetricsRegistry { (**self).get_metrics_registry() } } /// Type alias for runtime callback functions to reduce complexity /// /// This type represents an Arc-wrapped callback function that can be: /// - Shared efficiently across multiple threads and contexts /// - Cloned without duplicating the underlying closure /// - Used in generic contexts requiring 'static lifetime /// /// The Arc wrapper is included in the type to make sharing explicit. pub type PrometheusUpdateCallback = Arc anyhow::Result<()> + Send + Sync + 'static>; /// Type alias for exposition text callback functions that return Prometheus text pub type PrometheusExpositionFormatCallback = Arc anyhow::Result + Send + Sync + 'static>; /// Structure to hold Prometheus registries and associated callbacks for a given hierarchy. /// /// All fields are Arc-wrapped, so cloning shares state. This ensures metrics registered /// on cloned instances (e.g., cloned Client/Endpoint) are visible to the original. #[derive(Clone)] pub struct MetricsRegistry { /// The Prometheus registry for this hierarchy. /// Arc-wrapped so clones share the same registry (metrics registered on clones are visible everywhere). pub prometheus_registry: Arc>, /// Child registries included when emitting combined `/metrics` output. /// /// Why this exists: /// - Previously, `create_metric()` registered every collector into *all* parent registries /// (Endpoint → Component → Namespace → DRT) so scraping the root registry included everything. /// - That fan-out caused Prometheus collisions when different endpoints tried to register the /// same metric name with different const-labels (descriptor mismatch). /// /// We now register metrics only into the local hierarchy registry to avoid collisions. /// `child_registries` rebuilds “what to scrape” as a tree of registries so `/metrics` can: /// - traverse registries recursively, /// - merge metric families into one exposition payload, /// - warn/drop exact duplicate series, while allowing same metric name with different labels. child_registries: Arc>>, /// Update callbacks invoked before metrics are scraped. /// Wrapped in Arc to preserve callbacks across clones (prevents callback loss when MetricsRegistry is cloned). pub prometheus_update_callbacks: Arc>>, /// Callbacks that return Prometheus exposition text appended to metrics output. /// Wrapped in Arc to preserve callbacks across clones (e.g., vLLM callbacks registered at Endpoint remain accessible at DRT). pub prometheus_expfmt_callbacks: Arc>>, } impl std::fmt::Debug for MetricsRegistry { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("MetricsRegistry") .field("prometheus_registry", &">") .field( "prometheus_update_callbacks", &format!( ">> with {} callbacks", self.prometheus_update_callbacks.read().unwrap().len() ), ) .field( "prometheus_expfmt_callbacks", &format!( ">> with {} callbacks", self.prometheus_expfmt_callbacks.read().unwrap().len() ), ) .finish() } } impl MetricsRegistry { /// Create a new metrics registry with an empty Prometheus registry and callback lists pub fn new() -> Self { Self { prometheus_registry: Arc::new(std::sync::RwLock::new(prometheus::Registry::new())), child_registries: Arc::new(std::sync::RwLock::new(Vec::new())), prometheus_update_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())), prometheus_expfmt_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())), } } /// Add a child registry to be included in combined /metrics output. /// /// Dedup is by underlying Prometheus registry pointer, so repeated registration via clones is safe. pub fn add_child_registry(&self, child: &MetricsRegistry) { let child_ptr = Arc::as_ptr(&child.prometheus_registry); let mut guard = self.child_registries.write().unwrap(); if guard .iter() .any(|r| Arc::as_ptr(&r.prometheus_registry) == child_ptr) { return; } guard.push(child.clone()); } fn registries_for_combined_scrape(&self) -> Vec { // Traverse child registries recursively so `prometheus_expfmt()` on any hierarchy // (DRT/namespace/component/endpoint) includes metrics from its descendants. // // Dedup by underlying Prometheus registry pointer so multiple paths (e.g. also registering // directly on the root) won't duplicate output. fn visit( registry: &MetricsRegistry, out: &mut Vec, seen: &mut HashSet<*const std::sync::RwLock>, ) { let ptr = Arc::as_ptr(®istry.prometheus_registry); if !seen.insert(ptr) { return; } out.push(registry.clone()); let children: Vec = registry .child_registries .read() .unwrap() .iter() .cloned() .collect(); for child in children { visit(&child, out, seen); } } let mut out = Vec::new(); let mut seen: HashSet<*const std::sync::RwLock> = HashSet::new(); visit(self, &mut out, &mut seen); out } /// Combine metrics across this registry and all registered children into one Prometheus exposition output. /// /// - Families are merged by name; HELP and TYPE must match. /// - Multiple series for the same name are allowed if labels differ. /// - Exact duplicate series (same name + identical label pairs) are warned and dropped. pub fn prometheus_expfmt_combined(&self) -> anyhow::Result { let registries = self.registries_for_combined_scrape(); // Run per-registry update callbacks first. for registry in ®istries { for result in registry.execute_update_callbacks() { if let Err(e) = result { tracing::error!("Error executing metrics callback: {}", e); } } } // Merge metric families. let mut by_name: HashMap = HashMap::new(); let mut seen_series: HashSet = HashSet::new(); for (registry_idx, registry) in registries.iter().enumerate() { let families = registry.get_prometheus_registry().gather(); for mut family in families { let name = family.name().to_string(); let entry = by_name.entry(name.clone()).or_insert_with(|| { let mut out = prometheus::proto::MetricFamily::new(); out.set_name(name.clone()); out.set_help(family.help().to_string()); out.set_field_type(family.get_field_type()); out }); if entry.help() != family.help() || entry.get_field_type() != family.get_field_type() { return Err(anyhow::anyhow!( "Metric family '{}' has inconsistent help/type across registries (idx={})", name, registry_idx )); } let mut metrics = family.take_metric(); for metric in metrics.drain(..) { let mut labels: Vec<(String, String)> = metric .get_label() .iter() .map(|lp| (lp.name().to_string(), lp.value().to_string())) .collect(); labels.sort_by(|(ka, va), (kb, vb)| (ka, va).cmp(&(kb, vb))); let key = format!( "{}|{}", name, labels .iter() .map(|(k, v)| format!("{}={}", k, v)) .collect::>() .join(",") ); if !seen_series.insert(key) { tracing::warn!( metric_name = %name, labels = ?labels, registry_idx, "Duplicate Prometheus series while merging registries; dropping later sample" ); continue; } entry.mut_metric().push(metric); } } } let mut merged: Vec = by_name.into_values().collect(); merged.sort_by(|a, b| a.name().cmp(b.name())); let encoder = prometheus::TextEncoder::new(); let mut buffer = Vec::new(); encoder.encode(&merged, &mut buffer)?; let mut result = String::from_utf8(buffer)?; // Append expfmt callbacks deterministically in registry order. let mut expfmt = String::new(); for registry in registries { let text = registry.execute_expfmt_callbacks(); if !text.is_empty() { if !expfmt.is_empty() && !expfmt.ends_with('\n') { expfmt.push('\n'); } expfmt.push_str(&text); } } if !expfmt.is_empty() { if !result.ends_with('\n') { result.push('\n'); } result.push_str(&expfmt); } Ok(result) } /// Add a callback function that receives a reference to any MetricsHierarchy pub fn add_update_callback(&self, callback: PrometheusUpdateCallback) { self.prometheus_update_callbacks .write() .unwrap() .push(callback); } /// Add an exposition text callback that returns Prometheus text pub fn add_expfmt_callback(&self, callback: PrometheusExpositionFormatCallback) { self.prometheus_expfmt_callbacks .write() .unwrap() .push(callback); } /// Execute all update callbacks and return their results pub fn execute_update_callbacks(&self) -> Vec> { self.prometheus_update_callbacks .read() .unwrap() .iter() .map(|callback| callback()) .collect() } /// Execute all exposition text callbacks and return their concatenated text pub fn execute_expfmt_callbacks(&self) -> String { let callbacks = self.prometheus_expfmt_callbacks.read().unwrap(); let mut result = String::new(); for callback in callbacks.iter() { match callback() { Ok(text) => { if !text.is_empty() { if !result.is_empty() && !result.ends_with('\n') { result.push('\n'); } result.push_str(&text); } } Err(e) => { tracing::error!("Error executing exposition text callback: {}", e); } } } result } /// Add a Prometheus metric collector to this registry pub fn add_metric( &self, collector: Box, ) -> anyhow::Result<()> { self.prometheus_registry .write() .unwrap() .register(collector) .map_err(|e| anyhow::anyhow!("Failed to register metric: {}", e)) } /// Get a read guard to the Prometheus registry for scraping pub fn get_prometheus_registry(&self) -> std::sync::RwLockReadGuard<'_, prometheus::Registry> { self.prometheus_registry.read().unwrap() } /// Returns true if a metric with the given name already exists in the Prometheus registry pub fn has_metric_named(&self, metric_name: &str) -> bool { self.prometheus_registry .read() .unwrap() .gather() .iter() .any(|mf| mf.name() == metric_name) } } impl Default for MetricsRegistry { fn default() -> Self { Self::new() } } #[cfg(test)] mod test_helpers { use super::prometheus_names::name_prefix; use super::*; /// Base function to filter Prometheus output lines based on a predicate. /// Returns lines that match the predicate, converted to String. fn filter_prometheus_lines(input: &str, mut predicate: F) -> Vec where F: FnMut(&str) -> bool, { input .lines() .filter(|line| predicate(line)) .map(|line| line.to_string()) .collect::>() } /// Extracts all component metrics (excluding help text and type definitions). /// Returns only the actual metric lines with values. pub fn extract_metrics(input: &str) -> Vec { filter_prometheus_lines(input, |line| { line.starts_with(&format!("{}_", name_prefix::COMPONENT)) && !line.starts_with("#") && !line.trim().is_empty() }) } /// Parses a Prometheus metric line and extracts the name, labels, and value. /// Used instead of fetching metrics directly to test end-to-end results, not intermediate state. /// /// # Example /// ``` /// let line = "http_requests_total{method=\"GET\"} 1234"; /// let (name, labels, value) = parse_prometheus_metric(line).unwrap(); /// assert_eq!(name, "http_requests_total"); /// assert_eq!(labels.get("method"), Some(&"GET".to_string())); /// assert_eq!(value, 1234.0); /// ``` pub fn parse_prometheus_metric( line: &str, ) -> Option<(String, std::collections::HashMap, f64)> { if line.trim().is_empty() || line.starts_with('#') { return None; } let parts: Vec<&str> = line.split_whitespace().collect(); if parts.len() < 2 { return None; } let metric_part = parts[0]; let value: f64 = parts[1].parse().ok()?; let (name, labels) = if metric_part.contains('{') { let brace_start = metric_part.find('{').unwrap(); let brace_end = metric_part.rfind('}').unwrap_or(metric_part.len()); let name = &metric_part[..brace_start]; let labels_str = &metric_part[brace_start + 1..brace_end]; let mut labels = std::collections::HashMap::new(); for pair in labels_str.split(',') { if let Some((k, v)) = pair.split_once('=') { let v = v.trim_matches('"'); labels.insert(k.trim().to_string(), v.to_string()); } } (name.to_string(), labels) } else { (metric_part.to_string(), std::collections::HashMap::new()) }; Some((name, labels, value)) } } #[cfg(test)] mod test_metricsregistry_units { use super::*; #[test] fn test_build_component_metric_name_with_prefix() { // Test that build_component_metric_name correctly prepends the dynamo_component prefix let result = build_component_metric_name("requests"); assert_eq!(result, "dynamo_component_requests"); let result = build_component_metric_name("counter"); assert_eq!(result, "dynamo_component_counter"); } #[test] fn test_parse_prometheus_metric() { use super::test_helpers::parse_prometheus_metric; use std::collections::HashMap; // Test parsing a metric with labels let line = "http_requests_total{method=\"GET\",status=\"200\"} 1234"; let parsed = parse_prometheus_metric(line); assert!(parsed.is_some()); let (name, labels, value) = parsed.unwrap(); assert_eq!(name, "http_requests_total"); let mut expected_labels = HashMap::new(); expected_labels.insert("method".to_string(), "GET".to_string()); expected_labels.insert("status".to_string(), "200".to_string()); assert_eq!(labels, expected_labels); assert_eq!(value, 1234.0); // Test parsing a metric without labels let line = "cpu_usage 98.5"; let parsed = parse_prometheus_metric(line); assert!(parsed.is_some()); let (name, labels, value) = parsed.unwrap(); assert_eq!(name, "cpu_usage"); assert!(labels.is_empty()); assert_eq!(value, 98.5); // Test parsing a metric with float value let line = "response_time{service=\"api\"} 0.123"; let parsed = parse_prometheus_metric(line); assert!(parsed.is_some()); let (name, labels, value) = parsed.unwrap(); assert_eq!(name, "response_time"); let mut expected_labels = HashMap::new(); expected_labels.insert("service".to_string(), "api".to_string()); assert_eq!(labels, expected_labels); assert_eq!(value, 0.123); // Test parsing invalid lines assert!(parse_prometheus_metric("").is_none()); // Empty line assert!(parse_prometheus_metric("# HELP metric description").is_none()); // Help text assert!(parse_prometheus_metric("# TYPE metric counter").is_none()); // Type definition assert!(parse_prometheus_metric("metric_name").is_none()); // No value println!("✓ Prometheus metric parsing works correctly!"); } #[test] fn test_metrics_registry_entry_callbacks() { use crate::MetricsRegistry; use std::sync::atomic::{AtomicUsize, Ordering}; // Test 1: Basic callback execution with counter increments { let registry = MetricsRegistry::new(); let counter = Arc::new(AtomicUsize::new(0)); // Add callbacks with different increment values for increment in [1, 10, 100] { let counter_clone = counter.clone(); registry.add_update_callback(Arc::new(move || { counter_clone.fetch_add(increment, Ordering::SeqCst); Ok(()) })); } // Verify counter starts at 0 assert_eq!(counter.load(Ordering::SeqCst), 0); // First execution let results = registry.execute_update_callbacks(); assert_eq!(results.len(), 3); assert!(results.iter().all(|r| r.is_ok())); assert_eq!(counter.load(Ordering::SeqCst), 111); // 1 + 10 + 100 // Second execution - callbacks should be reusable let results = registry.execute_update_callbacks(); assert_eq!(results.len(), 3); assert_eq!(counter.load(Ordering::SeqCst), 222); // 111 + 111 // Test cloning - cloned entry shares callbacks (callbacks are Arc-wrapped) let cloned = registry.clone(); assert_eq!(cloned.execute_update_callbacks().len(), 3); assert_eq!(counter.load(Ordering::SeqCst), 333); // 222 + 111 // Original still has callbacks and shares the same Arc registry.execute_update_callbacks(); assert_eq!(counter.load(Ordering::SeqCst), 444); // 333 + 111 } // Test 2: Mixed success and error callbacks { let registry = MetricsRegistry::new(); let counter = Arc::new(AtomicUsize::new(0)); // Successful callback let counter_clone = counter.clone(); registry.add_update_callback(Arc::new(move || { counter_clone.fetch_add(1, Ordering::SeqCst); Ok(()) })); // Error callback registry.add_update_callback(Arc::new(|| Err(anyhow::anyhow!("Simulated error")))); // Another successful callback let counter_clone = counter.clone(); registry.add_update_callback(Arc::new(move || { counter_clone.fetch_add(10, Ordering::SeqCst); Ok(()) })); // Execute and verify mixed results let results = registry.execute_update_callbacks(); assert_eq!(results.len(), 3); assert!(results[0].is_ok()); assert!(results[1].is_err()); assert!(results[2].is_ok()); // Verify error message assert_eq!( results[1].as_ref().unwrap_err().to_string(), "Simulated error" ); // Verify successful callbacks still executed assert_eq!(counter.load(Ordering::SeqCst), 11); // 1 + 10 // Execute again - errors should be consistent let results = registry.execute_update_callbacks(); assert!(results[1].is_err()); assert_eq!(counter.load(Ordering::SeqCst), 22); // 11 + 11 } // Test 3: Empty registry { let registry = MetricsRegistry::new(); let results = registry.execute_update_callbacks(); assert_eq!(results.len(), 0); } } } #[cfg(feature = "integration")] #[cfg(test)] mod test_metricsregistry_prefixes { use super::*; use crate::distributed::distributed_test_utils::create_test_drt_async; use prometheus::core::Collector; #[tokio::test] async fn test_hierarchical_prefixes_and_parent_hierarchies() { let drt = create_test_drt_async().await; const DRT_NAME: &str = ""; const NAMESPACE_NAME: &str = "ns901"; const COMPONENT_NAME: &str = "comp901"; const ENDPOINT_NAME: &str = "ep901"; let namespace = drt.namespace(NAMESPACE_NAME).unwrap(); let component = namespace.component(COMPONENT_NAME).unwrap(); let endpoint = component.endpoint(ENDPOINT_NAME); // DRT assert_eq!(drt.basename(), DRT_NAME); assert_eq!(drt.parent_hierarchies().len(), 0); // DRT hierarchy is just its basename (empty string) // Namespace assert_eq!(namespace.basename(), NAMESPACE_NAME); assert_eq!(namespace.parent_hierarchies().len(), 1); assert_eq!(namespace.parent_hierarchies()[0].basename(), DRT_NAME); // Namespace hierarchy is just its basename since parent is empty // Component assert_eq!(component.basename(), COMPONENT_NAME); assert_eq!(component.parent_hierarchies().len(), 2); assert_eq!(component.parent_hierarchies()[0].basename(), DRT_NAME); assert_eq!(component.parent_hierarchies()[1].basename(), NAMESPACE_NAME); // Component hierarchy structure is validated by the individual assertions above // Endpoint assert_eq!(endpoint.basename(), ENDPOINT_NAME); assert_eq!(endpoint.parent_hierarchies().len(), 3); assert_eq!(endpoint.parent_hierarchies()[0].basename(), DRT_NAME); assert_eq!(endpoint.parent_hierarchies()[1].basename(), NAMESPACE_NAME); assert_eq!(endpoint.parent_hierarchies()[2].basename(), COMPONENT_NAME); // Endpoint hierarchy structure is validated by the individual assertions above // Relationships assert!( namespace .parent_hierarchies() .iter() .any(|h| h.basename() == drt.basename()) ); assert!( component .parent_hierarchies() .iter() .any(|h| h.basename() == namespace.basename()) ); assert!( endpoint .parent_hierarchies() .iter() .any(|h| h.basename() == component.basename()) ); // Depth assert_eq!(drt.parent_hierarchies().len(), 0); assert_eq!(namespace.parent_hierarchies().len(), 1); assert_eq!(component.parent_hierarchies().len(), 2); assert_eq!(endpoint.parent_hierarchies().len(), 3); // Invalid namespace behavior - sanitizes to "_123" and succeeds // @ryanolson intended to enable validation (see TODO comment in component.rs) but didn't turn it on, // so invalid characters are sanitized in MetricsRegistry rather than rejected. let invalid_namespace = drt.namespace("@@123").unwrap(); let result = invalid_namespace .metrics() .create_counter("test_counter", "A test counter", &[]); assert!(result.is_ok()); if let Ok(counter) = &result { // Verify the namespace was sanitized to "_123" in the label let desc = counter.desc(); let namespace_label = desc[0] .const_label_pairs .iter() .find(|l| l.name() == "dynamo_namespace") .expect("Should have dynamo_namespace label"); assert_eq!(namespace_label.value(), "_123"); } // Valid namespace works let valid_namespace = drt.namespace("ns567").unwrap(); assert!( valid_namespace .metrics() .create_counter("test_counter", "A test counter", &[]) .is_ok() ); } #[tokio::test] async fn test_expfmt_callback_only_registered_on_endpoint_is_included_once() { // Sanity test: if an expfmt callback is registered only on the endpoint registry, // scraping from the root (DRT) should still include it exactly once via the // child-registry traversal. let drt = create_test_drt_async().await; let namespace = drt.namespace("ns_expfmt_ep_only").unwrap(); let component = namespace.component("comp_expfmt_ep_only").unwrap(); let endpoint = component.endpoint("ep_expfmt_ep_only"); let metric_line = "dynamo_component_active_decode_blocks{dp_rank=\"0\"} 0\n"; let callback: PrometheusExpositionFormatCallback = Arc::new(move || Ok(metric_line.to_string())); endpoint .get_metrics_registry() .add_expfmt_callback(callback); let output = drt.metrics().prometheus_expfmt().unwrap(); let occurrences = output .lines() .filter(|line| line == &metric_line.trim_end_matches('\n')) .count(); assert_eq!( occurrences, 1, "endpoint-registered exposition callback should appear once, got {} occurrences\n\n{}", occurrences, output ); } #[tokio::test] async fn test_recursive_namespace() { // Create a distributed runtime for testing let drt = create_test_drt_async().await; // Create a deeply chained namespace: ns1.ns2.ns3 let ns1 = drt.namespace("ns1").unwrap(); let ns2 = ns1.namespace("ns2").unwrap(); let ns3 = ns2.namespace("ns3").unwrap(); // Create a component in the deepest namespace let component = ns3.component("test-component").unwrap(); // Verify the hierarchy structure assert_eq!(ns1.basename(), "ns1"); assert_eq!(ns1.parent_hierarchies().len(), 1); assert_eq!(ns1.parent_hierarchies()[0].basename(), ""); // ns1 hierarchy is just its basename since parent is empty assert_eq!(ns2.basename(), "ns2"); assert_eq!(ns2.parent_hierarchies().len(), 2); assert_eq!(ns2.parent_hierarchies()[0].basename(), ""); assert_eq!(ns2.parent_hierarchies()[1].basename(), "ns1"); // ns2 hierarchy structure validated by parent assertions above assert_eq!(ns3.basename(), "ns3"); assert_eq!(ns3.parent_hierarchies().len(), 3); assert_eq!(ns3.parent_hierarchies()[0].basename(), ""); assert_eq!(ns3.parent_hierarchies()[1].basename(), "ns1"); assert_eq!(ns3.parent_hierarchies()[2].basename(), "ns2"); // ns3 hierarchy structure validated by parent assertions above assert_eq!(component.basename(), "test-component"); assert_eq!(component.parent_hierarchies().len(), 4); assert_eq!(component.parent_hierarchies()[0].basename(), ""); assert_eq!(component.parent_hierarchies()[1].basename(), "ns1"); assert_eq!(component.parent_hierarchies()[2].basename(), "ns2"); assert_eq!(component.parent_hierarchies()[3].basename(), "ns3"); // component hierarchy structure validated by parent assertions above println!("✓ Chained namespace test passed - all prefixes correct"); } } #[cfg(feature = "integration")] #[cfg(test)] mod test_metricsregistry_prometheus_fmt_outputs { use super::prometheus_names::name_prefix; use super::*; use crate::distributed::distributed_test_utils::create_test_drt_async; use prometheus::Counter; use std::sync::Arc; #[tokio::test] async fn test_prometheusfactory_using_metrics_registry_trait() { // Setup real DRT and registry using the test-friendly constructor let drt = create_test_drt_async().await; // Use a simple constant namespace name let namespace_name = "ns345"; let namespace = drt.namespace(namespace_name).unwrap(); let component = namespace.component("comp345").unwrap(); let endpoint = component.endpoint("ep345"); // Test Counter creation let counter = endpoint .metrics() .create_counter("testcounter", "A test counter", &[]) .unwrap(); counter.inc_by(123.456789); let epsilon = 0.01; assert!((counter.get() - 123.456789).abs() < epsilon); let endpoint_output_raw = endpoint.metrics().prometheus_expfmt().unwrap(); println!("Endpoint output:"); println!("{}", endpoint_output_raw); let expected_endpoint_output = r#"# HELP dynamo_component_testcounter A test counter # TYPE dynamo_component_testcounter counter dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789"#.to_string(); assert_eq!( endpoint_output_raw.trim_end_matches('\n'), expected_endpoint_output.trim_end_matches('\n'), "\n=== ENDPOINT COMPARISON FAILED ===\n\ Actual:\n{}\n\ Expected:\n{}\n\ ==============================", endpoint_output_raw, expected_endpoint_output ); // Test Gauge creation let gauge = component .metrics() .create_gauge("testgauge", "A test gauge", &[]) .unwrap(); gauge.set(50000.0); assert_eq!(gauge.get(), 50000.0); // Test Prometheus format output for Component (gauge + histogram) let component_output_raw = component.metrics().prometheus_expfmt().unwrap(); println!("Component output:"); println!("{}", component_output_raw); let expected_component_output = r#"# HELP dynamo_component_testcounter A test counter # TYPE dynamo_component_testcounter counter dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789 # HELP dynamo_component_testgauge A test gauge # TYPE dynamo_component_testgauge gauge dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000"#.to_string(); assert_eq!( component_output_raw.trim_end_matches('\n'), expected_component_output.trim_end_matches('\n'), "\n=== COMPONENT COMPARISON FAILED ===\n\ Actual:\n{}\n\ Expected:\n{}\n\ ==============================", component_output_raw, expected_component_output ); let intcounter = namespace .metrics() .create_intcounter("testintcounter", "A test int counter", &[]) .unwrap(); intcounter.inc_by(12345); assert_eq!(intcounter.get(), 12345); // Test Prometheus format output for Namespace (int_counter + gauge + histogram) let namespace_output_raw = namespace.metrics().prometheus_expfmt().unwrap(); println!("Namespace output:"); println!("{}", namespace_output_raw); let expected_namespace_output = r#"# HELP dynamo_component_testcounter A test counter # TYPE dynamo_component_testcounter counter dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789 # HELP dynamo_component_testgauge A test gauge # TYPE dynamo_component_testgauge gauge dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000 # HELP dynamo_component_testintcounter A test int counter # TYPE dynamo_component_testintcounter counter dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string(); assert_eq!( namespace_output_raw.trim_end_matches('\n'), expected_namespace_output.trim_end_matches('\n'), "\n=== NAMESPACE COMPARISON FAILED ===\n\ Actual:\n{}\n\ Expected:\n{}\n\ ==============================", namespace_output_raw, expected_namespace_output ); // Test IntGauge creation let intgauge = namespace .metrics() .create_intgauge("testintgauge", "A test int gauge", &[]) .unwrap(); intgauge.set(42); assert_eq!(intgauge.get(), 42); // Test IntGaugeVec creation let intgaugevec = namespace .metrics() .create_intgaugevec( "testintgaugevec", "A test int gauge vector", &["instance", "status"], &[("service", "api")], ) .unwrap(); intgaugevec .with_label_values(&["server1", "active"]) .set(10); intgaugevec .with_label_values(&["server2", "inactive"]) .set(0); // Test CounterVec creation let countervec = endpoint .metrics() .create_countervec( "testcountervec", "A test counter vector", &["method", "status"], &[("service", "api")], ) .unwrap(); countervec.with_label_values(&["GET", "200"]).inc_by(10.0); countervec.with_label_values(&["POST", "201"]).inc_by(5.0); // Test Histogram creation let histogram = component .metrics() .create_histogram("testhistogram", "A test histogram", &[], None) .unwrap(); histogram.observe(1.0); histogram.observe(2.5); histogram.observe(4.0); // Test Prometheus format output for DRT (all metrics combined) let drt_output_raw = drt.metrics().prometheus_expfmt().unwrap(); println!("DRT output:"); println!("{}", drt_output_raw); let expected_drt_output = r#"# HELP dynamo_component_testcounter A test counter # TYPE dynamo_component_testcounter counter dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789 # HELP dynamo_component_testcountervec A test counter vector # TYPE dynamo_component_testcountervec counter dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="GET",service="api",status="200"} 10 dynamo_component_testcountervec{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345",method="POST",service="api",status="201"} 5 # HELP dynamo_component_testgauge A test gauge # TYPE dynamo_component_testgauge gauge dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000 # HELP dynamo_component_testhistogram A test histogram # TYPE dynamo_component_testhistogram histogram dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.005"} 0 dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.01"} 0 dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.025"} 0 dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.05"} 0 dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.1"} 0 dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.25"} 0 dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="0.5"} 0 dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="1"} 1 dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="2.5"} 2 dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="5"} 3 dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="10"} 3 dynamo_component_testhistogram_bucket{dynamo_component="comp345",dynamo_namespace="ns345",le="+Inf"} 3 dynamo_component_testhistogram_sum{dynamo_component="comp345",dynamo_namespace="ns345"} 7.5 dynamo_component_testhistogram_count{dynamo_component="comp345",dynamo_namespace="ns345"} 3 # HELP dynamo_component_testintcounter A test int counter # TYPE dynamo_component_testintcounter counter dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345 # HELP dynamo_component_testintgauge A test int gauge # TYPE dynamo_component_testintgauge gauge dynamo_component_testintgauge{dynamo_namespace="ns345"} 42 # HELP dynamo_component_testintgaugevec A test int gauge vector # TYPE dynamo_component_testintgaugevec gauge dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server1",service="api",status="active"} 10 dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server2",service="api",status="inactive"} 0 # HELP dynamo_component_uptime_seconds Total uptime of the DistributedRuntime in seconds # TYPE dynamo_component_uptime_seconds gauge dynamo_component_uptime_seconds 0"#.to_string(); assert_eq!( drt_output_raw.trim_end_matches('\n'), expected_drt_output.trim_end_matches('\n'), "\n=== DRT COMPARISON FAILED ===\n\ Expected:\n{}\n\ Actual (filtered):\n{}\n\ ==============================", expected_drt_output, drt_output_raw ); println!("✓ All Prometheus format outputs verified successfully!"); } #[test] fn test_refactored_filter_functions() { // Test data with component metrics let test_input = r#"# HELP dynamo_component_requests Total requests # TYPE dynamo_component_requests counter dynamo_component_requests 42 # HELP dynamo_component_latency Response latency # TYPE dynamo_component_latency histogram dynamo_component_latency_bucket{le="0.1"} 10 dynamo_component_latency_bucket{le="0.5"} 25 dynamo_component_errors_total 5"#; // Test extract_metrics (only actual metric lines, excluding help/type) let metrics_only = super::test_helpers::extract_metrics(test_input); assert_eq!(metrics_only.len(), 4); // 4 actual metric lines (excluding help/type) assert!( metrics_only .iter() .all(|line| line.starts_with("dynamo_component") && !line.starts_with("#")) ); println!("✓ All refactored filter functions work correctly!"); } #[tokio::test] async fn test_same_metric_name_different_endpoints() { // Test that the same metric name can exist in different endpoints without collision. // This validates the multi-registry approach: each endpoint has its own registry, // and metrics are merged at scrape time with distinct labels. let drt = create_test_drt_async().await; let namespace = drt.namespace("ns_test").unwrap(); let component = namespace.component("comp_test").unwrap(); // Create two endpoints with the same metric name let ep1 = component.endpoint("ep1"); let ep2 = component.endpoint("ep2"); let counter1 = ep1 .metrics() .create_counter("requests_total", "Total requests", &[]) .unwrap(); counter1.inc_by(100.0); let counter2 = ep2 .metrics() .create_counter("requests_total", "Total requests", &[]) .unwrap(); counter2.inc_by(200.0); // Get merged Prometheus output from component level let output = component.metrics().prometheus_expfmt().unwrap(); let expected_output = r#"# HELP dynamo_component_requests_total Total requests # TYPE dynamo_component_requests_total counter dynamo_component_requests_total{dynamo_component="comp_test",dynamo_endpoint="ep1",dynamo_namespace="ns_test"} 100 dynamo_component_requests_total{dynamo_component="comp_test",dynamo_endpoint="ep2",dynamo_namespace="ns_test"} 200"#; assert_eq!( output.trim_end_matches('\n'), expected_output.trim_end_matches('\n'), "\n=== MULTI-REGISTRY COMPARISON FAILED ===\n\ Actual:\n{}\n\ Expected:\n{}\n\ ==============================", output, expected_output ); println!("✓ Multi-registry prevents Prometheus collisions!"); } #[tokio::test] async fn test_duplicate_series_warning() { // Test that duplicate series (same metric name + same labels) are detected and deduplicated. // This should log a warning and keep only one of the duplicate series. let drt = create_test_drt_async().await; let namespace = drt.namespace("ns_dup").unwrap(); let component = namespace.component("comp_dup").unwrap(); // Create two endpoints with counters that will have identical labels when scraped let ep1 = component.endpoint("ep_same"); let ep2 = component.endpoint("ep_same"); // Same endpoint name = duplicate labels let counter1 = ep1 .metrics() .create_counter("dup_metric", "Duplicate metric test", &[]) .unwrap(); counter1.inc_by(50.0); let counter2 = ep2 .metrics() .create_counter("dup_metric", "Duplicate metric test", &[]) .unwrap(); counter2.inc_by(75.0); // Get merged output - duplicates should be deduplicated let output = component.metrics().prometheus_expfmt().unwrap(); let expected_output = r#"# HELP dynamo_component_dup_metric Duplicate metric test # TYPE dynamo_component_dup_metric counter dynamo_component_dup_metric{dynamo_component="comp_dup",dynamo_endpoint="ep_same",dynamo_namespace="ns_dup"} 50"#; assert_eq!( output.trim_end_matches('\n'), expected_output.trim_end_matches('\n'), "\n=== DEDUPLICATION COMPARISON FAILED ===\n\ Actual:\n{}\n\ Expected:\n{}\n\ ==============================", output, expected_output ); println!("✓ Duplicate series detection and deduplication works!"); } }