Unverified Commit 00b64ae0 authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

refactor: remove legacy NATS metrics and stats_handler (#4680)


Signed-off-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent 173201f5
...@@ -53,7 +53,7 @@ The hierarchy and naming in etcd and NATS may change over time, and this documen ...@@ -53,7 +53,7 @@ The hierarchy and naming in etcd and NATS may change over time, and this documen
For etcd, it also creates a primary lease and spin up a background task to keep the lease alive. All objects registered under this `DistributedRuntime` use this lease_id to maintain their life cycle. There is also a cancellation token that is tied to the primary lease. When the cancellation token is triggered or the background task failed, the primary lease is revoked or expired and the kv pairs stored with this lease_id is removed. For etcd, it also creates a primary lease and spin up a background task to keep the lease alive. All objects registered under this `DistributedRuntime` use this lease_id to maintain their life cycle. There is also a cancellation token that is tied to the primary lease. When the cancellation token is triggered or the background task failed, the primary lease is revoked or expired and the kv pairs stored with this lease_id is removed.
- `Namespace`: `Namespace`s are primarily a logical grouping mechanism and is not registered in etcd. It provides the root path for all components under this `Namespace`. - `Namespace`: `Namespace`s are primarily a logical grouping mechanism and is not registered in etcd. It provides the root path for all components under this `Namespace`.
- `Component`: When a `Component` object is created, similar to `Namespace`, it isn't be registered in etcd. When `create_service` is called, it creates a NATS service group using `{namespace_name}.{service_name}` for metrics and registers a service in the registry of the `Component`, where the registry is an internal data structure that tracks all services and endpoints within the `DistributedRuntime`. - `Component`: When a `Component` object is created, similar to `Namespace`, it isn't be registered in etcd. When `create_service` is called, it creates a NATS service group using `{namespace_name}.{service_name}` as the service identifier and registers a service in the registry of the `Component`, where the registry is an internal data structure that tracks all services and endpoints within the `DistributedRuntime`.
- `Endpoint`: When an Endpoint object is created and started, it performs two key registrations: - `Endpoint`: When an Endpoint object is created and started, it performs two key registrations:
- NATS Registration: The endpoint is registered with the NATS service group created during service creation. The endpoint is assigned a unique subject following the naming: `{namespace_name}.{service_name}.{endpoint_name}-{lease_id_hex}`. - NATS Registration: The endpoint is registered with the NATS service group created during service creation. The endpoint is assigned a unique subject following the naming: `{namespace_name}.{service_name}.{endpoint_name}-{lease_id_hex}`.
- etcd Registration: The endpoint information is stored in etcd at a path following the naming: `/services/{namespace}/{component}/{endpoint}-{lease_id}`. Note that the endpoints of different workers of the same type (i.e., two `VllmPrefillWorker`s in one deployment) share the same `Namespace`, `Component`, and `Endpoint` name. They are distinguished by their different primary `lease_id` of their `DistributedRuntime`. - etcd Registration: The endpoint information is stored in etcd at a path following the naming: `/services/{namespace}/{component}/{endpoint}-{lease_id}`. Note that the endpoints of different workers of the same type (i.e., two `VllmPrefillWorker`s in one deployment) share the same `Namespace`, `Component`, and `Endpoint` name. They are distinguished by their different primary `lease_id` of their `DistributedRuntime`.
......
...@@ -27,13 +27,7 @@ Annotated { data: Some("o"), id: None, event: None, comment: None } ...@@ -27,13 +27,7 @@ Annotated { data: Some("o"), id: None, event: None, comment: None }
Annotated { data: Some("r"), id: None, event: None, comment: None } Annotated { data: Some("r"), id: None, event: None, comment: None }
Annotated { data: Some("l"), id: None, event: None, comment: None } Annotated { data: Some("l"), id: None, event: None, comment: None }
Annotated { data: Some("d"), id: None, event: None, comment: None } Annotated { data: Some("d"), id: None, event: None, comment: None }
ServiceSet { services: [ServiceInfo { name: "dynamo_init_backend_720278f8", id: "eOHMc4ndRw8s5flv4WOZx7", version: "0.0.1", started: "2025-02-26T18:54:04.917294605Z", endpoints: [EndpointInfo { name: "dynamo_init_backend_720278f8-generate-694d951a80e06abf", subject: "dynamo_init_backend_720278f8.generate-694d951a80e06abf", data: Some(Metrics(Object {"average_processing_time": Number(53662), "data": Object {"val": Number(10)}, "last_error": String(""), "num_errors": Number(0), "num_requests": Number(2), "processing_time": Number(107325), "queue_group": String("q")})) }] }] } ServiceSet { services: [ServiceInfo { name: "dynamo_init_backend_720278f8", id: "eOHMc4ndRw8s5flv4WOZx7", version: "0.0.1", started: "2025-02-26T18:54:04.917294605Z", endpoints: [EndpointInfo { name: "dynamo_init_backend_720278f8-generate-694d951a80e06abf", subject: "dynamo_init_backend_720278f8.generate-694d951a80e06abf", data: Some(Metrics(Object {"average_processing_time": Number(53662), "last_error": String(""), "num_errors": Number(0), "num_requests": Number(2), "processing_time": Number(107325), "queue_group": String("q")})) }] }] }
```
Note the following stats in the output demonstrate the custom
`stats_handler` attached to the service in `server.rs` is being invoked:
```
data: Some(Metrics(Object {..., "data": Object {"val": Number(10)}, ...)
``` ```
If you start two copies of the server, you will see two entries being emitted. If you start two copies of the server, you will see two entries being emitted.
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use service_metrics::{DEFAULT_NAMESPACE, MyStats}; use service_metrics::DEFAULT_NAMESPACE;
use dynamo_runtime::{ use dynamo_runtime::{
DistributedRuntime, Runtime, Worker, logging, DistributedRuntime, Runtime, Worker, logging,
...@@ -63,11 +63,6 @@ async fn backend(runtime: DistributedRuntime) -> anyhow::Result<()> { ...@@ -63,11 +63,6 @@ async fn backend(runtime: DistributedRuntime) -> anyhow::Result<()> {
component component
.endpoint("generate") .endpoint("generate")
.endpoint_builder() .endpoint_builder()
.stats_handler(|stats| {
println!("stats: {:?}", stats);
let stats = MyStats { val: 10 };
serde_json::to_value(stats).unwrap()
})
.handler(ingress) .handler(ingress)
.start() .start()
.await .await
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use serde::{Deserialize, Serialize};
pub const DEFAULT_NAMESPACE: &str = "dynamo"; pub const DEFAULT_NAMESPACE: &str = "dynamo";
#[derive(Serialize, Deserialize)]
// Dummy Stats object to demonstrate how to attach a custom stats handler
pub struct MyStats {
pub val: u32,
}
...@@ -18,13 +18,6 @@ pub const DEFAULT_NAMESPACE: &str = "dyn_example_namespace"; ...@@ -18,13 +18,6 @@ pub const DEFAULT_NAMESPACE: &str = "dyn_example_namespace";
pub const DEFAULT_COMPONENT: &str = "dyn_example_component"; pub const DEFAULT_COMPONENT: &str = "dyn_example_component";
pub const DEFAULT_ENDPOINT: &str = "dyn_example_endpoint"; pub const DEFAULT_ENDPOINT: &str = "dyn_example_endpoint";
/// Stats structure returned by the endpoint's stats handler
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
pub struct MyStats {
// Example value for demonstration purposes
pub val: i32,
}
/// Custom metrics for system stats with data bytes tracking /// Custom metrics for system stats with data bytes tracking
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct MySystemStatsMetrics { pub struct MySystemStatsMetrics {
...@@ -103,17 +96,7 @@ pub async fn backend(drt: DistributedRuntime, endpoint_name: Option<&str>) -> an ...@@ -103,17 +96,7 @@ pub async fn backend(drt: DistributedRuntime, endpoint_name: Option<&str>) -> an
// Use the factory pattern - single line factory call with metrics // Use the factory pattern - single line factory call with metrics
let ingress = Ingress::for_engine(RequestHandler::with_metrics(system_metrics))?; let ingress = Ingress::for_engine(RequestHandler::with_metrics(system_metrics))?;
endpoint endpoint.endpoint_builder().handler(ingress).start().await?;
.endpoint_builder()
.stats_handler(|_stats| {
println!("Stats handler called with stats: {:?}", _stats);
// TODO(keivenc): return a real stats object
let stats = MyStats { val: 10 };
serde_json::to_value(stats).unwrap()
})
.handler(ingress)
.start()
.await?;
Ok(()) Ok(())
} }
...@@ -43,7 +43,6 @@ use super::{DistributedRuntime, Runtime, traits::*, transports::nats::Slug, util ...@@ -43,7 +43,6 @@ use super::{DistributedRuntime, Runtime, traits::*, transports::nats::Slug, util
use crate::pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint}; use crate::pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint};
use crate::protocols::EndpointId; use crate::protocols::EndpointId;
use crate::service::ComponentNatsServerPrometheusMetrics;
use async_nats::{ use async_nats::{
rustls::quic, rustls::quic,
service::{Service, ServiceExt}, service::{Service, ServiceExt},
...@@ -52,7 +51,6 @@ use derive_builder::Builder; ...@@ -52,7 +51,6 @@ use derive_builder::Builder;
use derive_getters::Getters; use derive_getters::Getters;
use educe::Educe; use educe::Educe;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use service::EndpointStatsHandler;
use std::{collections::HashMap, hash::Hash, sync::Arc}; use std::{collections::HashMap, hash::Hash, sync::Arc};
use validator::{Validate, ValidationError}; use validator::{Validate, ValidationError};
...@@ -79,8 +77,6 @@ pub enum TransportType { ...@@ -79,8 +77,6 @@ pub enum TransportType {
#[derive(Default)] #[derive(Default)]
pub struct RegistryInner { pub struct RegistryInner {
pub(crate) services: HashMap<String, Service>, pub(crate) services: HashMap<String, Service>,
pub(crate) stats_handlers:
HashMap<String, Arc<parking_lot::Mutex<HashMap<String, EndpointStatsHandler>>>>,
} }
#[derive(Clone)] #[derive(Clone)]
...@@ -279,10 +275,10 @@ impl ComponentBuilder { ...@@ -279,10 +275,10 @@ impl ComponentBuilder {
pub fn build(self) -> Result<Component, anyhow::Error> { pub fn build(self) -> Result<Component, anyhow::Error> {
let component = self.build_internal()?; let component = self.build_internal()?;
// If this component is using NATS, gather it's metrics // If this component is using NATS, register the NATS service in background
let drt = component.drt(); let drt = component.drt();
if drt.request_plane().is_nats() { if drt.request_plane().is_nats() {
drt.start_stats_service(component.clone()); drt.register_nats_service(component.clone());
} }
Ok(component) Ok(component)
} }
......
...@@ -4,14 +4,13 @@ ...@@ -4,14 +4,13 @@
use std::sync::Arc; use std::sync::Arc;
use anyhow::Result; use anyhow::Result;
pub use async_nats::service::endpoint::Stats as EndpointStats;
use derive_builder::Builder; use derive_builder::Builder;
use derive_getters::Dissolve; use derive_getters::Dissolve;
use educe::Educe; use educe::Educe;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use crate::{ use crate::{
component::{Endpoint, Instance, TransportType, service::EndpointStatsHandler}, component::{Endpoint, Instance, TransportType},
distributed::RequestPlaneMode, distributed::RequestPlaneMode,
pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint}, pipeline::network::{PushWorkHandler, ingress::push_endpoint::PushEndpoint},
protocols::EndpointId, protocols::EndpointId,
...@@ -30,11 +29,6 @@ pub struct EndpointConfig { ...@@ -30,11 +29,6 @@ pub struct EndpointConfig {
#[educe(Debug(ignore))] #[educe(Debug(ignore))]
handler: Arc<dyn PushWorkHandler>, handler: Arc<dyn PushWorkHandler>,
/// Stats handler
#[educe(Debug(ignore))]
#[builder(default, private)]
_stats_handler: Option<EndpointStatsHandler>,
/// Additional labels for metrics /// Additional labels for metrics
#[builder(default, setter(into))] #[builder(default, setter(into))]
metrics_labels: Option<Vec<(String, String)>>, metrics_labels: Option<Vec<(String, String)>>,
...@@ -56,13 +50,6 @@ impl EndpointConfigBuilder { ...@@ -56,13 +50,6 @@ impl EndpointConfigBuilder {
Self::default().endpoint(endpoint) Self::default().endpoint(endpoint)
} }
pub fn stats_handler<F>(self, handler: F) -> Self
where
F: FnMut(EndpointStats) -> serde_json::Value + Send + Sync + 'static,
{
self._stats_handler(Some(Box::new(handler)))
}
/// Register an async engine in the local endpoint registry for direct in-process calls /// Register an async engine in the local endpoint registry for direct in-process calls
pub fn register_local_engine( pub fn register_local_engine(
self, self,
...@@ -80,46 +67,19 @@ impl EndpointConfigBuilder { ...@@ -80,46 +67,19 @@ impl EndpointConfigBuilder {
} }
pub async fn start(self) -> Result<()> { pub async fn start(self) -> Result<()> {
let ( let (endpoint, handler, metrics_labels, graceful_shutdown, health_check_payload) =
endpoint, self.build_internal()?.dissolve();
handler,
stats_handler,
metrics_labels,
graceful_shutdown,
health_check_payload,
) = self.build_internal()?.dissolve();
let connection_id = endpoint.drt().connection_id(); let connection_id = endpoint.drt().connection_id();
let endpoint_id = endpoint.id(); let endpoint_id = endpoint.id();
tracing::debug!("Starting endpoint: {endpoint_id}"); tracing::debug!("Starting endpoint: {endpoint_id}");
let service_name = endpoint.component.service_name();
let metrics_labels: Option<Vec<(&str, &str)>> = metrics_labels let metrics_labels: Option<Vec<(&str, &str)>> = metrics_labels
.as_ref() .as_ref()
.map(|v| v.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect()); .map(|v| v.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect());
// Add metrics to the handler. The endpoint provides additional information to the handler. // Add metrics to the handler. The endpoint provides additional information to the handler.
handler.add_metrics(&endpoint, metrics_labels.as_deref())?; handler.add_metrics(&endpoint, metrics_labels.as_deref())?;
// Insert the stats handler. depends on NATS.
if let Some(stats_handler) = stats_handler {
let registry = endpoint.drt().component_registry().inner.lock().await;
let handler_map = registry
.stats_handlers
.get(&service_name)
.cloned()
.expect("no stats handler registry; this is unexpected");
// There is something wrong with the stats handler map I think.
// Here the connection_id is included, but in component/service.rs add_stats_service it uses service_name,
// no connection id so it's per-endpoint not per-instance. Doesn't match.
// To not block current refactor I am keeping previous behavior, but I think needs
// investigation.
handler_map.lock().insert(
nats::instance_subject(&endpoint_id, connection_id),
stats_handler,
);
}
// This creates a child token of the runtime's endpoint_shutdown_token. That token is // This creates a child token of the runtime's endpoint_shutdown_token. That token is
// cancelled first as part of graceful shutdown. See Runtime::shutdown. // cancelled first as part of graceful shutdown. See Runtime::shutdown.
let endpoint_shutdown_token = endpoint.drt().child_token(); let endpoint_shutdown_token = endpoint.drt().child_token();
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use super::*;
use crate::component::Component; use crate::component::Component;
use async_nats::service::Service as NatsService; use async_nats::service::{Service as NatsService, ServiceExt};
use async_nats::service::ServiceExt as _;
use derive_builder::Builder;
use derive_getters::Dissolve;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::sync::Arc;
pub use super::endpoint::EndpointStats;
type StatsHandlerRegistry = Arc<Mutex<HashMap<String, EndpointStatsHandler>>>;
pub type StatsHandler =
Box<dyn FnMut(String, EndpointStats) -> serde_json::Value + Send + Sync + 'static>;
pub type EndpointStatsHandler =
Box<dyn FnMut(EndpointStats) -> serde_json::Value + Send + Sync + 'static>;
pub const PROJECT_NAME: &str = "Dynamo"; pub const PROJECT_NAME: &str = "Dynamo";
const SERVICE_VERSION: &str = env!("CARGO_PKG_VERSION"); const SERVICE_VERSION: &str = env!("CARGO_PKG_VERSION");
/// Minimal NATS service builder to support legacy NATS request plane.
/// This will be removed once all components migrate to TCP request plane.
pub async fn build_nats_service( pub async fn build_nats_service(
nats_client: &crate::transports::nats::Client, nats_client: &crate::transports::nats::Client,
component: &Component, component: &Component,
description: Option<String>, description: Option<String>,
) -> anyhow::Result<(NatsService, StatsHandlerRegistry)> { ) -> anyhow::Result<NatsService> {
let service_name = component.service_name(); let service_name = component.service_name();
tracing::trace!("component: {component}; creating, service_name: {service_name}"); tracing::trace!("component: {component}; creating NATS service, service_name: {service_name}");
let description = description.unwrap_or(format!( let description = description.unwrap_or(format!(
"{PROJECT_NAME} component {} in namespace {}", "{PROJECT_NAME} component {} in namespace {}",
component.name, component.namespace component.name, component.namespace
)); ));
let stats_handler_registry: StatsHandlerRegistry = Arc::new(Mutex::new(HashMap::new())); let nats_service = nats_client
let stats_handler_registry_clone = stats_handler_registry.clone(); .client()
.service_builder()
let nats_service_builder = nats_client.client().service_builder();
let nats_service_builder =
nats_service_builder
.description(description) .description(description)
.stats_handler(move |name, stats| {
tracing::trace!("stats_handler: {name}, {stats:?}");
let mut guard = stats_handler_registry.lock();
match guard.get_mut(&name) {
Some(handler) => handler(stats),
None => serde_json::Value::Null,
}
});
let nats_service = nats_service_builder
.start(service_name, SERVICE_VERSION.to_string()) .start(service_name, SERVICE_VERSION.to_string())
.await .await
.map_err(|e| anyhow::anyhow!("Failed to start NATS service: {e}"))?; .map_err(|e| anyhow::anyhow!("Failed to start NATS service: {e}"))?;
Ok((nats_service, stats_handler_registry_clone)) Ok(nats_service)
} }
...@@ -4,9 +4,8 @@ ...@@ -4,9 +4,8 @@
use crate::component::{Component, Instance}; use crate::component::{Component, Instance};
use crate::pipeline::PipelineError; use crate::pipeline::PipelineError;
use crate::pipeline::network::manager::NetworkManager; use crate::pipeline::network::manager::NetworkManager;
use crate::service::{ComponentNatsServerPrometheusMetrics, ServiceClient, ServiceSet}; use crate::service::{ServiceClient, ServiceSet};
use crate::storage::kv::{self, Store as _}; use crate::storage::kv::{self, Store as _};
use crate::transports::nats::DRTNatsClientPrometheusMetrics;
use crate::{ use crate::{
component::{self, ComponentBuilder, Endpoint, Namespace}, component::{self, ComponentBuilder, Endpoint, Namespace},
discovery::Discovery, discovery::Discovery,
...@@ -174,7 +173,6 @@ impl DistributedRuntime { ...@@ -174,7 +173,6 @@ impl DistributedRuntime {
}; };
let component_registry = component::Registry::new(); let component_registry = component::Registry::new();
let nats_client_for_metrics = nats_client.clone();
// NetworkManager for request plane // NetworkManager for request plane
let network_manager = NetworkManager::new( let network_manager = NetworkManager::new(
...@@ -201,24 +199,6 @@ impl DistributedRuntime { ...@@ -201,24 +199,6 @@ impl DistributedRuntime {
local_endpoint_registry: crate::local_endpoint_registry::LocalEndpointRegistry::new(), local_endpoint_registry: crate::local_endpoint_registry::LocalEndpointRegistry::new(),
}; };
if let Some(nats_client_for_metrics) = nats_client_for_metrics {
let nats_client_metrics = DRTNatsClientPrometheusMetrics::new(
&distributed_runtime,
nats_client_for_metrics.client().clone(),
)?;
// Register a callback to update NATS client metrics on the DRT's metrics registry
let nats_client_callback = Arc::new({
let nats_client_clone = nats_client_metrics.clone();
move || {
nats_client_clone.set_from_client_stats();
Ok(())
}
});
distributed_runtime
.metrics_registry
.add_update_callback(nats_client_callback);
}
// Initialize the uptime gauge in SystemHealth // Initialize the uptime gauge in SystemHealth
distributed_runtime distributed_runtime
.system_health .system_health
...@@ -431,25 +411,16 @@ impl DistributedRuntime { ...@@ -431,25 +411,16 @@ impl DistributedRuntime {
Ok(nats_client.client().subscribe(subject).await?) Ok(nats_client.client().subscribe(subject).await?)
} }
/// Start NATS metrics service in the background to isolate the async, /// DEPRECATED: This method exists only for NATS request plane support.
/// and because we don't need it yet. /// Once everything uses the TCP request plane, this can be removed along with
/// TODO: This and the things it calls should be in a nats module somewhere. /// the NATS service registration infrastructure.
pub fn start_stats_service(&self, component: Component) { pub fn register_nats_service(&self, component: Component) {
let drt = self.clone(); let drt = self.clone();
self.runtime().secondary().spawn(async move { self.runtime().secondary().spawn(async move {
let service_name = component.service_name(); let service_name = component.service_name();
if let Err(err) = drt.add_stats_service(component).await {
tracing::error!(error = %err, component = service_name, "Failed starting stats service");
}
});
}
/// Gather NATS metrics
async fn add_stats_service(&self, component: Component) -> anyhow::Result<()> {
let service_name = component.service_name();
// Pre-check to save cost of creating the service, but don't hold the lock // Pre-check to save cost of creating the service, but don't hold the lock
if self if drt
.component_registry() .component_registry()
.inner .inner
.lock() .lock()
...@@ -460,24 +431,34 @@ impl DistributedRuntime { ...@@ -460,24 +431,34 @@ impl DistributedRuntime {
// The NATS service is per component, but it is called from `serve_endpoint`, and there // The NATS service is per component, but it is called from `serve_endpoint`, and there
// are often multiple endpoints for a component (e.g. `clear_kv_blocks` and `generate`). // are often multiple endpoints for a component (e.g. `clear_kv_blocks` and `generate`).
tracing::trace!("Service {service_name} already exists"); tracing::trace!("Service {service_name} already exists");
return Ok(()); return;
} }
let Some(nats_client) = self.nats_client.as_ref() else { let Some(nats_client) = drt.nats_client.as_ref() else {
anyhow::bail!("Cannot create NATS service without NATS."); tracing::error!("Cannot create NATS service without NATS.");
return;
}; };
let description = None; let description = None;
let (nats_service, stats_reg) = let nats_service = match crate::component::service::build_nats_service(
crate::component::service::build_nats_service(nats_client, &component, description) nats_client,
.await?; &component,
description,
)
.await
{
Ok(service) => service,
Err(err) => {
tracing::error!(error = %err, component = service_name, "Failed to build NATS service");
return;
}
};
let mut guard = self.component_registry().inner.lock().await; let mut guard = drt.component_registry().inner.lock().await;
if !guard.services.contains_key(&service_name) { if !guard.services.contains_key(&service_name) {
// Normal case // Normal case
guard.services.insert(service_name.clone(), nats_service); guard.services.insert(service_name.clone(), nats_service);
guard.stats_handlers.insert(service_name.clone(), stats_reg);
tracing::info!("Added NATS / stats service {service_name}"); tracing::info!("Added NATS service {service_name}");
drop(guard); drop(guard);
} else { } else {
...@@ -486,69 +467,8 @@ impl DistributedRuntime { ...@@ -486,69 +467,8 @@ impl DistributedRuntime {
// The NATS service is per component, but it is called from `serve_endpoint`, and there // The NATS service is per component, but it is called from `serve_endpoint`, and there
// are often multiple endpoints for a component (e.g. `clear_kv_blocks` and `generate`). // are often multiple endpoints for a component (e.g. `clear_kv_blocks` and `generate`).
// TODO: Is this still true? // TODO: Is this still true?
return Ok(());
} }
});
let cancel_token = self.primary_token();
let service_client = self
.nats_client
.as_ref()
.map(|nc| ServiceClient::new(nc.clone()))
.ok_or_else(|| {
anyhow::anyhow!("Stats service requires NATS client to collect service metrics.")
})?;
// If there is another component with the same service name, this will fail.
let component_metrics = ComponentNatsServerPrometheusMetrics::new(&component)?;
self.runtime().secondary().spawn(nats_metrics_worker(
cancel_token,
service_client,
component_metrics,
component,
));
Ok(())
}
}
/// Add Prometheus metrics for this component's NATS service stats.
///
/// Starts a background task that periodically requests service statistics from NATS
/// and updates the corresponding Prometheus metrics. The first scrape happens immediately,
/// then subsequent scrapes occur at a fixed interval of 9.8 seconds (MAX_WAIT_MS),
/// which should be near or smaller than typical Prometheus scraping intervals to ensure
/// metrics are fresh when Prometheus collects them.
async fn nats_metrics_worker(
cancel_token: CancellationToken,
service_client: ServiceClient,
component_metrics: ComponentNatsServerPrometheusMetrics,
component: Component,
) {
const MAX_WAIT_MS: Duration = Duration::from_millis(9800); // Should be <= Prometheus scrape interval
let timeout = Duration::from_millis(500);
let mut interval = tokio::time::interval(MAX_WAIT_MS);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let service_name = component.service_name();
loop {
tokio::select! {
result = service_client.collect_services(&service_name, timeout) => {
match result {
Ok(service_set) => {
component_metrics.update_from_service_set(&service_set);
}
Err(err) => {
tracing::error!("Background scrape failed for {service_name}: {err}",);
component_metrics.reset_to_zeros();
}
}
}
_ = cancel_token.cancelled() => {
tracing::trace!("nats_metrics_worker stopped");
break;
}
}
interval.tick().await;
} }
} }
......
...@@ -21,8 +21,8 @@ use std::collections::HashMap; ...@@ -21,8 +21,8 @@ use std::collections::HashMap;
// Import commonly used items to avoid verbose prefixes // Import commonly used items to avoid verbose prefixes
use prometheus_names::{ use prometheus_names::{
COMPONENT_NATS_METRICS, DRT_NATS_METRICS, build_component_metric_name, labels, name_prefix, build_component_metric_name, labels, name_prefix, sanitize_prometheus_label,
nats_client, nats_service, sanitize_prometheus_label, sanitize_prometheus_name, work_handler, sanitize_prometheus_name, work_handler,
}; };
// Pipeline imports for endpoint creation // Pipeline imports for endpoint creation
...@@ -783,7 +783,6 @@ impl Default for MetricsRegistry { ...@@ -783,7 +783,6 @@ impl Default for MetricsRegistry {
#[cfg(test)] #[cfg(test)]
mod test_helpers { mod test_helpers {
use super::prometheus_names::name_prefix; use super::prometheus_names::name_prefix;
use super::prometheus_names::{nats_client, nats_service};
use super::*; use super::*;
/// Base function to filter Prometheus output lines based on a predicate. /// Base function to filter Prometheus output lines based on a predicate.
...@@ -799,36 +798,6 @@ mod test_helpers { ...@@ -799,36 +798,6 @@ mod test_helpers {
.collect::<Vec<_>>() .collect::<Vec<_>>()
} }
/// Filters out all NATS metrics from Prometheus output for test comparisons.
pub fn remove_nats_lines(input: &str) -> Vec<String> {
filter_prometheus_lines(input, |line| {
!line.contains(&format!(
"{}_{}",
name_prefix::COMPONENT,
nats_client::PREFIX
)) && !line.contains(&format!(
"{}_{}",
name_prefix::COMPONENT,
nats_service::PREFIX
)) && !line.trim().is_empty()
})
}
/// Filters to only include NATS metrics from Prometheus output for test comparisons.
pub fn extract_nats_lines(input: &str) -> Vec<String> {
filter_prometheus_lines(input, |line| {
line.contains(&format!(
"{}_{}",
name_prefix::COMPONENT,
nats_client::PREFIX
)) || line.contains(&format!(
"{}_{}",
name_prefix::COMPONENT,
nats_service::PREFIX
))
})
}
/// Extracts all component metrics (excluding help text and type definitions). /// Extracts all component metrics (excluding help text and type definitions).
/// Returns only the actual metric lines with values. /// Returns only the actual metric lines with values.
pub fn extract_metrics(input: &str) -> Vec<String> { pub fn extract_metrics(input: &str) -> Vec<String> {
...@@ -1199,8 +1168,6 @@ mod test_metricsregistry_prefixes { ...@@ -1199,8 +1168,6 @@ mod test_metricsregistry_prefixes {
#[cfg(test)] #[cfg(test)]
mod test_metricsregistry_prometheus_fmt_outputs { mod test_metricsregistry_prometheus_fmt_outputs {
use super::prometheus_names::name_prefix; use super::prometheus_names::name_prefix;
use super::prometheus_names::{COMPONENT_NATS_METRICS, DRT_NATS_METRICS};
use super::prometheus_names::{nats_client, nats_service};
use super::*; use super::*;
use crate::distributed::distributed_test_utils::create_test_drt_async; use crate::distributed::distributed_test_utils::create_test_drt_async;
use prometheus::Counter; use prometheus::Counter;
...@@ -1231,21 +1198,17 @@ mod test_metricsregistry_prometheus_fmt_outputs { ...@@ -1231,21 +1198,17 @@ mod test_metricsregistry_prometheus_fmt_outputs {
println!("Endpoint output:"); println!("Endpoint output:");
println!("{}", endpoint_output_raw); println!("{}", endpoint_output_raw);
// Filter out NATS service metrics for test comparison
let endpoint_output =
super::test_helpers::remove_nats_lines(&endpoint_output_raw).join("\n");
let expected_endpoint_output = r#"# HELP dynamo_component_testcounter A test counter let expected_endpoint_output = r#"# HELP dynamo_component_testcounter A test counter
# TYPE dynamo_component_testcounter counter # TYPE dynamo_component_testcounter counter
dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789"#.to_string(); dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789"#.to_string();
assert_eq!( assert_eq!(
endpoint_output, expected_endpoint_output, endpoint_output_raw, expected_endpoint_output,
"\n=== ENDPOINT COMPARISON FAILED ===\n\ "\n=== ENDPOINT COMPARISON FAILED ===\n\
Expected:\n{}\n\
Actual:\n{}\n\ Actual:\n{}\n\
Expected:\n{}\n\
==============================", ==============================",
expected_endpoint_output, endpoint_output endpoint_output_raw, expected_endpoint_output
); );
// Test Gauge creation // Test Gauge creation
...@@ -1261,10 +1224,6 @@ dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345", ...@@ -1261,10 +1224,6 @@ dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",
println!("Component output:"); println!("Component output:");
println!("{}", component_output_raw); println!("{}", component_output_raw);
// Filter out NATS service metrics for test comparison
let component_output =
super::test_helpers::remove_nats_lines(&component_output_raw).join("\n");
let expected_component_output = r#"# HELP dynamo_component_testcounter A test counter let expected_component_output = r#"# HELP dynamo_component_testcounter A test counter
# TYPE dynamo_component_testcounter counter # TYPE dynamo_component_testcounter counter
dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789 dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
...@@ -1273,12 +1232,12 @@ dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345", ...@@ -1273,12 +1232,12 @@ dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",
dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000"#.to_string(); dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} 50000"#.to_string();
assert_eq!( assert_eq!(
component_output, expected_component_output, component_output_raw, expected_component_output,
"\n=== COMPONENT COMPARISON FAILED ===\n\ "\n=== COMPONENT COMPARISON FAILED ===\n\
Expected:\n{}\n\
Actual:\n{}\n\ Actual:\n{}\n\
Expected:\n{}\n\
==============================", ==============================",
expected_component_output, component_output component_output_raw, expected_component_output
); );
let intcounter = namespace let intcounter = namespace
...@@ -1293,10 +1252,6 @@ dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} ...@@ -1293,10 +1252,6 @@ dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"}
println!("Namespace output:"); println!("Namespace output:");
println!("{}", namespace_output_raw); println!("{}", namespace_output_raw);
// Filter out NATS service metrics for test comparison
let namespace_output =
super::test_helpers::remove_nats_lines(&namespace_output_raw).join("\n");
let expected_namespace_output = r#"# HELP dynamo_component_testcounter A test counter let expected_namespace_output = r#"# HELP dynamo_component_testcounter A test counter
# TYPE dynamo_component_testcounter counter # TYPE dynamo_component_testcounter counter
dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789 dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
...@@ -1308,12 +1263,12 @@ dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} ...@@ -1308,12 +1263,12 @@ dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"}
dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string(); dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string();
assert_eq!( assert_eq!(
namespace_output, expected_namespace_output, namespace_output_raw, expected_namespace_output,
"\n=== NAMESPACE COMPARISON FAILED ===\n\ "\n=== NAMESPACE COMPARISON FAILED ===\n\
Expected:\n{}\n\
Actual:\n{}\n\ Actual:\n{}\n\
Expected:\n{}\n\
==============================", ==============================",
expected_namespace_output, namespace_output namespace_output_raw, expected_namespace_output
); );
// Test IntGauge creation // Test IntGauge creation
...@@ -1368,10 +1323,6 @@ dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string(); ...@@ -1368,10 +1323,6 @@ dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string();
println!("DRT output:"); println!("DRT output:");
println!("{}", drt_output_raw); println!("{}", drt_output_raw);
// Filter out all NATS metrics for comparison
let filtered_drt_output =
super::test_helpers::remove_nats_lines(&drt_output_raw).join("\n");
let expected_drt_output = r#"# HELP dynamo_component_testcounter A test counter let expected_drt_output = r#"# HELP dynamo_component_testcounter A test counter
# TYPE dynamo_component_testcounter counter # TYPE dynamo_component_testcounter counter
dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789 dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
...@@ -1413,12 +1364,12 @@ dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server2",ser ...@@ -1413,12 +1364,12 @@ dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server2",ser
dynamo_component_uptime_seconds 0"#.to_string(); dynamo_component_uptime_seconds 0"#.to_string();
assert_eq!( assert_eq!(
filtered_drt_output, expected_drt_output, drt_output_raw, expected_drt_output,
"\n=== DRT COMPARISON FAILED ===\n\ "\n=== DRT COMPARISON FAILED ===\n\
Expected:\n{}\n\ Expected:\n{}\n\
Actual (filtered):\n{}\n\ Actual (filtered):\n{}\n\
==============================", ==============================",
expected_drt_output, filtered_drt_output expected_drt_output, drt_output_raw
); );
println!("✓ All Prometheus format outputs verified successfully!"); println!("✓ All Prometheus format outputs verified successfully!");
...@@ -1426,33 +1377,19 @@ dynamo_component_uptime_seconds 0"#.to_string(); ...@@ -1426,33 +1377,19 @@ dynamo_component_uptime_seconds 0"#.to_string();
#[test] #[test]
fn test_refactored_filter_functions() { fn test_refactored_filter_functions() {
// Test data with mixed content // Test data with component metrics
let test_input = r#"# HELP dynamo_component_requests Total requests let test_input = r#"# HELP dynamo_component_requests Total requests
# TYPE dynamo_component_requests counter # TYPE dynamo_component_requests counter
dynamo_component_requests 42 dynamo_component_requests 42
# HELP dynamo_component_nats_client_connection_state Connection state
# TYPE dynamo_component_nats_client_connection_state gauge
dynamo_component_nats_client_connection_state 1
# HELP dynamo_component_latency Response latency # HELP dynamo_component_latency Response latency
# TYPE dynamo_component_latency histogram # TYPE dynamo_component_latency histogram
dynamo_component_latency_bucket{le="0.1"} 10 dynamo_component_latency_bucket{le="0.1"} 10
dynamo_component_latency_bucket{le="0.5"} 25 dynamo_component_latency_bucket{le="0.5"} 25
dynamo_component_nats_service_requests_total 100 dynamo_component_errors_total 5"#;
dynamo_component_nats_service_errors_total 5"#;
// Test remove_nats_lines (excludes NATS lines but keeps help/type)
let filtered_out = super::test_helpers::remove_nats_lines(test_input);
assert_eq!(filtered_out.len(), 7); // 7 non-NATS lines
assert!(!filtered_out.iter().any(|line| line.contains("nats")));
// Test extract_nats_lines (includes all NATS lines including help/type)
let filtered_only = super::test_helpers::extract_nats_lines(test_input);
assert_eq!(filtered_only.len(), 5); // 5 NATS lines
assert!(filtered_only.iter().all(|line| line.contains("nats")));
// Test extract_metrics (only actual metric lines, excluding help/type) // Test extract_metrics (only actual metric lines, excluding help/type)
let metrics_only = super::test_helpers::extract_metrics(test_input); let metrics_only = super::test_helpers::extract_metrics(test_input);
assert_eq!(metrics_only.len(), 6); // 6 actual metric lines (excluding help/type) assert_eq!(metrics_only.len(), 4); // 4 actual metric lines (excluding help/type)
assert!( assert!(
metrics_only metrics_only
.iter() .iter()
...@@ -1462,490 +1399,3 @@ dynamo_component_nats_service_errors_total 5"#; ...@@ -1462,490 +1399,3 @@ dynamo_component_nats_service_errors_total 5"#;
println!("✓ All refactored filter functions work correctly!"); println!("✓ All refactored filter functions work correctly!");
} }
} }
#[cfg(feature = "integration")]
#[cfg(test)]
mod test_metricsregistry_nats {
use super::prometheus_names::name_prefix;
use super::prometheus_names::{COMPONENT_NATS_METRICS, DRT_NATS_METRICS};
use super::prometheus_names::{nats_client, nats_service};
use super::*;
use crate::distributed::distributed_test_utils::create_test_drt_async;
use crate::pipeline::PushRouter;
use crate::{DistributedRuntime, Runtime};
use tokio::time::{Duration, sleep};
#[ignore = "Deprecated - NATS related code to be deleted soon"]
#[tokio::test]
async fn test_drt_nats_metrics() {
// Setup real DRT and registry using the test-friendly constructor
let drt = create_test_drt_async().await;
// Get DRT output which should include NATS client metrics
let drt_output = drt.metrics().prometheus_expfmt().unwrap();
println!("DRT output with NATS metrics:");
println!("{}", drt_output);
// Additional checks for NATS client metrics (without checking specific values)
let drt_nats_metrics = super::test_helpers::extract_nats_lines(&drt_output);
// Check that NATS client metrics are present
assert!(
!drt_nats_metrics.is_empty(),
"NATS client metrics should be present"
);
// Check for specific NATS client metric names (without values)
// Extract only the metric lines from the already-filtered NATS metrics
let drt_nats_metric_lines =
super::test_helpers::extract_metrics(&drt_nats_metrics.join("\n"));
let actual_drt_nats_metrics_sorted: Vec<&str> = drt_nats_metric_lines
.iter()
.map(|line| {
let without_labels = line.split('{').next().unwrap_or(line);
// Remove the value part (everything after the last space)
without_labels.split(' ').next().unwrap_or(without_labels)
})
.collect();
let expect_drt_nats_metrics_sorted = {
let mut temp = DRT_NATS_METRICS
.iter()
.map(|metric| build_component_metric_name(metric))
.collect::<Vec<_>>();
temp.sort();
temp
};
// Print both lists for comparison
println!(
"actual_drt_nats_metrics_sorted: {:?}",
actual_drt_nats_metrics_sorted
);
println!(
"expect_drt_nats_metrics_sorted: {:?}",
expect_drt_nats_metrics_sorted
);
// Compare the sorted lists
assert_eq!(
actual_drt_nats_metrics_sorted, expect_drt_nats_metrics_sorted,
"DRT_NATS_METRICS with prefix and expected_nats_metrics should be identical when sorted"
);
println!("✓ DistributedRuntime NATS metrics integration test passed!");
}
#[ignore = "Deprecated - NATS related code to be deleted soon"]
#[tokio::test]
async fn test_nats_metric_names() {
// This test only tests the existence of the NATS metrics. It does not check
// the values of the metrics.
// Setup real DRT and registry using the test-friendly constructor
let drt = create_test_drt_async().await;
// Create a namespace and component from the DRT
let namespace = drt.namespace("ns789").unwrap();
let component = namespace.component("comp789").unwrap();
// Get component output which should include NATS client metrics
// Additional checks for NATS client metrics (without checking specific values)
let component_nats_metrics = super::test_helpers::extract_nats_lines(
&component.metrics().prometheus_expfmt().unwrap(),
);
println!(
"Component NATS metrics count: {}",
component_nats_metrics.len()
);
// Check that NATS client metrics are present
assert!(
!component_nats_metrics.is_empty(),
"NATS client metrics should be present"
);
// Check for specific NATS client metric names (without values)
let component_metrics =
super::test_helpers::extract_metrics(&component.metrics().prometheus_expfmt().unwrap());
let actual_component_nats_metrics_sorted: Vec<&str> = component_metrics
.iter()
.map(|line| {
let without_labels = line.split('{').next().unwrap_or(line);
// Remove the value part (everything after the last space)
without_labels.split(' ').next().unwrap_or(without_labels)
})
.collect();
let expect_component_nats_metrics_sorted = {
let mut temp = COMPONENT_NATS_METRICS
.iter()
.map(|metric| build_component_metric_name(metric))
.collect::<Vec<_>>();
temp.sort();
temp
};
// Print both lists for comparison
println!(
"actual_component_nats_metrics_sorted: {:?}",
actual_component_nats_metrics_sorted
);
println!(
"expect_component_nats_metrics_sorted: {:?}",
expect_component_nats_metrics_sorted
);
// Compare the sorted lists
assert_eq!(
actual_component_nats_metrics_sorted, expect_component_nats_metrics_sorted,
"COMPONENT_NATS_METRICS with prefix and expected_nats_metrics should be identical when sorted"
);
// Get both DRT and component output and filter for NATS metrics only
let drt_output = drt.metrics().prometheus_expfmt().unwrap();
let drt_nats_lines = super::test_helpers::extract_nats_lines(&drt_output);
let drt_and_component_nats_metrics =
super::test_helpers::extract_metrics(&drt_nats_lines.join("\n"));
println!(
"DRT and component NATS metrics count: {}",
drt_and_component_nats_metrics.len()
);
// Check that the NATS metrics are present in the component output
assert_eq!(
drt_and_component_nats_metrics.len(),
DRT_NATS_METRICS.len() + COMPONENT_NATS_METRICS.len(),
"DRT at this point should have both the DRT and component NATS metrics"
);
// Check that the NATS metrics are present in the component output
println!("✓ Component NATS metrics integration test passed!");
}
/// Tests NATS metrics values before and after endpoint activity with large message processing.
/// Creates endpoint, sends test messages + 10k byte message, validates metrics (NATS + work handler)
/// at initial state and post-activity state. Ensures byte thresholds, message counts, and processing
/// times are within expected ranges. Tests end-to-end client-server communication and metrics collection.
#[ignore = "Deprecated - NATS related code to be deleted soon"]
#[tokio::test]
async fn test_nats_metrics_values() -> anyhow::Result<()> {
struct MessageHandler {}
impl MessageHandler {
fn new() -> std::sync::Arc<Self> {
std::sync::Arc::new(Self {})
}
}
#[async_trait]
impl AsyncEngine<SingleIn<String>, ManyOut<Annotated<String>>, Error> for MessageHandler {
async fn generate(
&self,
input: SingleIn<String>,
) -> Result<ManyOut<Annotated<String>>, Error> {
let (data, ctx) = input.into_parts();
let response = data.to_string();
let stream = stream::iter(vec![Annotated::from_data(response)]);
Ok(ResponseStream::new(Box::pin(stream), ctx.context()))
}
}
println!("\n=== Initializing DistributedRuntime ===");
let runtime = Runtime::from_current()?;
let drt = DistributedRuntime::from_settings(runtime.clone()).await?;
let namespace = drt.namespace("ns123").unwrap();
let component = namespace.component("comp123").unwrap();
let ingress = Ingress::for_engine(MessageHandler::new()).unwrap();
let _backend_handle = tokio::spawn(async move {
let endpoint = component
.endpoint("echo")
.endpoint_builder()
.handler(ingress);
endpoint.start().await.unwrap();
});
sleep(Duration::from_millis(500)).await;
println!("✓ Launched endpoint service in background successfully");
let drt_output = drt.metrics().prometheus_expfmt().unwrap();
let parsed_metrics: Vec<_> = drt_output
.lines()
.filter_map(super::test_helpers::parse_prometheus_metric)
.collect();
println!("=== Initial DRT metrics output ===");
println!("{}", drt_output);
println!("\n=== Checking Initial Metric Values ===");
let initial_expected_metric_values = [
// DRT NATS metrics (ordered to match DRT_NATS_METRICS)
(
build_component_metric_name(nats_client::CONNECTION_STATE),
1.0,
1.0,
), // Should be connected
(
build_component_metric_name(nats_client::CURRENT_CONNECTIONS),
1.0,
1.0,
), // Should have 1 connection
(
build_component_metric_name(nats_client::IN_TOTAL_BYTES),
800.0,
4000.0,
), // Wide range around observed value of 1888
(
build_component_metric_name(nats_client::IN_MESSAGES),
0.0,
5.0,
), // Wide range around 2
(
build_component_metric_name(nats_client::OUT_OVERHEAD_BYTES),
1500.0,
5000.0,
), // Wide range around observed value of 2752
(
build_component_metric_name(nats_client::OUT_MESSAGES),
0.0,
5.0,
), // Wide range around 2
// Component NATS metrics (ordered to match COMPONENT_NATS_METRICS)
(
build_component_metric_name(nats_service::PROCESSING_MS_AVG),
0.0,
0.0,
), // No processing yet
(
build_component_metric_name(nats_service::ERRORS_TOTAL),
0.0,
0.0,
), // No errors yet
(
build_component_metric_name(nats_service::REQUESTS_TOTAL),
0.0,
0.0,
), // No requests yet
(
build_component_metric_name(nats_service::PROCESSING_MS_TOTAL),
0.0,
0.0,
), // No processing yet
(
build_component_metric_name(nats_service::ACTIVE_SERVICES),
0.0,
2.0,
), // Service may not be fully active yet
(
build_component_metric_name(nats_service::ACTIVE_ENDPOINTS),
0.0,
2.0,
), // Endpoint may not be fully active yet
];
for (metric_name, min_value, max_value) in &initial_expected_metric_values {
let actual_value = parsed_metrics
.iter()
.find(|(name, _, _)| name == metric_name)
.map(|(_, _, value)| *value)
.unwrap_or_else(|| panic!("Could not find expected metric: {}", metric_name));
assert!(
actual_value >= *min_value && actual_value <= *max_value,
"Initial metric {} should be between {} and {}, but got {}",
metric_name,
min_value,
max_value,
actual_value
);
}
println!("\n=== Client Runtime to hit the endpoint ===");
let client_runtime = Runtime::from_current()?;
let client_distributed = DistributedRuntime::from_settings(client_runtime.clone()).await?;
let namespace = client_distributed.namespace("ns123")?;
let component = namespace.component("comp123")?;
let client = component.endpoint("echo").client().await?;
client.wait_for_instances().await?;
println!("✓ Connected to endpoint, waiting for instances...");
let router =
PushRouter::<String, Annotated<String>>::from_client(client, Default::default())
.await?;
for i in 0..10 {
let msg = i.to_string().repeat(2000); // 2k bytes message
let mut stream = router.random(msg.clone().into()).await?;
while let Some(resp) = stream.next().await {
// Check if response matches the original message
if let Some(data) = &resp.data {
let is_same = data == &msg;
println!(
"Response {}: {} bytes, matches original: {}",
i,
data.len(),
is_same
);
}
}
}
println!("✓ Sent messages and received responses successfully");
println!("\n=== Waiting 500ms for metrics to update ===");
sleep(Duration::from_millis(500)).await;
println!("✓ Wait complete, getting final metrics...");
let final_drt_output = drt.metrics().prometheus_expfmt().unwrap();
println!("\n=== Final Prometheus DRT output ===");
println!("{}", final_drt_output);
let final_drt_nats_output = super::test_helpers::extract_nats_lines(&final_drt_output);
println!("\n=== Filtered NATS metrics from final DRT output ===");
for line in &final_drt_nats_output {
println!("{}", line);
}
let final_parsed_metrics: Vec<_> = super::test_helpers::extract_metrics(&final_drt_output)
.iter()
.filter_map(|line| super::test_helpers::parse_prometheus_metric(line.as_str()))
.collect();
let post_expected_metric_values = [
// DRT NATS metrics
(
build_component_metric_name(nats_client::CONNECTION_STATE),
1.0,
1.0,
), // Connected
(
build_component_metric_name(nats_client::CURRENT_CONNECTIONS),
1.0,
1.0,
), // 1 connection
(
build_component_metric_name(nats_client::IN_TOTAL_BYTES),
20000.0,
32000.0,
), // Wide range around 26117
(
build_component_metric_name(nats_client::IN_MESSAGES),
8.0,
20.0,
), // Wide range around 16
(
build_component_metric_name(nats_client::OUT_OVERHEAD_BYTES),
2500.0,
8000.0,
), // Wide range around 5524
(
build_component_metric_name(nats_client::OUT_MESSAGES),
8.0,
20.0,
), // Wide range around 16
// Component NATS metrics
(
build_component_metric_name(nats_service::PROCESSING_MS_AVG),
0.0,
1.0,
), // Low processing time
(
build_component_metric_name(nats_service::ERRORS_TOTAL),
0.0,
0.0,
), // No errors
(
build_component_metric_name(nats_service::REQUESTS_TOTAL),
0.0,
10.0,
), // NATS service stats requests (may differ from work handler count)
(
build_component_metric_name(nats_service::PROCESSING_MS_TOTAL),
0.0,
5.0,
), // Low total processing time
(
build_component_metric_name(nats_service::ACTIVE_SERVICES),
0.0,
2.0,
), // Service may not be fully active
(
build_component_metric_name(nats_service::ACTIVE_ENDPOINTS),
0.0,
2.0,
), // Endpoint may not be fully active
// Work handler metrics
(
build_component_metric_name(work_handler::REQUESTS_TOTAL),
10.0,
10.0,
), // 10 messages
(
build_component_metric_name(work_handler::REQUEST_BYTES_TOTAL),
21000.0,
26000.0,
), // ~75-125% of 23520
(
build_component_metric_name(work_handler::RESPONSE_BYTES_TOTAL),
18000.0,
23000.0,
), // ~75-125% of 20660
(
build_component_metric_name(work_handler::INFLIGHT_REQUESTS),
0.0,
1.0,
), // 0 or very low
// Histograms have _{count,sum} suffixes
(
format!(
"{}_count",
build_component_metric_name(work_handler::REQUEST_DURATION_SECONDS)
),
10.0,
10.0,
), // 10 messages
(
format!(
"{}_sum",
build_component_metric_name(work_handler::REQUEST_DURATION_SECONDS)
),
0.0001,
1.0,
), // Processing time sum (wide range)
];
println!("\n=== Checking Post-Activity All Metrics (NATS + Work Handler) ===");
for (metric_name, min_value, max_value) in &post_expected_metric_values {
let actual_value = final_parsed_metrics
.iter()
.find(|(name, _, _)| name == metric_name)
.map(|(_, _, value)| *value)
.unwrap_or_else(|| {
panic!(
"Could not find expected post-activity metric: {}",
metric_name
)
});
assert!(
actual_value >= *min_value && actual_value <= *max_value,
"Post-activity metric {} should be between {} and {}, but got {}",
metric_name,
min_value,
max_value,
actual_value
);
println!(
"✓ {}: {} (range: {} to {})",
metric_name, actual_value, min_value, max_value
);
}
println!("✓ All NATS and component metrics parsed successfully!");
println!("✓ Byte metrics verified to be >= 100 bytes!");
println!("✓ Post-activity metrics verified with higher thresholds!");
println!("✓ Work handler metrics reflect increased activity!");
Ok(())
}
}
...@@ -209,90 +209,6 @@ pub mod work_handler { ...@@ -209,90 +209,6 @@ pub mod work_handler {
} }
} }
/// NATS client metrics. DistributedRuntime contains a NATS client shared by all children)
pub mod nats_client {
/// Macro to generate NATS client metric names with the prefix
macro_rules! nats_client_name {
($name:expr) => {
concat!("nats_client_", $name)
};
}
/// Prefix for all NATS client metrics
pub const PREFIX: &str = nats_client_name!("");
/// Total number of bytes received by NATS client
pub const IN_TOTAL_BYTES: &str = nats_client_name!("in_total_bytes");
/// Total number of bytes sent by NATS client
pub const OUT_OVERHEAD_BYTES: &str = nats_client_name!("out_overhead_bytes");
/// Total number of messages received by NATS client
pub const IN_MESSAGES: &str = nats_client_name!("in_messages");
/// Total number of messages sent by NATS client
pub const OUT_MESSAGES: &str = nats_client_name!("out_messages");
/// Current number of active connections for NATS client
/// Note: Gauge metric measuring current connections, not cumulative total
pub const CURRENT_CONNECTIONS: &str = nats_client_name!("current_connections");
/// Current connection state of NATS client (0=disconnected, 1=connected, 2=reconnecting)
pub const CONNECTION_STATE: &str = nats_client_name!("connection_state");
}
/// NATS service metrics, from the $SRV.STATS.<service_name> requests on NATS server
pub mod nats_service {
/// Macro to generate NATS service metric names with the prefix
macro_rules! nats_service_name {
($name:expr) => {
concat!("nats_service_", $name)
};
}
/// Prefix for all NATS service metrics
pub const PREFIX: &str = nats_service_name!("");
/// Average processing time in milliseconds (maps to: average_processing_time in ms)
pub const PROCESSING_MS_AVG: &str = nats_service_name!("processing_ms_avg");
/// Total errors across all endpoints (maps to: num_errors)
pub const ERRORS_TOTAL: &str = nats_service_name!("errors_total");
/// Total requests across all endpoints (maps to: num_requests)
pub const REQUESTS_TOTAL: &str = nats_service_name!("requests_total");
/// Total processing time in milliseconds (maps to: processing_time in ms)
pub const PROCESSING_MS_TOTAL: &str = nats_service_name!("processing_ms_total");
/// Number of active services (derived from ServiceSet.services)
pub const ACTIVE_SERVICES: &str = nats_service_name!("active_services");
/// Number of active endpoints (derived from ServiceInfo.endpoints)
pub const ACTIVE_ENDPOINTS: &str = nats_service_name!("active_endpoints");
}
/// All NATS client Prometheus metric names as an array for iteration/validation
pub const DRT_NATS_METRICS: &[&str] = &[
nats_client::CONNECTION_STATE,
nats_client::CURRENT_CONNECTIONS,
nats_client::IN_TOTAL_BYTES,
nats_client::IN_MESSAGES,
nats_client::OUT_OVERHEAD_BYTES,
nats_client::OUT_MESSAGES,
];
/// All component service Prometheus metric names as an array for iteration/validation
/// (ordered to match NatsStatsMetrics fields)
pub const COMPONENT_NATS_METRICS: &[&str] = &[
nats_service::PROCESSING_MS_AVG, // maps to: average_processing_time (nanoseconds)
nats_service::ERRORS_TOTAL, // maps to: num_errors
nats_service::REQUESTS_TOTAL, // maps to: num_requests
nats_service::PROCESSING_MS_TOTAL, // maps to: processing_time (nanoseconds)
nats_service::ACTIVE_SERVICES, // derived from ServiceSet.services
nats_service::ACTIVE_ENDPOINTS, // derived from ServiceInfo.endpoints
];
/// Task tracker Prometheus metric name suffixes /// Task tracker Prometheus metric name suffixes
pub mod task_tracker { pub mod task_tracker {
/// Total number of tasks issued/submitted /// Total number of tasks issued/submitted
......
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
use crate::{ use crate::{
DistributedRuntime, DistributedRuntime,
component::Component, component::Component,
metrics::{MetricsHierarchy, prometheus_names, prometheus_names::nats_service}, metrics::{MetricsHierarchy, prometheus_names},
traits::*, traits::*,
transports::nats, transports::nats,
utils::stream, utils::stream,
...@@ -294,150 +294,3 @@ mod tests { ...@@ -294,150 +294,3 @@ mod tests {
assert_eq!(endpoints.len(), 2); assert_eq!(endpoints.len(), 2);
} }
} }
/// Prometheus metrics for component service statistics (ordered to match NatsStatsMetrics)
///
/// ⚠️ IMPORTANT: These Prometheus Gauges are COPIES of NATS data, not live references!
///
/// How it works:
/// 1. NATS provides source data via NatsStatsMetrics
/// 2. Metrics callbacks read current NATS values and update these Prometheus Gauges
/// 3. Prometheus scrapes these Gauge values (snapshots, not live data)
///
/// Flow: NATS Service → NatsStatsMetrics (Counters) → Metrics Callback → Prometheus Gauge
/// Note: These are snapshots updated when execute_prometheus_update_callbacks() is called.
#[derive(Debug, Clone)]
/// Prometheus metrics for NATS server components.
/// Note: Metrics with `_total` names use IntGauge because we copy counter values
/// from underlying services rather than incrementing directly.
pub struct ComponentNatsServerPrometheusMetrics {
/// Average processing time in milliseconds (maps to: average_processing_time)
pub service_processing_ms_avg: prometheus::Gauge,
/// Total errors across all endpoints (maps to: num_errors)
pub service_errors_total: prometheus::IntGauge,
/// Total requests across all endpoints (maps to: num_requests)
pub service_requests_total: prometheus::IntGauge,
/// Total processing time in milliseconds (maps to: processing_time)
pub service_processing_ms_total: prometheus::IntGauge,
/// Number of active services (derived from ServiceSet.services)
pub service_active_services: prometheus::IntGauge,
/// Number of active endpoints (derived from ServiceInfo.endpoints)
pub service_active_endpoints: prometheus::IntGauge,
}
impl ComponentNatsServerPrometheusMetrics {
/// Create new ComponentServiceMetrics using Component's DistributedRuntime's Prometheus constructors
pub fn new(component: &Component) -> Result<Self> {
let service_name = component.service_name();
// Build labels: service_name first, then component's labels
let mut labels_vec = vec![("service_name", service_name.as_str())];
// Add component's labels (convert from (String, String) to (&str, &str))
for (key, value) in component.labels() {
labels_vec.push((key.as_str(), value.as_str()));
}
let labels: &[(&str, &str)] = &labels_vec;
let service_processing_ms_avg = component.metrics().create_gauge(
nats_service::PROCESSING_MS_AVG,
"Average processing time across all component endpoints in milliseconds",
labels,
)?;
let service_errors_total = component.metrics().create_intgauge(
nats_service::ERRORS_TOTAL,
"Total number of errors across all component endpoints",
labels,
)?;
let service_requests_total = component.metrics().create_intgauge(
nats_service::REQUESTS_TOTAL,
"Total number of requests across all component endpoints",
labels,
)?;
let service_processing_ms_total = component.metrics().create_intgauge(
nats_service::PROCESSING_MS_TOTAL,
"Total processing time across all component endpoints in milliseconds",
labels,
)?;
let service_active_services = component.metrics().create_intgauge(
nats_service::ACTIVE_SERVICES,
"Number of active services in this component",
labels,
)?;
let service_active_endpoints = component.metrics().create_intgauge(
nats_service::ACTIVE_ENDPOINTS,
"Number of active endpoints across all services",
labels,
)?;
Ok(Self {
service_processing_ms_avg,
service_errors_total,
service_requests_total,
service_processing_ms_total,
service_active_services,
service_active_endpoints,
})
}
/// Update metrics from scraped ServiceSet data
pub fn update_from_service_set(&self, service_set: &ServiceSet) {
// Variables ordered to match NatsStatsMetrics fields
let mut processing_time_samples = 0u64; // for average_processing_time calculation
let mut total_errors = 0u64; // maps to: num_errors
let mut total_requests = 0u64; // maps to: num_requests
let mut total_processing_time_nanos = 0u64; // maps to: processing_time (nanoseconds from NATS)
let mut endpoint_count = 0u64; // for derived metrics
let service_count = service_set.services().len() as i64;
for service in service_set.services() {
for endpoint in &service.endpoints {
endpoint_count += 1;
if let Some(ref stats) = endpoint.data {
total_errors += stats.num_errors;
total_requests += stats.num_requests;
total_processing_time_nanos += stats.processing_time;
if stats.num_requests > 0 {
processing_time_samples += 1;
}
}
}
}
// Update metrics (ordered to match NatsStatsMetrics fields)
// Calculate average processing time in milliseconds (maps to: average_processing_time)
if processing_time_samples > 0 && total_requests > 0 {
let avg_time_nanos = total_processing_time_nanos as f64 / total_requests as f64;
let avg_time_ms = avg_time_nanos / 1_000_000.0; // Convert nanoseconds to milliseconds
self.service_processing_ms_avg.set(avg_time_ms);
} else {
self.service_processing_ms_avg.set(0.0);
}
self.service_errors_total.set(total_errors as i64); // maps to: num_errors
self.service_requests_total.set(total_requests as i64); // maps to: num_requests
self.service_processing_ms_total
.set((total_processing_time_nanos / 1_000_000) as i64); // maps to: processing_time (converted to milliseconds)
self.service_active_services.set(service_count); // derived from ServiceSet.services
self.service_active_endpoints.set(endpoint_count as i64); // derived from ServiceInfo.endpoints
}
/// Reset all metrics to zero. Useful when no data is available or to clear stale values.
pub fn reset_to_zeros(&self) {
self.service_processing_ms_avg.set(0.0);
self.service_errors_total.set(0);
self.service_requests_total.set(0);
self.service_processing_ms_total.set(0);
self.service_active_services.set(0);
self.service_active_endpoints.set(0);
}
}
...@@ -10,7 +10,6 @@ use crate::config::environment_names::runtime::canary as env_canary; ...@@ -10,7 +10,6 @@ use crate::config::environment_names::runtime::canary as env_canary;
use crate::config::environment_names::runtime::system as env_system; use crate::config::environment_names::runtime::system as env_system;
use crate::logging::make_request_span; use crate::logging::make_request_span;
use crate::metrics::MetricsHierarchy; use crate::metrics::MetricsHierarchy;
use crate::metrics::prometheus_names::{nats_client, nats_service};
use crate::traits::DistributedRuntimeProvider; use crate::traits::DistributedRuntimeProvider;
use axum::{ use axum::{
Router, Router,
...@@ -606,26 +605,17 @@ mod integration_tests { ...@@ -606,26 +605,17 @@ mod integration_tests {
let response = drt.metrics().prometheus_expfmt().unwrap(); let response = drt.metrics().prometheus_expfmt().unwrap();
println!("Full metrics response:\n{}", response); println!("Full metrics response:\n{}", response);
// Filter out NATS client metrics for comparison
let filtered_response: String = response
.lines()
.filter(|line| {
!line.contains(nats_client::PREFIX) && !line.contains(nats_service::PREFIX)
})
.collect::<Vec<_>>()
.join("\n");
// Check that uptime_seconds metric is present with correct namespace // Check that uptime_seconds metric is present with correct namespace
assert!( assert!(
filtered_response.contains("# HELP dynamo_component_uptime_seconds"), response.contains("# HELP dynamo_component_uptime_seconds"),
"Should contain uptime_seconds help text" "Should contain uptime_seconds help text"
); );
assert!( assert!(
filtered_response.contains("# TYPE dynamo_component_uptime_seconds gauge"), response.contains("# TYPE dynamo_component_uptime_seconds gauge"),
"Should contain uptime_seconds type" "Should contain uptime_seconds type"
); );
assert!( assert!(
filtered_response.contains("dynamo_component_uptime_seconds"), response.contains("dynamo_component_uptime_seconds"),
"Should contain uptime_seconds metric with correct namespace" "Should contain uptime_seconds metric with correct namespace"
); );
}) })
...@@ -918,7 +908,6 @@ mod integration_tests { ...@@ -918,7 +908,6 @@ mod integration_tests {
// Start the service and endpoint with a health check payload // Start the service and endpoint with a health check payload
// This will automatically register the endpoint for health monitoring // This will automatically register the endpoint for health monitoring
tokio::spawn(async move { tokio::spawn(async move {
component.add_stats_service().await.unwrap();
let _ = component.endpoint(ENDPOINT_NAME) let _ = component.endpoint(ENDPOINT_NAME)
.endpoint_builder() .endpoint_builder()
.handler(ingress) .handler(ingress)
......
...@@ -39,7 +39,6 @@ use url::Url; ...@@ -39,7 +39,6 @@ use url::Url;
use validator::{Validate, ValidationError}; use validator::{Validate, ValidationError};
use crate::config::environment_names::nats as env_nats; use crate::config::environment_names::nats as env_nats;
use crate::metrics::prometheus_names::nats_client as nats_metrics;
pub use crate::slug::Slug; pub use crate::slug::Slug;
use tracing as log; use tracing as log;
...@@ -887,110 +886,6 @@ impl EventPublisher for NatsQueue { ...@@ -887,110 +886,6 @@ impl EventPublisher for NatsQueue {
} }
} }
/// Prometheus metrics that mirror the NATS client statistics (in primitive types)
/// to be used for the System Status Server.
///
/// ⚠️ IMPORTANT: These Prometheus Gauges are COPIES of NATS client data, not live references!
///
/// How it works:
/// 1. NATS client provides source data via client.statistics() and connection_state()
/// 2. set_from_client_stats() reads current NATS values and updates these Prometheus Gauges
/// 3. Prometheus scrapes these Gauge values (snapshots, not live data)
///
/// Flow: NATS Client → Client Statistics → set_from_client_stats() → Prometheus Gauge
/// Note: These are snapshots updated when set_from_client_stats() is called.
#[derive(Debug, Clone)]
pub struct DRTNatsClientPrometheusMetrics {
nats_client: client::Client,
/// Number of bytes received (excluding protocol overhead)
pub in_bytes: IntGauge,
/// Number of bytes sent (excluding protocol overhead)
pub out_bytes: IntGauge,
/// Number of messages received
pub in_messages: IntGauge,
/// Number of messages sent
pub out_messages: IntGauge,
/// Number of times connection was established
pub connects: IntGauge,
/// Current connection state (0 = disconnected, 1 = connected, 2 = reconnecting)
pub connection_state: IntGauge,
}
impl DRTNatsClientPrometheusMetrics {
/// Create a new instance of NATS client metrics using a DistributedRuntime's Prometheus constructors
pub fn new(drt: &crate::DistributedRuntime, nats_client: client::Client) -> Result<Self> {
let metrics = drt.metrics();
let in_bytes = metrics.create_intgauge(
nats_metrics::IN_TOTAL_BYTES,
"Total number of bytes received by NATS client",
&[],
)?;
let out_bytes = metrics.create_intgauge(
nats_metrics::OUT_OVERHEAD_BYTES,
"Total number of bytes sent by NATS client",
&[],
)?;
let in_messages = metrics.create_intgauge(
nats_metrics::IN_MESSAGES,
"Total number of messages received by NATS client",
&[],
)?;
let out_messages = metrics.create_intgauge(
nats_metrics::OUT_MESSAGES,
"Total number of messages sent by NATS client",
&[],
)?;
let connects = metrics.create_intgauge(
nats_metrics::CURRENT_CONNECTIONS,
"Current number of active connections for NATS client",
&[],
)?;
let connection_state = metrics.create_intgauge(
nats_metrics::CONNECTION_STATE,
"Current connection state of NATS client (0=disconnected, 1=connected, 2=reconnecting)",
&[],
)?;
Ok(Self {
nats_client,
in_bytes,
out_bytes,
in_messages,
out_messages,
connects,
connection_state,
})
}
/// Copy statistics from the stored NATS client to these Prometheus metrics
pub fn set_from_client_stats(&self) {
let stats = self.nats_client.statistics();
// Get current values from the client statistics
let in_bytes = stats.in_bytes.load(Ordering::Relaxed);
let out_bytes = stats.out_bytes.load(Ordering::Relaxed);
let in_messages = stats.in_messages.load(Ordering::Relaxed);
let out_messages = stats.out_messages.load(Ordering::Relaxed);
let connects = stats.connects.load(Ordering::Relaxed);
// Get connection state
let connection_state = match self.nats_client.connection_state() {
State::Connected => 1,
// treat Disconnected and Pending as "down"
State::Disconnected | State::Pending => 0,
};
// Update Prometheus metrics
// Using gauges allows us to set absolute values directly
self.in_bytes.set(in_bytes as i64);
self.out_bytes.set(out_bytes as i64);
self.in_messages.set(in_messages as i64);
self.out_messages.set(out_messages as i64);
self.connects.set(connects as i64);
self.connection_state.set(connection_state);
}
}
/// The NATS subject / inbox to talk to an instance on. /// The NATS subject / inbox to talk to an instance on.
/// TODO: Do we need to sanitize the names? /// TODO: Do we need to sanitize the names?
pub fn instance_subject(endpoint_id: &EndpointId, instance_id: u64) -> String { pub fn instance_subject(endpoint_id: &EndpointId, instance_id: u64) -> String {
......
...@@ -317,9 +317,9 @@ class MetricsPayload(BasePayload): ...@@ -317,9 +317,9 @@ class MetricsPayload(BasePayload):
name=f"{prefix}_*", name=f"{prefix}_*",
pattern=lambda name: rf"^{prefix}_\w+", pattern=lambda name: rf"^{prefix}_\w+",
validator=lambda value: len(set(value)) validator=lambda value: len(set(value))
>= 23, # 80% of typical ~29 metrics (excluding _bucket) as of 2025-10-22 (but will grow) >= 11, # 80% of typical ~17 metrics (excluding _bucket) as of 2025-12-02
error_msg=lambda name, value: f"Expected at least 23 unique {prefix}_* metrics, but found only {len(set(value))}", error_msg=lambda name, value: f"Expected at least 11 unique {prefix}_* metrics, but found only {len(set(value))}",
success_msg=lambda name, value: f"SUCCESS: Found {len(set(value))} unique {prefix}_* metrics (minimum required: 23)", success_msg=lambda name, value: f"SUCCESS: Found {len(set(value))} unique {prefix}_* metrics (minimum required: 11)",
multiline=True, multiline=True,
), ),
MetricCheck( MetricCheck(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment