Unverified Commit a4746ab6 authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

fix: callback registration, fix metric name access, ensure ordered vec, etc... (#3541)


Signed-off-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
Co-authored-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent 2ba8bdb1
......@@ -216,7 +216,7 @@ Method 2 supports all standard Prometheus metric types:
- **CounterVec**: `CounterVec` (float with labels), `IntCounterVec` (integer with labels)
- **Histograms**: `Histogram`
All metrics are imported from `dynamo._prometheus_metrics`.
All metrics are imported from `dynamo.prometheus_metrics`.
#### Adding/Changing Metrics in Method 2
......
......@@ -10,6 +10,12 @@ This shows how Python code can:
2. Register them with an endpoint
3. Update their values using type-safe methods (set for gauges, inc for counters)
4. The metrics are automatically served via the /metrics endpoint
Usage:
DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=8081 ./server_with_callback.py
# In another terminal, query the metrics:
curl http://localhost:8081/metrics
"""
import asyncio
......@@ -18,8 +24,8 @@ import uvloop
# Note that these imports are for type hints only. They cannot be instantiated directly.
# You can instantiate them using the endpoint.metrics.create_*() methods.
from dynamo._prometheus_metrics import Gauge, IntCounter, IntGauge, IntGaugeVec
from dynamo.runtime import DistributedRuntime, dynamo_worker
from dynamo.prometheus_metrics import Gauge, IntCounter, IntGauge, IntGaugeVec
from dynamo.runtime import Component, DistributedRuntime, Endpoint, dynamo_worker
@dynamo_worker()
......@@ -29,10 +35,10 @@ async def worker(runtime: DistributedRuntime) -> None:
async def init(runtime: DistributedRuntime):
# Create component and endpoint
component = runtime.namespace("ns556").component("cp556")
component: Component = runtime.namespace("ns556").component("cp556")
await component.create_service()
endpoint = component.endpoint("ep556")
endpoint: Endpoint = component.endpoint("ep556")
# Step 1: Create metrics using the endpoint's metrics property
print("[python] Creating metrics...")
......@@ -59,11 +65,10 @@ async def init(runtime: DistributedRuntime):
[("update_method", "callback")],
)
print(f"[python] Created IntGauge: {request_total_slots.name}")
print(f"[python] Created Gauge: {gpu_cache_usage_perc.name}")
print(f"[python] Created IntGaugeVec: {worker_active_requests.name}")
print(f"[python] Created IntCounter with constant labels: {update_count.name}")
print(f"[python] Const labels: {update_count.const_labels}")
print(f"[python] Created IntGauge: {request_total_slots.name()}")
print(f"[python] Created Gauge: {gpu_cache_usage_perc.name()}")
print(f"[python] Created IntGaugeVec: {worker_active_requests.name()}")
print(f"[python] Created IntCounter: {update_count.name()}")
print("[python] Metrics automatically registered with endpoint!")
# Step 2: Register a callback to update metrics on-demand
......
......@@ -10,6 +10,12 @@ This shows an alternative approach where:
2. A background thread continuously updates metrics in a loop
3. No callback is used - metrics are updated directly by the thread
4. The metrics are automatically served via the /metrics endpoint
Usage:
DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=8081 ./server_with_loop.py
# In another terminal, query the metrics:
curl http://localhost:8081/metrics
"""
import asyncio
......@@ -18,8 +24,8 @@ import time
import uvloop
from dynamo._prometheus_metrics import Gauge, IntCounter, IntGauge, IntGaugeVec
from dynamo.runtime import DistributedRuntime, dynamo_worker
from dynamo.prometheus_metrics import Gauge, IntCounter, IntGauge, IntGaugeVec
from dynamo.runtime import Component, DistributedRuntime, Endpoint, dynamo_worker
def metrics_updater_thread(
......@@ -60,10 +66,10 @@ async def worker(runtime: DistributedRuntime) -> None:
async def init(runtime: DistributedRuntime):
# Create component and endpoint
component = runtime.namespace("ns557").component("cp557")
component: Component = runtime.namespace("ns557").component("cp557")
await component.create_service()
endpoint = component.endpoint("ep557")
endpoint: Endpoint = component.endpoint("ep557")
# Create metrics using the endpoint's metrics property
print("[python] Creating metrics...")
......@@ -87,10 +93,10 @@ async def init(runtime: DistributedRuntime):
[("update_method", "background_thread")],
)
print(f"[python] Created IntGauge: {request_total_slots.name}")
print(f"[python] Created Gauge: {gpu_cache_usage_perc.name}")
print(f"[python] Created IntGaugeVec: {worker_active_requests.name}")
print(f"[python] Created IntCounter: {update_count.name}")
print(f"[python] Created IntGauge: {request_total_slots.name()}")
print(f"[python] Created Gauge: {gpu_cache_usage_perc.name()}")
print(f"[python] Created IntGaugeVec: {worker_active_requests.name()}")
print(f"[python] Created IntCounter: {update_count.name()}")
print("[python] Metrics automatically registered with endpoint!")
# Set initial values
......
......@@ -185,7 +185,7 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
engine::add_to_module(m)?;
parsers::add_to_module(m)?;
m.add_class::<prometheus_metrics::PyRuntimeMetrics>()?;
m.add_class::<prometheus_metrics::RuntimeMetrics>()?;
let prometheus_metrics = PyModule::new(m.py(), "prometheus_metrics")?;
prometheus_metrics::add_to_module(&prometheus_metrics)?;
m.add_submodule(&prometheus_metrics)?;
......@@ -637,8 +637,8 @@ impl Component {
/// Get a RuntimeMetrics helper for creating Prometheus metrics
#[getter]
fn metrics(&self) -> prometheus_metrics::PyRuntimeMetrics {
prometheus_metrics::PyRuntimeMetrics::from_component(self.inner.clone())
fn metrics(&self) -> prometheus_metrics::RuntimeMetrics {
prometheus_metrics::RuntimeMetrics::from_component(self.inner.clone())
}
}
......@@ -726,8 +726,8 @@ impl Endpoint {
/// Get a RuntimeMetrics helper for creating Prometheus metrics
#[getter]
fn metrics(&self) -> prometheus_metrics::PyRuntimeMetrics {
prometheus_metrics::PyRuntimeMetrics::from_endpoint(self.inner.clone())
fn metrics(&self) -> prometheus_metrics::RuntimeMetrics {
prometheus_metrics::RuntimeMetrics::from_endpoint(self.inner.clone())
}
}
......@@ -743,8 +743,8 @@ impl Namespace {
/// Get a RuntimeMetrics helper for creating Prometheus metrics
#[getter]
fn metrics(&self) -> prometheus_metrics::PyRuntimeMetrics {
prometheus_metrics::PyRuntimeMetrics::from_namespace(self.inner.clone())
fn metrics(&self) -> prometheus_metrics::RuntimeMetrics {
prometheus_metrics::RuntimeMetrics::from_namespace(self.inner.clone())
}
}
......
......@@ -18,6 +18,37 @@ use std::sync::Arc;
use crate::rs;
/// Helper function to order label values according to variable_labels declaration.
/// This ensures labels are passed to with_label_values() in the correct order.
///
/// # Arguments
/// * `variable_labels` - The ordered list of label names as declared in the metric
/// * `labels` - The HashMap of label name-value pairs from Python
///
/// # Returns
/// * `Ok(Vec<&str>)` - Ordered vector of label values matching variable_labels order
/// * `Err(PyErr)` - If a required label is missing
fn collect_ordered_label_values<'a>(
variable_labels: &[String],
labels: &'a HashMap<String, String>,
) -> PyResult<Vec<&'a str>> {
let mut ordered_values = Vec::with_capacity(variable_labels.len());
for label_name in variable_labels {
match labels.get(label_name) {
Some(value) => ordered_values.push(value.as_str()),
None => {
return Err(pyo3::exceptions::PyValueError::new_err(format!(
"Missing required label '{}'. Expected labels: {:?}, Got: {:?}",
label_name,
variable_labels,
labels.keys().collect::<Vec<_>>()
)));
}
}
}
Ok(ordered_values)
}
// Python wrappers for Prometheus metric types.
//
// These wrapper structs are necessary because Prometheus types from the external `prometheus` crate
......@@ -205,21 +236,24 @@ impl CounterVec {
/// Increment counter by 1 with labels
fn inc(&self, labels: HashMap<String, String>) -> PyResult<()> {
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
let desc = self.counter.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
self.counter.with_label_values(&label_values).inc();
Ok(())
}
/// Increment counter by value with labels
fn inc_by(&self, labels: HashMap<String, String>, value: f64) -> PyResult<()> {
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
let desc = self.counter.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
self.counter.with_label_values(&label_values).inc_by(value);
Ok(())
}
/// Get counter value with labels
fn get(&self, labels: HashMap<String, String>) -> PyResult<f64> {
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
let desc = self.counter.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
Ok(self.counter.with_label_values(&label_values).get())
}
}
......@@ -257,21 +291,24 @@ impl IntCounterVec {
/// Increment counter by 1 with labels
fn inc(&self, labels: HashMap<String, String>) -> PyResult<()> {
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
let desc = self.counter.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
self.counter.with_label_values(&label_values).inc();
Ok(())
}
/// Increment counter by value with labels
fn inc_by(&self, labels: HashMap<String, String>, value: u64) -> PyResult<()> {
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
let desc = self.counter.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
self.counter.with_label_values(&label_values).inc_by(value);
Ok(())
}
/// Get counter value with labels
fn get(&self, labels: HashMap<String, String>) -> PyResult<u64> {
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
let desc = self.counter.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
Ok(self.counter.with_label_values(&label_values).get())
}
}
......@@ -443,41 +480,47 @@ impl GaugeVec {
/// Set gauge value with labels
fn set(&self, value: f64, labels: HashMap<String, String>) -> PyResult<()> {
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
let desc = self.gauge.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
self.gauge.with_label_values(&label_values).set(value);
Ok(())
}
/// Get gauge value with labels
fn get(&self, labels: HashMap<String, String>) -> PyResult<f64> {
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
let desc = self.gauge.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
Ok(self.gauge.with_label_values(&label_values).get())
}
/// Increment gauge by 1 with labels
fn inc(&self, labels: HashMap<String, String>) -> PyResult<()> {
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
let desc = self.gauge.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
self.gauge.with_label_values(&label_values).inc();
Ok(())
}
/// Decrement gauge by 1 with labels
fn dec(&self, labels: HashMap<String, String>) -> PyResult<()> {
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
let desc = self.gauge.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
self.gauge.with_label_values(&label_values).dec();
Ok(())
}
/// Add value to gauge with labels
fn add(&self, labels: HashMap<String, String>, value: f64) -> PyResult<()> {
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
let desc = self.gauge.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
self.gauge.with_label_values(&label_values).add(value);
Ok(())
}
/// Subtract value from gauge with labels
fn sub(&self, labels: HashMap<String, String>, value: f64) -> PyResult<()> {
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
let desc = self.gauge.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
self.gauge.with_label_values(&label_values).sub(value);
Ok(())
}
......@@ -516,41 +559,47 @@ impl IntGaugeVec {
/// Set gauge value with labels
fn set(&self, value: i64, labels: HashMap<String, String>) -> PyResult<()> {
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
let desc = self.gauge.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
self.gauge.with_label_values(&label_values).set(value);
Ok(())
}
/// Get gauge value with labels
fn get(&self, labels: HashMap<String, String>) -> PyResult<i64> {
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
let desc = self.gauge.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
Ok(self.gauge.with_label_values(&label_values).get())
}
/// Increment gauge by 1 with labels
fn inc(&self, labels: HashMap<String, String>) -> PyResult<()> {
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
let desc = self.gauge.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
self.gauge.with_label_values(&label_values).inc();
Ok(())
}
/// Decrement gauge by 1 with labels
fn dec(&self, labels: HashMap<String, String>) -> PyResult<()> {
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
let desc = self.gauge.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
self.gauge.with_label_values(&label_values).dec();
Ok(())
}
/// Add value to gauge with labels
fn add(&self, labels: HashMap<String, String>, value: i64) -> PyResult<()> {
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
let desc = self.gauge.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
self.gauge.with_label_values(&label_values).add(value);
Ok(())
}
/// Subtract value from gauge with labels
fn sub(&self, labels: HashMap<String, String>, value: i64) -> PyResult<()> {
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
let desc = self.gauge.desc();
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
self.gauge.with_label_values(&label_values).sub(value);
Ok(())
}
......@@ -598,15 +647,15 @@ impl Histogram {
/// and utilities for registering metrics callbacks.
/// Exposed as endpoint.metrics, component.metrics, and namespace.metrics in Python.
///
/// NOTE: The create_* methods in PyRuntimeMetrics must stay in sync with the MetricsRegistry trait
/// NOTE: The create_* methods in RuntimeMetrics must stay in sync with the MetricsRegistry trait
/// in lib/runtime/src/metrics.rs. When adding new metric types, update both locations.
#[pyclass]
#[derive(Clone)]
pub struct PyRuntimeMetrics {
pub struct RuntimeMetrics {
metricsregistry: Arc<dyn rs::metrics::MetricsRegistry>,
}
impl PyRuntimeMetrics {
impl RuntimeMetrics {
/// Create from Endpoint
pub fn from_endpoint(endpoint: dynamo_runtime::component::Endpoint) -> Self {
Self {
......@@ -668,7 +717,7 @@ impl PyRuntimeMetrics {
}
#[pymethods]
impl PyRuntimeMetrics {
impl RuntimeMetrics {
/// Register a Python callback to be invoked before metrics are scraped
/// This callback will be called for this endpoint's metrics hierarchy
fn register_update_callback(&self, callback: PyObject, _py: Python) -> PyResult<()> {
......
......@@ -12,10 +12,11 @@ from typing import (
Tuple,
)
# Import from specialized modules
from ._prometheus_metrics import RuntimeMetrics
from ._prometheus_names import prometheus_names
# Import from specialized modules
from .prometheus_metrics import RuntimeMetrics as PyRuntimeMetrics
def log_message(level: str, message: str, module: str, file: str, line: int) -> None:
"""
Log a message from Python with file and line info
......@@ -91,12 +92,12 @@ class Namespace:
...
@property
def metrics(self) -> RuntimeMetrics:
def metrics(self) -> PyRuntimeMetrics:
"""
Get a RuntimeMetrics helper for creating Prometheus metrics.
Get a PyRuntimeMetrics helper for creating Prometheus metrics.
Returns:
A RuntimeMetrics object that provides create_* methods for different metric types
A PyRuntimeMetrics object that provides create_* methods for different metric types
"""
...
......@@ -120,12 +121,12 @@ class Component:
...
@property
def metrics(self) -> RuntimeMetrics:
def metrics(self) -> PyRuntimeMetrics:
"""
Get a RuntimeMetrics helper for creating Prometheus metrics.
Get a PyRuntimeMetrics helper for creating Prometheus metrics.
Returns:
A RuntimeMetrics object that provides create_* methods for different metric types
A PyRuntimeMetrics object that provides create_* methods for different metric types
"""
...
......@@ -163,12 +164,12 @@ class Endpoint:
...
@property
def metrics(self) -> RuntimeMetrics:
def metrics(self) -> PyRuntimeMetrics:
"""
Get a RuntimeMetrics helper for creating Prometheus metrics.
Get a PyRuntimeMetrics helper for creating Prometheus metrics.
Returns:
A RuntimeMetrics object that provides create_* methods for different metric types
A PyRuntimeMetrics object that provides create_* methods for different metric types
"""
...
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Re-export prometheus_metrics types from _core for convenience.
"""Official public API for prometheus metrics types.
This module exists alongside _prometheus_metrics.pyi but serves a different purpose:
- This .py file: Executed at runtime to re-export metric types from _core.prometheus_metrics,
providing a clean public API (dynamo._prometheus_metrics.Counter) instead of exposing
internal implementation details (dynamo._core.prometheus_metrics.Counter).
- The .pyi file: Used only by type checkers/IDEs for type hints and autocomplete. Never executed.
Provides signatures and docstrings for the Rust-implemented classes.
Both files are needed: .py provides the runtime imports, .pyi provides static type information.
This module provides the official public API for Prometheus metrics in Dynamo.
The metric types are implemented in Rust and exposed via the _core extension module.
"""
# Import directly from the Rust extension module
# Note: IDEs/type checkers may complain about this import because _core is a compiled
# extension module (.so file), not a Python module. The type stub (_core.pyi) doesn't
# declare the prometheus_metrics submodule. However, this import is valid at runtime
# because the Rust code (lib.rs) creates and registers the prometheus_metrics submodule
# via PyModule::new() and add_submodule(). The type: ignore suppresses the false warning.
from dynamo._core import ( # type: ignore[attr-defined]
PyRuntimeMetrics,
prometheus_metrics,
)
# extension module (.so file). However, this import is valid at runtime because the
# Rust code (lib.rs) creates and registers the prometheus_metrics submodule.
from dynamo._core import PyRuntimeMetrics # type: ignore[attr-defined]
from dynamo._core import prometheus_metrics # type: ignore[attr-defined]
# Import metric type classes from the prometheus_metrics submodule
# Re-export metric type classes from the prometheus_metrics submodule
Counter = prometheus_metrics.Counter
CounterVec = prometheus_metrics.CounterVec
Gauge = prometheus_metrics.Gauge
......@@ -34,6 +25,9 @@ IntCounterVec = prometheus_metrics.IntCounterVec
IntGauge = prometheus_metrics.IntGauge
IntGaugeVec = prometheus_metrics.IntGaugeVec
# RuntimeMetrics is in the main _core module (as PyRuntimeMetrics), not the submodule
RuntimeMetrics = PyRuntimeMetrics
__all__ = [
"Counter",
"CounterVec",
......@@ -44,5 +38,5 @@ __all__ = [
"IntCounterVec",
"IntGauge",
"IntGaugeVec",
"PyRuntimeMetrics",
"RuntimeMetrics",
]
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# NOTE: This file defines Python type stubs for Prometheus metric types.
# It should be kept in sync with:
# - lib/bindings/python/rust/metrics.rs (Rust implementations)
# - lib/runtime/src/metrics.rs (MetricsRegistry trait and Prometheus types)
"""Type stubs for the official public API of prometheus metrics.
This file defines Python type stubs for Prometheus metric types.
It should be kept in sync with:
- lib/bindings/python/rust/metrics.rs (Rust implementations)
- lib/runtime/src/metrics.rs (MetricsRegistry trait and Prometheus types)
"""
from typing import Callable, Dict, List, Optional, Tuple
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment