Unverified Commit 55e458d8 authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

feat: add SGLang and vLLM passthrough metrics on Dynamo backend worker (#3539)


Signed-off-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
Co-authored-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent a4746ab6
# SGLang Prometheus Metrics
**📚 Official Documentation**: [SGLang Production Metrics](https://docs.sglang.ai/references/production_metrics.html)
This document describes how SGLang Prometheus metrics are exposed in Dynamo.
## Overview
When running SGLang through Dynamo, SGLang engine metrics are automatically passed through and exposed on Dynamo's `/metrics` endpoint (default port 8081). This allows you to access both SGLang engine metrics (prefixed with `sglang:`) and Dynamo runtime metrics (prefixed with `dynamo_*`) from a single worker backend endpoint.
For the complete and authoritative list of all SGLang metrics, always refer to the official documentation linked above.
Dynamo runtime metrics are documented in [docs/guides/metrics.md](../../../docs/guides/metrics.md).
## Metric Reference
The official documentation includes:
- Complete metric definitions with HELP and TYPE descriptions
- Example metric output in Prometheus exposition format
- Counter, Gauge, and Histogram metrics
- Metric labels (e.g., `model_name`, `engine_type`, `tp_rank`, `pp_rank`)
- Setup guide for Prometheus + Grafana monitoring
- Troubleshooting tips and configuration examples
## Metric Categories
SGLang provides metrics in the following categories (all prefixed with `sglang:`):
- Throughput metrics
- Resource usage
- Latency metrics
- Disaggregation metrics (when enabled)
**Note:** Specific metrics are subject to change between SGLang versions. Always refer to the [official documentation](https://docs.sglang.ai/references/production_metrics.html) or inspect the `/metrics` endpoint for your SGLang version.
## Enabling Metrics in Dynamo
SGLang metrics are automatically exposed when running SGLang through Dynamo with metrics enabled.
## Inspecting Metrics
To see the actual metrics available in your SGLang version:
### 1. Launch SGLang with Metrics Enabled
```bash
# Set environment variables
export DYN_SYSTEM_ENABLED=true
export DYN_SYSTEM_PORT=8081
# Start SGLang worker with metrics enabled
python -m dynamo.sglang --model <model_name> --enable-metrics
# Wait for engine to initialize
```
Metrics will be available at: `http://localhost:8081/metrics`
### 2. Fetch Metrics via curl
```bash
curl http://localhost:8081/metrics | grep "^sglang:"
```
### 3. Example Output
**Note:** The specific metrics shown below are examples and may vary depending on your SGLang version. Always inspect your actual `/metrics` endpoint for the current list.
```
# HELP sglang:prompt_tokens_total Number of prefill tokens processed.
# TYPE sglang:prompt_tokens_total counter
sglang:prompt_tokens_total{model_name="meta-llama/Llama-3.1-8B-Instruct"} 8128902.0
# HELP sglang:generation_tokens_total Number of generation tokens processed.
# TYPE sglang:generation_tokens_total counter
sglang:generation_tokens_total{model_name="meta-llama/Llama-3.1-8B-Instruct"} 7557572.0
# HELP sglang:cache_hit_rate The cache hit rate
# TYPE sglang:cache_hit_rate gauge
sglang:cache_hit_rate{model_name="meta-llama/Llama-3.1-8B-Instruct"} 0.0075
```
## Implementation Details
- SGLang uses multiprocess metrics collection via `prometheus_client.multiprocess.MultiProcessCollector`
- Metrics are filtered by the `sglang:` prefix before being exposed
- The integration uses Dynamo's `register_engine_metrics_callback()` function
- Metrics appear after SGLang engine initialization completes
## See Also
### SGLang Metrics
- [Official SGLang Production Metrics](https://docs.sglang.ai/references/production_metrics.html)
- [SGLang GitHub - Metrics Collector](https://github.com/sgl-project/sglang/blob/main/python/sglang/srt/metrics/collector.py)
### Dynamo Metrics
- **Dynamo Metrics Guide**: See `docs/guides/metrics.md` for complete documentation on Dynamo runtime metrics
- **Dynamo Runtime Metrics**: Metrics prefixed with `dynamo_*` for runtime, components, endpoints, and namespaces
- Implementation: `lib/runtime/src/metrics.rs` (Rust runtime metrics)
- Metric names: `lib/runtime/src/metrics/prometheus_names.rs` (metric name constants)
- Available at the same `/metrics` endpoint alongside SGLang metrics
- **Integration Code**: `components/src/dynamo/common/utils/prometheus.py` - Prometheus utilities and callback registration
# vLLM Prometheus Metrics
**📚 Official Documentation**: [vLLM Metrics Design](https://docs.vllm.ai/en/latest/design/metrics.html)
This document describes how vLLM Prometheus metrics are exposed in Dynamo.
## Overview
When running vLLM through Dynamo, vLLM engine metrics are automatically passed through and exposed on Dynamo's `/metrics` endpoint (default port 8081). This allows you to access both vLLM engine metrics (prefixed with `vllm:`) and Dynamo runtime metrics (prefixed with `dynamo_*`) from a single worker backend endpoint.
For the complete and authoritative list of all vLLM metrics, always refer to the official documentation linked above.
Dynamo runtime metrics are documented in [docs/guides/metrics.md](../../../docs/guides/metrics.md).
## Metric Reference
The official documentation includes:
- Complete metric definitions with detailed explanations
- Counter, Gauge, and Histogram metrics
- Metric labels (e.g., `model_name`, `finished_reason`, `scheduling_event`)
- Design rationale and implementation details
- Information about v1 metrics migration
- Future work and deprecated metrics
## Metric Categories
vLLM provides metrics in the following categories (all prefixed with `vllm:`):
- Request metrics
- Performance metrics
- Resource usage
- Scheduler metrics
- Disaggregation metrics (when enabled)
**Note:** Specific metrics are subject to change between vLLM versions. Always refer to the [official documentation](https://docs.vllm.ai/en/latest/design/metrics.html) or inspect the `/metrics` endpoint for your vLLM version.
## Enabling Metrics in Dynamo
vLLM metrics are automatically exposed when running vLLM through Dynamo with metrics enabled.
## Inspecting Metrics
To see the actual metrics available in your vLLM version:
### 1. Launch vLLM with Metrics Enabled
```bash
# Set environment variables
export DYN_SYSTEM_ENABLED=true
export DYN_SYSTEM_PORT=8081
# Start vLLM worker (metrics enabled by default via --disable-log-stats=false)
python -m dynamo.vllm --model <model_name>
# Wait for engine to initialize
```
Metrics will be available at: `http://localhost:8081/metrics`
### 2. Fetch Metrics via curl
```bash
curl http://localhost:8081/metrics | grep "^vllm:"
```
### 3. Example Output
**Note:** The specific metrics shown below are examples and may vary depending on your vLLM version. Always inspect your actual `/metrics` endpoint for the current list.
```
# HELP vllm:request_success_total Number of successfully finished requests.
# TYPE vllm:request_success_total counter
vllm:request_success_total{finished_reason="length",model_name="meta-llama/Llama-3.1-8B"} 15.0
vllm:request_success_total{finished_reason="stop",model_name="meta-llama/Llama-3.1-8B"} 150.0
# HELP vllm:time_to_first_token_seconds Histogram of time to first token in seconds.
# TYPE vllm:time_to_first_token_seconds histogram
vllm:time_to_first_token_seconds_bucket{le="0.001",model_name="meta-llama/Llama-3.1-8B"} 0.0
vllm:time_to_first_token_seconds_bucket{le="0.005",model_name="meta-llama/Llama-3.1-8B"} 5.0
vllm:time_to_first_token_seconds_count{model_name="meta-llama/Llama-3.1-8B"} 165.0
vllm:time_to_first_token_seconds_sum{model_name="meta-llama/Llama-3.1-8B"} 89.38
```
## Implementation Details
- vLLM v1 uses multiprocess metrics collection via `prometheus_client.multiprocess`
- `PROMETHEUS_MULTIPROC_DIR`: vLLM sets this environment variable to a temporary directory where multiprocess metrics are stored as memory-mapped files. Each worker process writes its metrics to separate files in this directory, which are aggregated when `/metrics` is scraped.
- Metrics are filtered by the `vllm:` prefix before being exposed
- The integration uses Dynamo's `register_engine_metrics_callback()` function
- Metrics appear after vLLM engine initialization completes
- vLLM v1 metrics are different from v0 - see the [official documentation](https://docs.vllm.ai/en/latest/design/metrics.html) for migration details
## See Also
### vLLM Metrics
- [Official vLLM Metrics Design Documentation](https://docs.vllm.ai/en/latest/design/metrics.html)
- [vLLM Production Metrics User Guide](https://docs.vllm.ai/en/latest/user/production_metrics.html)
- [vLLM GitHub - Metrics Implementation](https://github.com/vllm-project/vllm/tree/main/vllm/engine/metrics)
### Dynamo Metrics
- **Dynamo Metrics Guide**: See `docs/guides/metrics.md` for complete documentation on Dynamo runtime metrics
- **Dynamo Runtime Metrics**: Metrics prefixed with `dynamo_*` for runtime, components, endpoints, and namespaces
- Implementation: `lib/runtime/src/metrics.rs` (Rust runtime metrics)
- Metric names: `lib/runtime/src/metrics/prometheus_names.rs` (metric name constants)
- Available at the same `/metrics` endpoint alongside vLLM metrics
- **Integration Code**: `components/src/dynamo/common/utils/prometheus.py` - Prometheus utilities and callback registration
......@@ -9,9 +9,10 @@ Dynamo backends and components.
Main submodules:
- config_dump: Configuration dumping and system diagnostics utilities
- utils: Common utilities including environment and prometheus helpers
"""
from dynamo.common import config_dump
from dynamo.common import config_dump, utils
try:
from ._version import __version__
......@@ -23,4 +24,4 @@ except Exception:
except Exception:
__version__ = "0.0.0+unknown"
__all__ = ["__version__", "config_dump"]
__all__ = ["__version__", "config_dump", "utils"]
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Dynamo Common Utils Module
This module contains shared utility functions used across multiple
Dynamo backends and components.
Submodules:
- prometheus: Prometheus metrics collection and logging utilities
"""
from dynamo.common.utils import prometheus
__all__ = ["prometheus"]
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Prometheus metrics utilities for Dynamo components.
This module provides shared functionality for collecting and exposing Prometheus metrics
from backend engines (SGLang, vLLM, etc.) via Dynamo's metrics endpoint.
Note: Engine metrics take time to appear after engine initialization,
while Dynamo runtime metrics are available immediately after component creation.
"""
import logging
import re
from typing import TYPE_CHECKING, Optional
from prometheus_client import generate_latest
from dynamo._core import Endpoint
# Import CollectorRegistry only for type hints to avoid importing prometheus_client at module load time.
# prometheus_client must be imported AFTER set_prometheus_multiproc_dir() is called.
# See main.py worker() function for detailed explanation.
if TYPE_CHECKING:
from prometheus_client import CollectorRegistry
def register_engine_metrics_callback(
endpoint: Endpoint,
registry: "CollectorRegistry",
metric_prefix: str,
engine_name: str,
) -> None:
"""
Register a callback to expose engine Prometheus metrics via Dynamo's metrics endpoint.
This registers a callback that is invoked when /metrics is scraped, passing through
engine-specific metrics alongside Dynamo runtime metrics.
Args:
endpoint: Dynamo endpoint object with metrics.register_prometheus_expfmt_callback()
registry: Prometheus registry to collect from (e.g., REGISTRY or CollectorRegistry)
metric_prefix: Prefix to filter metrics (e.g., "vllm:" or "sglang:")
engine_name: Name of the engine for logging (e.g., "vLLM" or "SGLang")
Example:
from prometheus_client import REGISTRY
register_engine_metrics_callback(
generate_endpoint, REGISTRY, "vllm:", "vLLM"
)
"""
def get_expfmt() -> str:
"""Callback to return engine Prometheus metrics in exposition format"""
return get_prometheus_expfmt(registry, metric_prefix_filter=metric_prefix)
endpoint.metrics.register_prometheus_expfmt_callback(get_expfmt)
def get_prometheus_expfmt(
registry,
metric_prefix_filter: Optional[str] = None,
) -> str:
"""
Get Prometheus metrics from a registry formatted as text using the standard text encoder.
Collects all metrics from the registry and returns them in Prometheus text exposition format.
Optionally filters metrics by prefix.
Prometheus exposition format consists of:
- Comment lines starting with # (HELP and TYPE declarations)
- Metric lines with format: metric_name{label="value"} metric_value timestamp
Example output format:
# HELP vllm:request_success_total Number of successful requests
# TYPE vllm:request_success_total counter
vllm:request_success_total{model="llama2",endpoint="generate"} 150.0
# HELP vllm:time_to_first_token_seconds Time to first token
# TYPE vllm:time_to_first_token_seconds histogram
vllm:time_to_first_token_seconds_bucket{model="llama2",le="0.01"} 10.0
vllm:time_to_first_token_seconds_bucket{model="llama2",le="0.1"} 45.0
vllm:time_to_first_token_seconds_count{model="llama2"} 50.0
vllm:time_to_first_token_seconds_sum{model="llama2"} 2.5
Args:
registry: Prometheus registry to collect from.
Pass CollectorRegistry with MultiProcessCollector for SGLang.
Pass REGISTRY for vLLM single-process mode.
metric_prefix_filter: Optional prefix to filter displayed metrics (e.g., "vllm:").
If None, returns all metrics. (default: None)
Returns:
Formatted metrics text in Prometheus exposition format. Returns empty string on error.
Example:
from prometheus_client import REGISTRY
metrics_text = get_prometheus_expfmt(REGISTRY)
print(metrics_text)
# With filter
vllm_metrics = get_prometheus_expfmt(REGISTRY, metric_prefix_filter="vllm:")
"""
try:
# Generate metrics in Prometheus text format
metrics_text = generate_latest(registry).decode("utf-8")
if metric_prefix_filter:
# Filter lines: keep metric lines starting with prefix and their HELP/TYPE comments
escaped_prefix = re.escape(metric_prefix_filter)
pattern = rf"^(?:{escaped_prefix}|# (?:HELP|TYPE) {escaped_prefix})"
filtered_lines = [
line for line in metrics_text.split("\n") if re.match(pattern, line)
]
result = "\n".join(filtered_lines)
if result:
# Ensure result ends with newline
if result and not result.endswith("\n"):
result += "\n"
return result
else:
# Ensure metrics_text ends with newline
if metrics_text and not metrics_text.endswith("\n"):
metrics_text += "\n"
return metrics_text
except Exception as e:
logging.error(f"Error getting metrics: {e}")
return ""
......@@ -9,8 +9,10 @@ from typing import List, Optional, Tuple
import sglang as sgl
import zmq
import zmq.asyncio
from prometheus_client import CollectorRegistry, multiprocess
from sglang.srt.utils import get_local_ip_auto, get_zmq_socket
from dynamo.common.utils.prometheus import register_engine_metrics_callback
from dynamo.llm import (
ForwardPassMetrics,
KvStats,
......@@ -217,6 +219,16 @@ async def setup_sgl_metrics(
publisher.init_engine_metrics_publish()
publisher.init_kv_event_publish()
# Register Prometheus metrics callback if enabled
if engine.server_args.enable_metrics:
# SGLang uses multiprocess architecture where metrics are stored in shared memory.
# MultiProcessCollector aggregates metrics from all worker processes.
registry = CollectorRegistry()
multiprocess.MultiProcessCollector(registry)
register_engine_metrics_callback(
generate_endpoint, registry, "sglang:", "SGLang"
)
task = asyncio.create_task(publisher.run())
logging.info("SGLang metrics loop started")
return publisher, task, metrics_labels
......@@ -11,8 +11,10 @@ import uvloop
from vllm.distributed.kv_events import ZmqEventPublisher
from vllm.usage.usage_lib import UsageContext
from vllm.v1.engine.async_llm import AsyncLLM
from vllm.v1.metrics.prometheus import setup_multiprocess_prometheus
from dynamo.common.config_dump import dump_config
from dynamo.common.utils.prometheus import register_engine_metrics_callback
from dynamo.llm import (
ModelInput,
ModelRuntimeConfig,
......@@ -125,6 +127,11 @@ def setup_kv_event_publisher(
def setup_vllm_engine(config, stat_logger=None):
setup_multiprocess_prometheus()
logger.debug(
f"Prometheus multiproc dir set to: {os.environ.get('PROMETHEUS_MULTIPROC_DIR')}"
)
os.environ["VLLM_NO_USAGE_STATS"] = "1" # Avoid internal HTTP requests
os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn"
......@@ -161,6 +168,7 @@ def setup_vllm_engine(config, stat_logger=None):
logger.info(f"VllmWorker for {config.model} has been initialized with LMCache")
else:
logger.info(f"VllmWorker for {config.model} has been initialized")
return engine_client, vllm_config, default_sampling_params
......@@ -272,6 +280,11 @@ async def init(runtime: DistributedRuntime, config: Config):
if kv_publisher:
handler.kv_publisher = kv_publisher
if config.engine_args.disable_log_stats is False:
from prometheus_client import REGISTRY
register_engine_metrics_callback(generate_endpoint, REGISTRY, "vllm:", "vLLM")
if not config.engine_args.data_parallel_rank: # if rank is 0 or None then register
runtime_config = ModelRuntimeConfig()
......
......@@ -188,7 +188,7 @@ def update_metrics():
request_slots.set(compute_current_slots())
gpu_usage.set(get_gpu_usage())
endpoint.metrics.register_update_callback(update_metrics)
endpoint.metrics.register_callback(update_metrics)
```
Both examples support vector metrics with labels:
......@@ -355,7 +355,7 @@ graph TD
MT -->|return to Python| PY
PY -->|metric.set/get| MT
MT -->|direct FFI call| PROM
PY -.->|endpoint.metrics.register_update_callback| PM
PY -.->|endpoint.metrics.register_callback| PM
PM -.->|drt.register_metrics_callback| DRT
SS ==>|execute_metrics_callbacks| DRT
DRT -.->|invoke Python callback| PY
......
......@@ -83,7 +83,7 @@ async def init(runtime: DistributedRuntime):
gpu_cache_usage_perc.set(0.01 + (count * 0.01))
print(f"[python] Updated metrics (call #{count})")
endpoint.metrics.register_update_callback(update_metrics)
endpoint.metrics.register_callback(update_metrics)
print("[python] update (metrics) callback registered!")
# Step 3: Set initial values and test vector metrics
......
......@@ -699,7 +699,9 @@ impl RuntimeMetrics {
let hierarchy = registry_item.hierarchy();
// Store the callback in the DRT's metrics callback registry using the registry_item's hierarchy
registry_item.drt().register_metrics_callback(
// TODO: rename this to register_callback, once we move the the MetricsRegistry trait
// out of the runtime, and make it into a composed module.
registry_item.drt().register_prometheus_update_callback(
vec![hierarchy.clone()],
Arc::new(move || {
// Execute the Python callback in the Python event loop
......@@ -720,10 +722,45 @@ impl RuntimeMetrics {
impl RuntimeMetrics {
/// Register a Python callback to be invoked before metrics are scraped
/// This callback will be called for this endpoint's metrics hierarchy
fn register_update_callback(&self, callback: PyObject, _py: Python) -> PyResult<()> {
fn register_callback(&self, callback: PyObject, _py: Python) -> PyResult<()> {
Self::register_callback_for(self.metricsregistry.as_ref(), callback)
}
/// Register a Python callback that returns Prometheus exposition text
/// The returned text will be appended to the /metrics endpoint output
/// The callback should return a string in Prometheus text exposition format
fn register_prometheus_expfmt_callback(&self, callback: PyObject, _py: Python) -> PyResult<()> {
let hierarchy = self.metricsregistry.hierarchy();
// Store the callback in the DRT's metrics exposition text callback registry
self.metricsregistry.drt().register_prometheus_expfmt_callback(
vec![hierarchy.clone()],
Arc::new(move || {
// Execute the Python callback in the Python event loop
Python::with_gil(|py| {
match callback.call0(py) {
Ok(result) => {
// Try to extract a string from the result
match result.extract::<String>(py) {
Ok(text) => Ok(text),
Err(e) => {
tracing::error!("Metrics exposition text callback must return a string: {}", e);
Ok(String::new())
}
}
}
Err(e) => {
tracing::error!("Metrics exposition text callback failed: {}", e);
Ok(String::new())
}
}
})
}),
);
Ok(())
}
// NOTE: The order of create_* methods below matches lib/runtime/src/metrics.rs::MetricsRegistry trait
// Keep them synchronized when adding new metric types
......
......@@ -231,7 +231,7 @@ class RuntimeMetrics:
Also provides utilities for registering metrics callbacks.
"""
def register_update_callback(self, callback: Callable[[], None]) -> None:
def register_callback(self, callback: Callable[[], None]) -> None:
"""
Register a Python callback to be invoked before metrics are scraped.
......@@ -250,7 +250,30 @@ class RuntimeMetrics:
def update_metrics():
counter.inc()
metrics.register_update_callback(update_metrics)
metrics.register_callback(update_metrics)
```
"""
...
def register_prometheus_expfmt_callback(self, callback: Callable[[], str]) -> None:
"""
Register a Python callback that returns Prometheus exposition text.
The returned text will be appended to the /metrics endpoint output.
This allows you to integrate external Prometheus metrics (e.g. from vLLM)
directly into the endpoint's metrics output.
Args:
callback: A callable that takes no arguments and returns a string
in Prometheus text exposition format
Example:
```python
def get_external_metrics():
# Fetch metrics from external source
return "# HELP external_metric Some metric\\nexternal_metric 42.0\\n"
metrics.register_prometheus_expfmt_callback(get_external_metrics)
```
"""
...
......
......@@ -1438,8 +1438,8 @@ mod test_integration_publisher {
assert_eq!(hit_rate_gauge.get(), 0.75);
// Test 4: Verify metrics are properly registered in the component's registry
// Component implements MetricsRegistry trait which provides prometheus_metrics_fmt()
let prometheus_output = component.prometheus_metrics_fmt().unwrap();
// Component implements MetricsRegistry trait which provides prometheus_expfmt()
let prometheus_output = component.prometheus_expfmt().unwrap();
// Verify metric names are present
assert!(prometheus_output.contains(kvstats::ACTIVE_BLOCKS));
......
......@@ -5,7 +5,7 @@ pub use crate::component::Component;
use crate::storage::key_value_store::{EtcdStore, KeyValueStore, MemoryStore};
use crate::transports::nats::DRTNatsClientPrometheusMetrics;
use crate::{
ErrorContext, RuntimeCallback,
ErrorContext, PrometheusUpdateCallback,
component::{self, ComponentBuilder, Endpoint, InstanceSource, Namespace},
discovery::DiscoveryClient,
metrics::MetricsRegistry,
......@@ -110,7 +110,8 @@ impl DistributedRuntime {
Ok(())
}
});
distributed_runtime.register_metrics_callback(drt_hierarchies, nats_client_callback);
distributed_runtime
.register_prometheus_update_callback(drt_hierarchies, nats_client_callback);
// Initialize the uptime gauge in SystemHealth
distributed_runtime
......@@ -311,27 +312,31 @@ impl DistributedRuntime {
.map_err(|e| e.into())
}
/// Add a callback function to metrics registries for the given hierarchies
// TODO: Rename to register_metrics_update_callback for consistency with Python API.
// Do this after we move the MetricsRegistry trait to composition pattern.
pub fn register_metrics_callback(&self, hierarchies: Vec<String>, callback: RuntimeCallback) {
/// Add a Prometheus update callback to the given hierarchies
/// TODO: rename this to register_callback, once we move the the MetricsRegistry trait
/// out of the runtime, and make it into a composed module.
pub fn register_prometheus_update_callback(
&self,
hierarchies: Vec<String>,
callback: PrometheusUpdateCallback,
) {
let mut registries = self.hierarchy_to_metricsregistry.write().unwrap();
for hierarchy in &hierarchies {
registries
.entry(hierarchy.clone())
.or_default()
.add_callback(callback.clone());
.add_prometheus_update_callback(callback.clone());
}
}
/// Execute all callbacks for a given hierarchy key and return their results
pub fn execute_metrics_callbacks(&self, hierarchy: &str) -> Vec<anyhow::Result<()>> {
/// Execute all Prometheus update callbacks for a given hierarchy and return their results
pub fn execute_prometheus_update_callbacks(&self, hierarchy: &str) -> Vec<anyhow::Result<()>> {
// Clone callbacks while holding read lock (fast operation)
let callbacks = {
let registries = self.hierarchy_to_metricsregistry.read().unwrap();
registries
.get(hierarchy)
.map(|entry| entry.runtime_callbacks.clone())
.map(|entry| entry.prometheus_update_callbacks.clone())
}; // Read lock released here
// Execute callbacks without holding the lock
......@@ -341,6 +346,21 @@ impl DistributedRuntime {
}
}
/// Add a Prometheus exposition text callback that returns Prometheus text for the given hierarchies
pub fn register_prometheus_expfmt_callback(
&self,
hierarchies: Vec<String>,
callback: crate::PrometheusExpositionFormatCallback,
) {
let mut registries = self.hierarchy_to_metricsregistry.write().unwrap();
for hierarchy in &hierarchies {
registries
.entry(hierarchy.clone())
.or_default()
.add_prometheus_expfmt_callback(callback.clone());
}
}
/// Get all registered hierarchy keys. Private because it is only used for testing.
fn get_registered_hierarchies(&self) -> Vec<String> {
let registries = self.hierarchy_to_metricsregistry.read().unwrap();
......
......@@ -88,14 +88,20 @@ pub struct Runtime {
/// - Used in generic contexts requiring 'static lifetime
///
/// The Arc wrapper is included in the type to make sharing explicit.
type RuntimeCallback = Arc<dyn Fn() -> anyhow::Result<()> + Send + Sync + 'static>;
type PrometheusUpdateCallback = Arc<dyn Fn() -> anyhow::Result<()> + Send + Sync + 'static>;
/// Type alias for exposition text callback functions that return Prometheus text
type PrometheusExpositionFormatCallback =
Arc<dyn Fn() -> anyhow::Result<String> + Send + Sync + 'static>;
/// Structure to hold Prometheus registries and associated callbacks for a given hierarchy
pub struct MetricsRegistryEntry {
/// The Prometheus registry for this prefix
pub prometheus_registry: prometheus::Registry,
/// List of function callbacks that receive a reference to any MetricsRegistry
pub runtime_callbacks: Vec<RuntimeCallback>,
/// List of update callbacks invoked before metrics are scraped
pub prometheus_update_callbacks: Vec<PrometheusUpdateCallback>,
/// List of callbacks that return Prometheus exposition text to be appended to metrics output
pub prometheus_expfmt_callbacks: Vec<PrometheusExpositionFormatCallback>,
}
impl MetricsRegistryEntry {
......@@ -103,23 +109,50 @@ impl MetricsRegistryEntry {
pub fn new() -> Self {
Self {
prometheus_registry: prometheus::Registry::new(),
runtime_callbacks: Vec::new(),
prometheus_update_callbacks: Vec::new(),
prometheus_expfmt_callbacks: Vec::new(),
}
}
/// Add a callback function that receives a reference to any MetricsRegistry
pub fn add_callback(&mut self, callback: RuntimeCallback) {
self.runtime_callbacks.push(callback);
pub fn add_prometheus_update_callback(&mut self, callback: PrometheusUpdateCallback) {
self.prometheus_update_callbacks.push(callback);
}
/// Add an exposition text callback that returns Prometheus text
pub fn add_prometheus_expfmt_callback(&mut self, callback: PrometheusExpositionFormatCallback) {
self.prometheus_expfmt_callbacks.push(callback);
}
/// Execute all runtime callbacks and return their results
pub fn execute_callbacks(&self) -> Vec<anyhow::Result<()>> {
self.runtime_callbacks
/// Execute all update callbacks and return their results
pub fn execute_prometheus_update_callbacks(&self) -> Vec<anyhow::Result<()>> {
self.prometheus_update_callbacks
.iter()
.map(|callback| callback())
.collect()
}
/// Execute all exposition text callbacks and return their concatenated text
pub fn execute_prometheus_expfmt_callbacks(&self) -> String {
let mut result = String::new();
for callback in &self.prometheus_expfmt_callbacks {
match callback() {
Ok(text) => {
if !text.is_empty() {
if !result.is_empty() && !result.ends_with('\n') {
result.push('\n');
}
result.push_str(&text);
}
}
Err(e) => {
tracing::error!("Error executing exposition text callback: {}", e);
}
}
}
result
}
/// Returns true if a metric with the given name already exists in the Prometheus registry
pub fn has_metric_named(&self, metric_name: &str) -> bool {
self.prometheus_registry
......@@ -139,7 +172,8 @@ impl Clone for MetricsRegistryEntry {
fn clone(&self) -> Self {
Self {
prometheus_registry: self.prometheus_registry.clone(),
runtime_callbacks: Vec::new(), // Callbacks cannot be cloned, so we start with an empty list
prometheus_update_callbacks: Vec::new(), // Callbacks cannot be cloned, so we start with an empty list
prometheus_expfmt_callbacks: Vec::new(), // Callbacks cannot be cloned, so we start with an empty list
}
}
}
......
......@@ -554,9 +554,11 @@ pub trait MetricsRegistry: Send + Sync + DistributedRuntimeProvider {
}
/// Get metrics in Prometheus text format
fn prometheus_metrics_fmt(&self) -> anyhow::Result<String> {
fn prometheus_expfmt(&self) -> anyhow::Result<String> {
// Execute callbacks first to ensure any new metrics are added to the registry
let callback_results = self.drt().execute_metrics_callbacks(&self.hierarchy());
let callback_results = self
.drt()
.execute_prometheus_update_callbacks(&self.hierarchy());
// Log any callback errors but continue
for result in callback_results {
......@@ -565,20 +567,31 @@ pub trait MetricsRegistry: Send + Sync + DistributedRuntimeProvider {
}
}
// Get the Prometheus registry for this hierarchy
let prometheus_registry = {
// Get the Prometheus registry for this hierarchy and execute exposition text callbacks
let (prometheus_registry, expfmt) = {
let mut registry_entry = self.drt().hierarchy_to_metricsregistry.write().unwrap();
registry_entry
.entry(self.hierarchy())
.or_default()
.prometheus_registry
.clone()
let entry = registry_entry.entry(self.hierarchy()).or_default();
let registry = entry.prometheus_registry.clone();
let text = entry.execute_prometheus_expfmt_callbacks();
(registry, text)
};
// Encode metrics from the registry
let metric_families = prometheus_registry.gather();
let encoder = prometheus::TextEncoder::new();
let mut buffer = Vec::new();
encoder.encode(&metric_families, &mut buffer)?;
Ok(String::from_utf8(buffer)?)
let mut result = String::from_utf8(buffer)?;
// Append exposition text callback results if any
if !expfmt.is_empty() {
if !result.ends_with('\n') {
result.push('\n');
}
result.push_str(&expfmt);
}
Ok(result)
}
}
......@@ -769,7 +782,7 @@ mod test_metricsregistry_units {
// Add callbacks with different increment values
for increment in [1, 10, 100] {
let counter_clone = counter.clone();
entry.add_callback(Arc::new(move || {
entry.add_prometheus_update_callback(Arc::new(move || {
counter_clone.fetch_add(increment, Ordering::SeqCst);
Ok(())
}));
......@@ -779,23 +792,23 @@ mod test_metricsregistry_units {
assert_eq!(counter.load(Ordering::SeqCst), 0);
// First execution
let results = entry.execute_callbacks();
let results = entry.execute_prometheus_update_callbacks();
assert_eq!(results.len(), 3);
assert!(results.iter().all(|r| r.is_ok()));
assert_eq!(counter.load(Ordering::SeqCst), 111); // 1 + 10 + 100
// Second execution - callbacks should be reusable
let results = entry.execute_callbacks();
let results = entry.execute_prometheus_update_callbacks();
assert_eq!(results.len(), 3);
assert_eq!(counter.load(Ordering::SeqCst), 222); // 111 + 111
// Test cloning - cloned entry should have no callbacks
let cloned = entry.clone();
assert_eq!(cloned.execute_callbacks().len(), 0);
assert_eq!(cloned.execute_prometheus_update_callbacks().len(), 0);
assert_eq!(counter.load(Ordering::SeqCst), 222); // No change
// Original still has callbacks
entry.execute_callbacks();
entry.execute_prometheus_update_callbacks();
assert_eq!(counter.load(Ordering::SeqCst), 333); // 222 + 111
}
......@@ -806,23 +819,25 @@ mod test_metricsregistry_units {
// Successful callback
let counter_clone = counter.clone();
entry.add_callback(Arc::new(move || {
entry.add_prometheus_update_callback(Arc::new(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
Ok(())
}));
// Error callback
entry.add_callback(Arc::new(|| Err(anyhow::anyhow!("Simulated error"))));
entry.add_prometheus_update_callback(Arc::new(|| {
Err(anyhow::anyhow!("Simulated error"))
}));
// Another successful callback
let counter_clone = counter.clone();
entry.add_callback(Arc::new(move || {
entry.add_prometheus_update_callback(Arc::new(move || {
counter_clone.fetch_add(10, Ordering::SeqCst);
Ok(())
}));
// Execute and verify mixed results
let results = entry.execute_callbacks();
let results = entry.execute_prometheus_update_callbacks();
assert_eq!(results.len(), 3);
assert!(results[0].is_ok());
assert!(results[1].is_err());
......@@ -838,7 +853,7 @@ mod test_metricsregistry_units {
assert_eq!(counter.load(Ordering::SeqCst), 11); // 1 + 10
// Execute again - errors should be consistent
let results = entry.execute_callbacks();
let results = entry.execute_prometheus_update_callbacks();
assert!(results[1].is_err());
assert_eq!(counter.load(Ordering::SeqCst), 22); // 11 + 11
}
......@@ -846,7 +861,7 @@ mod test_metricsregistry_units {
// Test 3: Empty registry
{
let entry = MetricsRegistryEntry::new();
let results = entry.execute_callbacks();
let results = entry.execute_prometheus_update_callbacks();
assert_eq!(results.len(), 0);
}
}
......@@ -1023,7 +1038,7 @@ mod test_metricsregistry_prometheus_fmt_outputs {
let epsilon = 0.01;
assert!((counter.get() - 123.456789).abs() < epsilon);
let endpoint_output_raw = endpoint.prometheus_metrics_fmt().unwrap();
let endpoint_output_raw = endpoint.prometheus_expfmt().unwrap();
println!("Endpoint output:");
println!("{}", endpoint_output_raw);
......@@ -1052,7 +1067,7 @@ dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",
assert_eq!(gauge.get(), 50000.0);
// Test Prometheus format output for Component (gauge + histogram)
let component_output_raw = component.prometheus_metrics_fmt().unwrap();
let component_output_raw = component.prometheus_expfmt().unwrap();
println!("Component output:");
println!("{}", component_output_raw);
......@@ -1083,7 +1098,7 @@ dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"}
assert_eq!(intcounter.get(), 12345);
// Test Prometheus format output for Namespace (int_counter + gauge + histogram)
let namespace_output_raw = namespace.prometheus_metrics_fmt().unwrap();
let namespace_output_raw = namespace.prometheus_expfmt().unwrap();
println!("Namespace output:");
println!("{}", namespace_output_raw);
......@@ -1154,7 +1169,7 @@ dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string();
histogram.observe(4.0);
// Test Prometheus format output for DRT (all metrics combined)
let drt_output_raw = drt.prometheus_metrics_fmt().unwrap();
let drt_output_raw = drt.prometheus_expfmt().unwrap();
println!("DRT output:");
println!("{}", drt_output_raw);
......@@ -1270,7 +1285,7 @@ mod test_metricsregistry_nats {
let drt = create_test_drt_async().await;
// Get DRT output which should include NATS client metrics
let drt_output = drt.prometheus_metrics_fmt().unwrap();
let drt_output = drt.prometheus_expfmt().unwrap();
println!("DRT output with NATS metrics:");
println!("{}", drt_output);
......@@ -1342,7 +1357,7 @@ mod test_metricsregistry_nats {
// Get components output which should include NATS client metrics
// Additional checks for NATS client metrics (without checking specific values)
let component_nats_metrics =
super::test_helpers::extract_nats_lines(&components.prometheus_metrics_fmt().unwrap());
super::test_helpers::extract_nats_lines(&components.prometheus_expfmt().unwrap());
println!(
"Component NATS metrics count: {}",
component_nats_metrics.len()
......@@ -1356,7 +1371,7 @@ mod test_metricsregistry_nats {
// Check for specific NATS client metric names (without values)
let component_metrics =
super::test_helpers::extract_metrics(&components.prometheus_metrics_fmt().unwrap());
super::test_helpers::extract_metrics(&components.prometheus_expfmt().unwrap());
let actual_component_nats_metrics_sorted: Vec<&str> = component_metrics
.iter()
.map(|line| {
......@@ -1392,7 +1407,7 @@ mod test_metricsregistry_nats {
);
// Get both DRT and component output and filter for NATS metrics only
let drt_output = drt.prometheus_metrics_fmt().unwrap();
let drt_output = drt.prometheus_expfmt().unwrap();
let drt_nats_lines = super::test_helpers::extract_nats_lines(&drt_output);
let drt_and_component_nats_metrics =
super::test_helpers::extract_metrics(&drt_nats_lines.join("\n"));
......@@ -1454,7 +1469,7 @@ mod test_metricsregistry_nats {
sleep(Duration::from_millis(500)).await;
println!("✓ Launched endpoint service in background successfully");
let drt_output = drt.prometheus_metrics_fmt().unwrap();
let drt_output = drt.prometheus_expfmt().unwrap();
let parsed_metrics: Vec<_> = drt_output
.lines()
.filter_map(super::test_helpers::parse_prometheus_metric)
......@@ -1583,7 +1598,7 @@ mod test_metricsregistry_nats {
sleep(Duration::from_millis(500)).await;
println!("✓ Wait complete, getting final metrics...");
let final_drt_output = drt.prometheus_metrics_fmt().unwrap();
let final_drt_output = drt.prometheus_expfmt().unwrap();
println!("\n=== Final Prometheus DRT output ===");
println!("{}", final_drt_output);
......
......@@ -304,7 +304,7 @@ mod tests {
/// 3. Prometheus scrapes these Gauge values (snapshots, not live data)
///
/// Flow: NATS Service → NatsStatsMetrics (Counters) → Metrics Callback → Prometheus Gauge
/// Note: These are snapshots updated when execute_metrics_callbacks() is called.
/// Note: These are snapshots updated when execute_prometheus_update_callbacks() is called.
#[derive(Debug, Clone)]
/// Prometheus metrics for NATS server components.
/// Note: Metrics with `_total` names use IntGauge because we copy counter values
......
......@@ -200,7 +200,7 @@ async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
};
for hierarchy in &all_hierarchies {
let callback_results = state.drt().execute_metrics_callbacks(hierarchy);
let callback_results = state.drt().execute_prometheus_update_callbacks(hierarchy);
for result in callback_results {
if let Err(e) = result {
tracing::error!(
......@@ -213,16 +213,37 @@ async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
}
// Get all metrics from DistributedRuntime (top-level)
match state.drt().prometheus_metrics_fmt() {
Ok(response) => (StatusCode::OK, response),
let mut response = match state.drt().prometheus_expfmt() {
Ok(r) => r,
Err(e) => {
tracing::error!("Failed to get metrics from registry: {}", e);
(
return (
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to get metrics".to_string(),
)
);
}
};
// Collect and append Prometheus exposition text from all hierarchies
for hierarchy in &all_hierarchies {
let expfmt = {
let registries = state.drt().hierarchy_to_metricsregistry.read().unwrap();
if let Some(entry) = registries.get(hierarchy) {
entry.execute_prometheus_expfmt_callbacks()
} else {
String::new()
}
};
if !expfmt.is_empty() {
if !response.ends_with('\n') {
response.push('\n');
}
response.push_str(&expfmt);
}
}
(StatusCode::OK, response)
}
// Regular tests: cargo test system_status_server --lib
......@@ -301,7 +322,7 @@ mod integration_tests {
// so we don't need to create it again here
// The uptime_seconds metric should already be registered and available
let response = drt.prometheus_metrics_fmt().unwrap();
let response = drt.prometheus_expfmt().unwrap();
println!("Full metrics response:\n{}", response);
// Filter out NATS client metrics for comparison
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment