Unverified Commit 3d62cc7f authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

fix: avoid Prometheus collisions via multi-registry scrape (#5678)


Signed-off-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
Co-authored-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent ed5e4fd0
......@@ -236,12 +236,16 @@ impl Component {
}
pub fn endpoint(&self, endpoint: impl Into<String>) -> Endpoint {
Endpoint {
let endpoint = Endpoint {
component: self.clone(),
name: endpoint.into(),
labels: Vec::new(),
metrics_registry: crate::MetricsRegistry::new(),
}
};
// Attach endpoint registry so scrapes traverse separate registries (avoids collisions).
self.get_metrics_registry()
.add_child_registry(endpoint.get_metrics_registry());
endpoint
}
pub async fn list_instances(&self) -> anyhow::Result<Vec<Instance>> {
......@@ -453,27 +457,40 @@ impl std::fmt::Display for Namespace {
impl Namespace {
pub(crate) fn new(runtime: DistributedRuntime, name: String) -> anyhow::Result<Self> {
Ok(NamespaceBuilder::default()
let ns = NamespaceBuilder::default()
.runtime(Arc::new(runtime))
.name(name)
.build()?)
.build()?;
// Attach namespace registry so scrapes traverse separate registries (avoids collisions).
ns.drt()
.get_metrics_registry()
.add_child_registry(ns.get_metrics_registry());
Ok(ns)
}
/// Create a [`Component`] in the namespace who's endpoints can be discovered with etcd
pub fn component(&self, name: impl Into<String>) -> anyhow::Result<Component> {
ComponentBuilder::from_runtime(self.runtime.clone())
let component = ComponentBuilder::from_runtime(self.runtime.clone())
.name(name)
.namespace(self.clone())
.build()
.build()?;
// Attach component registry so scrapes traverse separate registries (avoids collisions).
self.get_metrics_registry()
.add_child_registry(component.get_metrics_registry());
Ok(component)
}
/// Create a [`Namespace`] in the parent namespace
pub fn namespace(&self, name: impl Into<String>) -> anyhow::Result<Namespace> {
Ok(NamespaceBuilder::default()
let child = NamespaceBuilder::default()
.runtime(self.runtime.clone())
.name(name.into())
.parent(Some(Arc::new(self.clone())))
.build()?)
.build()?;
// Attach child namespace registry so scrapes traverse separate registries (avoids collisions).
self.get_metrics_registry()
.add_child_registry(child.get_metrics_registry());
Ok(child)
}
pub fn name(&self) -> String {
......
......@@ -369,14 +369,6 @@ pub fn create_metric<T: PrometheusMetric, H: MetricsHierarchy + ?Sized>(
T::with_opts(opts)?
};
// Register the metric at all hierarchy levels (parents + self)
// First register at all parent levels
for parent in parent_hierarchies {
let collector: Box<dyn prometheus::core::Collector> = Box::new(prometheus_metric.clone());
parent.get_metrics_registry().add_metric(collector)?;
}
// Then register at this level
let collector: Box<dyn prometheus::core::Collector> = Box::new(prometheus_metric.clone());
hierarchy.get_metrics_registry().add_metric(collector)?;
......@@ -540,45 +532,9 @@ impl<H: MetricsHierarchy> Metrics<H> {
/// Get metrics in Prometheus text format
pub fn prometheus_expfmt(&self) -> anyhow::Result<String> {
// Execute callbacks first to ensure any new metrics are added to the registry
let callback_results = self
.hierarchy
self.hierarchy
.get_metrics_registry()
.execute_update_callbacks();
// Log any callback errors but continue
for result in callback_results {
if let Err(e) = result {
tracing::error!("Error executing metrics callback: {}", e);
}
}
// Get the Prometheus registry for this hierarchy
let prometheus_registry = self
.hierarchy
.get_metrics_registry()
.get_prometheus_registry();
// Encode metrics from the registry
let metric_families = prometheus_registry.gather();
let encoder = prometheus::TextEncoder::new();
let mut buffer = Vec::new();
encoder.encode(&metric_families, &mut buffer)?;
let mut result = String::from_utf8(buffer)?;
// Execute and append exposition text callback results
let expfmt = self
.hierarchy
.get_metrics_registry()
.execute_expfmt_callbacks();
if !expfmt.is_empty() {
if !result.ends_with('\n') {
result.push('\n');
}
result.push_str(&expfmt);
}
Ok(result)
.prometheus_expfmt_combined()
}
}
......@@ -656,6 +612,21 @@ pub struct MetricsRegistry {
/// Arc-wrapped so clones share the same registry (metrics registered on clones are visible everywhere).
pub prometheus_registry: Arc<std::sync::RwLock<prometheus::Registry>>,
/// Child registries included when emitting combined `/metrics` output.
///
/// Why this exists:
/// - Previously, `create_metric()` registered every collector into *all* parent registries
/// (Endpoint → Component → Namespace → DRT) so scraping the root registry included everything.
/// - That fan-out caused Prometheus collisions when different endpoints tried to register the
/// same metric name with different const-labels (descriptor mismatch).
///
/// We now register metrics only into the local hierarchy registry to avoid collisions.
/// `child_registries` rebuilds “what to scrape” as a tree of registries so `/metrics` can:
/// - traverse registries recursively,
/// - merge metric families into one exposition payload,
/// - warn/drop exact duplicate series, while allowing same metric name with different labels.
child_registries: Arc<std::sync::RwLock<Vec<MetricsRegistry>>>,
/// Update callbacks invoked before metrics are scraped.
/// Wrapped in Arc to preserve callbacks across clones (prevents callback loss when MetricsRegistry is cloned).
pub prometheus_update_callbacks: Arc<std::sync::RwLock<Vec<PrometheusUpdateCallback>>>,
......@@ -693,11 +664,171 @@ impl MetricsRegistry {
pub fn new() -> Self {
Self {
prometheus_registry: Arc::new(std::sync::RwLock::new(prometheus::Registry::new())),
child_registries: Arc::new(std::sync::RwLock::new(Vec::new())),
prometheus_update_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())),
prometheus_expfmt_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())),
}
}
/// Add a child registry to be included in combined /metrics output.
///
/// Dedup is by underlying Prometheus registry pointer, so repeated registration via clones is safe.
pub fn add_child_registry(&self, child: &MetricsRegistry) {
let child_ptr = Arc::as_ptr(&child.prometheus_registry);
let mut guard = self.child_registries.write().unwrap();
if guard
.iter()
.any(|r| Arc::as_ptr(&r.prometheus_registry) == child_ptr)
{
return;
}
guard.push(child.clone());
}
fn registries_for_combined_scrape(&self) -> Vec<MetricsRegistry> {
// Traverse child registries recursively so `prometheus_expfmt()` on any hierarchy
// (DRT/namespace/component/endpoint) includes metrics from its descendants.
//
// Dedup by underlying Prometheus registry pointer so multiple paths (e.g. also registering
// directly on the root) won't duplicate output.
fn visit(
registry: &MetricsRegistry,
out: &mut Vec<MetricsRegistry>,
seen: &mut HashSet<*const std::sync::RwLock<prometheus::Registry>>,
) {
let ptr = Arc::as_ptr(&registry.prometheus_registry);
if !seen.insert(ptr) {
return;
}
out.push(registry.clone());
let children: Vec<MetricsRegistry> = registry
.child_registries
.read()
.unwrap()
.iter()
.cloned()
.collect();
for child in children {
visit(&child, out, seen);
}
}
let mut out = Vec::new();
let mut seen: HashSet<*const std::sync::RwLock<prometheus::Registry>> = HashSet::new();
visit(self, &mut out, &mut seen);
out
}
/// Combine metrics across this registry and all registered children into one Prometheus exposition output.
///
/// - Families are merged by name; HELP and TYPE must match.
/// - Multiple series for the same name are allowed if labels differ.
/// - Exact duplicate series (same name + identical label pairs) are warned and dropped.
pub fn prometheus_expfmt_combined(&self) -> anyhow::Result<String> {
let registries = self.registries_for_combined_scrape();
// Run per-registry update callbacks first.
for registry in &registries {
for result in registry.execute_update_callbacks() {
if let Err(e) = result {
tracing::error!("Error executing metrics callback: {}", e);
}
}
}
// Merge metric families.
let mut by_name: HashMap<String, prometheus::proto::MetricFamily> = HashMap::new();
let mut seen_series: HashSet<String> = HashSet::new();
for (registry_idx, registry) in registries.iter().enumerate() {
let families = registry.get_prometheus_registry().gather();
for mut family in families {
let name = family.name().to_string();
let entry = by_name.entry(name.clone()).or_insert_with(|| {
let mut out = prometheus::proto::MetricFamily::new();
out.set_name(name.clone());
out.set_help(family.help().to_string());
out.set_field_type(family.get_field_type());
out
});
if entry.help() != family.help()
|| entry.get_field_type() != family.get_field_type()
{
return Err(anyhow::anyhow!(
"Metric family '{}' has inconsistent help/type across registries (idx={})",
name,
registry_idx
));
}
let mut metrics = family.take_metric();
for metric in metrics.drain(..) {
let mut labels: Vec<(String, String)> = metric
.get_label()
.iter()
.map(|lp| (lp.name().to_string(), lp.value().to_string()))
.collect();
labels.sort_by(|(ka, va), (kb, vb)| (ka, va).cmp(&(kb, vb)));
let key = format!(
"{}|{}",
name,
labels
.iter()
.map(|(k, v)| format!("{}={}", k, v))
.collect::<Vec<_>>()
.join(",")
);
if !seen_series.insert(key) {
tracing::warn!(
metric_name = %name,
labels = ?labels,
registry_idx,
"Duplicate Prometheus series while merging registries; dropping later sample"
);
continue;
}
entry.mut_metric().push(metric);
}
}
}
let mut merged: Vec<prometheus::proto::MetricFamily> = by_name.into_values().collect();
merged.sort_by(|a, b| a.name().cmp(b.name()));
let encoder = prometheus::TextEncoder::new();
let mut buffer = Vec::new();
encoder.encode(&merged, &mut buffer)?;
let mut result = String::from_utf8(buffer)?;
// Append expfmt callbacks deterministically in registry order.
let mut expfmt = String::new();
for registry in registries {
let text = registry.execute_expfmt_callbacks();
if !text.is_empty() {
if !expfmt.is_empty() && !expfmt.ends_with('\n') {
expfmt.push('\n');
}
expfmt.push_str(&text);
}
}
if !expfmt.is_empty() {
if !result.ends_with('\n') {
result.push('\n');
}
result.push_str(&expfmt);
}
Ok(result)
}
/// Add a callback function that receives a reference to any MetricsHierarchy
pub fn add_update_callback(&self, callback: PrometheusUpdateCallback) {
self.prometheus_update_callbacks
......@@ -1203,12 +1334,14 @@ mod test_metricsregistry_prometheus_fmt_outputs {
dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789"#.to_string();
assert_eq!(
endpoint_output_raw, expected_endpoint_output,
endpoint_output_raw.trim_end_matches('\n'),
expected_endpoint_output.trim_end_matches('\n'),
"\n=== ENDPOINT COMPARISON FAILED ===\n\
Actual:\n{}\n\
Expected:\n{}\n\
==============================",
endpoint_output_raw, expected_endpoint_output
endpoint_output_raw,
expected_endpoint_output
);
// Test Gauge creation
......@@ -1232,12 +1365,14 @@ dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",
dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000"#.to_string();
assert_eq!(
component_output_raw, expected_component_output,
component_output_raw.trim_end_matches('\n'),
expected_component_output.trim_end_matches('\n'),
"\n=== COMPONENT COMPARISON FAILED ===\n\
Actual:\n{}\n\
Expected:\n{}\n\
==============================",
component_output_raw, expected_component_output
component_output_raw,
expected_component_output
);
let intcounter = namespace
......@@ -1263,12 +1398,14 @@ dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"}
dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string();
assert_eq!(
namespace_output_raw, expected_namespace_output,
namespace_output_raw.trim_end_matches('\n'),
expected_namespace_output.trim_end_matches('\n'),
"\n=== NAMESPACE COMPARISON FAILED ===\n\
Actual:\n{}\n\
Expected:\n{}\n\
==============================",
namespace_output_raw, expected_namespace_output
namespace_output_raw,
expected_namespace_output
);
// Test IntGauge creation
......@@ -1364,12 +1501,14 @@ dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server2",ser
dynamo_component_uptime_seconds 0"#.to_string();
assert_eq!(
drt_output_raw, expected_drt_output,
drt_output_raw.trim_end_matches('\n'),
expected_drt_output.trim_end_matches('\n'),
"\n=== DRT COMPARISON FAILED ===\n\
Expected:\n{}\n\
Actual (filtered):\n{}\n\
==============================",
expected_drt_output, drt_output_raw
expected_drt_output,
drt_output_raw
);
println!("✓ All Prometheus format outputs verified successfully!");
......@@ -1398,4 +1537,96 @@ dynamo_component_errors_total 5"#;
println!("✓ All refactored filter functions work correctly!");
}
#[tokio::test]
async fn test_same_metric_name_different_endpoints() {
// Test that the same metric name can exist in different endpoints without collision.
// This validates the multi-registry approach: each endpoint has its own registry,
// and metrics are merged at scrape time with distinct labels.
let drt = create_test_drt_async().await;
let namespace = drt.namespace("ns_test").unwrap();
let component = namespace.component("comp_test").unwrap();
// Create two endpoints with the same metric name
let ep1 = component.endpoint("ep1");
let ep2 = component.endpoint("ep2");
let counter1 = ep1
.metrics()
.create_counter("requests_total", "Total requests", &[])
.unwrap();
counter1.inc_by(100.0);
let counter2 = ep2
.metrics()
.create_counter("requests_total", "Total requests", &[])
.unwrap();
counter2.inc_by(200.0);
// Get merged Prometheus output from component level
let output = component.metrics().prometheus_expfmt().unwrap();
let expected_output = r#"# HELP dynamo_component_requests_total Total requests
# TYPE dynamo_component_requests_total counter
dynamo_component_requests_total{dynamo_component="comp_test",dynamo_endpoint="ep1",dynamo_namespace="ns_test"} 100
dynamo_component_requests_total{dynamo_component="comp_test",dynamo_endpoint="ep2",dynamo_namespace="ns_test"} 200"#;
assert_eq!(
output.trim_end_matches('\n'),
expected_output.trim_end_matches('\n'),
"\n=== MULTI-REGISTRY COMPARISON FAILED ===\n\
Actual:\n{}\n\
Expected:\n{}\n\
==============================",
output,
expected_output
);
println!("✓ Multi-registry prevents Prometheus collisions!");
}
#[tokio::test]
async fn test_duplicate_series_warning() {
// Test that duplicate series (same metric name + same labels) are detected and deduplicated.
// This should log a warning and keep only one of the duplicate series.
let drt = create_test_drt_async().await;
let namespace = drt.namespace("ns_dup").unwrap();
let component = namespace.component("comp_dup").unwrap();
// Create two endpoints with counters that will have identical labels when scraped
let ep1 = component.endpoint("ep_same");
let ep2 = component.endpoint("ep_same"); // Same endpoint name = duplicate labels
let counter1 = ep1
.metrics()
.create_counter("dup_metric", "Duplicate metric test", &[])
.unwrap();
counter1.inc_by(50.0);
let counter2 = ep2
.metrics()
.create_counter("dup_metric", "Duplicate metric test", &[])
.unwrap();
counter2.inc_by(75.0);
// Get merged output - duplicates should be deduplicated
let output = component.metrics().prometheus_expfmt().unwrap();
let expected_output = r#"# HELP dynamo_component_dup_metric Duplicate metric test
# TYPE dynamo_component_dup_metric counter
dynamo_component_dup_metric{dynamo_component="comp_dup",dynamo_endpoint="ep_same",dynamo_namespace="ns_dup"} 50"#;
assert_eq!(
output.trim_end_matches('\n'),
expected_output.trim_end_matches('\n'),
"\n=== DEDUPLICATION COMPARISON FAILED ===\n\
Actual:\n{}\n\
Expected:\n{}\n\
==============================",
output,
expected_output
);
println!("✓ Duplicate series detection and deduplication works!");
}
}
......@@ -291,11 +291,10 @@ async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
// Update the uptime gauge with current value
state.drt().system_health().lock().update_uptime_gauge();
// Get all metrics from DistributedRuntime
// Note: In the new hierarchy-based architecture, metrics are automatically registered
// at all parent levels, so DRT's metrics include all metrics from children
// (Namespace, Component, Endpoint). The prometheus_expfmt() method also executes
// all update callbacks and expfmt callbacks before returning the metrics.
// Get all metrics from the DistributedRuntime.
//
// NOTE: We use a multi-registry model (e.g. one registry per endpoint) and merge at scrape time,
// so /metrics traverses registered child registries and produces a single combined output.
let response = match state.drt().metrics().prometheus_expfmt() {
Ok(r) => r,
Err(e) => {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment