"examples/llm/vscode:/vscode.git/clone" did not exist on "9cd9993d7b8b82dd3a49f17481e527c411e7e19a"
Unverified Commit cbe0b177 authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

refactor: redesign the metrics API from Trait to composition to make the code...


refactor: redesign the metrics API from Trait to composition to make the code cleaner and easier to understand (#3687)
Signed-off-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent eb8d07cb
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
<!--
SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
# Dynamo MetricsRegistry
......@@ -25,11 +13,11 @@ Dynamo provides built-in metrics capabilities through the `MetricsRegistry` trai
Dynamo automatically exposes metrics with the `dynamo_` name prefixes. It also adds the following labels `dynamo_namespace`, `dynamo_component`, and `dynamo_endpoint` to indicate which component is providing the metric.
**Frontend Metrics**: When using Dynamo HTTP Frontend (`--framework VLLM` or `--framework TRTLLM`), these metrics are automatically exposed with the `dynamo_frontend_*` prefix and include `model` labels containing the model name. These cover request handling, token processing, and latency measurements. See the [Available Metrics section](../../deploy/metrics/README.md#available-metrics) for the complete list of frontend metrics.
**Frontend Metrics**: When using Dynamo HTTP Frontend (`--framework VLLM` or `--framework TRTLLM`), these metrics are automatically exposed with the `dynamo_frontend_*` prefix and include `model` labels containing the model name. These cover request handling, token processing, and latency measurements. See [prometheus-grafana.md](prometheus-grafana.md#available-metrics) for the complete list of frontend metrics.
**Component Metrics**: The core Dynamo backend system automatically exposes metrics with the `dynamo_component_*` prefix for all components that use the `DistributedRuntime` framework. These include request counts, processing times, byte transfers, and system uptime metrics. See the [Available Metrics section](../../deploy/metrics/README.md#available-metrics) for the complete list of component metrics.
**Component Metrics**: The core Dynamo backend system automatically exposes metrics with the `dynamo_component_*` prefix for all components that use the `DistributedRuntime` framework. These include request counts, processing times, byte transfers, and system uptime metrics. See [prometheus-grafana.md](prometheus-grafana.md#available-metrics) for the complete list of component metrics.
**Specialized Component Metrics**: Components can also expose additional metrics specific to their functionality. For example, a `preprocessor` component exposes metrics with the `dynamo_preprocessor_*` prefix. See the [Available Metrics section](../../deploy/metrics/README.md#available-metrics) for details on specialized component metrics.
**Specialized Component Metrics**: Components can also expose additional metrics specific to their functionality. For example, a `preprocessor` component exposes metrics with the `dynamo_preprocessor_*` prefix. See [prometheus-grafana.md](prometheus-grafana.md#available-metrics) for details on specialized component metrics.
**Kubernetes Integration**: For comprehensive Kubernetes deployment and monitoring setup, see the [Kubernetes Metrics Guide](../kubernetes/observability/metrics.md). This includes Prometheus Operator setup, metrics collection configuration, and visualization in Grafana.
......@@ -47,7 +35,7 @@ This hierarchical structure allows you to create metrics at the appropriate leve
## Getting Started
For a complete setup guide including Docker Compose configuration, Prometheus setup, and Grafana dashboards, see the [Getting Started section](../../deploy/metrics/README.md#getting-started) in the deploy metrics documentation.
For a complete setup guide including Docker Compose configuration, Prometheus setup, and Grafana dashboards, see the [Getting Started section](prometheus-grafana.md#getting-started) in the Prometheus and Grafana guide.
The quick start includes:
- Docker Compose setup for Prometheus and Grafana
......@@ -57,7 +45,7 @@ The quick start includes:
## Implementation Examples
See [Implementation Examples](../../deploy/metrics/README.md#implementation-examples) for detailed examples of creating metrics at different hierarchy levels and using dynamic labels.
Examples of creating metrics at different hierarchy levels and using dynamic labels are included in this document below.
### Grafana Dashboards
......@@ -90,12 +78,22 @@ graph TD
The metrics system includes a pre-configured Grafana dashboard for visualizing service metrics:
![Grafana Dynamo Dashboard](../../deploy/metrics/grafana-dynamo-composite.png)
![Grafana Dynamo Dashboard](./grafana-dynamo-composite.png)
## Detailed Setup Guide
For complete setup instructions including Docker Compose, Prometheus configuration, and Grafana dashboards, see:
```{toctree}
:hidden:
prometheus-grafana
```
- [Prometheus and Grafana Setup Guide](prometheus-grafana.md)
## Related Documentation
- [Distributed Runtime Architecture](../design_docs/distributed_runtime.md)
- [Dynamo Architecture Overview](../design_docs/architecture.md)
- [Backend Guide](../development/backend-guide.md)
- [Metrics Implementation Examples](../../deploy/metrics/README.md#implementation-examples)
- [Complete Metrics Setup Guide](../../deploy/metrics/README.md)
\ No newline at end of file
......@@ -3,7 +3,7 @@
This directory contains configuration for visualizing metrics from the metrics aggregation service using Prometheus and Grafana.
> [!NOTE]
> For detailed information about Dynamo's metrics system, including hierarchical metrics, automatic labeling, and usage examples, see the [Metrics Guide](../../docs/observability/metrics.md).
> For detailed information about Dynamo's metrics system, including hierarchical metrics, automatic labeling, and usage examples, see the [Metrics Guide](./metrics.md).
## Overview
......@@ -165,14 +165,14 @@ $ python -m dynamo.vllm --model Qwen/Qwen3-0.6B \
### Required Files
The following configuration files should be present in this directory:
- [docker-compose.yml](../docker-compose.yml): Defines the Prometheus and Grafana services
- [prometheus.yml](./prometheus.yml): Contains Prometheus scraping configuration
- [grafana-datasources.yml](./grafana-datasources.yml): Contains Grafana datasource configuration
- [grafana_dashboards/grafana-dashboard-providers.yml](./grafana_dashboards/grafana-dashboard-providers.yml): Contains Grafana dashboard provider configuration
- [grafana_dashboards/grafana-dynamo-dashboard.json](./grafana_dashboards/grafana-dynamo-dashboard.json): A general Dynamo Dashboard for both SW and HW metrics.
- [grafana_dashboards/grafana-dcgm-metrics.json](./grafana_dashboards/grafana-dcgm-metrics.json): Contains Grafana dashboard configuration for DCGM GPU metrics
- [grafana_dashboards/grafana-kvbm-dashboard.json](./grafana_dashboards/grafana-kvbm-dashboard.json): Contains Grafana dashboard configuration for KVBM metrics
The following configuration files are located in the `deploy/metrics/` directory:
- [docker-compose.yml](../../deploy/docker-compose.yml): Defines the Prometheus and Grafana services
- [prometheus.yml](../../deploy/metrics/prometheus.yml): Contains Prometheus scraping configuration
- [grafana-datasources.yml](../../deploy/metrics/grafana-datasources.yml): Contains Grafana datasource configuration
- [grafana_dashboards/grafana-dashboard-providers.yml](../../deploy/metrics/grafana_dashboards/grafana-dashboard-providers.yml): Contains Grafana dashboard provider configuration
- [grafana_dashboards/grafana-dynamo-dashboard.json](../../deploy/metrics/grafana_dashboards/grafana-dynamo-dashboard.json): A general Dynamo Dashboard for both SW and HW metrics.
- [grafana_dashboards/grafana-dcgm-metrics.json](../../deploy/metrics/grafana_dashboards/grafana-dcgm-metrics.json): Contains Grafana dashboard configuration for DCGM GPU metrics
- [grafana_dashboards/grafana-kvbm-dashboard.json](../../deploy/metrics/grafana_dashboards/grafana-kvbm-dashboard.json): Contains Grafana dashboard configuration for KVBM metrics
### Metric Name Constants
......@@ -241,7 +241,7 @@ This centralized approach ensures all Dynamo components use consistent, valid Pr
#### Prometheus
The Prometheus configuration is specified in [prometheus.yml](./prometheus.yml). This file is set up to collect metrics from the metrics aggregation service endpoint.
The Prometheus configuration is specified in [prometheus.yml](../../deploy/metrics/prometheus.yml). This file is set up to collect metrics from the metrics aggregation service endpoint.
Please be aware that you might need to modify the target settings to align with your specific host configuration and network environment.
......@@ -288,13 +288,13 @@ let component = namespace.component("my_component")?;
let endpoint = component.endpoint("my_endpoint")?;
// Create endpoint-level counters (this is a Prometheus Counter type)
let requests_total = endpoint.create_counter(
let requests_total = endpoint.metrics().create_counter(
"requests_total",
"Total requests across all namespaces",
&[]
)?;
let active_connections = endpoint.create_gauge(
let active_connections = endpoint.metrics().create_gauge(
"active_connections",
"Number of active client connections",
&[]
......@@ -307,17 +307,17 @@ let active_connections = endpoint.create_gauge(
let namespace = runtime.namespace("my_model")?;
// Namespace-scoped metrics
let model_requests = namespace.create_counter(
let model_requests = namespace.metrics().create_counter(
"model_requests",
"Requests for this specific model",
&[]
)?;
let model_latency = namespace.create_histogram(
let model_latency = namespace.metrics().create_histogram(
"model_latency_seconds",
"Model inference latency",
&[],
&[0.001, 0.01, 0.1, 1.0, 10.0]
Some(vec![0.001, 0.01, 0.1, 1.0, 10.0])
)?;
```
......@@ -327,13 +327,13 @@ let model_latency = namespace.create_histogram(
let component = namespace.component("backend")?;
// Component-specific metrics
let backend_requests = component.create_counter(
let backend_requests = component.metrics().create_counter(
"backend_requests",
"Requests handled by this backend component",
&[]
)?;
let gpu_memory_usage = component.create_gauge(
let gpu_memory_usage = component.metrics().create_gauge(
"gpu_memory_bytes",
"GPU memory usage in bytes",
&[]
......@@ -346,17 +346,17 @@ let gpu_memory_usage = component.create_gauge(
let endpoint = component.endpoint("generate")?;
// Endpoint-specific metrics
let generate_requests = endpoint.create_counter(
let generate_requests = endpoint.metrics().create_counter(
"generate_requests",
"Generate endpoint requests",
&[]
)?;
let generate_latency = endpoint.create_histogram(
let generate_latency = endpoint.metrics().create_histogram(
"generate_latency_seconds",
"Generate endpoint latency",
&[],
&[0.001, 0.01, 0.1, 1.0, 10.0]
Some(vec![0.001, 0.01, 0.1, 1.0, 10.0])
)?;
```
......@@ -366,10 +366,11 @@ Use vector metrics when you need to track metrics with different label values:
```rust
// Counter with labels
let requests_by_model = endpoint.create_counter_vec(
let requests_by_model = endpoint.metrics().create_countervec(
"requests_by_model",
"Requests by model type",
&["model_type", "model_size"]
&["model_type", "model_size"],
&[] // no constant labels
)?;
// Increment with specific labels
......@@ -377,10 +378,11 @@ requests_by_model.with_label_values(&["llama", "7b"]).inc();
requests_by_model.with_label_values(&["gpt", "13b"]).inc();
// Gauge with labels
let memory_by_gpu = component.create_gauge_vec(
let memory_by_gpu = component.metrics().create_gaugevec(
"gpu_memory_bytes",
"GPU memory usage by device",
&["gpu_id", "memory_type"]
&["gpu_id", "memory_type"],
&[] // no constant labels
)?;
memory_by_gpu.with_label_values(&["0", "allocated"]).set(8192.0);
......@@ -392,11 +394,11 @@ memory_by_gpu.with_label_values(&["0", "cached"]).set(4096.0);
Histograms are useful for measuring distributions of values like latency:
```rust
let latency_histogram = endpoint.create_histogram(
let latency_histogram = endpoint.metrics().create_histogram(
"request_latency_seconds",
"Request latency distribution",
&[],
&[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
Some(vec![0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0])
)?;
// Record latency values
......@@ -429,7 +431,7 @@ counter.inc();
#### After (Dynamo MetricsRegistry)
```rust
let counter = endpoint.create_counter(
let counter = endpoint.metrics().create_counter(
"my_counter",
"My custom counter",
&[]
......@@ -438,10 +440,10 @@ let counter = endpoint.create_counter(
counter.inc();
```
**Note:** The metric is automatically registered when created via the endpoint's `create_counter` factory method.
**Note:** The metric is automatically registered when created via the endpoint's `metrics().create_counter()` factory method.
**Benefits of Dynamo's approach:**
- **Automatic registration**: Metrics created via endpoint's `create_*` factory methods are automatically registered with the system
- **Automatic registration**: Metrics created via endpoint's `metrics().create_*()` factory methods are automatically registered with the system
- Automatic labeling with namespace, component, and endpoint information
- Consistent metric naming with `dynamo_` prefix
- Built-in HTTP metrics endpoint when enabled with `DYN_SYSTEM_ENABLED=true`
......@@ -454,11 +456,11 @@ counter.inc();
```rust
// Define custom buckets for your use case
let custom_buckets = vec![0.001, 0.01, 0.1, 1.0, 10.0];
let latency = endpoint.create_histogram(
let latency = endpoint.metrics().create_histogram(
"api_latency_seconds",
"API latency in seconds",
&[],
&custom_buckets
Some(custom_buckets)
)?;
```
......@@ -466,7 +468,7 @@ let latency = endpoint.create_histogram(
```rust
// Aggregate metrics across multiple endpoints
let requests_total = namespace.create_counter(
let requests_total = namespace.metrics().create_counter(
"requests_total",
"Total requests across all endpoints",
&[]
......
......@@ -92,7 +92,7 @@ When you need to add or modify metrics in Method 1 (ForwardPassMetrics Pub/Sub v
// ... existing gauges ...
// Manually create and register new Prometheus gauge
let new_metric_gauge = component.create_gauge(
let new_metric_gauge = component.metrics().create_gauge(
"new_metric_name",
"Description of new metric",
&[], // labels
......@@ -345,7 +345,7 @@ graph TD
end
PY -->|endpoint.metrics.create_intgauge| PM
PM -->|endpoint.create_intgauge| EP
PM -->|endpoint.metrics.create_intgauge| EP
EP -->|create & register| PROM
PM -->|wrap & return| MT
MT -->|return to Python| PY
......
......@@ -652,28 +652,28 @@ impl Histogram {
#[pyclass]
#[derive(Clone)]
pub struct RuntimeMetrics {
metricsregistry: Arc<dyn rs::metrics::MetricsRegistry>,
hierarchy: Arc<dyn rs::metrics::MetricsHierarchy>,
}
impl RuntimeMetrics {
/// Create from Endpoint
pub fn from_endpoint(endpoint: dynamo_runtime::component::Endpoint) -> Self {
Self {
metricsregistry: Arc::new(endpoint),
hierarchy: Arc::new(endpoint),
}
}
/// Create from Component
pub fn from_component(component: dynamo_runtime::component::Component) -> Self {
Self {
metricsregistry: Arc::new(component),
hierarchy: Arc::new(component),
}
}
/// Create from Namespace
pub fn from_namespace(namespace: dynamo_runtime::component::Namespace) -> Self {
Self {
metricsregistry: Arc::new(namespace),
hierarchy: Arc::new(namespace),
}
}
......@@ -690,20 +690,15 @@ impl RuntimeMetrics {
names.iter().map(|s| s.as_str()).collect()
}
/// Generic helper to register metrics callbacks for any type implementing MetricsRegistry
/// Generic helper to register metrics callbacks for any type implementing MetricsHierarchy
/// This allows Endpoint, Component, and Namespace to share the same callback registration logic
pub fn register_callback_for<T>(registry_item: &T, callback: PyObject) -> PyResult<()>
where
T: rs::metrics::MetricsRegistry + rs::traits::DistributedRuntimeProvider + ?Sized,
T: rs::metrics::MetricsHierarchy + ?Sized,
{
let hierarchy = registry_item.hierarchy();
// Store the callback in the DRT's metrics callback registry using the registry_item's hierarchy
// TODO: rename this to register_callback, once we move the the MetricsRegistry trait
// out of the runtime, and make it into a composed module.
registry_item.drt().register_prometheus_update_callback(
vec![hierarchy.clone()],
Arc::new(move || {
// Get the metrics registry from the hierarchy and register the callback directly
let metrics_registry = registry_item.get_metrics_registry();
metrics_registry.add_update_callback(Arc::new(move || {
// Execute the Python callback in the Python event loop
Python::with_gil(|py| {
if let Err(e) = callback.call0(py) {
......@@ -711,8 +706,7 @@ impl RuntimeMetrics {
}
});
Ok(())
}),
);
}));
Ok(())
}
......@@ -723,19 +717,15 @@ impl RuntimeMetrics {
/// Register a Python callback to be invoked before metrics are scraped
/// This callback will be called for this endpoint's metrics hierarchy
fn register_callback(&self, callback: PyObject, _py: Python) -> PyResult<()> {
Self::register_callback_for(self.metricsregistry.as_ref(), callback)
Self::register_callback_for(self.hierarchy.as_ref(), callback)
}
/// Register a Python callback that returns Prometheus exposition text
/// The returned text will be appended to the /metrics endpoint output
/// The callback should return a string in Prometheus text exposition format
fn register_prometheus_expfmt_callback(&self, callback: PyObject, _py: Python) -> PyResult<()> {
let hierarchy = self.metricsregistry.hierarchy();
// Store the callback in the DRT's metrics exposition text callback registry
self.metricsregistry.drt().register_prometheus_expfmt_callback(
vec![hierarchy.clone()],
Arc::new(move || {
// Create the callback once (Arc allows sharing across registries)
let callback_arc = Arc::new(move || {
// Execute the Python callback in the Python event loop
Python::with_gil(|py| {
match callback.call0(py) {
......@@ -744,7 +734,10 @@ impl RuntimeMetrics {
match result.extract::<String>(py) {
Ok(text) => Ok(text),
Err(e) => {
tracing::error!("Metrics exposition text callback must return a string: {}", e);
tracing::error!(
"Metrics exposition text callback must return a string: {}",
e
);
Ok(String::new())
}
}
......@@ -755,8 +748,21 @@ impl RuntimeMetrics {
}
}
})
}),
);
});
// Register the callback at this hierarchy level
self.hierarchy
.get_metrics_registry()
.add_expfmt_callback(callback_arc.clone());
// Also register at all parent hierarchy levels so the callback is accessible
// when prometheus_expfmt() is called on any parent (e.g., DRT)
let parents = self.hierarchy.parent_hierarchies();
for parent in parents.iter() {
parent
.get_metrics_registry()
.add_expfmt_callback(callback_arc.clone());
}
Ok(())
}
......@@ -774,9 +780,14 @@ impl RuntimeMetrics {
py: Python,
) -> PyResult<Py<Counter>> {
let labels_vec = Self::convert_py_to_rust_labels(&labels);
let counter = self
.metricsregistry
.create_counter(&name, &description, &labels_vec)
let counter: prometheus::Counter = rs::metrics::create_metric(
self.hierarchy.as_ref(),
&name,
&description,
&labels_vec,
None,
None,
)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
let metric = Counter::from_prometheus(counter);
......@@ -795,9 +806,14 @@ impl RuntimeMetrics {
) -> PyResult<Py<CounterVec>> {
let label_names_str = Self::convert_py_to_rust_label_names(&label_names);
let const_labels_vec = Self::convert_py_to_rust_labels(&const_labels);
let counter_vec = self
.metricsregistry
.create_countervec(&name, &description, &label_names_str, &const_labels_vec)
let counter_vec: prometheus::CounterVec = rs::metrics::create_metric(
self.hierarchy.as_ref(),
&name,
&description,
&const_labels_vec,
None,
Some(&label_names_str),
)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
let metric = CounterVec::from_prometheus(counter_vec);
......@@ -815,9 +831,14 @@ impl RuntimeMetrics {
) -> PyResult<Py<Gauge>> {
let labels_vec = Self::convert_py_to_rust_labels(&labels);
let gauge = self
.metricsregistry
.create_gauge(&name, &description, &labels_vec)
let gauge: prometheus::Gauge = rs::metrics::create_metric(
self.hierarchy.as_ref(),
&name,
&description,
&labels_vec,
None,
None,
)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
let metric = Gauge::from_prometheus(gauge);
......@@ -836,9 +857,14 @@ impl RuntimeMetrics {
) -> PyResult<Py<GaugeVec>> {
let label_names_str = Self::convert_py_to_rust_label_names(&label_names);
let const_labels_vec = Self::convert_py_to_rust_labels(&const_labels);
let gauge_vec = self
.metricsregistry
.create_gaugevec(&name, &description, &label_names_str, &const_labels_vec)
let gauge_vec: prometheus::GaugeVec = rs::metrics::create_metric(
self.hierarchy.as_ref(),
&name,
&description,
&const_labels_vec,
None,
Some(&label_names_str),
)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
let metric = GaugeVec::from_prometheus(gauge_vec);
......@@ -856,9 +882,14 @@ impl RuntimeMetrics {
) -> PyResult<Py<Histogram>> {
let labels_vec = Self::convert_py_to_rust_labels(&labels);
let histogram = self
.metricsregistry
.create_histogram(&name, &description, &labels_vec, None)
let histogram: prometheus::Histogram = rs::metrics::create_metric(
self.hierarchy.as_ref(),
&name,
&description,
&labels_vec,
None,
None,
)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
let metric = Histogram::from_prometheus(histogram);
......@@ -876,9 +907,14 @@ impl RuntimeMetrics {
) -> PyResult<Py<IntCounter>> {
let labels_vec = Self::convert_py_to_rust_labels(&labels);
let counter = self
.metricsregistry
.create_intcounter(&name, &description, &labels_vec)
let counter: prometheus::IntCounter = rs::metrics::create_metric(
self.hierarchy.as_ref(),
&name,
&description,
&labels_vec,
None,
None,
)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
let metric = IntCounter::from_prometheus(counter);
......@@ -897,9 +933,14 @@ impl RuntimeMetrics {
) -> PyResult<Py<IntCounterVec>> {
let label_names_str = Self::convert_py_to_rust_label_names(&label_names);
let const_labels_vec = Self::convert_py_to_rust_labels(&const_labels);
let counter_vec = self
.metricsregistry
.create_intcountervec(&name, &description, &label_names_str, &const_labels_vec)
let counter_vec: prometheus::IntCounterVec = rs::metrics::create_metric(
self.hierarchy.as_ref(),
&name,
&description,
&const_labels_vec,
None,
Some(&label_names_str),
)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
let metric = IntCounterVec::from_prometheus(counter_vec);
......@@ -917,9 +958,14 @@ impl RuntimeMetrics {
) -> PyResult<Py<IntGauge>> {
let labels_vec = Self::convert_py_to_rust_labels(&labels);
let gauge = self
.metricsregistry
.create_intgauge(&name, &description, &labels_vec)
let gauge: prometheus::IntGauge = rs::metrics::create_metric(
self.hierarchy.as_ref(),
&name,
&description,
&labels_vec,
None,
None,
)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
let metric = IntGauge::from_prometheus(gauge);
......@@ -938,9 +984,14 @@ impl RuntimeMetrics {
) -> PyResult<Py<IntGaugeVec>> {
let label_names_str = Self::convert_py_to_rust_label_names(&label_names);
let const_labels_vec = Self::convert_py_to_rust_labels(&const_labels);
let gauge_vec = self
.metricsregistry
.create_intgaugevec(&name, &description, &label_names_str, &const_labels_vec)
let gauge_vec: prometheus::IntGaugeVec = rs::metrics::create_metric(
self.hierarchy.as_ref(),
&name,
&description,
&const_labels_vec,
None,
Some(&label_names_str),
)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
let metric = IntGaugeVec::from_prometheus(gauge_vec);
......
......@@ -35,7 +35,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use dynamo_runtime::{
component::Component,
metrics::{MetricsRegistry, prometheus_names::kvrouter},
metrics::{MetricsHierarchy, prometheus_names::kvrouter},
};
use prometheus::{IntCounterVec, Opts};
use serde::{Deserialize, Serialize};
......@@ -589,7 +589,7 @@ impl KvIndexerMetrics {
/// KV_INDEXER_METRICS to avoid duplicate registration issues.
pub fn from_component(component: &Component) -> Arc<Self> {
KV_INDEXER_METRICS.get_or_init(|| {
match component.create_intcountervec(
match component.metrics().create_intcountervec(
kvrouter::KV_CACHE_EVENTS_APPLIED,
"Total number of KV cache events applied to index",
&["event_type", "status"],
......
......@@ -7,7 +7,7 @@ use crate::kv_router::{
protocols::*,
scoring::LoadEvent,
};
use dynamo_runtime::metrics::{MetricsRegistry, prometheus_names::kvstats};
use dynamo_runtime::metrics::{MetricsHierarchy, prometheus_names::kvstats};
use dynamo_runtime::traits::{DistributedRuntimeProvider, events::EventPublisher};
use dynamo_runtime::{
Result,
......@@ -700,25 +700,25 @@ struct KvStatsPrometheusGauges {
impl KvStatsPrometheusGauges {
/// Create a new KvStatsPrometheusGauges instance with all metrics registered
fn new(component: &Component) -> Result<Self> {
let kv_active_blocks_gauge = component.create_gauge(
let kv_active_blocks_gauge = component.metrics().create_gauge(
kvstats::ACTIVE_BLOCKS,
"Number of active KV cache blocks currently in use",
&[],
)?;
let kv_total_blocks_gauge = component.create_gauge(
let kv_total_blocks_gauge = component.metrics().create_gauge(
kvstats::TOTAL_BLOCKS,
"Total number of KV cache blocks available",
&[],
)?;
let gpu_cache_usage_gauge = component.create_gauge(
let gpu_cache_usage_gauge = component.metrics().create_gauge(
kvstats::GPU_CACHE_USAGE_PERCENT,
"GPU cache usage as a percentage (0.0-1.0)",
&[],
)?;
let gpu_prefix_cache_hit_rate_gauge = component.create_gauge(
let gpu_prefix_cache_hit_rate_gauge = component.metrics().create_gauge(
kvstats::GPU_PREFIX_CACHE_HIT_RATE,
"GPU prefix cache hit rate as a percentage (0.0-1.0)",
&[],
......@@ -1333,7 +1333,6 @@ mod test_integration_publisher {
#[ignore] // Mark as ignored as requested, because CI's integrations still don't have NATS
async fn test_kvstats_prometheus_gauge_updates() {
use crate::kv_router::publisher::kvstats;
use dynamo_runtime::metrics::MetricsRegistry;
// Test that publish() updates Prometheus gauges correctly using real Component
let publisher = WorkerMetricsPublisher::new().unwrap();
......@@ -1388,7 +1387,7 @@ mod test_integration_publisher {
// Test 4: Verify metrics are properly registered in the component's registry
// Component implements MetricsRegistry trait which provides prometheus_expfmt()
let prometheus_output = component.prometheus_expfmt().unwrap();
let prometheus_output = component.metrics().prometheus_expfmt().unwrap();
// Verify metric names are present
assert!(prometheus_output.contains(kvstats::ACTIVE_BLOCKS));
......
......@@ -3,7 +3,7 @@
use dynamo_runtime::{
DistributedRuntime, Result,
metrics::MetricsRegistry,
metrics::MetricsHierarchy,
pipeline::{
AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, ResponseStream, SingleIn,
async_trait, network::Ingress,
......@@ -33,7 +33,7 @@ pub struct MySystemStatsMetrics {
impl MySystemStatsMetrics {
pub fn from_endpoint(endpoint: &dynamo_runtime::component::Endpoint) -> anyhow::Result<Self> {
let data_bytes_processed = endpoint.create_intcounter(
let data_bytes_processed = endpoint.metrics().create_intcounter(
"my_custom_bytes_processed_total",
"Example of a custom metric. Total number of data bytes processed by system handler",
&[],
......
......@@ -34,7 +34,7 @@ use std::fmt;
use crate::{
config::HealthStatus,
discovery::Lease,
metrics::{MetricsRegistry, prometheus_names},
metrics::{MetricsHierarchy, MetricsRegistry, prometheus_names},
service::ServiceSet,
transports::etcd::{ETCD_ROOT_PATH, EtcdPath},
};
......@@ -170,6 +170,10 @@ pub struct Component {
// A static component's endpoints cannot be discovered via etcd, they are
// fixed at startup time.
is_static: bool,
/// This hierarchy's own metrics registry
#[builder(default = "crate::MetricsRegistry::new()")]
metrics_registry: crate::MetricsRegistry,
}
impl Hash for Component {
......@@ -208,17 +212,25 @@ impl RuntimeProvider for Component {
}
}
impl MetricsRegistry for Component {
impl MetricsHierarchy for Component {
fn basename(&self) -> String {
self.name.clone()
}
fn parent_hierarchy(&self) -> Vec<String> {
[
self.namespace.parent_hierarchy(),
vec![self.namespace.basename()],
]
.concat()
fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
let mut parents = vec![];
// Get all ancestors of namespace (DRT, parent namespaces, etc.)
parents.extend(self.namespace.parent_hierarchies());
// Add namespace itself
parents.push(&self.namespace as &dyn MetricsHierarchy);
parents
}
fn get_metrics_registry(&self) -> &MetricsRegistry {
&self.metrics_registry
}
}
......@@ -262,6 +274,7 @@ impl Component {
name: endpoint.into(),
is_static: self.is_static,
labels: Vec::new(),
metrics_registry: crate::MetricsRegistry::new(),
}
}
......@@ -312,15 +325,6 @@ impl Component {
let component_metrics = ComponentNatsServerPrometheusMetrics::new(self)?;
let component_clone = self.clone();
let mut hierarchies = self.parent_hierarchy();
hierarchies.push(self.hierarchy());
debug_assert!(
hierarchies
.last()
.map(|x| x.as_str())
.unwrap_or_default()
.eq_ignore_ascii_case(&self.service_name())
); // it happens that in component, hierarchy and service name are the same
// Start a background task that scrapes stats every 5 seconds
let m = component_metrics.clone();
......@@ -434,6 +438,9 @@ pub struct Endpoint {
/// Additional labels for metrics
labels: Vec<(String, String)>,
/// This hierarchy's own metrics registry
metrics_registry: crate::MetricsRegistry,
}
impl Hash for Endpoint {
......@@ -466,17 +473,25 @@ impl RuntimeProvider for Endpoint {
}
}
impl MetricsRegistry for Endpoint {
impl MetricsHierarchy for Endpoint {
fn basename(&self) -> String {
self.name.clone()
}
fn parent_hierarchy(&self) -> Vec<String> {
[
self.component.parent_hierarchy(),
vec![self.component.basename()],
]
.concat()
fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
let mut parents = vec![];
// Get all ancestors of component (DRT, Namespace, etc.)
parents.extend(self.component.parent_hierarchies());
// Add component itself
parents.push(&self.component as &dyn MetricsHierarchy);
parents
}
fn get_metrics_registry(&self) -> &MetricsRegistry {
&self.metrics_registry
}
}
......@@ -603,6 +618,10 @@ pub struct Namespace {
/// Additional labels for metrics
#[builder(default = "Vec::new()")]
labels: Vec<(String, String)>,
/// This hierarchy's own metrics registry
#[builder(default = "crate::MetricsRegistry::new()")]
metrics_registry: crate::MetricsRegistry,
}
impl DistributedRuntimeProvider for Namespace {
......
......@@ -7,7 +7,7 @@ use futures::stream::StreamExt;
use futures::{Stream, TryStreamExt};
use super::*;
use crate::metrics::MetricsRegistry;
use crate::metrics::{MetricsHierarchy, MetricsRegistry};
use crate::traits::events::{EventPublisher, EventSubscriber};
#[async_trait]
......@@ -68,25 +68,31 @@ impl EventSubscriber for Namespace {
}
}
impl MetricsRegistry for Namespace {
impl MetricsHierarchy for Namespace {
fn basename(&self) -> String {
self.name.clone()
}
fn parent_hierarchy(&self) -> Vec<String> {
// Build as: [ "" (DRT), non-empty parent basenames from root -> leaf ]
let mut names = vec![String::new()]; // Start with empty string for DRT
fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
let mut parents = vec![];
// Collect parent basenames from root to leaf
let parent_names: Vec<String> =
std::iter::successors(self.parent.as_deref(), |ns| ns.parent.as_deref())
.map(|ns| ns.basename())
.filter(|name| !name.is_empty())
.collect();
// Walk up the namespace parent chain (grandparents to immediate parent)
let parent_chain: Vec<&Namespace> =
std::iter::successors(self.parent.as_deref(), |ns| ns.parent.as_deref()).collect();
// Append parent names in reverse order (root to leaf)
names.extend(parent_names.into_iter().rev());
names
// Add DRT first (root)
parents.push(&*self.runtime as &dyn MetricsHierarchy);
// Then add parent namespaces in reverse order (root -> leaf)
for parent_ns in parent_chain.iter().rev() {
parents.push(*parent_ns as &dyn MetricsHierarchy);
}
parents
}
fn get_metrics_registry(&self) -> &MetricsRegistry {
&self.metrics_registry
}
}
......
......@@ -7,10 +7,11 @@ use crate::storage::key_value_store::{
};
use crate::transports::nats::DRTNatsClientPrometheusMetrics;
use crate::{
ErrorContext, PrometheusUpdateCallback,
ErrorContext,
component::{self, ComponentBuilder, Endpoint, InstanceSource, Namespace},
discovery::DiscoveryClient,
metrics::MetricsRegistry,
metrics::PrometheusUpdateCallback,
metrics::{MetricsHierarchy, MetricsRegistry},
service::ServiceClient,
transports::{etcd, nats, tcp},
};
......@@ -25,13 +26,17 @@ use std::collections::HashMap;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
impl MetricsRegistry for DistributedRuntime {
impl MetricsHierarchy for DistributedRuntime {
fn basename(&self) -> String {
"".to_string() // drt has no basename. Basename only begins with the Namespace.
}
fn parent_hierarchy(&self) -> Vec<String> {
vec![] // drt is the root, so no parent hierarchy
fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
vec![] // drt is the root, so no parent hierarchies
}
fn get_metrics_registry(&self) -> &MetricsRegistry {
&self.metrics_registry
}
}
......@@ -89,10 +94,7 @@ impl DistributedRuntime {
component_registry: component::Registry::new(),
is_static,
instance_sources: Arc::new(Mutex::new(HashMap::new())),
hierarchy_to_metricsregistry: Arc::new(std::sync::RwLock::new(HashMap::<
String,
crate::MetricsRegistryEntry,
>::new())),
metrics_registry: crate::MetricsRegistry::new(),
system_health,
};
......@@ -101,9 +103,7 @@ impl DistributedRuntime {
&distributed_runtime,
nats_client_for_metrics.client().clone(),
)?;
let mut drt_hierarchies = distributed_runtime.parent_hierarchy();
drt_hierarchies.push(distributed_runtime.hierarchy());
// Register a callback to update NATS client metrics
// Register a callback to update NATS client metrics on the DRT's metrics registry
let nats_client_callback = Arc::new({
let nats_client_clone = nats_client_metrics.clone();
move || {
......@@ -112,7 +112,8 @@ impl DistributedRuntime {
}
});
distributed_runtime
.register_prometheus_update_callback(drt_hierarchies, nats_client_callback);
.metrics_registry
.add_update_callback(nats_client_callback);
}
// Initialize the uptime gauge in SystemHealth
......@@ -301,78 +302,6 @@ impl DistributedRuntime {
pub fn instance_sources(&self) -> Arc<Mutex<HashMap<Endpoint, Weak<InstanceSource>>>> {
self.instance_sources.clone()
}
/// Add a Prometheus metric to a specific hierarchy's registry. Note that it is possible
/// to register the same metric name multiple times, as long as the labels are different.
pub fn add_prometheus_metric(
&self,
hierarchy: &str,
prometheus_metric: Box<dyn prometheus::core::Collector>,
) -> anyhow::Result<()> {
let mut registries = self.hierarchy_to_metricsregistry.write().unwrap();
let entry = registries.entry(hierarchy.to_string()).or_default();
// Try to register the metric
entry
.prometheus_registry
.register(prometheus_metric)
.map_err(|e| e.into())
}
/// Add a Prometheus update callback to the given hierarchies
/// TODO: rename this to register_callback, once we move the the MetricsRegistry trait
/// out of the runtime, and make it into a composed module.
pub fn register_prometheus_update_callback(
&self,
hierarchies: Vec<String>,
callback: PrometheusUpdateCallback,
) {
let mut registries = self.hierarchy_to_metricsregistry.write().unwrap();
for hierarchy in &hierarchies {
registries
.entry(hierarchy.clone())
.or_default()
.add_prometheus_update_callback(callback.clone());
}
}
/// Execute all Prometheus update callbacks for a given hierarchy and return their results
pub fn execute_prometheus_update_callbacks(&self, hierarchy: &str) -> Vec<anyhow::Result<()>> {
// Clone callbacks while holding read lock (fast operation)
let callbacks = {
let registries = self.hierarchy_to_metricsregistry.read().unwrap();
registries
.get(hierarchy)
.map(|entry| entry.prometheus_update_callbacks.clone())
}; // Read lock released here
// Execute callbacks without holding the lock
match callbacks {
Some(callbacks) => callbacks.iter().map(|callback| callback()).collect(),
None => Vec::new(),
}
}
/// Add a Prometheus exposition text callback that returns Prometheus text for the given hierarchies
pub fn register_prometheus_expfmt_callback(
&self,
hierarchies: Vec<String>,
callback: crate::PrometheusExpositionFormatCallback,
) {
let mut registries = self.hierarchy_to_metricsregistry.write().unwrap();
for hierarchy in &hierarchies {
registries
.entry(hierarchy.clone())
.or_default()
.add_prometheus_expfmt_callback(callback.clone());
}
}
/// Get all registered hierarchy keys. Private because it is only used for testing.
fn get_registered_hierarchies(&self) -> Vec<String> {
let registries = self.hierarchy_to_metricsregistry.read().unwrap();
registries.keys().cloned().collect()
}
}
#[derive(Dissolve)]
......
......@@ -47,6 +47,7 @@ pub mod worker;
pub mod distributed;
pub use distributed::distributed_test_utils;
pub use futures::stream;
pub use metrics::MetricsRegistry;
pub use system_health::{HealthCheckTarget, SystemHealth};
pub use tokio_util::sync::CancellationToken;
pub use worker::Worker;
......@@ -81,104 +82,6 @@ pub struct Runtime {
block_in_place_permits: Option<Arc<tokio::sync::Semaphore>>,
}
/// Type alias for runtime callback functions to reduce complexity
///
/// This type represents an Arc-wrapped callback function that can be:
/// - Shared efficiently across multiple threads and contexts
/// - Cloned without duplicating the underlying closure
/// - Used in generic contexts requiring 'static lifetime
///
/// The Arc wrapper is included in the type to make sharing explicit.
type PrometheusUpdateCallback = Arc<dyn Fn() -> anyhow::Result<()> + Send + Sync + 'static>;
/// Type alias for exposition text callback functions that return Prometheus text
type PrometheusExpositionFormatCallback =
Arc<dyn Fn() -> anyhow::Result<String> + Send + Sync + 'static>;
/// Structure to hold Prometheus registries and associated callbacks for a given hierarchy
pub struct MetricsRegistryEntry {
/// The Prometheus registry for this prefix
pub prometheus_registry: prometheus::Registry,
/// List of update callbacks invoked before metrics are scraped
pub prometheus_update_callbacks: Vec<PrometheusUpdateCallback>,
/// List of callbacks that return Prometheus exposition text to be appended to metrics output
pub prometheus_expfmt_callbacks: Vec<PrometheusExpositionFormatCallback>,
}
impl MetricsRegistryEntry {
/// Create a new metrics registry entry with an empty registry and no callbacks
pub fn new() -> Self {
Self {
prometheus_registry: prometheus::Registry::new(),
prometheus_update_callbacks: Vec::new(),
prometheus_expfmt_callbacks: Vec::new(),
}
}
/// Add a callback function that receives a reference to any MetricsRegistry
pub fn add_prometheus_update_callback(&mut self, callback: PrometheusUpdateCallback) {
self.prometheus_update_callbacks.push(callback);
}
/// Add an exposition text callback that returns Prometheus text
pub fn add_prometheus_expfmt_callback(&mut self, callback: PrometheusExpositionFormatCallback) {
self.prometheus_expfmt_callbacks.push(callback);
}
/// Execute all update callbacks and return their results
pub fn execute_prometheus_update_callbacks(&self) -> Vec<anyhow::Result<()>> {
self.prometheus_update_callbacks
.iter()
.map(|callback| callback())
.collect()
}
/// Execute all exposition text callbacks and return their concatenated text
pub fn execute_prometheus_expfmt_callbacks(&self) -> String {
let mut result = String::new();
for callback in &self.prometheus_expfmt_callbacks {
match callback() {
Ok(text) => {
if !text.is_empty() {
if !result.is_empty() && !result.ends_with('\n') {
result.push('\n');
}
result.push_str(&text);
}
}
Err(e) => {
tracing::error!("Error executing exposition text callback: {}", e);
}
}
}
result
}
/// Returns true if a metric with the given name already exists in the Prometheus registry
pub fn has_metric_named(&self, metric_name: &str) -> bool {
self.prometheus_registry
.gather()
.iter()
.any(|mf| mf.name() == metric_name)
}
}
impl Default for MetricsRegistryEntry {
fn default() -> Self {
Self::new()
}
}
impl Clone for MetricsRegistryEntry {
fn clone(&self) -> Self {
Self {
prometheus_registry: self.prometheus_registry.clone(),
prometheus_update_callbacks: Vec::new(), // Callbacks cannot be cloned, so we start with an empty list
prometheus_expfmt_callbacks: Vec::new(), // Callbacks cannot be cloned, so we start with an empty list
}
}
}
/// Distributed [Runtime] which provides access to shared resources across the cluster, this includes
/// communication protocols and transports.
#[derive(Clone)]
......@@ -209,7 +112,6 @@ pub struct DistributedRuntime {
// Health Status
system_health: Arc<parking_lot::Mutex<SystemHealth>>,
// This map associates metric prefixes with their corresponding Prometheus registries and callbacks.
// Uses RwLock for better concurrency - multiple threads can read (execute callbacks) simultaneously.
hierarchy_to_metricsregistry: Arc<std::sync::RwLock<HashMap<String, MetricsRegistryEntry>>>,
// This hierarchy's own metrics registry
metrics_registry: MetricsRegistry,
}
......@@ -56,6 +56,9 @@ fn validate_no_duplicate_label_keys(labels: &[(&str, &str)]) -> anyhow::Result<(
Ok(())
}
/// ==============================
/// Prometheus section
/// ==============================
/// Trait that defines common behavior for Prometheus metric types
pub trait PrometheusMetric: prometheus::core::Collector + Clone + Send + Sync + 'static {
/// Create a new metric with the given options
......@@ -192,9 +195,12 @@ impl PrometheusMetric for prometheus::CounterVec {
}
}
/// Private helper function to create metrics - not accessible to trait implementors
fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>(
registry: &R,
/// ==============================
/// Metrics section
/// ==============================
/// Public helper function to create metrics - accessible for Python bindings
pub fn create_metric<T: PrometheusMetric, H: MetricsHierarchy + ?Sized>(
hierarchy: &H,
metric_name: &str,
metric_desc: &str,
labels: &[(&str, &str)],
......@@ -205,11 +211,13 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>(
validate_no_duplicate_label_keys(labels)?;
// Note: stored labels functionality has been removed
let basename = registry.basename();
let parent_hierarchy = registry.parent_hierarchy();
let basename = hierarchy.basename();
let parent_hierarchies = hierarchy.parent_hierarchies();
// Build hierarchy: parent_hierarchy + [basename]
let hierarchy = [parent_hierarchy.clone(), vec![basename.clone()]].concat();
// Build hierarchy path as vector of strings: parent names + [basename]
let mut hierarchy_names: Vec<String> =
parent_hierarchies.iter().map(|p| p.basename()).collect();
hierarchy_names.push(basename.clone());
let metric_name = build_component_metric_name(metric_name);
......@@ -228,8 +236,8 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>(
}
// Add auto-generated labels with sanitized values
if hierarchy.len() > 1 {
let namespace = &hierarchy[1];
if hierarchy_names.len() > 1 {
let namespace = &hierarchy_names[1];
if !namespace.is_empty() {
let valid_namespace = sanitize_prometheus_label(namespace)?;
if !valid_namespace.is_empty() {
......@@ -237,8 +245,8 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>(
}
}
}
if hierarchy.len() > 2 {
let component = &hierarchy[2];
if hierarchy_names.len() > 2 {
let component = &hierarchy_names[2];
if !component.is_empty() {
let valid_component = sanitize_prometheus_label(component)?;
if !valid_component.is_empty() {
......@@ -246,8 +254,8 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>(
}
}
}
if hierarchy.len() > 3 {
let endpoint = &hierarchy[3];
if hierarchy_names.len() > 3 {
let endpoint = &hierarchy_names[3];
if !endpoint.is_empty() {
let valid_endpoint = sanitize_prometheus_label(endpoint)?;
if !valid_endpoint.is_empty() {
......@@ -361,59 +369,36 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>(
T::with_opts(opts)?
};
// Iterate over the DRT's registry and register this metric across all hierarchical levels.
// The accumulated hierarchy is structured as: ["", "testnamespace", "testnamespace_testcomponent", "testnamespace_testcomponent_testendpoint"]
// This accumulation is essential to differentiate between the names of children and grandchildren.
// Build accumulated hierarchy and register metrics in a single loop
// current_prefix accumulates the hierarchical path as we iterate through hierarchy
// For example, if hierarchy = ["", "testnamespace", "testcomponent"], then:
// - Iteration 1: current_prefix = "" (empty string from DRT)
// - Iteration 2: current_prefix = "testnamespace"
// - Iteration 3: current_prefix = "testnamespace_testcomponent"
let mut current_hierarchy = String::new();
for name in &hierarchy {
if !current_hierarchy.is_empty() && !name.is_empty() {
current_hierarchy.push('_');
}
current_hierarchy.push_str(name);
// Register metric at this hierarchical level using the new helper function
// Register the metric at all hierarchy levels (parents + self)
// First register at all parent levels
for parent in parent_hierarchies {
let collector: Box<dyn prometheus::core::Collector> = Box::new(prometheus_metric.clone());
registry
.drt()
.add_prometheus_metric(&current_hierarchy, collector)?;
parent.get_metrics_registry().add_metric(collector)?;
}
// Then register at this level
let collector: Box<dyn prometheus::core::Collector> = Box::new(prometheus_metric.clone());
hierarchy.get_metrics_registry().add_metric(collector)?;
Ok(prometheus_metric)
}
/// This trait should be implemented by all metric registries, including Prometheus, Envy, OpenTelemetry, and others.
/// It offers a unified interface for creating and managing metrics, organizing sub-registries, and
/// generating output in Prometheus text format.
use crate::traits::DistributedRuntimeProvider;
pub trait MetricsRegistry: Send + Sync + DistributedRuntimeProvider {
// Get the name of this registry (without any hierarchy prefix)
fn basename(&self) -> String;
/// Wrapper struct that provides access to metrics functionality
/// This struct is accessed via the `.metrics()` method on DistributedRuntime, Namespace, Component, and Endpoint
pub struct Metrics<H: MetricsHierarchy> {
hierarchy: H,
}
/// Retrieve the complete hierarchy and basename for this registry. Currently, the hierarchy for drt is an empty string,
/// so we must account for the leading underscore. The existing code remains unchanged to accommodate any future
/// scenarios where drt's prefix might be assigned a value.
fn hierarchy(&self) -> String {
[self.parent_hierarchy(), vec![self.basename()]]
.concat()
.join("_")
.trim_start_matches('_')
.to_string()
impl<H: MetricsHierarchy> Metrics<H> {
pub fn new(hierarchy: H) -> Self {
Self { hierarchy }
}
// Get the parent hierarchy for this registry (just the base names, NOT the flattened hierarchy key)
fn parent_hierarchy(&self) -> Vec<String>;
// TODO: Add support for additional Prometheus metric types:
// - Counter: ✅ IMPLEMENTED - create_counter()
// - CounterVec: ✅ IMPLEMENTED - create_countervec()
// - Gauge: ✅ IMPLEMENTED - create_gauge()
// - GaugeVec: ✅ IMPLEMENTED - create_gaugevec()
// - GaugeHistogram: create_gauge_histogram() - for gauge histograms
// - Histogram: ✅ IMPLEMENTED - create_histogram()
// - HistogramVec with custom buckets: create_histogram_with_buckets()
......@@ -431,17 +416,17 @@ pub trait MetricsRegistry: Send + Sync + DistributedRuntimeProvider {
// Keep them synchronized when adding new metric types
/// Create a Counter metric
fn create_counter(
pub fn create_counter(
&self,
name: &str,
description: &str,
labels: &[(&str, &str)],
) -> anyhow::Result<prometheus::Counter> {
create_metric(self, name, description, labels, None, None)
create_metric(&self.hierarchy, name, description, labels, None, None)
}
/// Create a CounterVec metric with label names (for dynamic labels)
fn create_countervec(
pub fn create_countervec(
&self,
name: &str,
description: &str,
......@@ -449,7 +434,7 @@ pub trait MetricsRegistry: Send + Sync + DistributedRuntimeProvider {
const_label_values: &[(&str, &str)],
) -> anyhow::Result<prometheus::CounterVec> {
create_metric(
self,
&self.hierarchy,
name,
description,
const_label_values,
......@@ -459,17 +444,17 @@ pub trait MetricsRegistry: Send + Sync + DistributedRuntimeProvider {
}
/// Create a Gauge metric
fn create_gauge(
pub fn create_gauge(
&self,
name: &str,
description: &str,
labels: &[(&str, &str)],
) -> anyhow::Result<prometheus::Gauge> {
create_metric(self, name, description, labels, None, None)
create_metric(&self.hierarchy, name, description, labels, None, None)
}
/// Create a GaugeVec metric with label names (for dynamic labels)
fn create_gaugevec(
pub fn create_gaugevec(
&self,
name: &str,
description: &str,
......@@ -477,7 +462,7 @@ pub trait MetricsRegistry: Send + Sync + DistributedRuntimeProvider {
const_label_values: &[(&str, &str)],
) -> anyhow::Result<prometheus::GaugeVec> {
create_metric(
self,
&self.hierarchy,
name,
description,
const_label_values,
......@@ -487,28 +472,28 @@ pub trait MetricsRegistry: Send + Sync + DistributedRuntimeProvider {
}
/// Create a Histogram metric with custom buckets
fn create_histogram(
pub fn create_histogram(
&self,
name: &str,
description: &str,
labels: &[(&str, &str)],
buckets: Option<Vec<f64>>,
) -> anyhow::Result<prometheus::Histogram> {
create_metric(self, name, description, labels, buckets, None)
create_metric(&self.hierarchy, name, description, labels, buckets, None)
}
/// Create an IntCounter metric
fn create_intcounter(
pub fn create_intcounter(
&self,
name: &str,
description: &str,
labels: &[(&str, &str)],
) -> anyhow::Result<prometheus::IntCounter> {
create_metric(self, name, description, labels, None, None)
create_metric(&self.hierarchy, name, description, labels, None, None)
}
/// Create an IntCounterVec metric with label names (for dynamic labels)
fn create_intcountervec(
pub fn create_intcountervec(
&self,
name: &str,
description: &str,
......@@ -516,7 +501,7 @@ pub trait MetricsRegistry: Send + Sync + DistributedRuntimeProvider {
const_label_values: &[(&str, &str)],
) -> anyhow::Result<prometheus::IntCounterVec> {
create_metric(
self,
&self.hierarchy,
name,
description,
const_label_values,
......@@ -526,17 +511,17 @@ pub trait MetricsRegistry: Send + Sync + DistributedRuntimeProvider {
}
/// Create an IntGauge metric
fn create_intgauge(
pub fn create_intgauge(
&self,
name: &str,
description: &str,
labels: &[(&str, &str)],
) -> anyhow::Result<prometheus::IntGauge> {
create_metric(self, name, description, labels, None, None)
create_metric(&self.hierarchy, name, description, labels, None, None)
}
/// Create an IntGaugeVec metric with label names (for dynamic labels)
fn create_intgaugevec(
pub fn create_intgaugevec(
&self,
name: &str,
description: &str,
......@@ -544,7 +529,7 @@ pub trait MetricsRegistry: Send + Sync + DistributedRuntimeProvider {
const_label_values: &[(&str, &str)],
) -> anyhow::Result<prometheus::IntGaugeVec> {
create_metric(
self,
&self.hierarchy,
name,
description,
const_label_values,
......@@ -554,11 +539,12 @@ pub trait MetricsRegistry: Send + Sync + DistributedRuntimeProvider {
}
/// Get metrics in Prometheus text format
fn prometheus_expfmt(&self) -> anyhow::Result<String> {
pub fn prometheus_expfmt(&self) -> anyhow::Result<String> {
// Execute callbacks first to ensure any new metrics are added to the registry
let callback_results = self
.drt()
.execute_prometheus_update_callbacks(&self.hierarchy());
.hierarchy
.get_metrics_registry()
.execute_update_callbacks();
// Log any callback errors but continue
for result in callback_results {
......@@ -567,14 +553,11 @@ pub trait MetricsRegistry: Send + Sync + DistributedRuntimeProvider {
}
}
// Get the Prometheus registry for this hierarchy and execute exposition text callbacks
let (prometheus_registry, expfmt) = {
let mut registry_entry = self.drt().hierarchy_to_metricsregistry.write().unwrap();
let entry = registry_entry.entry(self.hierarchy()).or_default();
let registry = entry.prometheus_registry.clone();
let text = entry.execute_prometheus_expfmt_callbacks();
(registry, text)
};
// Get the Prometheus registry for this hierarchy
let prometheus_registry = self
.hierarchy
.get_metrics_registry()
.get_prometheus_registry();
// Encode metrics from the registry
let metric_families = prometheus_registry.gather();
......@@ -583,7 +566,11 @@ pub trait MetricsRegistry: Send + Sync + DistributedRuntimeProvider {
encoder.encode(&metric_families, &mut buffer)?;
let mut result = String::from_utf8(buffer)?;
// Append exposition text callback results if any
// Execute and append exposition text callback results
let expfmt = self
.hierarchy
.get_metrics_registry()
.execute_expfmt_callbacks();
if !expfmt.is_empty() {
if !result.ends_with('\n') {
result.push('\n');
......@@ -595,6 +582,213 @@ pub trait MetricsRegistry: Send + Sync + DistributedRuntimeProvider {
}
}
/// This trait should be implemented by all metric registries, including Prometheus, Envy, OpenTelemetry, and others.
/// It offers a unified interface for creating and managing metrics, organizing sub-registries, and
/// generating output in Prometheus text format.
use crate::traits::DistributedRuntimeProvider;
pub trait MetricsHierarchy: Send + Sync {
// ========================================================================
// Required methods - must be implemented by all types
// ========================================================================
/// Get the name of this hierarchy (without any hierarchy prefix)
fn basename(&self) -> String;
/// Get the parent hierarchies as actual objects (not strings)
/// Returns a vector of hierarchy references, ordered from root to immediate parent.
/// For example, an Endpoint would return [DRT, Namespace, Component].
fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy>;
/// Get a reference to this hierarchy's metrics registry
fn get_metrics_registry(&self) -> &MetricsRegistry;
// ========================================================================
// Provided methods - have default implementations
// ========================================================================
/// Access the metrics interface for this hierarchy
/// This is a provided method that works for any type implementing MetricsHierarchy
fn metrics(&self) -> Metrics<&Self>
where
Self: Sized,
{
Metrics::new(self)
}
}
// Blanket implementation for references to types that implement MetricsHierarchy
impl<T: MetricsHierarchy + ?Sized> MetricsHierarchy for &T {
fn basename(&self) -> String {
(**self).basename()
}
fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
(**self).parent_hierarchies()
}
fn get_metrics_registry(&self) -> &MetricsRegistry {
(**self).get_metrics_registry()
}
}
/// Type alias for runtime callback functions to reduce complexity
///
/// This type represents an Arc-wrapped callback function that can be:
/// - Shared efficiently across multiple threads and contexts
/// - Cloned without duplicating the underlying closure
/// - Used in generic contexts requiring 'static lifetime
///
/// The Arc wrapper is included in the type to make sharing explicit.
pub type PrometheusUpdateCallback = Arc<dyn Fn() -> anyhow::Result<()> + Send + Sync + 'static>;
/// Type alias for exposition text callback functions that return Prometheus text
pub type PrometheusExpositionFormatCallback =
Arc<dyn Fn() -> anyhow::Result<String> + Send + Sync + 'static>;
/// Structure to hold Prometheus registries and associated callbacks for a given hierarchy
pub struct MetricsRegistry {
/// The Prometheus registry for this hierarchy (with interior mutability for thread-safe access)
pub prometheus_registry: std::sync::RwLock<prometheus::Registry>,
/// Update callbacks invoked before metrics are scraped.
/// Wrapped in Arc to preserve callbacks across clones (prevents callback loss when MetricsRegistry is cloned).
pub prometheus_update_callbacks: Arc<std::sync::RwLock<Vec<PrometheusUpdateCallback>>>,
/// Callbacks that return Prometheus exposition text appended to metrics output.
/// Wrapped in Arc to preserve callbacks across clones (e.g., vLLM callbacks registered at Endpoint remain accessible at DRT).
pub prometheus_expfmt_callbacks:
Arc<std::sync::RwLock<Vec<PrometheusExpositionFormatCallback>>>,
}
impl std::fmt::Debug for MetricsRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MetricsRegistry")
.field("prometheus_registry", &"<RwLock<Registry>>")
.field(
"prometheus_update_callbacks",
&format!(
"<RwLock<Vec<Callback>>> with {} callbacks",
self.prometheus_update_callbacks.read().unwrap().len()
),
)
.field(
"prometheus_expfmt_callbacks",
&format!(
"<RwLock<Vec<Callback>>> with {} callbacks",
self.prometheus_expfmt_callbacks.read().unwrap().len()
),
)
.finish()
}
}
impl Clone for MetricsRegistry {
fn clone(&self) -> Self {
Self {
prometheus_registry: std::sync::RwLock::new(
self.prometheus_registry.read().unwrap().clone(),
),
// Clone the Arc to share callbacks across all clones (prevents callback loss).
// Previously used Vec::new() here, which caused vllm: metrics to disappear.
prometheus_update_callbacks: Arc::clone(&self.prometheus_update_callbacks),
prometheus_expfmt_callbacks: Arc::clone(&self.prometheus_expfmt_callbacks),
}
}
}
impl MetricsRegistry {
/// Create a new metrics registry with an empty Prometheus registry and callback lists
pub fn new() -> Self {
Self {
prometheus_registry: std::sync::RwLock::new(prometheus::Registry::new()),
prometheus_update_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())),
prometheus_expfmt_callbacks: Arc::new(std::sync::RwLock::new(Vec::new())),
}
}
/// Add a callback function that receives a reference to any MetricsHierarchy
pub fn add_update_callback(&self, callback: PrometheusUpdateCallback) {
self.prometheus_update_callbacks
.write()
.unwrap()
.push(callback);
}
/// Add an exposition text callback that returns Prometheus text
pub fn add_expfmt_callback(&self, callback: PrometheusExpositionFormatCallback) {
self.prometheus_expfmt_callbacks
.write()
.unwrap()
.push(callback);
}
/// Execute all update callbacks and return their results
pub fn execute_update_callbacks(&self) -> Vec<anyhow::Result<()>> {
self.prometheus_update_callbacks
.read()
.unwrap()
.iter()
.map(|callback| callback())
.collect()
}
/// Execute all exposition text callbacks and return their concatenated text
pub fn execute_expfmt_callbacks(&self) -> String {
let callbacks = self.prometheus_expfmt_callbacks.read().unwrap();
let mut result = String::new();
for callback in callbacks.iter() {
match callback() {
Ok(text) => {
if !text.is_empty() {
if !result.is_empty() && !result.ends_with('\n') {
result.push('\n');
}
result.push_str(&text);
}
}
Err(e) => {
tracing::error!("Error executing exposition text callback: {}", e);
}
}
}
result
}
/// Add a Prometheus metric collector to this registry
pub fn add_metric(
&self,
collector: Box<dyn prometheus::core::Collector>,
) -> anyhow::Result<()> {
self.prometheus_registry
.write()
.unwrap()
.register(collector)
.map_err(|e| anyhow::anyhow!("Failed to register metric: {}", e))
}
/// Get a read guard to the Prometheus registry for scraping
pub fn get_prometheus_registry(&self) -> std::sync::RwLockReadGuard<'_, prometheus::Registry> {
self.prometheus_registry.read().unwrap()
}
/// Returns true if a metric with the given name already exists in the Prometheus registry
pub fn has_metric_named(&self, metric_name: &str) -> bool {
self.prometheus_registry
.read()
.unwrap()
.gather()
.iter()
.any(|mf| mf.name() == metric_name)
}
}
impl Default for MetricsRegistry {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod test_helpers {
use super::prometheus_names::name_prefix;
......@@ -771,18 +965,18 @@ mod test_metricsregistry_units {
#[test]
fn test_metrics_registry_entry_callbacks() {
use crate::MetricsRegistryEntry;
use crate::MetricsRegistry;
use std::sync::atomic::{AtomicUsize, Ordering};
// Test 1: Basic callback execution with counter increments
{
let mut entry = MetricsRegistryEntry::new();
let registry = MetricsRegistry::new();
let counter = Arc::new(AtomicUsize::new(0));
// Add callbacks with different increment values
for increment in [1, 10, 100] {
let counter_clone = counter.clone();
entry.add_prometheus_update_callback(Arc::new(move || {
registry.add_update_callback(Arc::new(move || {
counter_clone.fetch_add(increment, Ordering::SeqCst);
Ok(())
}));
......@@ -792,52 +986,50 @@ mod test_metricsregistry_units {
assert_eq!(counter.load(Ordering::SeqCst), 0);
// First execution
let results = entry.execute_prometheus_update_callbacks();
let results = registry.execute_update_callbacks();
assert_eq!(results.len(), 3);
assert!(results.iter().all(|r| r.is_ok()));
assert_eq!(counter.load(Ordering::SeqCst), 111); // 1 + 10 + 100
// Second execution - callbacks should be reusable
let results = entry.execute_prometheus_update_callbacks();
let results = registry.execute_update_callbacks();
assert_eq!(results.len(), 3);
assert_eq!(counter.load(Ordering::SeqCst), 222); // 111 + 111
// Test cloning - cloned entry should have no callbacks
let cloned = entry.clone();
assert_eq!(cloned.execute_prometheus_update_callbacks().len(), 0);
assert_eq!(counter.load(Ordering::SeqCst), 222); // No change
// Original still has callbacks
entry.execute_prometheus_update_callbacks();
// Test cloning - cloned entry shares callbacks (callbacks are Arc-wrapped)
let cloned = registry.clone();
assert_eq!(cloned.execute_update_callbacks().len(), 3);
assert_eq!(counter.load(Ordering::SeqCst), 333); // 222 + 111
// Original still has callbacks and shares the same Arc
registry.execute_update_callbacks();
assert_eq!(counter.load(Ordering::SeqCst), 444); // 333 + 111
}
// Test 2: Mixed success and error callbacks
{
let mut entry = MetricsRegistryEntry::new();
let registry = MetricsRegistry::new();
let counter = Arc::new(AtomicUsize::new(0));
// Successful callback
let counter_clone = counter.clone();
entry.add_prometheus_update_callback(Arc::new(move || {
registry.add_update_callback(Arc::new(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
Ok(())
}));
// Error callback
entry.add_prometheus_update_callback(Arc::new(|| {
Err(anyhow::anyhow!("Simulated error"))
}));
registry.add_update_callback(Arc::new(|| Err(anyhow::anyhow!("Simulated error"))));
// Another successful callback
let counter_clone = counter.clone();
entry.add_prometheus_update_callback(Arc::new(move || {
registry.add_update_callback(Arc::new(move || {
counter_clone.fetch_add(10, Ordering::SeqCst);
Ok(())
}));
// Execute and verify mixed results
let results = entry.execute_prometheus_update_callbacks();
let results = registry.execute_update_callbacks();
assert_eq!(results.len(), 3);
assert!(results[0].is_ok());
assert!(results[1].is_err());
......@@ -853,15 +1045,15 @@ mod test_metricsregistry_units {
assert_eq!(counter.load(Ordering::SeqCst), 11); // 1 + 10
// Execute again - errors should be consistent
let results = entry.execute_prometheus_update_callbacks();
let results = registry.execute_update_callbacks();
assert!(results[1].is_err());
assert_eq!(counter.load(Ordering::SeqCst), 22); // 11 + 11
}
// Test 3: Empty registry
{
let entry = MetricsRegistryEntry::new();
let results = entry.execute_prometheus_update_callbacks();
let registry = MetricsRegistry::new();
let results = registry.execute_update_callbacks();
assert_eq!(results.len(), 0);
}
}
......@@ -888,56 +1080,64 @@ mod test_metricsregistry_prefixes {
// DRT
assert_eq!(drt.basename(), DRT_NAME);
assert_eq!(drt.parent_hierarchy(), Vec::<String>::new());
assert_eq!(drt.hierarchy(), DRT_NAME);
assert_eq!(drt.parent_hierarchies().len(), 0);
// DRT hierarchy is just its basename (empty string)
// Namespace
assert_eq!(namespace.basename(), NAMESPACE_NAME);
assert_eq!(namespace.parent_hierarchy(), vec!["".to_string()]);
assert_eq!(namespace.hierarchy(), NAMESPACE_NAME);
assert_eq!(namespace.parent_hierarchies().len(), 1);
assert_eq!(namespace.parent_hierarchies()[0].basename(), DRT_NAME);
// Namespace hierarchy is just its basename since parent is empty
// Component
assert_eq!(component.basename(), COMPONENT_NAME);
assert_eq!(
component.parent_hierarchy(),
vec!["".to_string(), NAMESPACE_NAME.to_string()]
);
assert_eq!(
component.hierarchy(),
format!("{}_{}", NAMESPACE_NAME, COMPONENT_NAME)
);
assert_eq!(component.parent_hierarchies().len(), 2);
assert_eq!(component.parent_hierarchies()[0].basename(), DRT_NAME);
assert_eq!(component.parent_hierarchies()[1].basename(), NAMESPACE_NAME);
// Component hierarchy structure is validated by the individual assertions above
// Endpoint
assert_eq!(endpoint.basename(), ENDPOINT_NAME);
assert_eq!(
endpoint.parent_hierarchy(),
vec![
"".to_string(),
NAMESPACE_NAME.to_string(),
COMPONENT_NAME.to_string(),
]
);
assert_eq!(
endpoint.hierarchy(),
format!("{}_{}_{}", NAMESPACE_NAME, COMPONENT_NAME, ENDPOINT_NAME)
);
assert_eq!(endpoint.parent_hierarchies().len(), 3);
assert_eq!(endpoint.parent_hierarchies()[0].basename(), DRT_NAME);
assert_eq!(endpoint.parent_hierarchies()[1].basename(), NAMESPACE_NAME);
assert_eq!(endpoint.parent_hierarchies()[2].basename(), COMPONENT_NAME);
// Endpoint hierarchy structure is validated by the individual assertions above
// Relationships
assert!(namespace.parent_hierarchy().contains(&drt.basename()));
assert!(component.parent_hierarchy().contains(&namespace.basename()));
assert!(endpoint.parent_hierarchy().contains(&component.basename()));
assert!(
namespace
.parent_hierarchies()
.iter()
.any(|h| h.basename() == drt.basename())
);
assert!(
component
.parent_hierarchies()
.iter()
.any(|h| h.basename() == namespace.basename())
);
assert!(
endpoint
.parent_hierarchies()
.iter()
.any(|h| h.basename() == component.basename())
);
// Depth
assert_eq!(drt.parent_hierarchy().len(), 0);
assert_eq!(namespace.parent_hierarchy().len(), 1);
assert_eq!(component.parent_hierarchy().len(), 2);
assert_eq!(endpoint.parent_hierarchy().len(), 3);
assert_eq!(drt.parent_hierarchies().len(), 0);
assert_eq!(namespace.parent_hierarchies().len(), 1);
assert_eq!(component.parent_hierarchies().len(), 2);
assert_eq!(endpoint.parent_hierarchies().len(), 3);
// Invalid namespace behavior - sanitizes to "_123" and succeeds
// @ryanolson intended to enable validation (see TODO comment in component.rs) but didn't turn it on,
// so invalid characters are sanitized in MetricsRegistry rather than rejected.
let invalid_namespace = drt.namespace("@@123").unwrap();
let result = invalid_namespace.create_counter("test_counter", "A test counter", &[]);
let result =
invalid_namespace
.metrics()
.create_counter("test_counter", "A test counter", &[]);
assert!(result.is_ok());
if let Ok(counter) = &result {
// Verify the namespace was sanitized to "_123" in the label
......@@ -954,6 +1154,7 @@ mod test_metricsregistry_prefixes {
let valid_namespace = drt.namespace("ns567").unwrap();
assert!(
valid_namespace
.metrics()
.create_counter("test_counter", "A test counter", &[])
.is_ok()
);
......@@ -974,34 +1175,30 @@ mod test_metricsregistry_prefixes {
// Verify the hierarchy structure
assert_eq!(ns1.basename(), "ns1");
assert_eq!(ns1.parent_hierarchy(), vec!("".to_string()));
assert_eq!(ns1.hierarchy(), "ns1");
assert_eq!(ns1.parent_hierarchies().len(), 1);
assert_eq!(ns1.parent_hierarchies()[0].basename(), "");
// ns1 hierarchy is just its basename since parent is empty
assert_eq!(ns2.basename(), "ns2");
assert_eq!(
ns2.parent_hierarchy(),
vec!["".to_string(), "ns1".to_string()]
);
assert_eq!(ns2.hierarchy(), "ns1_ns2");
assert_eq!(ns2.parent_hierarchies().len(), 2);
assert_eq!(ns2.parent_hierarchies()[0].basename(), "");
assert_eq!(ns2.parent_hierarchies()[1].basename(), "ns1");
// ns2 hierarchy structure validated by parent assertions above
assert_eq!(ns3.basename(), "ns3");
assert_eq!(
ns3.parent_hierarchy(),
vec!["".to_string(), "ns1".to_string(), "ns2".to_string()]
);
assert_eq!(ns3.hierarchy(), "ns1_ns2_ns3");
assert_eq!(ns3.parent_hierarchies().len(), 3);
assert_eq!(ns3.parent_hierarchies()[0].basename(), "");
assert_eq!(ns3.parent_hierarchies()[1].basename(), "ns1");
assert_eq!(ns3.parent_hierarchies()[2].basename(), "ns2");
// ns3 hierarchy structure validated by parent assertions above
assert_eq!(component.basename(), "test-component");
assert_eq!(
component.parent_hierarchy(),
vec![
"".to_string(),
"ns1".to_string(),
"ns2".to_string(),
"ns3".to_string()
]
);
assert_eq!(component.hierarchy(), "ns1_ns2_ns3_test-component");
assert_eq!(component.parent_hierarchies().len(), 4);
assert_eq!(component.parent_hierarchies()[0].basename(), "");
assert_eq!(component.parent_hierarchies()[1].basename(), "ns1");
assert_eq!(component.parent_hierarchies()[2].basename(), "ns2");
assert_eq!(component.parent_hierarchies()[3].basename(), "ns3");
// component hierarchy structure validated by parent assertions above
println!("✓ Chained namespace test passed - all prefixes correct");
}
......@@ -1032,13 +1229,14 @@ mod test_metricsregistry_prometheus_fmt_outputs {
// Test Counter creation
let counter = endpoint
.metrics()
.create_counter("testcounter", "A test counter", &[])
.unwrap();
counter.inc_by(123.456789);
let epsilon = 0.01;
assert!((counter.get() - 123.456789).abs() < epsilon);
let endpoint_output_raw = endpoint.prometheus_expfmt().unwrap();
let endpoint_output_raw = endpoint.metrics().prometheus_expfmt().unwrap();
println!("Endpoint output:");
println!("{}", endpoint_output_raw);
......@@ -1061,13 +1259,14 @@ dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",
// Test Gauge creation
let gauge = component
.metrics()
.create_gauge("testgauge", "A test gauge", &[])
.unwrap();
gauge.set(50000.0);
assert_eq!(gauge.get(), 50000.0);
// Test Prometheus format output for Component (gauge + histogram)
let component_output_raw = component.prometheus_expfmt().unwrap();
let component_output_raw = component.metrics().prometheus_expfmt().unwrap();
println!("Component output:");
println!("{}", component_output_raw);
......@@ -1092,13 +1291,14 @@ dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"}
);
let intcounter = namespace
.metrics()
.create_intcounter("testintcounter", "A test int counter", &[])
.unwrap();
intcounter.inc_by(12345);
assert_eq!(intcounter.get(), 12345);
// Test Prometheus format output for Namespace (int_counter + gauge + histogram)
let namespace_output_raw = namespace.prometheus_expfmt().unwrap();
let namespace_output_raw = namespace.metrics().prometheus_expfmt().unwrap();
println!("Namespace output:");
println!("{}", namespace_output_raw);
......@@ -1127,6 +1327,7 @@ dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string();
// Test IntGauge creation
let intgauge = namespace
.metrics()
.create_intgauge("testintgauge", "A test int gauge", &[])
.unwrap();
intgauge.set(42);
......@@ -1134,6 +1335,7 @@ dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string();
// Test IntGaugeVec creation
let intgaugevec = namespace
.metrics()
.create_intgaugevec(
"testintgaugevec",
"A test int gauge vector",
......@@ -1150,6 +1352,7 @@ dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string();
// Test CounterVec creation
let countervec = endpoint
.metrics()
.create_countervec(
"testcountervec",
"A test counter vector",
......@@ -1162,6 +1365,7 @@ dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string();
// Test Histogram creation
let histogram = component
.metrics()
.create_histogram("testhistogram", "A test histogram", &[], None)
.unwrap();
histogram.observe(1.0);
......@@ -1169,7 +1373,7 @@ dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string();
histogram.observe(4.0);
// Test Prometheus format output for DRT (all metrics combined)
let drt_output_raw = drt.prometheus_expfmt().unwrap();
let drt_output_raw = drt.metrics().prometheus_expfmt().unwrap();
println!("DRT output:");
println!("{}", drt_output_raw);
......@@ -1285,7 +1489,7 @@ mod test_metricsregistry_nats {
let drt = create_test_drt_async().await;
// Get DRT output which should include NATS client metrics
let drt_output = drt.prometheus_expfmt().unwrap();
let drt_output = drt.metrics().prometheus_expfmt().unwrap();
println!("DRT output with NATS metrics:");
println!("{}", drt_output);
......@@ -1356,8 +1560,9 @@ mod test_metricsregistry_nats {
// Get component output which should include NATS client metrics
// Additional checks for NATS client metrics (without checking specific values)
let component_nats_metrics =
super::test_helpers::extract_nats_lines(&component.prometheus_expfmt().unwrap());
let component_nats_metrics = super::test_helpers::extract_nats_lines(
&component.metrics().prometheus_expfmt().unwrap(),
);
println!(
"Component NATS metrics count: {}",
component_nats_metrics.len()
......@@ -1371,7 +1576,7 @@ mod test_metricsregistry_nats {
// Check for specific NATS client metric names (without values)
let component_metrics =
super::test_helpers::extract_metrics(&component.prometheus_expfmt().unwrap());
super::test_helpers::extract_metrics(&component.metrics().prometheus_expfmt().unwrap());
let actual_component_nats_metrics_sorted: Vec<&str> = component_metrics
.iter()
.map(|line| {
......@@ -1407,7 +1612,7 @@ mod test_metricsregistry_nats {
);
// Get both DRT and component output and filter for NATS metrics only
let drt_output = drt.prometheus_expfmt().unwrap();
let drt_output = drt.metrics().prometheus_expfmt().unwrap();
let drt_nats_lines = super::test_helpers::extract_nats_lines(&drt_output);
let drt_and_component_nats_metrics =
super::test_helpers::extract_metrics(&drt_nats_lines.join("\n"));
......@@ -1472,7 +1677,7 @@ mod test_metricsregistry_nats {
sleep(Duration::from_millis(500)).await;
println!("✓ Launched endpoint service in background successfully");
let drt_output = drt.prometheus_expfmt().unwrap();
let drt_output = drt.metrics().prometheus_expfmt().unwrap();
let parsed_metrics: Vec<_> = drt_output
.lines()
.filter_map(super::test_helpers::parse_prometheus_metric)
......@@ -1601,7 +1806,7 @@ mod test_metricsregistry_nats {
sleep(Duration::from_millis(500)).await;
println!("✓ Wait complete, getting final metrics...");
let final_drt_output = drt.prometheus_expfmt().unwrap();
let final_drt_output = drt.metrics().prometheus_expfmt().unwrap();
println!("\n=== Final Prometheus DRT output ===");
println!("{}", final_drt_output);
......
......@@ -31,7 +31,7 @@ use ingress::push_handler::WorkHandlerMetrics;
pub const STREAM_ERR_MSG: &str = "Stream ended before generation completed";
// Add Prometheus metrics types
use crate::metrics::MetricsRegistry;
use crate::metrics::MetricsHierarchy;
use prometheus::{CounterVec, Histogram, IntCounter, IntCounterVec, IntGauge};
pub trait Codable: PipelineIO + Serialize + for<'de> Deserialize<'de> {}
......
......@@ -47,38 +47,39 @@ impl WorkHandlerMetrics {
metrics_labels: Option<&[(&str, &str)]>,
) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let metrics_labels = metrics_labels.unwrap_or(&[]);
let request_counter = endpoint.create_intcounter(
let metrics = endpoint.metrics();
let request_counter = metrics.create_intcounter(
work_handler::REQUESTS_TOTAL,
"Total number of requests processed by work handler",
metrics_labels,
)?;
let request_duration = endpoint.create_histogram(
let request_duration = metrics.create_histogram(
work_handler::REQUEST_DURATION_SECONDS,
"Time spent processing requests by work handler",
metrics_labels,
None,
)?;
let inflight_requests = endpoint.create_intgauge(
let inflight_requests = metrics.create_intgauge(
work_handler::INFLIGHT_REQUESTS,
"Number of requests currently being processed by work handler",
metrics_labels,
)?;
let request_bytes = endpoint.create_intcounter(
let request_bytes = metrics.create_intcounter(
work_handler::REQUEST_BYTES_TOTAL,
"Total number of bytes received in requests by work handler",
metrics_labels,
)?;
let response_bytes = endpoint.create_intcounter(
let response_bytes = metrics.create_intcounter(
work_handler::RESPONSE_BYTES_TOTAL,
"Total number of bytes sent in responses by work handler",
metrics_labels,
)?;
let error_counter = endpoint.create_intcountervec(
let error_counter = metrics.create_intcountervec(
work_handler::ERRORS_TOTAL,
"Total number of errors in work handler processing",
&[work_handler::ERROR_TYPE_LABEL],
......
......@@ -11,7 +11,7 @@ use crate::{
DistributedRuntime, Result,
component::Component,
error,
metrics::{MetricsRegistry, prometheus_names, prometheus_names::nats_service},
metrics::{MetricsHierarchy, prometheus_names, prometheus_names::nats_service},
traits::*,
transports::nats,
utils::stream,
......@@ -339,37 +339,37 @@ impl ComponentNatsServerPrometheusMetrics {
let labels: &[(&str, &str)] = &labels_vec;
let service_processing_ms_avg = component.create_gauge(
let service_processing_ms_avg = component.metrics().create_gauge(
nats_service::PROCESSING_MS_AVG,
"Average processing time across all component endpoints in milliseconds",
labels,
)?;
let service_errors_total = component.create_intgauge(
let service_errors_total = component.metrics().create_intgauge(
nats_service::ERRORS_TOTAL,
"Total number of errors across all component endpoints",
labels,
)?;
let service_requests_total = component.create_intgauge(
let service_requests_total = component.metrics().create_intgauge(
nats_service::REQUESTS_TOTAL,
"Total number of requests across all component endpoints",
labels,
)?;
let service_processing_ms_total = component.create_intgauge(
let service_processing_ms_total = component.metrics().create_intgauge(
nats_service::PROCESSING_MS_TOTAL,
"Total processing time across all component endpoints in milliseconds",
labels,
)?;
let service_active_services = component.create_intgauge(
let service_active_services = component.metrics().create_intgauge(
nats_service::ACTIVE_SERVICES,
"Number of active services in this component",
labels,
)?;
let service_active_endpoints = component.create_intgauge(
let service_active_endpoints = component.metrics().create_intgauge(
nats_service::ACTIVE_ENDPOINTS,
"Number of active endpoints across all services",
labels,
......
......@@ -24,7 +24,7 @@ use tokio::sync::mpsc;
use crate::component;
use crate::config::HealthStatus;
use crate::metrics::prometheus_names::distributed_runtime;
use crate::metrics::{MetricsHierarchy, prometheus_names::distributed_runtime};
/// Health check target containing instance info and payload
#[derive(Clone, Debug)]
......@@ -242,11 +242,8 @@ impl SystemHealth {
}
/// Initialize the uptime gauge using the provided metrics registry
pub fn initialize_uptime_gauge<T: crate::metrics::MetricsRegistry>(
&self,
registry: &T,
) -> anyhow::Result<()> {
let gauge = registry.create_gauge(
pub fn initialize_uptime_gauge<T: MetricsHierarchy>(&self, registry: &T) -> anyhow::Result<()> {
let gauge = registry.metrics().create_gauge(
distributed_runtime::UPTIME_SECONDS,
"Total uptime of the DistributedRuntime in seconds",
&[],
......
......@@ -3,7 +3,7 @@
use crate::config::HealthStatus;
use crate::logging::make_request_span;
use crate::metrics::MetricsRegistry;
use crate::metrics::MetricsHierarchy;
use crate::metrics::prometheus_names::{nats_client, nats_service};
use crate::traits::DistributedRuntimeProvider;
use axum::{Router, http::StatusCode, response::IntoResponse, routing::get};
......@@ -186,27 +186,12 @@ async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
// Update the uptime gauge with current value
state.drt().system_health.lock().update_uptime_gauge();
// Execute all the callbacks for all registered hierarchies
let all_hierarchies: Vec<String> = {
let registries = state.drt().hierarchy_to_metricsregistry.read().unwrap();
registries.keys().cloned().collect()
};
for hierarchy in &all_hierarchies {
let callback_results = state.drt().execute_prometheus_update_callbacks(hierarchy);
for result in callback_results {
if let Err(e) = result {
tracing::error!(
"Error executing metrics callback for hierarchy '{}': {}",
hierarchy,
e
);
}
}
}
// Get all metrics from DistributedRuntime (top-level)
let mut response = match state.drt().prometheus_expfmt() {
// Get all metrics from DistributedRuntime
// Note: In the new hierarchy-based architecture, metrics are automatically registered
// at all parent levels, so DRT's metrics include all metrics from children
// (Namespace, Component, Endpoint). The prometheus_expfmt() method also executes
// all update callbacks and expfmt callbacks before returning the metrics.
let response = match state.drt().metrics().prometheus_expfmt() {
Ok(r) => r,
Err(e) => {
tracing::error!("Failed to get metrics from registry: {}", e);
......@@ -217,25 +202,6 @@ async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
}
};
// Collect and append Prometheus exposition text from all hierarchies
for hierarchy in &all_hierarchies {
let expfmt = {
let registries = state.drt().hierarchy_to_metricsregistry.read().unwrap();
if let Some(entry) = registries.get(hierarchy) {
entry.execute_prometheus_expfmt_callbacks()
} else {
String::new()
}
};
if !expfmt.is_empty() {
if !response.ends_with('\n') {
response.push('\n');
}
response.push_str(&expfmt);
}
}
(StatusCode::OK, response)
}
......@@ -281,7 +247,7 @@ mod tests {
mod integration_tests {
use super::*;
use crate::distributed::distributed_test_utils::create_test_drt_async;
use crate::metrics::MetricsRegistry;
use crate::metrics::MetricsHierarchy;
use anyhow::Result;
use rstest::rstest;
use std::sync::Arc;
......@@ -315,7 +281,7 @@ mod integration_tests {
// so we don't need to create it again here
// The uptime_seconds metric should already be registered and available
let response = drt.prometheus_expfmt().unwrap();
let response = drt.metrics().prometheus_expfmt().unwrap();
println!("Full metrics response:\n{}", response);
// Filter out NATS client metrics for comparison
......
......@@ -20,11 +20,9 @@ impl RuntimeProvider for DistributedRuntime {
}
}
// This implementation is required because:
// 1. MetricsRegistry has a supertrait bound: `MetricsRegistry: Send + Sync + DistributedRuntimeProvider`
// 2. DistributedRuntime implements MetricsRegistry (in distributed.rs)
// 3. Therefore, DistributedRuntime must implement DistributedRuntimeProvider to satisfy the trait bound
// 4. This enables DistributedRuntime to serve as both a provider (of itself) and a metrics registry
// This implementation allows DistributedRuntime to provide access to itself
// when used in contexts that require DistributedRuntimeProvider.
// Components, Namespaces, and Endpoints use this trait to access their DRT.
impl DistributedRuntimeProvider for DistributedRuntime {
fn drt(&self) -> &DistributedRuntime {
self
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment