Unverified Commit 7361de42 authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

refactor: reduce Python API to only Prometheus Exposition Format callback (#5594)


Signed-off-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
Co-authored-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent b273eda2
......@@ -129,141 +129,9 @@ let counter = endpoint.metrics().create_counter(
---
## Metrics API in Python
Python components can create and manage Prometheus metrics using the same metrics API through Python bindings.
### Available Methods
- `endpoint.metrics.create_counter()` / `create_intcounter()`: Create a counter metric
- `endpoint.metrics.create_gauge()` / `create_intgauge()`: Create a gauge metric
- `endpoint.metrics.create_histogram()`: Create a histogram metric
- `endpoint.metrics.create_countervec()` / `create_intcountervec()`: Create a counter with labels
- `endpoint.metrics.create_gaugevec()` / `create_intgaugevec()`: Create a gauge with labels
- `endpoint.metrics.create_histogramvec()`: Create a histogram with labels
All metrics are imported from `dynamo.prometheus_metrics`.
### Creating Metrics
```python
from dynamo.runtime import DistributedRuntime
drt = DistributedRuntime()
endpoint = drt.namespace("my_namespace").component("my_component").endpoint("my_endpoint")
# Simple metrics
requests_total = endpoint.metrics.create_intcounter(
"requests_total",
"Total requests"
)
active_connections = endpoint.metrics.create_intgauge(
"active_connections",
"Active connections"
)
latency = endpoint.metrics.create_histogram(
"latency_seconds",
"Request latency",
buckets=[0.001, 0.01, 0.1, 1.0, 10.0]
)
```
### Using Metrics
```python
# Counters
requests_total.inc()
requests_total.inc_by(5)
# Gauges
active_connections.set(42)
active_connections.inc()
active_connections.dec()
# Histograms
latency.observe(0.023) # 23ms
```
### Vector Metrics with Labels
```python
# Create vector metrics with label names
requests_by_model = endpoint.metrics.create_intcountervec(
"requests_by_model",
"Requests by model",
["model_type", "model_size"]
)
memory_by_gpu = endpoint.metrics.create_intgaugevec(
"gpu_memory_bytes",
"GPU memory by device",
["gpu_id", "memory_type"]
)
# Use with specific label values
requests_by_model.inc({"model_type": "llama", "model_size": "7b"})
memory_by_gpu.set(8192, {"gpu_id": "0", "memory_type": "allocated"})
```
### Advanced Features
**Constant labels:**
```python
counter = endpoint.metrics.create_intcounter(
"requests_total",
"Total requests",
[("region", "us-west"), ("env", "prod")]
)
```
**Metric introspection:**
```python
print(counter.name()) # "my_namespace_my_component_my_endpoint_requests_total"
print(counter.const_labels()) # {"dynamo_namespace": "my_namespace", ...}
print(gauge_vec.variable_labels()) # ["model_type", "model_size"]
```
**Update patterns:**
Background thread updates:
```python
import threading
import time
def update_loop():
while True:
active_connections.set(compute_current_connections())
time.sleep(2)
threading.Thread(target=update_loop, daemon=True).start()
```
Callback-based updates (called before each `/metrics` scrape):
```python
def update_metrics():
active_connections.set(compute_current_connections())
endpoint.metrics.register_callback(update_metrics)
```
### Examples
Example scripts: [lib/bindings/python/examples/metrics/](../../lib/bindings/python/examples/metrics/)
```bash
cd ~/dynamo/lib/bindings/python/examples/metrics
DYN_SYSTEM_PORT=8081 ./server_with_loop.py
DYN_SYSTEM_PORT=8081 ./server_with_callback.py
```
---
## Related Documentation
- [Metrics Overview](metrics.md)
- [Prometheus and Grafana Setup](prometheus-grafana.md)
- [Distributed Runtime Architecture](../design_docs/distributed_runtime.md)
- [Python Metrics Examples](../../lib/bindings/python/examples/metrics/)
......@@ -4180,6 +4180,8 @@ version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e3d48272133186f64c82a89faff4405b342478a2f015bb35d78550eff2a9961"
dependencies = [
"anyhow",
"bincode 1.3.3",
"bindgen 0.71.1",
"cc",
"libc",
......
#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Example demonstrating the new typed Prometheus metrics API for declarative metrics registration.
This shows how Python code can:
1. Create typed metric objects directly (Gauge, IntGauge, GaugeVec, IntGaugeVec, etc.)
2. Register them with an endpoint
3. Update their values using type-safe methods (set for gauges, inc for counters)
4. The metrics are automatically served via the /metrics endpoint
Usage:
DYN_SYSTEM_PORT=8081 ./server_with_callback.py
# In another terminal, query the metrics:
curl http://localhost:8081/metrics
"""
import asyncio
import uvloop
# Note that these imports are for type hints only. They cannot be instantiated directly.
# You can instantiate them using the endpoint.metrics.create_*() methods.
from dynamo.prometheus_metrics import Gauge, IntCounter, IntGauge, IntGaugeVec
from dynamo.runtime import Component, DistributedRuntime, Endpoint, dynamo_worker
@dynamo_worker()
async def worker(runtime: DistributedRuntime) -> None:
await init(runtime)
async def init(runtime: DistributedRuntime):
# Create component and endpoint
component: Component = runtime.namespace("ns556").component("cp556")
endpoint: Endpoint = component.endpoint("ep556")
# Step 1: Create metrics using the endpoint's metrics property
print("[python] Creating metrics...")
# Simple metrics (Gauge and IntGauge) - automatically registered
request_total_slots: IntGauge = endpoint.metrics.create_intgauge(
"request_total_slots", "Total request slots available"
)
gpu_cache_usage_perc: Gauge = endpoint.metrics.create_gauge(
"gpu_cache_usage_percent", "GPU cache usage percentage"
)
# Vector metrics (IntGaugeVec with labels)
worker_active_requests: IntGaugeVec = endpoint.metrics.create_intgaugevec(
"worker_active_requests",
"Active requests per worker",
["worker_id", "model"],
)
# Counter metric to track updates (with constant label values)
update_count: IntCounter = endpoint.metrics.create_intcounter(
"update_count",
"Number of times metrics were updated",
[("update_method", "callback")],
)
print(f"[python] Created IntGauge: {request_total_slots.name()}")
print(f"[python] Created Gauge: {gpu_cache_usage_perc.name()}")
print(f"[python] Created IntGaugeVec: {worker_active_requests.name()}")
print(f"[python] Created IntCounter: {update_count.name()}")
print("[python] Metrics automatically registered with endpoint!")
# Step 2: Register a callback to update metrics on-demand
print("[python] Registering metrics callback...")
def update_metrics():
"""Called automatically before /metrics endpoint is scraped"""
update_count.inc()
# Update metrics with fresh values
count = update_count.get()
request_total_slots.set(1024 + count)
gpu_cache_usage_perc.set(0.01 + (count * 0.01))
print(f"[python] Updated metrics (call #{count})")
endpoint.metrics.register_callback(update_metrics)
print("[python] update (metrics) callback registered!")
# Step 3: Set initial values and test vector metrics
print("[python] Setting initial metric values...")
request_total_slots.set(1024)
gpu_cache_usage_perc.set(0.00)
print(f"[python] request_total_slots = {request_total_slots.get()}")
print(f"[python] gpu_cache_usage_perc = {gpu_cache_usage_perc.get()}")
print("[python] Updating vector metric with labels...")
worker_active_requests.set(5, {"worker_id": "worker_1", "model": "llama-3"})
worker_active_requests.set(3, {"worker_id": "worker_2", "model": "llama-3"})
print("[python] worker_active_requests set for worker_1 and worker_2")
# The metrics are now available at:
# http://localhost:<system_status_port>/metrics
print("[python] ✅ Metrics are now registered and served via /metrics endpoint")
print(
"[python] Check the system status server port to see them in Prometheus format"
)
print(
"[python] Supported types: Counter, IntCounter, Gauge, IntGauge, Histogram, and their Vec variants"
)
# Note: This example does not call serve_endpoint() to keep it simple.
# In a real service, you would call: await endpoint.serve_endpoint(handler, ...)
# Keep running so metrics endpoint stays up
_ = await asyncio.Event().wait()
if __name__ == "__main__":
uvloop.install()
asyncio.run(worker())
#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Example demonstrating metrics updates via background loop instead of callback.
This shows an alternative approach where:
1. Metrics are created and registered with an endpoint
2. A background thread continuously updates metrics in a loop
3. No callback is used - metrics are updated directly by the thread
4. The metrics are automatically served via the /metrics endpoint
Usage:
DYN_SYSTEM_PORT=8081 ./server_with_loop.py
# In another terminal, query the metrics:
curl http://localhost:8081/metrics
"""
import asyncio
import threading
import time
import uvloop
from dynamo.prometheus_metrics import Gauge, IntCounter, IntGauge, IntGaugeVec
from dynamo.runtime import Component, DistributedRuntime, Endpoint, dynamo_worker
def metrics_updater_thread(
request_total_slots: IntGauge,
gpu_cache_usage_perc: Gauge,
worker_active_requests: IntGaugeVec,
update_count: IntCounter,
):
"""Background thread that continuously updates metrics."""
print("[python] Metrics updater thread started")
while True:
update_count.inc()
count = update_count.get()
# Update simple metrics
request_total_slots.set(1024 + count)
gpu_cache_usage_perc.set(0.01 + (count * 0.01))
# Update vector metrics with varying values
worker_active_requests.set(
5 + (count % 10), {"worker_id": "worker_1", "model": "llama-3"}
)
worker_active_requests.set(
3 + (count % 5), {"worker_id": "worker_2", "model": "llama-3"}
)
print(f"[python] Updated metrics in loop (iteration #{count})")
# Update every 2 seconds
time.sleep(2)
@dynamo_worker()
async def worker(runtime: DistributedRuntime) -> None:
await init(runtime)
async def init(runtime: DistributedRuntime):
# Create component and endpoint
component: Component = runtime.namespace("ns557").component("cp557")
endpoint: Endpoint = component.endpoint("ep557")
# Create metrics using the endpoint's metrics property
print("[python] Creating metrics...")
request_total_slots: IntGauge = endpoint.metrics.create_intgauge(
"request_total_slots", "Total request slots available"
)
gpu_cache_usage_perc: Gauge = endpoint.metrics.create_gauge(
"gpu_cache_usage_percent", "GPU cache usage percentage"
)
worker_active_requests: IntGaugeVec = endpoint.metrics.create_intgaugevec(
"worker_active_requests",
"Active requests per worker",
["worker_id", "model"],
)
update_count: IntCounter = endpoint.metrics.create_intcounter(
"update_count",
"Number of times metrics were updated",
[("update_method", "background_thread")],
)
print(f"[python] Created IntGauge: {request_total_slots.name()}")
print(f"[python] Created Gauge: {gpu_cache_usage_perc.name()}")
print(f"[python] Created IntGaugeVec: {worker_active_requests.name()}")
print(f"[python] Created IntCounter: {update_count.name()}")
print("[python] Metrics automatically registered with endpoint!")
# Set initial values
print("[python] Setting initial metric values...")
request_total_slots.set(1024)
gpu_cache_usage_perc.set(0.00)
worker_active_requests.set(5, {"worker_id": "worker_1", "model": "llama-3"})
worker_active_requests.set(3, {"worker_id": "worker_2", "model": "llama-3"})
# Start background thread to update metrics
print("[python] Starting background thread to update metrics...")
updater = threading.Thread(
target=metrics_updater_thread,
args=(
request_total_slots,
gpu_cache_usage_perc,
worker_active_requests,
update_count,
),
daemon=True,
)
updater.start()
print("[python] ✅ Metrics are now registered and served via /metrics endpoint")
print("[python] Metrics are being updated every 2 seconds by background thread")
print(
"[python] Check the system status server port to see them in Prometheus format"
)
# Note: This example does not call serve_endpoint() to keep it simple.
# In a real service, you would call: await endpoint.serve_endpoint(handler, ...)
# Keep running so metrics endpoint stays up
_ = await asyncio.Event().wait()
if __name__ == "__main__":
uvloop.install()
asyncio.run(worker())
......@@ -754,12 +754,6 @@ impl Component {
event_loop: self.event_loop.clone(),
})
}
/// Get a RuntimeMetrics helper for creating Prometheus metrics
#[getter]
fn metrics(&self) -> prometheus_metrics::RuntimeMetrics {
prometheus_metrics::RuntimeMetrics::from_component(self.inner.clone())
}
}
#[pymethods]
......@@ -889,12 +883,6 @@ impl Namespace {
event_loop: self.event_loop.clone(),
})
}
/// Get a RuntimeMetrics helper for creating Prometheus metrics
#[getter]
fn metrics(&self) -> prometheus_metrics::RuntimeMetrics {
prometheus_metrics::RuntimeMetrics::from_namespace(self.inner.clone())
}
}
#[pymethods]
......
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
// NOTE: This file implements Python bindings for Prometheus metric types.
// It should be kept in sync with:
// - lib/bindings/python/src/dynamo/_metrics.pyi (Python type stubs - method signatures must match)
// - lib/runtime/src/metrics.rs (MetricsRegistry trait - metric types should align)
//
// When adding/modifying metric methods:
// 1. Update the Rust implementation here (#[pymethods])
// 2. Update the Python type stub in _metrics.pyi
// 3. Follow standard Prometheus API conventions (e.g., Counter.inc(), Gauge.set(), etc.)
//! Python bindings for Prometheus metrics callbacks.
//!
//! This module provides minimal bindings for registering callbacks to integrate
//! external Prometheus metrics (e.g., from vLLM, SGLang, TensorRT-LLM) into
//! Dynamo's metrics endpoint.
use prometheus::core::Collector;
use pyo3::prelude::*;
use std::collections::HashMap;
use std::sync::Arc;
use crate::rs;
/// Helper function to order label values according to variable_labels declaration.
/// This ensures labels are passed to with_label_values() in the correct order.
/// RuntimeMetrics provides utilities for registering metrics callbacks.
/// Exposed as endpoint.metrics in Python.
///
/// # Arguments
/// * `variable_labels` - The ordered list of label names as declared in the metric
/// * `labels` - The HashMap of label name-value pairs from Python
///
/// # Returns
/// * `Ok(Vec<&str>)` - Ordered vector of label values matching variable_labels order
/// * `Err(PyErr)` - If a required label is missing
fn collect_ordered_label_values<'a>(
variable_labels: &[String],
labels: &'a HashMap<String, String>,
) -> PyResult<Vec<&'a str>> {
let mut ordered_values = Vec::with_capacity(variable_labels.len());
for label_name in variable_labels {
match labels.get(label_name) {
Some(value) => ordered_values.push(value.as_str()),
None => {
return Err(pyo3::exceptions::PyValueError::new_err(format!(
"Missing required label '{}'. Expected labels: {:?}, Got: {:?}",
label_name,
variable_labels,
labels.keys().collect::<Vec<_>>()
)));
}
}
}
Ok(ordered_values)
}
// Python wrappers for Prometheus metric types.
//
// These wrapper structs are necessary because Prometheus types from the external `prometheus` crate
// cannot be directly exposed to Python via PyO3's #[pyclass] attribute. This is due to:
//
// 1. **Orphan Rule**: PyO3 requires implementing traits on types, but Rust's orphan rule prevents
// implementing foreign traits (like PyClass) on foreign types (prometheus::Counter, etc.).
//
// 2. **Ownership**: #[pyclass] can only be applied to structs defined in your crate, not external types.
//
// 3. **PyO3 Requirements**: Types exposed to Python must satisfy specific trait bounds (Send, Sync)
// and implement PyO3's internal traits, which we cannot add to external crate types.
//
// The solution is the newtype wrapper pattern: wrap each Prometheus type in our own struct,
// apply #[pyclass] to our wrapper, and delegate method calls to the inner Prometheus type.
/// Python wrapper for Counter metric
#[pyclass]
pub struct Counter {
counter: prometheus::Counter,
}
/// Python wrapper for IntCounter metric
#[pyclass]
pub struct IntCounter {
counter: prometheus::IntCounter,
}
/// Python wrapper for CounterVec metric
#[pyclass]
pub struct CounterVec {
counter: prometheus::CounterVec,
}
/// Python wrapper for IntCounterVec metric
#[pyclass]
pub struct IntCounterVec {
counter: prometheus::IntCounterVec,
}
/// Python wrapper for Gauge metric
#[pyclass]
pub struct Gauge {
gauge: prometheus::Gauge,
}
/// Python wrapper for IntGauge metric
#[pyclass]
pub struct IntGauge {
gauge: prometheus::IntGauge,
}
/// Python wrapper for GaugeVec metric
#[pyclass]
pub struct GaugeVec {
gauge: prometheus::GaugeVec,
}
/// Python wrapper for IntGaugeVec metric
#[pyclass]
pub struct IntGaugeVec {
gauge: prometheus::IntGaugeVec,
}
/// Python wrapper for Histogram metric
#[pyclass]
pub struct Histogram {
histogram: prometheus::Histogram,
}
// ============================================================================
// Various PyMethod implementations below.
// ============================================================================
#[pymethods]
impl Counter {
/// Get the metric name
fn name(&self) -> PyResult<String> {
let desc = self.counter.desc();
Ok(desc[0].fq_name.clone())
}
/// Get the constant labels
fn const_labels(&self) -> PyResult<HashMap<String, String>> {
let desc = self.counter.desc();
let labels: HashMap<String, String> = desc[0]
.const_label_pairs
.iter()
.map(|pair| (pair.name().to_string(), pair.value().to_string()))
.collect();
Ok(labels)
}
/// Increment counter by 1
fn inc(&self) -> PyResult<()> {
self.counter.inc();
Ok(())
}
/// Increment counter by value
fn inc_by(&self, value: f64) -> PyResult<()> {
self.counter.inc_by(value);
Ok(())
}
/// Get counter value
fn get(&self) -> PyResult<f64> {
Ok(self.counter.get())
}
}
impl Counter {
fn from_prometheus(counter: prometheus::Counter) -> Self {
Self { counter }
}
}
#[pymethods]
impl IntCounter {
/// Get the metric name
fn name(&self) -> PyResult<String> {
let desc = self.counter.desc();
Ok(desc[0].fq_name.clone())
}
/// Get the constant labels
fn const_labels(&self) -> PyResult<HashMap<String, String>> {
let desc = self.counter.desc();
let labels: HashMap<String, String> = desc[0]
.const_label_pairs
.iter()
.map(|pair| (pair.name().to_string(), pair.value().to_string()))
.collect();
Ok(labels)
}
/// Increment counter by 1
fn inc(&self) -> PyResult<()> {
self.counter.inc();
Ok(())
}
/// Increment counter by value
fn inc_by(&self, value: u64) -> PyResult<()> {
self.counter.inc_by(value);
Ok(())
}
/// Get counter value
fn get(&self) -> PyResult<u64> {
Ok(self.counter.get())
}
}
impl IntCounter {
fn from_prometheus(counter: prometheus::IntCounter) -> Self {
Self { counter }
}
}
#[pymethods]
impl CounterVec {
/// Get the metric name
fn name(&self) -> PyResult<String> {
let desc = self.counter.desc();
Ok(desc[0].fq_name.clone())
}
/// Get the constant labels
fn const_labels(&self) -> PyResult<HashMap<String, String>> {
let desc = self.counter.desc();
let labels: HashMap<String, String> = desc[0]
.const_label_pairs
.iter()
.map(|pair| (pair.name().to_string(), pair.value().to_string()))
.collect();
Ok(labels)
}
/// Get the variable label names
fn variable_labels(&self) -> PyResult<Vec<String>> {
let desc = self.counter.desc();
Ok(desc[0].variable_labels.clone())
}
/// Increment counter by 1 with labels
fn inc(&self, labels: HashMap<String, String>) -> PyResult<()> {
let desc = self.counter.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
self.counter.with_label_values(&label_values).inc();
Ok(())
}
/// Increment counter by value with labels
fn inc_by(&self, labels: HashMap<String, String>, value: f64) -> PyResult<()> {
let desc = self.counter.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
self.counter.with_label_values(&label_values).inc_by(value);
Ok(())
}
/// Get counter value with labels
fn get(&self, labels: HashMap<String, String>) -> PyResult<f64> {
let desc = self.counter.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
Ok(self.counter.with_label_values(&label_values).get())
}
}
impl CounterVec {
fn from_prometheus(counter: prometheus::CounterVec) -> Self {
Self { counter }
}
}
#[pymethods]
impl IntCounterVec {
/// Get the metric name
fn name(&self) -> PyResult<String> {
let desc = self.counter.desc();
Ok(desc[0].fq_name.clone())
}
/// Get the constant labels
fn const_labels(&self) -> PyResult<HashMap<String, String>> {
let desc = self.counter.desc();
let labels: HashMap<String, String> = desc[0]
.const_label_pairs
.iter()
.map(|pair| (pair.name().to_string(), pair.value().to_string()))
.collect();
Ok(labels)
}
/// Get the variable label names
fn variable_labels(&self) -> PyResult<Vec<String>> {
let desc = self.counter.desc();
Ok(desc[0].variable_labels.clone())
}
/// Increment counter by 1 with labels
fn inc(&self, labels: HashMap<String, String>) -> PyResult<()> {
let desc = self.counter.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
self.counter.with_label_values(&label_values).inc();
Ok(())
}
/// Increment counter by value with labels
fn inc_by(&self, labels: HashMap<String, String>, value: u64) -> PyResult<()> {
let desc = self.counter.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
self.counter.with_label_values(&label_values).inc_by(value);
Ok(())
}
/// Get counter value with labels
fn get(&self, labels: HashMap<String, String>) -> PyResult<u64> {
let desc = self.counter.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
Ok(self.counter.with_label_values(&label_values).get())
}
}
impl IntCounterVec {
fn from_prometheus(counter: prometheus::IntCounterVec) -> Self {
Self { counter }
}
}
#[pymethods]
impl Gauge {
/// Get the metric name
fn name(&self) -> PyResult<String> {
let desc = self.gauge.desc();
Ok(desc[0].fq_name.clone())
}
/// Get the constant labels
fn const_labels(&self) -> PyResult<HashMap<String, String>> {
let desc = self.gauge.desc();
let labels: HashMap<String, String> = desc[0]
.const_label_pairs
.iter()
.map(|pair| (pair.name().to_string(), pair.value().to_string()))
.collect();
Ok(labels)
}
/// Set gauge value
fn set(&self, value: f64) -> PyResult<()> {
self.gauge.set(value);
Ok(())
}
/// Get gauge value
fn get(&self) -> PyResult<f64> {
Ok(self.gauge.get())
}
/// Increment gauge by 1
fn inc(&self) -> PyResult<()> {
self.gauge.inc();
Ok(())
}
/// Increment gauge by value
fn inc_by(&self, value: f64) -> PyResult<()> {
self.gauge.add(value);
Ok(())
}
/// Decrement gauge by 1
fn dec(&self) -> PyResult<()> {
self.gauge.dec();
Ok(())
}
/// Decrement gauge by value
fn dec_by(&self, value: f64) -> PyResult<()> {
self.gauge.sub(value);
Ok(())
}
/// Add value to gauge
fn add(&self, value: f64) -> PyResult<()> {
self.gauge.add(value);
Ok(())
}
/// Subtract value from gauge
fn sub(&self, value: f64) -> PyResult<()> {
self.gauge.sub(value);
Ok(())
}
}
impl Gauge {
fn from_prometheus(gauge: prometheus::Gauge) -> Self {
Self { gauge }
}
}
#[pymethods]
impl IntGauge {
/// Get the metric name
fn name(&self) -> PyResult<String> {
let desc = self.gauge.desc();
Ok(desc[0].fq_name.clone())
}
/// Get the constant labels
fn const_labels(&self) -> PyResult<HashMap<String, String>> {
let desc = self.gauge.desc();
let labels: HashMap<String, String> = desc[0]
.const_label_pairs
.iter()
.map(|pair| (pair.name().to_string(), pair.value().to_string()))
.collect();
Ok(labels)
}
/// Set gauge value
fn set(&self, value: i64) -> PyResult<()> {
self.gauge.set(value);
Ok(())
}
/// Get gauge value
fn get(&self) -> PyResult<i64> {
Ok(self.gauge.get())
}
/// Increment gauge by 1
fn inc(&self) -> PyResult<()> {
self.gauge.inc();
Ok(())
}
/// Decrement gauge by 1
fn dec(&self) -> PyResult<()> {
self.gauge.dec();
Ok(())
}
/// Add value to gauge
fn add(&self, value: i64) -> PyResult<()> {
self.gauge.add(value);
Ok(())
}
/// Subtract value from gauge
fn sub(&self, value: i64) -> PyResult<()> {
self.gauge.sub(value);
Ok(())
}
}
impl IntGauge {
fn from_prometheus(gauge: prometheus::IntGauge) -> Self {
Self { gauge }
}
}
#[pymethods]
impl GaugeVec {
/// Get the metric name
fn name(&self) -> PyResult<String> {
let desc = self.gauge.desc();
Ok(desc[0].fq_name.clone())
}
/// Get the constant labels
fn const_labels(&self) -> PyResult<HashMap<String, String>> {
let desc = self.gauge.desc();
let labels: HashMap<String, String> = desc[0]
.const_label_pairs
.iter()
.map(|pair| (pair.name().to_string(), pair.value().to_string()))
.collect();
Ok(labels)
}
/// Get the variable label names
fn variable_labels(&self) -> PyResult<Vec<String>> {
let desc = self.gauge.desc();
Ok(desc[0].variable_labels.clone())
}
/// Set gauge value with labels
fn set(&self, value: f64, labels: HashMap<String, String>) -> PyResult<()> {
let desc = self.gauge.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
self.gauge.with_label_values(&label_values).set(value);
Ok(())
}
/// Get gauge value with labels
fn get(&self, labels: HashMap<String, String>) -> PyResult<f64> {
let desc = self.gauge.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
Ok(self.gauge.with_label_values(&label_values).get())
}
/// Increment gauge by 1 with labels
fn inc(&self, labels: HashMap<String, String>) -> PyResult<()> {
let desc = self.gauge.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
self.gauge.with_label_values(&label_values).inc();
Ok(())
}
/// Decrement gauge by 1 with labels
fn dec(&self, labels: HashMap<String, String>) -> PyResult<()> {
let desc = self.gauge.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
self.gauge.with_label_values(&label_values).dec();
Ok(())
}
/// Add value to gauge with labels
fn add(&self, labels: HashMap<String, String>, value: f64) -> PyResult<()> {
let desc = self.gauge.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
self.gauge.with_label_values(&label_values).add(value);
Ok(())
}
/// Subtract value from gauge with labels
fn sub(&self, labels: HashMap<String, String>, value: f64) -> PyResult<()> {
let desc = self.gauge.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
self.gauge.with_label_values(&label_values).sub(value);
Ok(())
}
}
impl GaugeVec {
fn from_prometheus(gauge: prometheus::GaugeVec) -> Self {
Self { gauge }
}
}
#[pymethods]
impl IntGaugeVec {
/// Get the metric name
fn name(&self) -> PyResult<String> {
let desc = self.gauge.desc();
Ok(desc[0].fq_name.clone())
}
/// Get the constant labels
fn const_labels(&self) -> PyResult<HashMap<String, String>> {
let desc = self.gauge.desc();
let labels: HashMap<String, String> = desc[0]
.const_label_pairs
.iter()
.map(|pair| (pair.name().to_string(), pair.value().to_string()))
.collect();
Ok(labels)
}
/// Get the variable label names
fn variable_labels(&self) -> PyResult<Vec<String>> {
let desc = self.gauge.desc();
Ok(desc[0].variable_labels.clone())
}
/// Set gauge value with labels
fn set(&self, value: i64, labels: HashMap<String, String>) -> PyResult<()> {
let desc = self.gauge.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
self.gauge.with_label_values(&label_values).set(value);
Ok(())
}
/// Get gauge value with labels
fn get(&self, labels: HashMap<String, String>) -> PyResult<i64> {
let desc = self.gauge.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
Ok(self.gauge.with_label_values(&label_values).get())
}
/// Increment gauge by 1 with labels
fn inc(&self, labels: HashMap<String, String>) -> PyResult<()> {
let desc = self.gauge.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
self.gauge.with_label_values(&label_values).inc();
Ok(())
}
/// Decrement gauge by 1 with labels
fn dec(&self, labels: HashMap<String, String>) -> PyResult<()> {
let desc = self.gauge.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
self.gauge.with_label_values(&label_values).dec();
Ok(())
}
/// Add value to gauge with labels
fn add(&self, labels: HashMap<String, String>, value: i64) -> PyResult<()> {
let desc = self.gauge.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
self.gauge.with_label_values(&label_values).add(value);
Ok(())
}
/// Subtract value from gauge with labels
fn sub(&self, labels: HashMap<String, String>, value: i64) -> PyResult<()> {
let desc = self.gauge.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
self.gauge.with_label_values(&label_values).sub(value);
Ok(())
}
}
impl IntGaugeVec {
fn from_prometheus(gauge: prometheus::IntGaugeVec) -> Self {
Self { gauge }
}
}
#[pymethods]
impl Histogram {
/// Get the metric name
fn name(&self) -> PyResult<String> {
let desc = self.histogram.desc();
Ok(desc[0].fq_name.clone())
}
/// Get the constant labels
fn const_labels(&self) -> PyResult<HashMap<String, String>> {
let desc = self.histogram.desc();
let labels: HashMap<String, String> = desc[0]
.const_label_pairs
.iter()
.map(|pair| (pair.name().to_string(), pair.value().to_string()))
.collect();
Ok(labels)
}
/// Observe a value
fn observe(&self, value: f64) -> PyResult<()> {
self.histogram.observe(value);
Ok(())
}
}
impl Histogram {
fn from_prometheus(histogram: prometheus::Histogram) -> Self {
Self { histogram }
}
}
/// RuntimeMetrics provides factory methods for creating typed Prometheus metrics
/// and utilities for registering metrics callbacks.
/// Exposed as endpoint.metrics, component.metrics, and namespace.metrics in Python.
///
/// NOTE: The create_* methods in RuntimeMetrics must stay in sync with the MetricsRegistry trait
/// in lib/runtime/src/metrics.rs. When adding new metric types, update both locations.
/// Note: Metric creation methods have been removed from the public API.
/// This class only provides callback registration for integrating external metrics.
#[pyclass]
#[derive(Clone)]
pub struct RuntimeMetrics {
......@@ -662,64 +30,10 @@ impl RuntimeMetrics {
hierarchy: Arc::new(endpoint),
}
}
/// Create from Component
pub fn from_component(component: dynamo_runtime::component::Component) -> Self {
Self {
hierarchy: Arc::new(component),
}
}
/// Create from Namespace
pub fn from_namespace(namespace: dynamo_runtime::component::Namespace) -> Self {
Self {
hierarchy: Arc::new(namespace),
}
}
/// Helper to convert Python labels (String, String) to Rust labels (&str, &str)
fn convert_py_to_rust_labels(labels: &Option<Vec<(String, String)>>) -> Vec<(&str, &str)> {
labels
.as_ref()
.map(|v| v.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect())
.unwrap_or_default()
}
/// Helper to convert Python label names Vec<String> to Vec<&str>
fn convert_py_to_rust_label_names(names: &[String]) -> Vec<&str> {
names.iter().map(|s| s.as_str()).collect()
}
/// Generic helper to register metrics callbacks for any type implementing MetricsHierarchy
/// This allows Endpoint, Component, and Namespace to share the same callback registration logic
pub fn register_callback_for<T>(registry_item: &T, callback: PyObject) -> PyResult<()>
where
T: rs::metrics::MetricsHierarchy + ?Sized,
{
// Get the metrics registry from the hierarchy and register the callback directly
let metrics_registry = registry_item.get_metrics_registry();
metrics_registry.add_update_callback(Arc::new(move || {
// Execute the Python callback in the Python event loop
Python::with_gil(|py| {
if let Err(e) = callback.call0(py) {
tracing::error!("Metrics callback failed: {}", e);
}
});
Ok(())
}));
Ok(())
}
}
#[pymethods]
impl RuntimeMetrics {
/// Register a Python callback to be invoked before metrics are scraped
/// This callback will be called for this endpoint's metrics hierarchy
fn register_callback(&self, callback: PyObject, _py: Python) -> PyResult<()> {
Self::register_callback_for(self.hierarchy.as_ref(), callback)
}
/// Register a Python callback that returns Prometheus exposition text
/// The returned text will be appended to the /metrics endpoint output
/// The callback should return a string in Prometheus text exposition format
......@@ -766,250 +80,9 @@ impl RuntimeMetrics {
Ok(())
}
// NOTE: The order of create_* methods below matches lib/runtime/src/metrics.rs::MetricsRegistry trait
// Keep them synchronized when adding new metric types
/// Create a Counter metric
#[pyo3(signature = (name, description, labels=None))]
fn create_counter(
&self,
name: String,
description: String,
labels: Option<Vec<(String, String)>>,
py: Python,
) -> PyResult<Py<Counter>> {
let labels_vec = Self::convert_py_to_rust_labels(&labels);
let counter: prometheus::Counter = rs::metrics::create_metric(
self.hierarchy.as_ref(),
&name,
&description,
&labels_vec,
None,
None,
)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
let metric = Counter::from_prometheus(counter);
Py::new(py, metric)
}
/// Create a CounterVec metric
#[pyo3(signature = (name, description, label_names, const_labels=None))]
fn create_countervec(
&self,
name: String,
description: String,
label_names: Vec<String>,
const_labels: Option<Vec<(String, String)>>,
py: Python,
) -> PyResult<Py<CounterVec>> {
let label_names_str = Self::convert_py_to_rust_label_names(&label_names);
let const_labels_vec = Self::convert_py_to_rust_labels(&const_labels);
let counter_vec: prometheus::CounterVec = rs::metrics::create_metric(
self.hierarchy.as_ref(),
&name,
&description,
&const_labels_vec,
None,
Some(&label_names_str),
)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
let metric = CounterVec::from_prometheus(counter_vec);
Py::new(py, metric)
}
/// Create a Gauge metric
#[pyo3(signature = (name, description, labels=None))]
fn create_gauge(
&self,
name: String,
description: String,
labels: Option<Vec<(String, String)>>,
py: Python,
) -> PyResult<Py<Gauge>> {
let labels_vec = Self::convert_py_to_rust_labels(&labels);
let gauge: prometheus::Gauge = rs::metrics::create_metric(
self.hierarchy.as_ref(),
&name,
&description,
&labels_vec,
None,
None,
)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
let metric = Gauge::from_prometheus(gauge);
Py::new(py, metric)
}
/// Create a GaugeVec metric
#[pyo3(signature = (name, description, label_names, const_labels=None))]
fn create_gaugevec(
&self,
name: String,
description: String,
label_names: Vec<String>,
const_labels: Option<Vec<(String, String)>>,
py: Python,
) -> PyResult<Py<GaugeVec>> {
let label_names_str = Self::convert_py_to_rust_label_names(&label_names);
let const_labels_vec = Self::convert_py_to_rust_labels(&const_labels);
let gauge_vec: prometheus::GaugeVec = rs::metrics::create_metric(
self.hierarchy.as_ref(),
&name,
&description,
&const_labels_vec,
None,
Some(&label_names_str),
)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
let metric = GaugeVec::from_prometheus(gauge_vec);
Py::new(py, metric)
}
/// Create a Histogram metric
#[pyo3(signature = (name, description, labels=None))]
fn create_histogram(
&self,
name: String,
description: String,
labels: Option<Vec<(String, String)>>,
py: Python,
) -> PyResult<Py<Histogram>> {
let labels_vec = Self::convert_py_to_rust_labels(&labels);
let histogram: prometheus::Histogram = rs::metrics::create_metric(
self.hierarchy.as_ref(),
&name,
&description,
&labels_vec,
None,
None,
)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
let metric = Histogram::from_prometheus(histogram);
Py::new(py, metric)
}
/// Create an IntCounter metric
#[pyo3(signature = (name, description, labels=None))]
fn create_intcounter(
&self,
name: String,
description: String,
labels: Option<Vec<(String, String)>>,
py: Python,
) -> PyResult<Py<IntCounter>> {
let labels_vec = Self::convert_py_to_rust_labels(&labels);
let counter: prometheus::IntCounter = rs::metrics::create_metric(
self.hierarchy.as_ref(),
&name,
&description,
&labels_vec,
None,
None,
)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
let metric = IntCounter::from_prometheus(counter);
Py::new(py, metric)
}
/// Create an IntCounterVec metric
#[pyo3(signature = (name, description, label_names, const_labels=None))]
fn create_intcountervec(
&self,
name: String,
description: String,
label_names: Vec<String>,
const_labels: Option<Vec<(String, String)>>,
py: Python,
) -> PyResult<Py<IntCounterVec>> {
let label_names_str = Self::convert_py_to_rust_label_names(&label_names);
let const_labels_vec = Self::convert_py_to_rust_labels(&const_labels);
let counter_vec: prometheus::IntCounterVec = rs::metrics::create_metric(
self.hierarchy.as_ref(),
&name,
&description,
&const_labels_vec,
None,
Some(&label_names_str),
)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
let metric = IntCounterVec::from_prometheus(counter_vec);
Py::new(py, metric)
}
/// Create an IntGauge metric
#[pyo3(signature = (name, description, labels=None))]
fn create_intgauge(
&self,
name: String,
description: String,
labels: Option<Vec<(String, String)>>,
py: Python,
) -> PyResult<Py<IntGauge>> {
let labels_vec = Self::convert_py_to_rust_labels(&labels);
let gauge: prometheus::IntGauge = rs::metrics::create_metric(
self.hierarchy.as_ref(),
&name,
&description,
&labels_vec,
None,
None,
)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
let metric = IntGauge::from_prometheus(gauge);
Py::new(py, metric)
}
/// Create an IntGaugeVec metric
#[pyo3(signature = (name, description, label_names, const_labels=None))]
fn create_intgaugevec(
&self,
name: String,
description: String,
label_names: Vec<String>,
const_labels: Option<Vec<(String, String)>>,
py: Python,
) -> PyResult<Py<IntGaugeVec>> {
let label_names_str = Self::convert_py_to_rust_label_names(&label_names);
let const_labels_vec = Self::convert_py_to_rust_labels(&const_labels);
let gauge_vec: prometheus::IntGaugeVec = rs::metrics::create_metric(
self.hierarchy.as_ref(),
&name,
&description,
&const_labels_vec,
None,
Some(&label_names_str),
)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(e.to_string()))?;
let metric = IntGaugeVec::from_prometheus(gauge_vec);
Py::new(py, metric)
}
}
pub fn add_to_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
// Add specific metric type classes
m.add_class::<Counter>()?;
m.add_class::<IntCounter>()?;
m.add_class::<CounterVec>()?;
m.add_class::<IntCounterVec>()?;
m.add_class::<Gauge>()?;
m.add_class::<IntGauge>()?;
m.add_class::<GaugeVec>()?;
m.add_class::<IntGaugeVec>()?;
m.add_class::<Histogram>()?;
pub fn add_to_module(_m: &Bound<'_, PyModule>) -> PyResult<()> {
// No metric type classes to add - only RuntimeMetrics is exposed
Ok(())
}
......@@ -129,16 +129,6 @@ class Namespace:
"""
...
@property
def metrics(self) -> PyRuntimeMetrics:
"""
Get a PyRuntimeMetrics helper for creating Prometheus metrics.
Returns:
A PyRuntimeMetrics object that provides create_* methods for different metric types
"""
...
class Component:
"""
A component is a collection of endpoints
......@@ -152,15 +142,6 @@ class Component:
"""
...
@property
def metrics(self) -> PyRuntimeMetrics:
"""
Get a PyRuntimeMetrics helper for creating Prometheus metrics.
Returns:
A PyRuntimeMetrics object that provides create_* methods for different metric types
"""
...
class Endpoint:
"""
......@@ -198,10 +179,10 @@ class Endpoint:
@property
def metrics(self) -> PyRuntimeMetrics:
"""
Get a PyRuntimeMetrics helper for creating Prometheus metrics.
Get a PyRuntimeMetrics helper for registering Prometheus metrics callbacks.
Returns:
A PyRuntimeMetrics object that provides create_* methods for different metric types
A PyRuntimeMetrics object for callback registration
"""
...
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Official public API for prometheus metrics types.
This module provides the official public API for Prometheus metrics in Dynamo.
The metric types are implemented in Rust and exposed via the _core extension module.
"""
# Import directly from the Rust extension module
# Note: IDEs/type checkers may complain about this import because _core is a compiled
# extension module (.so file). However, this import is valid at runtime because the
# Rust code (lib.rs) creates and registers the prometheus_metrics submodule.
from dynamo._core import PyRuntimeMetrics # type: ignore[attr-defined]
from dynamo._core import prometheus_metrics # type: ignore[attr-defined]
# Re-export metric type classes from the prometheus_metrics submodule
Counter = prometheus_metrics.Counter
CounterVec = prometheus_metrics.CounterVec
Gauge = prometheus_metrics.Gauge
GaugeVec = prometheus_metrics.GaugeVec
Histogram = prometheus_metrics.Histogram
IntCounter = prometheus_metrics.IntCounter
IntCounterVec = prometheus_metrics.IntCounterVec
IntGauge = prometheus_metrics.IntGauge
IntGaugeVec = prometheus_metrics.IntGaugeVec
# RuntimeMetrics is in the main _core module (as PyRuntimeMetrics), not the submodule
RuntimeMetrics = PyRuntimeMetrics
__all__ = [
"Counter",
"CounterVec",
"Gauge",
"GaugeVec",
"Histogram",
"IntCounter",
"IntCounterVec",
"IntGauge",
"IntGaugeVec",
"RuntimeMetrics",
]
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Type stubs for the official public API of prometheus metrics.
"""Type stubs for prometheus metrics callbacks.
This file defines Python type stubs for Prometheus metric types.
It should be kept in sync with:
- lib/bindings/python/rust/metrics.rs (Rust implementations)
- lib/runtime/src/metrics.rs (MetricsRegistry trait and Prometheus types)
This file defines Python type stubs for the RuntimeMetrics class.
Only register_prometheus_expfmt_callback is exposed for integrating external metrics.
"""
from typing import Callable, Dict, List, Optional, Tuple
# Specific metric type classes
class Counter:
"""Prometheus Counter metric (float)"""
def name(self) -> str:
"""Get the metric name"""
...
def const_labels(self) -> Dict[str, str]:
"""Get the constant labels"""
...
def inc(self) -> None:
"""Increment counter by 1"""
...
def inc_by(self, value: float) -> None:
"""Increment counter by value"""
...
def get(self) -> float:
"""Get counter value"""
...
class CounterVec:
"""Prometheus CounterVec metric with labels (float)"""
def name(self) -> str:
"""Get the metric name"""
...
def const_labels(self) -> Dict[str, str]:
"""Get the constant labels"""
...
def variable_labels(self) -> List[str]:
"""Get the variable label names"""
...
def inc(self, labels: Dict[str, str]) -> None:
"""Increment counter by 1 with labels"""
...
def inc_by(self, labels: Dict[str, str], value: float) -> None:
"""Increment counter by value with labels"""
...
def get(self, labels: Dict[str, str]) -> float:
"""Get counter value with labels"""
...
class Gauge:
"""Prometheus Gauge metric (float)"""
def name(self) -> str:
"""Get the metric name"""
...
def const_labels(self) -> Dict[str, str]:
"""Get the constant labels"""
...
def set(self, value: float) -> None:
"""Set gauge value"""
...
def get(self) -> float:
"""Get gauge value"""
...
def inc(self) -> None:
"""Increment gauge by 1"""
...
def inc_by(self, value: float) -> None:
"""Increment gauge by value"""
...
def dec(self) -> None:
"""Decrement gauge by 1"""
...
def dec_by(self, value: float) -> None:
"""Decrement gauge by value"""
...
def add(self, value: float) -> None:
"""Add value to gauge"""
...
def sub(self, value: float) -> None:
"""Subtract value from gauge"""
...
class GaugeVec:
"""Prometheus GaugeVec metric with labels (float)"""
def name(self) -> str:
"""Get the metric name"""
...
def const_labels(self) -> Dict[str, str]:
"""Get the constant labels"""
...
def variable_labels(self) -> List[str]:
"""Get the variable label names"""
...
def set(self, value: float, labels: Dict[str, str]) -> None:
"""Set gauge value with labels"""
...
def get(self, labels: Dict[str, str]) -> float:
"""Get gauge value with labels"""
...
def inc(self, labels: Dict[str, str]) -> None:
"""Increment gauge by 1 with labels"""
...
def dec(self, labels: Dict[str, str]) -> None:
"""Decrement gauge by 1 with labels"""
...
def add(self, labels: Dict[str, str], value: float) -> None:
"""Add value to gauge with labels"""
...
def sub(self, labels: Dict[str, str], value: float) -> None:
"""Subtract value from gauge with labels"""
...
class Histogram:
"""Prometheus Histogram metric"""
def name(self) -> str:
"""Get the metric name"""
...
def const_labels(self) -> Dict[str, str]:
"""Get the constant labels"""
...
def observe(self, value: float) -> None:
"""Observe a value"""
...
class IntCounter:
"""Prometheus IntCounter metric (integer)"""
def name(self) -> str:
"""Get the metric name"""
...
def const_labels(self) -> Dict[str, str]:
"""Get the constant labels"""
...
def inc(self) -> None:
"""Increment counter by 1"""
...
def inc_by(self, value: int) -> None:
"""Increment counter by value"""
...
def get(self) -> int:
"""Get counter value"""
...
class IntCounterVec:
"""Prometheus IntCounterVec metric with labels (integer)"""
def name(self) -> str:
"""Get the metric name"""
...
def const_labels(self) -> Dict[str, str]:
"""Get the constant labels"""
...
def variable_labels(self) -> List[str]:
"""Get the variable label names"""
...
def inc(self, labels: Dict[str, str]) -> None:
"""Increment counter by 1 with labels"""
...
def inc_by(self, labels: Dict[str, str], value: int) -> None:
"""Increment counter by value with labels"""
...
def get(self, labels: Dict[str, str]) -> int:
"""Get counter value with labels"""
...
class IntGauge:
"""Prometheus IntGauge metric (integer)"""
def name(self) -> str:
"""Get the metric name"""
...
def const_labels(self) -> Dict[str, str]:
"""Get the constant labels"""
...
def set(self, value: int) -> None:
"""Set gauge value"""
...
def get(self) -> int:
"""Get gauge value"""
...
def inc(self) -> None:
"""Increment gauge by 1"""
...
def dec(self) -> None:
"""Decrement gauge by 1"""
...
def add(self, value: int) -> None:
"""Add value to gauge"""
...
def sub(self, value: int) -> None:
"""Subtract value from gauge"""
...
class IntGaugeVec:
"""Prometheus IntGaugeVec metric with labels (integer)"""
def name(self) -> str:
"""Get the metric name"""
...
def const_labels(self) -> Dict[str, str]:
"""Get the constant labels"""
...
def variable_labels(self) -> List[str]:
"""Get the variable label names"""
...
def set(self, value: int, labels: Dict[str, str]) -> None:
"""Set gauge value with labels"""
...
def get(self, labels: Dict[str, str]) -> int:
"""Get gauge value with labels"""
...
def inc(self, labels: Dict[str, str]) -> None:
"""Increment gauge by 1 with labels"""
...
def dec(self, labels: Dict[str, str]) -> None:
"""Decrement gauge by 1 with labels"""
...
def add(self, labels: Dict[str, str], value: int) -> None:
"""Add value to gauge with labels"""
...
def sub(self, labels: Dict[str, str], value: int) -> None:
"""Subtract value from gauge with labels"""
...
from typing import Callable
class RuntimeMetrics:
"""
Helper class for creating Prometheus metrics on an Endpoint.
Provides factory methods to create various Prometheus metric types
that are automatically registered with the endpoint's Prometheus registry.
Also provides utilities for registering metrics callbacks.
"""
def register_callback(self, callback: Callable[[], None]) -> None:
"""
Register a Python callback to be invoked before metrics are scraped.
This allows you to update metric values dynamically when the /metrics endpoint
is accessed. The callback will be executed synchronously before serving metrics.
Helper class for registering Prometheus metrics callbacks on an Endpoint.
Args:
callback: A callable that takes no arguments and returns None.
This function will be called each time metrics are scraped.
Example:
```python
metrics = endpoint.metrics
counter = metrics.create_intcounter("request_count", "Total requests")
def update_metrics():
counter.inc()
metrics.register_callback(update_metrics)
```
Provides utilities for integrating external metrics (e.g., from vLLM, SGLang, TensorRT-LLM).
"""
...
def register_prometheus_expfmt_callback(self, callback: Callable[[], str]) -> None:
"""
......@@ -266,63 +27,9 @@ class RuntimeMetrics:
Args:
callback: A callable that takes no arguments and returns a string
in Prometheus text exposition format
Example:
```python
def get_external_metrics():
# Fetch metrics from external source
return "# HELP external_metric Some metric\\nexternal_metric 42.0\\n"
metrics.register_prometheus_expfmt_callback(get_external_metrics)
```
"""
...
def create_counter(self, name: str, description: str, const_labels: Optional[List[Tuple[str, str]]] = None) -> Counter:
"""Create a Counter metric (float) with optional static labels"""
...
def create_countervec(self, name: str, description: str, label_names: List[str], const_labels: Optional[List[Tuple[str, str]]] = None) -> CounterVec:
"""Create a CounterVec metric with labels (float)"""
...
def create_gauge(self, name: str, description: str, const_labels: Optional[List[Tuple[str, str]]] = None) -> Gauge:
"""Create a Gauge metric (float) with optional static labels"""
...
def create_gaugevec(self, name: str, description: str, label_names: List[str], const_labels: Optional[List[Tuple[str, str]]] = None) -> GaugeVec:
"""Create a GaugeVec metric with labels (float)"""
...
def create_histogram(self, name: str, description: str, const_labels: Optional[List[Tuple[str, str]]] = None) -> Histogram:
"""Create a Histogram metric with optional static labels"""
...
def create_intcounter(self, name: str, description: str, const_labels: Optional[List[Tuple[str, str]]] = None) -> IntCounter:
"""Create an IntCounter metric (integer) with optional static labels"""
...
def create_intcountervec(self, name: str, description: str, label_names: List[str], const_labels: Optional[List[Tuple[str, str]]] = None) -> IntCounterVec:
"""Create an IntCounterVec metric with labels (integer)"""
...
def create_intgauge(self, name: str, description: str, const_labels: Optional[List[Tuple[str, str]]] = None) -> IntGauge:
"""Create an IntGauge metric (integer) with optional static labels"""
...
def create_intgaugevec(self, name: str, description: str, label_names: List[str], const_labels: Optional[List[Tuple[str, str]]] = None) -> IntGaugeVec:
"""Create an IntGaugeVec metric with labels (integer)"""
...
__all__ = [
"Counter",
"CounterVec",
"Gauge",
"GaugeVec",
"Histogram",
"IntCounter",
"IntCounterVec",
"IntGauge",
"IntGaugeVec",
"RuntimeMetrics",
]
......@@ -11,14 +11,14 @@ TWO MODES OF OPERATION:
- Requires: pytest-forked (uv pip install pytest-forked)
- Tests using 'runtime' fixture MUST have @pytest.mark.forked
- Safer, enables parallel execution
- Run: ENABLE_ISOLATED_ETCD_AND_NATS=1 pytest tests/test_metrics_registry.py -n auto
- Run: ENABLE_ISOLATED_ETCD_AND_NATS=1 pytest tests/ -n auto
2. Default Ports Mode (ENABLE_ISOLATED_ETCD_AND_NATS=0, default):
- All tests share NATS/ETCD on default ports (4222, 2379)
- No pytest-forked required
- No @pytest.mark.forked required
- Faster for sequential runs, but NO parallel execution
- Run: pytest tests/test_metrics_registry.py
- Run: pytest tests/
Performance comparison (32-core machine, 13 tests):
Default ports (ENABLE_ISOLATED_ETCD_AND_NATS=0, default): 4.06s (sequential only)
......
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Tests for Python MetricsRegistry bindings.
This test suite verifies that Python can create, introspect, and use Prometheus
metrics through the Dynamo MetricsRegistry interface.
"""
import pytest
async def get_metrics_runtime(runtime, endpoint_name):
"""Helper to create a unique metrics runtime for each test."""
namespace = runtime.namespace("test_metrics_ns")
component = namespace.component("test_metrics_comp")
endpoint = component.endpoint(endpoint_name)
return endpoint.metrics
pytestmark = pytest.mark.pre_merge
@pytest.mark.asyncio
@pytest.mark.forked
async def test_counter_introspection(runtime):
"""Test Counter metric introspection methods."""
metrics_runtime = await get_metrics_runtime(runtime, "ep_counter_introspection")
counter = metrics_runtime.create_counter(
"test_counter", "A test counter", [("env", "test")] # constant labels
)
# Test name() method
name = counter.name()
assert isinstance(name, str)
assert "test_counter" in name
assert name.endswith("test_counter")
# Test const_labels() method
labels = counter.const_labels()
assert isinstance(labels, dict)
assert "env" in labels
assert labels["env"] == "test"
assert "dynamo_namespace" in labels
assert labels["dynamo_namespace"] == "test_metrics_ns"
@pytest.mark.asyncio
@pytest.mark.forked
async def test_intcounter_introspection(runtime):
"""Test IntCounter metric introspection methods."""
metrics_runtime = await get_metrics_runtime(runtime, "ep_intcounter_introspection")
counter = metrics_runtime.create_intcounter(
"test_int_counter", "A test int counter", [("type", "integer")]
)
name = counter.name()
assert isinstance(name, str)
assert "test_int_counter" in name
labels = counter.const_labels()
assert isinstance(labels, dict)
assert labels["type"] == "integer"
@pytest.mark.asyncio
@pytest.mark.forked
async def test_gauge_introspection(runtime):
"""Test Gauge metric introspection methods."""
metrics_runtime = await get_metrics_runtime(runtime, "ep_gauge_introspection")
gauge = metrics_runtime.create_gauge(
"test_gauge", "A test gauge", [("unit", "bytes")]
)
name = gauge.name()
assert isinstance(name, str)
assert "test_gauge" in name
labels = gauge.const_labels()
assert isinstance(labels, dict)
assert labels["unit"] == "bytes"
@pytest.mark.asyncio
@pytest.mark.forked
async def test_intgauge_introspection(runtime):
"""Test IntGauge metric introspection methods."""
metrics_runtime = await get_metrics_runtime(runtime, "ep_intgauge_introspection")
gauge = metrics_runtime.create_intgauge(
"test_int_gauge", "A test int gauge", [] # no constant labels
)
name = gauge.name()
assert isinstance(name, str)
assert "test_int_gauge" in name
labels = gauge.const_labels()
assert isinstance(labels, dict)
# Should still have hierarchy labels
assert "dynamo_namespace" in labels
@pytest.mark.asyncio
@pytest.mark.forked
async def test_histogram_introspection(runtime):
"""Test Histogram metric introspection methods."""
metrics_runtime = await get_metrics_runtime(runtime, "ep_histogram_introspection")
histogram = metrics_runtime.create_histogram(
"test_histogram", "A test histogram", [("method", "POST")]
)
name = histogram.name()
assert isinstance(name, str)
assert "test_histogram" in name
labels = histogram.const_labels()
assert isinstance(labels, dict)
assert labels["method"] == "POST"
@pytest.mark.asyncio
@pytest.mark.forked
async def test_countervec_introspection(runtime):
"""Test CounterVec metric introspection methods."""
metrics_runtime = await get_metrics_runtime(runtime, "ep_countervec_introspection")
counter_vec = metrics_runtime.create_countervec(
"test_counter_vec",
"A test counter vec",
["worker_id", "status"], # variable labels
[("cluster", "prod")], # constant labels
)
# Test name()
name = counter_vec.name()
assert isinstance(name, str)
assert "test_counter_vec" in name
# Test const_labels()
const_labels = counter_vec.const_labels()
assert isinstance(const_labels, dict)
assert const_labels["cluster"] == "prod"
assert "dynamo_namespace" in const_labels
# Test variable_labels()
var_labels = counter_vec.variable_labels()
assert isinstance(var_labels, list)
assert len(var_labels) == 2
assert "worker_id" in var_labels
assert "status" in var_labels
@pytest.mark.asyncio
@pytest.mark.forked
async def test_intcountervec_introspection(runtime):
"""Test IntCounterVec metric introspection methods."""
metrics_runtime = await get_metrics_runtime(
runtime, "ep_intcountervec_introspection"
)
counter_vec = metrics_runtime.create_intcountervec(
"test_int_counter_vec",
"A test int counter vec",
["region", "zone"],
[], # no constant labels
)
name = counter_vec.name()
assert "test_int_counter_vec" in name
const_labels = counter_vec.const_labels()
assert isinstance(const_labels, dict)
var_labels = counter_vec.variable_labels()
assert len(var_labels) == 2
assert "region" in var_labels
assert "zone" in var_labels
@pytest.mark.asyncio
@pytest.mark.forked
async def test_gaugevec_introspection(runtime):
"""Test GaugeVec metric introspection methods."""
metrics_runtime = await get_metrics_runtime(runtime, "ep_gaugevec_introspection")
gauge_vec = metrics_runtime.create_gaugevec(
"test_gauge_vec", "A test gauge vec", ["instance", "job"], [("env", "staging")]
)
name = gauge_vec.name()
assert "test_gauge_vec" in name
const_labels = gauge_vec.const_labels()
assert const_labels["env"] == "staging"
var_labels = gauge_vec.variable_labels()
assert len(var_labels) == 2
assert "instance" in var_labels
assert "job" in var_labels
@pytest.mark.asyncio
@pytest.mark.forked
async def test_intgaugevec_introspection(runtime):
"""Test IntGaugeVec metric introspection methods."""
metrics_runtime = await get_metrics_runtime(runtime, "ep_intgaugevec_introspection")
gauge_vec = metrics_runtime.create_intgaugevec(
"test_int_gauge_vec",
"A test int gauge vec",
["device", "partition"],
[("datacenter", "us-west")],
)
name = gauge_vec.name()
assert "test_int_gauge_vec" in name
const_labels = gauge_vec.const_labels()
assert const_labels["datacenter"] == "us-west"
var_labels = gauge_vec.variable_labels()
assert len(var_labels) == 2
assert "device" in var_labels
assert "partition" in var_labels
@pytest.mark.asyncio
@pytest.mark.forked
async def test_metric_operations(runtime):
"""Test that metrics can be used after introspection."""
metrics_runtime = await get_metrics_runtime(runtime, "ep_metric_operations")
# Counter operations
counter = metrics_runtime.create_intcounter("ops_counter", "Operations counter", [])
counter.inc()
counter.inc_by(5)
assert counter.get() == 6
# Gauge operations
gauge = metrics_runtime.create_intgauge(
"connections_gauge", "Connections gauge", []
)
gauge.set(10)
assert gauge.get() == 10
gauge.inc()
assert gauge.get() == 11
gauge.dec()
assert gauge.get() == 10
# Vec operations
gauge_vec = metrics_runtime.create_intgaugevec(
"worker_gauge_vec", "Worker gauge vec", ["worker_id"], []
)
gauge_vec.set(5, {"worker_id": "w1"})
assert gauge_vec.get({"worker_id": "w1"}) == 5
gauge_vec.inc({"worker_id": "w1"})
assert gauge_vec.get({"worker_id": "w1"}) == 6
@pytest.mark.asyncio
@pytest.mark.forked
async def test_multiple_metrics_same_runtime(runtime):
"""Test creating multiple metrics in the same runtime."""
metrics_runtime = await get_metrics_runtime(
runtime, "ep_multiple_metrics_same_runtime"
)
counter1 = metrics_runtime.create_intcounter("counter1", "Counter 1", [])
counter2 = metrics_runtime.create_intcounter("counter2", "Counter 2", [])
gauge1 = metrics_runtime.create_gauge("gauge1", "Gauge 1", [])
# All should have unique names
names = {counter1.name(), counter2.name(), gauge1.name()}
assert len(names) == 3
# All should share the same hierarchy labels
for metric in [counter1, counter2, gauge1]:
labels = metric.const_labels()
assert labels["dynamo_namespace"] == "test_metrics_ns"
assert "dynamo_component" in labels # Component name is test-specific
assert labels["dynamo_endpoint"] == "ep_multiple_metrics_same_runtime"
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment