Unverified Commit acbdabc4 authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

feat(metrics): add NATS client metrics to prometheus_metrics_fmt (#2292)


Co-authored-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent 591f4d56
......@@ -31,7 +31,6 @@ use dynamo_llm::kv_router::scheduler::KVHitRateEvent;
use dynamo_llm::kv_router::KV_HIT_RATE_SUBJECT;
use dynamo_runtime::{
error, logging,
metrics::MetricsRegistry,
traits::events::{EventPublisher, EventSubscriber},
utils::{Duration, Instant},
DistributedRuntime, ErrorContext, Result, Runtime, Worker,
......@@ -137,14 +136,7 @@ async fn app(runtime: Runtime) -> Result<()> {
.await
.context("Unable to create unique instance of Count; possibly one already exists")?;
let target_component = {
let c = namespace.component(&config.component_name)?;
if let Some(ref model) = config.model_name {
c.add_labels(&[("model", model.as_str())])?
} else {
c
}
};
let target_component = namespace.component(&config.component_name)?;
let target_endpoint = target_component.endpoint(&config.endpoint_name);
let service_path = target_endpoint.path();
......
......@@ -485,21 +485,6 @@ impl Component {
Ok(())
})
}
/// Add constant labels to this component (for metrics). Returns a new Component with labels.
/// labels: list of (key, value) tuples.
fn add_labels(&self, labels: Vec<(String, String)>) -> PyResult<Component> {
use rs::metrics::MetricsRegistry as _;
let pairs: Vec<(&str, &str)> = labels
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect();
let inner = self.inner.clone().add_labels(&pairs).map_err(to_pyerr)?;
Ok(Component {
inner,
event_loop: self.event_loop.clone(),
})
}
}
#[pymethods]
......
......@@ -7,7 +7,6 @@ use anyhow::Context as _;
use tokio::sync::{mpsc::Receiver, Notify};
use dynamo_runtime::{
metrics::MetricsRegistry,
pipeline::{
network::egress::push_router::PushRouter, ManyOut, Operator, RouterMode, SegmentSource,
ServiceBackend, SingleIn, Source,
......@@ -170,8 +169,7 @@ impl ModelWatcher {
let component = self
.drt
.namespace(&endpoint_id.namespace)?
.component(&endpoint_id.component)
.and_then(|c| c.add_labels(&[("model", &model_entry.name)]))?;
.component(&endpoint_id.component)?;
let client = component.endpoint(&endpoint_id.name).client().await?;
let Some(etcd_client) = self.drt.etcd_client() else {
......
......@@ -27,7 +27,6 @@ use dynamo_runtime::{
component::Client,
distributed::DistributedConfig,
engine::{AsyncEngineStream, Data},
metrics::MetricsRegistry,
pipeline::{
Context, ManyOut, Operator, PushRouter, RouterMode, SegmentSource, ServiceBackend,
ServiceEngine, ServiceFrontend, SingleIn, Source,
......@@ -111,8 +110,7 @@ pub async fn prepare_engine(
let endpoint_id = local_model.endpoint_id();
let component = distributed_runtime
.namespace(&endpoint_id.namespace)?
.component(&endpoint_id.component)
.and_then(|c| c.add_labels(&[("model", card.slug().to_string().as_str())]))?;
.component(&endpoint_id.component)?;
let client = component.endpoint(&endpoint_id.name).client().await?;
......
......@@ -17,7 +17,6 @@ use crate::{
};
use dynamo_runtime::engine::AsyncEngineStream;
use dynamo_runtime::metrics::MetricsRegistry;
use dynamo_runtime::pipeline::{
network::Ingress, Context, ManyOut, Operator, SegmentSource, ServiceBackend, SingleIn, Source,
};
......@@ -33,25 +32,9 @@ pub async fn run(
let cancel_token = distributed_runtime.primary_token().clone();
let endpoint_id: EndpointId = path.parse()?;
let model_name = match &engine_config {
EngineConfig::StaticFull { model, .. } | EngineConfig::StaticCore { model, .. } => {
Some(model.service_name().to_string())
}
EngineConfig::StaticRemote(model) | EngineConfig::Dynamic(model) => {
Some(model.service_name().to_string())
}
};
let component = distributed_runtime
.namespace(&endpoint_id.namespace)?
.component(&endpoint_id.component)
.and_then(|c| {
if let Some(ref name) = model_name {
c.add_labels(&[("model", name.as_str())])
} else {
Ok(c)
}
})?;
.component(&endpoint_id.component)?;
let endpoint = component
.service_builder()
.create()
......
......@@ -14,8 +14,8 @@
// limitations under the License.
use dynamo_runtime::{
logging, metrics::MetricsRegistry, pipeline::PushRouter, protocols::annotated::Annotated,
stream::StreamExt, DistributedRuntime, Result, Runtime, Worker,
logging, pipeline::PushRouter, protocols::annotated::Annotated, stream::StreamExt,
DistributedRuntime, Result, Runtime, Worker,
};
use hello_world::DEFAULT_NAMESPACE;
......@@ -31,7 +31,6 @@ async fn app(runtime: Runtime) -> Result<()> {
let client = distributed
.namespace(DEFAULT_NAMESPACE)?
.component("backend")?
.add_labels(&[("model", "hello_world_model")])?
.endpoint("generate")
.client()
.await?;
......
......@@ -15,7 +15,6 @@
use dynamo_runtime::{
logging,
metrics::MetricsRegistry,
pipeline::{
async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut,
ResponseStream, SingleIn,
......@@ -70,7 +69,6 @@ async fn backend(runtime: DistributedRuntime) -> Result<()> {
runtime
.namespace(DEFAULT_NAMESPACE)?
.component("backend")?
.add_labels(&[("model", "hello_world_model")])?
.service_builder()
.create()
.await?
......
......@@ -17,8 +17,8 @@ use futures::StreamExt;
use service_metrics::DEFAULT_NAMESPACE;
use dynamo_runtime::{
logging, metrics::MetricsRegistry, pipeline::PushRouter, protocols::annotated::Annotated,
utils::Duration, DistributedRuntime, Result, Runtime, Worker,
logging, pipeline::PushRouter, protocols::annotated::Annotated, utils::Duration,
DistributedRuntime, Result, Runtime, Worker,
};
fn main() -> Result<()> {
......@@ -31,9 +31,7 @@ async fn app(runtime: Runtime) -> Result<()> {
let distributed = DistributedRuntime::from_settings(runtime.clone()).await?;
let namespace = distributed.namespace(DEFAULT_NAMESPACE)?;
let component = namespace
.component("backend")?
.add_labels(&[("model", "service_metrics_model")])?;
let component = namespace.component("backend")?;
let client = component.endpoint("generate").client().await?;
......
......@@ -17,7 +17,6 @@ use service_metrics::{MyStats, DEFAULT_NAMESPACE};
use dynamo_runtime::{
logging,
metrics::MetricsRegistry,
pipeline::{
async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut,
ResponseStream, SingleIn,
......@@ -72,7 +71,6 @@ async fn backend(runtime: DistributedRuntime) -> Result<()> {
runtime
.namespace(DEFAULT_NAMESPACE)?
.component("backend")?
.add_labels(&[("model", "service_metrics_model")])?
.service_builder()
.create()
.await?
......
......@@ -91,7 +91,6 @@ pub async fn backend(drt: DistributedRuntime, endpoint_name: Option<&str>) -> Re
let endpoint = drt
.namespace(DEFAULT_NAMESPACE)?
.component(DEFAULT_COMPONENT)?
.add_labels(&[("model", DEFAULT_MODEL_NAME)])?
.service_builder()
.create()
.await?
......
......@@ -30,7 +30,10 @@
//! TODO: Top-level Overview of Endpoints/Functions
use crate::{
config::HealthStatus, discovery::Lease, metrics::MetricsRegistry, service::ServiceSet,
config::HealthStatus,
discovery::Lease,
metrics::{prometheus_names, MetricsRegistry},
service::ServiceSet,
transports::etcd::EtcdPath,
};
......@@ -45,6 +48,7 @@ use super::{
use crate::pipeline::network::{ingress::push_endpoint::PushEndpoint, PushWorkHandler};
use crate::protocols::Endpoint as EndpointId;
use crate::service::ComponentNatsPrometheusMetrics;
use async_nats::{
rustls::quic,
service::{Service, ServiceExt},
......@@ -187,16 +191,6 @@ impl MetricsRegistry for Component {
]
.concat()
}
fn stored_labels(&self) -> Vec<(&str, &str)> {
let mut all_labels = self.namespace.stored_labels();
all_labels.extend(self.labels.iter().map(|(k, v)| (k.as_str(), v.as_str())));
all_labels
}
fn labels_mut(&mut self) -> &mut Vec<(String, String)> {
&mut self.labels
}
}
impl Component {
......@@ -262,6 +256,8 @@ impl Component {
Ok(out)
}
/// Scrape ServiceSet, which contains NATS stats as well as user defined stats
/// embedded in data field of ServiceInfo.
pub async fn scrape_stats(&self, timeout: Duration) -> Result<ServiceSet> {
let service_name = self.service_name();
let service_client = self.drt().service_client();
......@@ -270,6 +266,78 @@ impl Component {
.await
}
/// Add Prometheus metrics for this component's service stats.
///
/// Uses a channel to synchronize with the spawned async task, ensuring
/// metrics are updated before the callback returns.
pub fn add_metrics_callback(&self) -> Result<()> {
let component_metrics = ComponentNatsPrometheusMetrics::new(self)?;
let component_clone = self.clone();
let mut hierarchies = self.parent_hierarchy();
hierarchies.push(self.hierarchy());
debug_assert_eq!(
hierarchies.last().cloned().unwrap_or_default(),
self.service_name()
); // it happens that in component, hierarchy and service name are the same
// Register a metrics callback that scrapes component statistics
let metrics_callback = Arc::new(move || {
// Timeout for scraping metrics from components (in milliseconds)
// This value is also used by KV Router metrics aggregator (300ms) and other components
const METRICS_SCRAPE_TIMEOUT_MS: u64 = 300;
// Get the current Tokio runtime handle
let handle = tokio::runtime::Handle::try_current()
.map_err(|err| anyhow::anyhow!("No Tokio runtime handle available: {}", err))?;
let m = component_metrics.clone();
let c = component_clone.clone();
// Create a channel to synchronize with the spawned task
let (tx, rx) = std::sync::mpsc::channel::<anyhow::Result<()>>();
let timeout = std::time::Duration::from_millis(METRICS_SCRAPE_TIMEOUT_MS);
handle.spawn(async move {
let result = match c.scrape_stats(timeout).await {
Ok(service_set) => {
m.update_from_service_set(&service_set);
Ok(())
}
Err(err) => {
// Reset metrics on failure
m.reset_to_zeros();
Err(anyhow::anyhow!("Failed to scrape stats: {}", err))
}
};
// Send the result back to the waiting thread
// If send fails, the receiver has already given up waiting
let _ = tx.send(result);
});
// Wait for the spawned task to complete (with a timeout to prevent hanging)
// Add 100ms buffer to the scrape timeout to account for processing overhead
let recv_timeout = std::time::Duration::from_millis(METRICS_SCRAPE_TIMEOUT_MS + 100);
match rx.recv_timeout(recv_timeout) {
Ok(result) => result, // Return the actual result from scraping
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
component_metrics.reset_to_zeros();
Err(anyhow::anyhow!("Metrics collection timed out"))
}
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
component_metrics.reset_to_zeros();
Err(anyhow::anyhow!("Metrics collection task failed"))
}
}
});
self.drt()
.register_metrics_callback(hierarchies, metrics_callback);
Ok(())
}
/// TODO
///
/// This method will scrape the stats for all available services
......@@ -347,16 +415,6 @@ impl MetricsRegistry for Endpoint {
]
.concat()
}
fn stored_labels(&self) -> Vec<(&str, &str)> {
let mut all_labels = self.component.stored_labels();
all_labels.extend(self.labels.iter().map(|(k, v)| (k.as_str(), v.as_str())));
all_labels
}
fn labels_mut(&mut self) -> &mut Vec<(String, String)> {
&mut self.labels
}
}
impl Endpoint {
......@@ -520,11 +578,24 @@ impl Namespace {
/// Create a [`Component`] in the namespace who's endpoints can be discovered with etcd
pub fn component(&self, name: impl Into<String>) -> Result<Component> {
Ok(ComponentBuilder::from_runtime(self.runtime.clone())
let component = ComponentBuilder::from_runtime(self.runtime.clone())
.name(name)
.namespace(self.clone())
.is_static(self.is_static)
.build()?)
.build()?;
// Register the metrics callback for this component.
// If registration fails, log a warning but do not propagate the error,
// as metrics are not mission critical and should not block component creation.
if let Err(err) = component.add_metrics_callback() {
tracing::warn!(
"Failed to add metrics callback for component '{}': {}",
component.service_name(),
err
);
}
Ok(component)
}
/// Create a [`Namespace`] in the parent namespace
......
......@@ -84,19 +84,19 @@ impl MetricsRegistry for Namespace {
}
fn parent_hierarchy(&self) -> Vec<String> {
vec![self.drt().basename()]
}
fn stored_labels(&self) -> Vec<(&str, &str)> {
// Convert Vec<(String, String)> to Vec<(&str, &str)>
self.labels
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect()
}
fn labels_mut(&mut self) -> &mut Vec<(String, String)> {
&mut self.labels
// Build as: [ "" (DRT), non-empty parent basenames from root -> leaf ]
let mut names = vec![String::new()]; // Start with empty string for DRT
// Collect parent basenames from root to leaf
let parent_names: Vec<String> =
std::iter::successors(self.parent.as_deref(), |ns| ns.parent.as_deref())
.map(|ns| ns.basename())
.filter(|name| !name.is_empty())
.collect();
// Append parent names in reverse order (root to leaf)
names.extend(parent_names.into_iter().rev());
names
}
}
......
......@@ -14,13 +14,14 @@
// limitations under the License.
pub use crate::component::Component;
use crate::transports::nats::DRTNatsPrometheusMetrics;
use crate::{
component::{self, ComponentBuilder, Endpoint, InstanceSource, Namespace},
discovery::DiscoveryClient,
metrics::MetricsRegistry,
service::ServiceClient,
transports::{etcd, nats, tcp},
ErrorContext,
ErrorContext, RuntimeCallback,
};
use super::{error, Arc, DistributedRuntime, OnceCell, Result, Runtime, SystemHealth, Weak, OK};
......@@ -40,18 +41,6 @@ impl MetricsRegistry for DistributedRuntime {
fn parent_hierarchy(&self) -> Vec<String> {
vec![] // drt is the root, so no parent hierarchy
}
fn stored_labels(&self) -> Vec<(&str, &str)> {
// Convert Vec<(String, String)> to Vec<(&str, &str)>
self.labels
.iter()
.map(|(k, v)| (k.as_str(), v.as_str()))
.collect()
}
fn labels_mut(&mut self) -> &mut Vec<(String, String)> {
&mut self.labels
}
}
impl DistributedRuntime {
......@@ -88,6 +77,8 @@ impl DistributedRuntime {
live_endpoint_path,
)));
let nats_client_for_metrics = nats_client.clone();
let distributed_runtime = Self {
runtime,
etcd_client,
......@@ -97,14 +88,29 @@ impl DistributedRuntime {
component_registry: component::Registry::new(),
is_static,
instance_sources: Arc::new(Mutex::new(HashMap::new())),
prometheus_registries_by_prefix: Arc::new(std::sync::Mutex::new(HashMap::<
hierarchy_to_metricsregistry: Arc::new(std::sync::RwLock::new(HashMap::<
String,
prometheus::Registry,
crate::MetricsRegistryEntry,
>::new())),
system_health,
labels: Vec::new(),
};
let sys_nats_metrics = DRTNatsPrometheusMetrics::new(
&distributed_runtime,
nats_client_for_metrics.client().clone(),
)?;
let mut drt_hierarchies = distributed_runtime.parent_hierarchy();
drt_hierarchies.push(distributed_runtime.hierarchy());
// Register a callback to update NATS client metrics
let nats_metrics_callback = Arc::new({
let sys_nats_metrics_clone = sys_nats_metrics.clone();
move || {
sys_nats_metrics_clone.set_from_client_stats();
Ok(())
}
});
distributed_runtime.register_metrics_callback(drt_hierarchies, nats_metrics_callback);
// Start system status server if enabled
if let Some(cancel_token) = cancel_token {
let host = config.system_host.clone();
......@@ -240,6 +246,76 @@ impl DistributedRuntime {
pub fn instance_sources(&self) -> Arc<Mutex<HashMap<Endpoint, Weak<InstanceSource>>>> {
self.instance_sources.clone()
}
/// Add a Prometheus metric to a specific hierarchy's registry
pub fn add_prometheus_metric(
&self,
hierarchy: &str,
metric_name: &str,
prometheus_metric: Box<dyn prometheus::core::Collector>,
) -> anyhow::Result<()> {
let mut registries = self.hierarchy_to_metricsregistry.write().unwrap();
let entry = registries.entry(hierarchy.to_string()).or_default();
// If a metric with this name already exists for the hierarchy, warn and skip registration
if entry.has_metric_named(metric_name) {
tracing::warn!(
hierarchy = ?hierarchy,
metric_name = ?metric_name,
"Metric already exists in registry; skipping registration"
);
return Ok(());
}
// Try to register the metric and provide better error information
match entry.prometheus_registry.register(prometheus_metric) {
Ok(_) => Ok(()),
Err(e) => {
let error_msg = e.to_string();
tracing::error!(
hierarchy = ?hierarchy,
error = ?error_msg,
metric_name = ?metric_name,
"Metric registration failed"
);
Err(e.into())
}
}
}
/// Add a callback function to metrics registries for the given hierarchies
pub fn register_metrics_callback(&self, hierarchies: Vec<String>, callback: RuntimeCallback) {
let mut registries = self.hierarchy_to_metricsregistry.write().unwrap();
for hierarchy in hierarchies {
registries
.entry(hierarchy)
.or_default()
.add_callback(callback.clone());
}
}
/// Execute all callbacks for a given hierarchy key and return their results
pub fn execute_metrics_callbacks(&self, hierarchy: &str) -> Vec<anyhow::Result<()>> {
// Clone callbacks while holding read lock (fast operation)
let callbacks = {
let registries = self.hierarchy_to_metricsregistry.read().unwrap();
registries
.get(hierarchy)
.map(|entry| entry.runtime_callbacks.clone())
}; // Read lock released here
// Execute callbacks without holding the lock
match callbacks {
Some(callbacks) => callbacks.iter().map(|callback| callback()).collect(),
None => Vec::new(),
}
}
/// Get all registered hierarchy keys. Private because it is only used for testing.
fn get_registered_hierarchies(&self) -> Vec<String> {
let registries = self.hierarchy_to_metricsregistry.read().unwrap();
registries.keys().cloned().collect()
}
}
#[derive(Dissolve)]
......
......@@ -147,6 +147,70 @@ impl SystemHealth {
}
}
/// Type alias for runtime callback functions to reduce complexity
///
/// This type represents an Arc-wrapped callback function that can be:
/// - Shared efficiently across multiple threads and contexts
/// - Cloned without duplicating the underlying closure
/// - Used in generic contexts requiring 'static lifetime
///
/// The Arc wrapper is included in the type to make sharing explicit.
type RuntimeCallback = Arc<dyn Fn() -> anyhow::Result<()> + Send + Sync + 'static>;
/// Structure to hold Prometheus registries and associated callbacks for a given hierarchy
pub struct MetricsRegistryEntry {
/// The Prometheus registry for this prefix
pub prometheus_registry: prometheus::Registry,
/// List of function callbacks that receive a reference to any MetricsRegistry
pub runtime_callbacks: Vec<RuntimeCallback>,
}
impl MetricsRegistryEntry {
/// Create a new metrics registry entry with an empty registry and no callbacks
pub fn new() -> Self {
Self {
prometheus_registry: prometheus::Registry::new(),
runtime_callbacks: Vec::new(),
}
}
/// Add a callback function that receives a reference to any MetricsRegistry
pub fn add_callback(&mut self, callback: RuntimeCallback) {
self.runtime_callbacks.push(callback);
}
/// Execute all runtime callbacks and return their results
pub fn execute_callbacks(&self) -> Vec<anyhow::Result<()>> {
self.runtime_callbacks
.iter()
.map(|callback| callback())
.collect()
}
/// Returns true if a metric with the given name already exists in the Prometheus registry
pub fn has_metric_named(&self, metric_name: &str) -> bool {
self.prometheus_registry
.gather()
.iter()
.any(|mf| mf.name() == metric_name)
}
}
impl Default for MetricsRegistryEntry {
fn default() -> Self {
Self::new()
}
}
impl Clone for MetricsRegistryEntry {
fn clone(&self) -> Self {
Self {
prometheus_registry: self.prometheus_registry.clone(),
runtime_callbacks: Vec::new(), // Callbacks cannot be cloned, so we start with an empty list
}
}
}
/// Distributed [Runtime] which provides access to shared resources across the cluster, this includes
/// communication protocols and transports.
#[derive(Clone)]
......@@ -176,9 +240,7 @@ pub struct DistributedRuntime {
// Health Status
system_health: Arc<std::sync::Mutex<SystemHealth>>,
// This map associates metric prefixes with their corresponding Prometheus registries.
prometheus_registries_by_prefix: Arc<std::sync::Mutex<HashMap<String, prometheus::Registry>>>,
// Additional labels for metrics
labels: Vec<(String, String)>,
// This map associates metric prefixes with their corresponding Prometheus registries and callbacks.
// Uses RwLock for better concurrency - multiple threads can read (execute callbacks) simultaneously.
hierarchy_to_metricsregistry: Arc<std::sync::RwLock<HashMap<String, MetricsRegistryEntry>>>,
}
This diff is collapsed.
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Prometheus metric name constants
//!
//! This module provides centralized Prometheus metric name constants for various components
//! to ensure consistency and avoid duplication across the codebase.
/// Builds a full metric name by prepending the component prefix
pub fn build_metric_name(metric_name: &str) -> String {
format!("{}{}", name_prefix::COMPONENT, metric_name)
}
/// Metric name prefixes used across the metrics system
pub mod name_prefix {
/// Prefix for all Prometheus metric names.
pub const COMPONENT: &str = "dynamo_component_";
// TODO(keivenc): uncomment below for the frontend
// pub const FRONTEND: &str = "dynamo_frontend_";
}
/// Automatically inserted Prometheus label names used across the metrics system
pub mod labels {
/// Label for component identification
pub const COMPONENT: &str = "dynamo_component";
/// Label for namespace identification
pub const NAMESPACE: &str = "dynamo_namespace";
/// Label for endpoint identification
pub const ENDPOINT: &str = "dynamo_endpoint";
}
/// NATS Prometheus metric names
pub mod nats {
/// Prefix for all NATS client metrics
pub const PREFIX: &str = "nats_";
/// ===== DistributedRuntime metrics =====
/// Total number of bytes received by NATS client
pub const IN_TOTAL_BYTES: &str = "nats_in_total_bytes";
/// Total number of bytes sent by NATS client
pub const OUT_OVERHEAD_BYTES: &str = "nats_out_overhead_bytes";
/// Total number of messages received by NATS client
pub const IN_MESSAGES: &str = "nats_in_messages";
/// Total number of messages sent by NATS client
pub const OUT_MESSAGES: &str = "nats_out_messages";
/// Total number of connections established by NATS client
pub const CONNECTS: &str = "nats_connects";
/// Current connection state of NATS client (0=disconnected, 1=connected, 2=reconnecting)
pub const CONNECTION_STATE: &str = "nats_connection_state";
/// ===== Component metrics (ordered to match NatsStatsMetrics fields) =====
/// Average processing time in milliseconds (maps to: average_processing_time in ms)
pub const AVG_PROCESSING_MS: &str = "nats_avg_processing_time_ms";
/// Total errors across all endpoints (maps to: num_errors)
pub const TOTAL_ERRORS: &str = "nats_total_errors";
/// Total requests across all endpoints (maps to: num_requests)
pub const TOTAL_REQUESTS: &str = "nats_total_requests";
/// Total processing time in milliseconds (maps to: processing_time in ms)
pub const TOTAL_PROCESSING_MS: &str = "nats_total_processing_time_ms";
/// Number of active services (derived from ServiceSet.services)
pub const ACTIVE_SERVICES: &str = "nats_active_services";
/// Number of active endpoints (derived from ServiceInfo.endpoints)
pub const ACTIVE_ENDPOINTS: &str = "nats_active_endpoints";
}
/// All NATS client Prometheus metric names as an array for iteration/validation
pub const DRT_NATS_METRICS: &[&str] = &[
nats::CONNECTION_STATE,
nats::CONNECTS,
nats::IN_TOTAL_BYTES,
nats::IN_MESSAGES,
nats::OUT_OVERHEAD_BYTES,
nats::OUT_MESSAGES,
];
/// All component service Prometheus metric names as an array for iteration/validation
/// (ordered to match NatsStatsMetrics fields)
pub const COMPONENT_NATS_METRICS: &[&str] = &[
nats::AVG_PROCESSING_MS, // maps to: average_processing_time (nanoseconds)
nats::TOTAL_ERRORS, // maps to: num_errors
nats::TOTAL_REQUESTS, // maps to: num_requests
nats::TOTAL_PROCESSING_MS, // maps to: processing_time (nanoseconds)
nats::ACTIVE_SERVICES, // derived from ServiceSet.services
nats::ACTIVE_ENDPOINTS, // derived from ServiceInfo.endpoints
];
/// Work handler Prometheus metric names
pub mod work_handler {
/// Total number of requests processed by work handler
pub const REQUESTS_TOTAL: &str = "requests_total";
/// Total number of bytes received in requests by work handler
pub const REQUEST_BYTES_TOTAL: &str = "request_bytes_total";
/// Total number of bytes sent in responses by work handler
pub const RESPONSE_BYTES_TOTAL: &str = "response_bytes_total";
/// Number of requests currently being processed by work handler
pub const CONCURRENT_REQUESTS: &str = "concurrent_requests";
/// Time spent processing requests by work handler (histogram)
pub const REQUEST_DURATION_SECONDS: &str = "request_duration_seconds";
}
......@@ -19,13 +19,22 @@
// we will want to associate the components cancellation token with the
// component's "service state"
use crate::{error, transports::nats, utils::stream, Result};
use crate::{
component::Component,
error,
metrics::{prometheus_names, MetricsRegistry},
traits::*,
transports::nats,
utils::stream,
DistributedRuntime, Result,
};
use async_nats::Message;
use async_stream::try_stream;
use bytes::Bytes;
use derive_getters::Dissolve;
use futures::stream::{StreamExt, TryStreamExt};
use prometheus;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::time::Duration;
......@@ -39,11 +48,55 @@ impl ServiceClient {
}
}
/// ServiceSet contains a collection of services with their endpoints and metrics
///
/// Tree structure:
/// Structure:
/// - ServiceSet
/// - services: Vec<ServiceInfo>
/// - name: String
/// - id: String
/// - version: String
/// - started: String
/// - endpoints: Vec<EndpointInfo>
/// - name: String
/// - subject: String
/// - data: Option<NatsStatsMetrics>
/// - average_processing_time: f64
/// - last_error: String
/// - num_errors: u64
/// - num_requests: u64
/// - processing_time: u64
/// - queue_group: String
/// - data: serde_json::Value (custom stats)
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceSet {
services: Vec<ServiceInfo>,
}
/// This is a example JSON from `nats req '$SRV.STATS.dynamo_backend'`:
/// {
/// "type": "io.nats.micro.v1.stats_response",
/// "name": "dynamo_backend",
/// "id": "bdu7nA8tbhy9mEkxIWlkBA",
/// "version": "0.0.1",
/// "started": "2025-08-08T05:07:17.720783523Z",
/// "endpoints": [
/// {
/// "name": "dynamo_backend-generate-694d988806b92e39",
/// "subject": "dynamo_backend.generate-694d988806b92e39",
/// "num_requests": 0,
/// "num_errors": 0,
/// "processing_time": 0,
/// "average_processing_time": 0,
/// "last_error": "",
/// "data": {
/// "val": 10
/// },
/// "queue_group": "q"
/// }
/// ]
/// }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceInfo {
pub name: String,
......@@ -53,13 +106,15 @@ pub struct ServiceInfo {
pub endpoints: Vec<EndpointInfo>,
}
/// Each endpoint has name, subject, num_requests, num_errors, processing_time, average_processing_time, last_error, queue_group, and data
#[derive(Debug, Clone, Serialize, Deserialize, Dissolve)]
pub struct EndpointInfo {
pub name: String,
pub subject: String,
/// Extra fields that don't fit in EndpointInfo will be flattened into the Metrics struct.
#[serde(flatten)]
pub data: Option<Metrics>,
pub data: Option<NatsStatsMetrics>,
}
impl EndpointInfo {
......@@ -79,20 +134,21 @@ impl EndpointInfo {
// for easy deserialization. Ideally, this type already exists or can
// be exposed in the library somewhere.
/// Stats structure returned from NATS service API
/// https://github.com/nats-io/nats.rs/blob/main/async-nats/src/service/endpoint.rs
#[derive(Debug, Clone, Serialize, Deserialize, Dissolve)]
pub struct Metrics {
// Standard NATS Service API fields
pub average_processing_time: f64,
pub struct NatsStatsMetrics {
// Standard NATS Stats Service API fields from $SRV.STATS.<service_name> requests
pub average_processing_time: u64, // in nanoseconds according to nats-io
pub last_error: String,
pub num_errors: u64,
pub num_requests: u64,
pub processing_time: u64,
pub processing_time: u64, // in nanoseconds according to nats-io
pub queue_group: String,
// Field containing custom stats handler data
pub data: serde_json::Value,
}
impl Metrics {
impl NatsStatsMetrics {
pub fn decode<T: for<'de> Deserialize<'de>>(self) -> Result<T> {
serde_json::from_value(self.data).map_err(Into::into)
}
......@@ -154,6 +210,11 @@ impl ServiceSet {
.into_iter()
.flat_map(|s| s.endpoints.into_iter())
}
/// Get a reference to the services in this ServiceSet
pub fn services(&self) -> &[ServiceInfo] {
&self.services
}
}
#[cfg(test)]
......@@ -173,8 +234,8 @@ mod tests {
EndpointInfo {
name: "endpoint1".to_string(),
subject: "subject1".to_string(),
data: Some(Metrics {
average_processing_time: 0.1,
data: Some(NatsStatsMetrics {
average_processing_time: 100_000, // 0.1ms = 100,000 nanoseconds
last_error: "none".to_string(),
num_errors: 0,
num_requests: 10,
......@@ -186,8 +247,8 @@ mod tests {
EndpointInfo {
name: "endpoint2-foo".to_string(),
subject: "subject2".to_string(),
data: Some(Metrics {
average_processing_time: 0.1,
data: Some(NatsStatsMetrics {
average_processing_time: 100_000, // 0.1ms = 100,000 nanoseconds
last_error: "none".to_string(),
num_errors: 0,
num_requests: 10,
......@@ -207,8 +268,8 @@ mod tests {
EndpointInfo {
name: "endpoint1".to_string(),
subject: "subject1".to_string(),
data: Some(Metrics {
average_processing_time: 0.1,
data: Some(NatsStatsMetrics {
average_processing_time: 100_000, // 0.1ms = 100,000 nanoseconds
last_error: "none".to_string(),
num_errors: 0,
num_requests: 10,
......@@ -220,8 +281,8 @@ mod tests {
EndpointInfo {
name: "endpoint2-bar".to_string(),
subject: "subject2".to_string(),
data: Some(Metrics {
average_processing_time: 0.1,
data: Some(NatsStatsMetrics {
average_processing_time: 100_000, // 0.1ms = 100,000 nanoseconds
last_error: "none".to_string(),
num_errors: 0,
num_requests: 10,
......@@ -244,3 +305,135 @@ mod tests {
assert_eq!(endpoints.len(), 2);
}
}
/// Prometheus metrics for component service statistics (ordered to match NatsStatsMetrics)
///
/// ⚠️ IMPORTANT: These Prometheus Gauges are COPIES of NATS data, not live references!
///
/// How it works:
/// 1. NATS provides source data via NatsStatsMetrics
/// 2. Metrics callbacks read current NATS values and update these Prometheus Gauges
/// 3. Prometheus scrapes these Gauge values (snapshots, not live data)
///
/// Flow: NATS Service → NatsStatsMetrics (Counters) → Metrics Callback → Prometheus Gauge
/// Note: These are snapshots updated when execute_metrics_callbacks() is called.
#[derive(Debug, Clone)]
pub struct ComponentNatsPrometheusMetrics {
/// Average processing time in milliseconds (maps to: average_processing_time)
pub avg_processing_ms: prometheus::Gauge,
/// Total errors across all endpoints (maps to: num_errors)
pub total_errors: prometheus::IntGauge,
/// Total requests across all endpoints (maps to: num_requests)
pub total_requests: prometheus::IntGauge,
/// Total processing time in milliseconds (maps to: processing_time)
pub total_processing_ms: prometheus::IntGauge,
/// Number of active services (derived from ServiceSet.services)
pub active_services: prometheus::IntGauge,
/// Number of active endpoints (derived from ServiceInfo.endpoints)
pub active_endpoints: prometheus::IntGauge,
}
impl ComponentNatsPrometheusMetrics {
/// Create new ComponentServiceMetrics using Component's DistributedRuntime's Prometheus constructors
pub fn new(component: &Component) -> Result<Self> {
let avg_processing_ms = component.create_gauge(
prometheus_names::nats::AVG_PROCESSING_MS,
"Average processing time across all component endpoints in milliseconds",
&[],
)?;
let total_errors = component.create_intgauge(
prometheus_names::nats::TOTAL_ERRORS,
"Total number of errors across all component endpoints",
&[],
)?;
let total_requests = component.create_intgauge(
prometheus_names::nats::TOTAL_REQUESTS,
"Total number of requests across all component endpoints",
&[],
)?;
let total_processing_ms = component.create_intgauge(
prometheus_names::nats::TOTAL_PROCESSING_MS,
"Total processing time across all component endpoints in milliseconds",
&[],
)?;
let active_services = component.create_intgauge(
prometheus_names::nats::ACTIVE_SERVICES,
"Number of active services in this component",
&[],
)?;
let active_endpoints = component.create_intgauge(
prometheus_names::nats::ACTIVE_ENDPOINTS,
"Number of active endpoints across all services",
&[],
)?;
Ok(Self {
avg_processing_ms,
total_errors,
total_requests,
total_processing_ms,
active_services,
active_endpoints,
})
}
/// Update metrics from scraped ServiceSet data
pub fn update_from_service_set(&self, service_set: &ServiceSet) {
// Variables ordered to match NatsStatsMetrics fields
let mut processing_time_samples = 0u64; // for average_processing_time calculation
let mut total_errors = 0u64; // maps to: num_errors
let mut total_requests = 0u64; // maps to: num_requests
let mut total_processing_time_nanos = 0u64; // maps to: processing_time (nanoseconds from NATS)
let mut endpoint_count = 0u64; // for derived metrics
let service_count = service_set.services().len() as i64;
for service in service_set.services() {
for endpoint in &service.endpoints {
endpoint_count += 1;
if let Some(ref stats) = endpoint.data {
total_errors += stats.num_errors;
total_requests += stats.num_requests;
total_processing_time_nanos += stats.processing_time;
if stats.num_requests > 0 {
processing_time_samples += 1;
}
}
}
}
// Update metrics (ordered to match NatsStatsMetrics fields)
// Calculate average processing time in milliseconds (maps to: average_processing_time)
if processing_time_samples > 0 && total_requests > 0 {
let avg_time_nanos = total_processing_time_nanos as f64 / total_requests as f64;
let avg_time_ms = avg_time_nanos / 1_000_000.0; // Convert nanoseconds to milliseconds
self.avg_processing_ms.set(avg_time_ms);
} else {
self.avg_processing_ms.set(0.0);
}
self.total_errors.set(total_errors as i64); // maps to: num_errors
self.total_requests.set(total_requests as i64); // maps to: num_requests
self.total_processing_ms
.set((total_processing_time_nanos / 1_000_000) as i64); // maps to: processing_time (converted to milliseconds)
self.active_services.set(service_count); // derived from ServiceSet.services
self.active_endpoints.set(endpoint_count as i64); // derived from ServiceInfo.endpoints
}
/// Reset all metrics to zero. Useful when no data is available or to clear stale values.
pub fn reset_to_zeros(&self) {
self.avg_processing_ms.set(0.0);
self.total_errors.set(0);
self.total_requests.set(0);
self.total_processing_ms.set(0);
self.active_services.set(0);
self.active_endpoints.set(0);
}
}
......@@ -209,6 +209,7 @@ pub async fn spawn_system_status_server(
tracing::error!("System status server error: {}", e);
}
});
Ok((actual_address, handle))
}
......@@ -254,7 +255,18 @@ async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
// Update the uptime gauge with current value
state.update_uptime_gauge();
// Get metrics from the registry
// Execute all the callbacks starting at the DistributedRuntime level
assert!(state.drt().basename() == "");
let callback_results = state
.drt()
.execute_metrics_callbacks(&state.drt().hierarchy());
for result in callback_results {
if let Err(e) = result {
tracing::error!("Error executing metrics callback: {}", e);
}
}
// Get all metrics from DistributedRuntime (top-level)
match state.drt().prometheus_metrics_fmt() {
Ok(response) => (StatusCode::OK, response),
Err(e) => {
......@@ -341,12 +353,20 @@ mod tests {
let response = runtime_metrics.drt().prometheus_metrics_fmt().unwrap();
println!("Full metrics response:\n{}", response);
// Filter out NATS client metrics for comparison
use crate::metrics::prometheus_names::nats as nats_metrics;
let filtered_response: String = response
.lines()
.filter(|line| !line.contains(nats_metrics::PREFIX))
.collect::<Vec<_>>()
.join("\n");
let expected = "\
# HELP dynamo_component_dynamo_uptime_seconds Total uptime of the DistributedRuntime in seconds
# TYPE dynamo_component_dynamo_uptime_seconds gauge
dynamo_component_dynamo_uptime_seconds 42
";
assert_eq!(response, expected);
dynamo_component_dynamo_uptime_seconds 42";
assert_eq!(filtered_response, expected);
}
#[cfg(feature = "integration")]
......
......@@ -632,15 +632,11 @@ mod tests {
.id();
// Create the key
let result = client
.kv_create(key.to_string(), value.to_vec(), Some(lease_id))
.await;
let result = client.kv_create(key, value.to_vec(), Some(lease_id)).await;
assert!(result.is_ok(), "");
// Try to create the key again - this should fail
let result = client
.kv_create(key.to_string(), value.to_vec(), Some(lease_id))
.await;
let result = client.kv_create(key, value.to_vec(), Some(lease_id)).await;
assert!(result.is_err());
// Create or validate should succeed as the values match
......
......@@ -28,20 +28,23 @@
//! - `NATS_AUTH_CREDENTIALS_FILE`: the path to the credentials file
//!
//! Note: `NATS_AUTH_USERNAME` and `NATS_AUTH_PASSWORD` must be used together.
use crate::Result;
use crate::{metrics::MetricsRegistry, Result};
use async_nats::connection::State;
use async_nats::{client, jetstream, Subscriber};
use bytes::Bytes;
use derive_builder::Builder;
use futures::{StreamExt, TryStreamExt};
use prometheus::{Counter, Gauge, Histogram, HistogramOpts, IntCounter, IntGauge, Opts, Registry};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::Ordering;
use tokio::fs::File as TokioFile;
use tokio::io::AsyncRead;
use tokio::time;
use url::Url;
use validator::{Validate, ValidationError};
use crate::metrics::prometheus_names::nats as nats_metrics;
pub use crate::slug::Slug;
use tracing as log;
......@@ -504,6 +507,109 @@ impl NatsQueue {
}
}
/// Prometheus metrics that mirror the NATS client statistics (in primitive types)
/// to be used for the System Status Server.
///
/// ⚠️ IMPORTANT: These Prometheus Gauges are COPIES of NATS client data, not live references!
///
/// How it works:
/// 1. NATS client provides source data via client.statistics() and connection_state()
/// 2. set_from_client_stats() reads current NATS values and updates these Prometheus Gauges
/// 3. Prometheus scrapes these Gauge values (snapshots, not live data)
///
/// Flow: NATS Client → Client Statistics → set_from_client_stats() → Prometheus Gauge
/// Note: These are snapshots updated when set_from_client_stats() is called.
#[derive(Debug, Clone)]
pub struct DRTNatsPrometheusMetrics {
nats_client: client::Client,
/// Number of bytes received (excluding protocol overhead)
pub in_bytes: IntGauge,
/// Number of bytes sent (excluding protocol overhead)
pub out_bytes: IntGauge,
/// Number of messages received
pub in_messages: IntGauge,
/// Number of messages sent
pub out_messages: IntGauge,
/// Number of times connection was established
pub connects: IntGauge,
/// Current connection state (0 = disconnected, 1 = connected, 2 = reconnecting)
pub connection_state: IntGauge,
}
impl DRTNatsPrometheusMetrics {
/// Create a new instance of NATS client metrics using a DistributedRuntime's Prometheus constructors
pub fn new(drt: &crate::DistributedRuntime, nats_client: client::Client) -> Result<Self> {
let in_bytes = drt.create_intgauge(
nats_metrics::IN_TOTAL_BYTES,
"Total number of bytes received by NATS client",
&[],
)?;
let out_bytes = drt.create_intgauge(
nats_metrics::OUT_OVERHEAD_BYTES,
"Total number of bytes sent by NATS client",
&[],
)?;
let in_messages = drt.create_intgauge(
nats_metrics::IN_MESSAGES,
"Total number of messages received by NATS client",
&[],
)?;
let out_messages = drt.create_intgauge(
nats_metrics::OUT_MESSAGES,
"Total number of messages sent by NATS client",
&[],
)?;
let connects = drt.create_intgauge(
nats_metrics::CONNECTS,
"Total number of connections established by NATS client",
&[],
)?;
let connection_state = drt.create_intgauge(
nats_metrics::CONNECTION_STATE,
"Current connection state of NATS client (0=disconnected, 1=connected, 2=reconnecting)",
&[],
)?;
Ok(Self {
nats_client,
in_bytes,
out_bytes,
in_messages,
out_messages,
connects,
connection_state,
})
}
/// Copy statistics from the stored NATS client to these Prometheus metrics
pub fn set_from_client_stats(&self) {
let stats = self.nats_client.statistics();
// Get current values from the client statistics
let in_bytes = stats.in_bytes.load(Ordering::Relaxed);
let out_bytes = stats.out_bytes.load(Ordering::Relaxed);
let in_messages = stats.in_messages.load(Ordering::Relaxed);
let out_messages = stats.out_messages.load(Ordering::Relaxed);
let connects = stats.connects.load(Ordering::Relaxed);
// Get connection state
let connection_state = match self.nats_client.connection_state() {
State::Connected => 1,
// treat Disconnected and Pending as "down"
State::Disconnected | State::Pending => 0,
};
// Update Prometheus metrics
// Using gauges allows us to set absolute values directly
self.in_bytes.set(in_bytes as i64);
self.out_bytes.set(out_bytes as i64);
self.in_messages.set(in_messages as i64);
self.out_messages.set(out_messages as i64);
self.connects.set(connects as i64);
self.connection_state.set(connection_state);
}
}
#[cfg(test)]
mod tests {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment