Unverified Commit 0c4c4d1d authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

feat: add Python MetricsRegistry Python metrics registration (#3341)


Signed-off-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent 8c9f3616
<!-- SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. -->
<!-- SPDX-License-Identifier: Apache-2.0 -->
# Dynamo MetricsRegistry for Python
Python MetricsRegistry allows you to create and manage Prometheus metrics from Python:
- **Metric Types**: Counter, IntCounter, Gauge, IntGauge, Histogram, and their Vec variants (CounterVec, IntCounterVec, GaugeVec, IntGaugeVec)
- **Metric Introspection**: Access metric names, constant labels, and variable label names
- **Automatic Registration**: Metrics are automatically registered with the component hierarchy (namespace/component/endpoint) and available on the HTTP system status server
- **Optional Callback Support**: Register Python callbacks to update metrics before scraping
Example:
```python
from dynamo.runtime import DistributedRuntime
async def main():
drt = DistributedRuntime()
endpoint = drt.namespace("ns").component("comp").endpoint("ep")
# Create metrics
counter = endpoint.metrics.create_intcounter("requests_total", "Total requests")
gauge_vec = endpoint.metrics.create_intgaugevec(
"active_connections",
"Active connections by status",
["status"], # variable labels
[("region", "us-west")] # constant labels
)
# Introspect metrics
print(counter.name()) # "ns_comp_ep_requests_total"
print(counter.const_labels()) # {"dynamo_namespace": "ns", ...}
print(gauge_vec.variable_labels()) # ["status"]
# Use metrics
counter.inc()
gauge_vec.set(5, {"status": "active"})
```
## Python-Rust Metrics Integration
This directory demonstrates two methods for passing metrics between Python and Rust in the Dynamo runtime.
### Method 1: ForwardPassMetrics Pub/Sub via NATS (Legacy method for passing metrics)
Python maintains its own metrics dictionary, serializes it, and publishes to NATS. Rust subscribes to NATS, deserializes the metrics, and updates Prometheus gauges.
**Communication pattern**: Unidirectional (Python → NATS → Rust). Python publishes metrics; no feedback from Rust to Python.
**Example**: Used by `WorkerMetricsPublisher` in production code
```python
from dynamo.llm import WorkerMetricsPublisher, ForwardPassMetrics
# Create publisher
publisher = WorkerMetricsPublisher()
await publisher.create_endpoint(component, metrics_labels)
# Python maintains its own metrics dict
metrics_dict = {
"num_running_reqs": 5,
"num_waiting_reqs": 10,
"gpu_cache_usage": 0.75,
}
# Serialize and publish to NATS
metrics = ForwardPassMetrics(metrics_dict)
publisher.publish(metrics)
# Rust subscribes to NATS, deserializes, and updates Prometheus
```
### Adding/Changing Metrics in Method 1
When you need to add or modify metrics in Method 1 (ForwardPassMetrics Pub/Sub via NATS), you must update **multiple files**:
1. **`lib/llm/src/kv_router/protocols.rs`** - Add field to struct (WorkerStats is part of ForwardPassMetrics):
```rust
pub struct WorkerStats {
pub request_active_slots: u64,
pub request_total_slots: u64,
pub num_requests_waiting: u64,
pub new_metric_field: u64, // ADD THIS
}
```
2. **`lib/llm/src/kv_router/publisher.rs`** - Manually create Prometheus gauge using DRT:
```rust
fn new(component: &Component) -> Result<Self> {
use dynamo_runtime::metrics::MetricsRegistry;
// ... existing gauges ...
// Manually create and register new Prometheus gauge
let new_metric_gauge = component.create_gauge(
"new_metric_name",
"Description of new metric",
&[], // labels
)?;
// Store in struct
Ok(KvStatsPrometheusGauges {
kv_active_blocks_gauge,
kv_total_blocks_gauge,
gpu_cache_usage_gauge,
gpu_prefix_cache_hit_rate_gauge,
new_metric_gauge, // ADD THIS
})
}
```
3. **`lib/llm/src/kv_router/publisher.rs`** - Update gauge in `update_from_kvstats()`:
```rust
fn update_from_kvstats(&self, kv_stats: &KvStats) {
// ... existing updates ...
self.new_metric_gauge.set(worker_stats.new_metric_field as f64);
}
```
4. **`components/backends/sglang/.../publisher.py`** - Update Python code to compute new metric:
```python
def collect_metrics():
worker_stats = WorkerStats(
request_active_slots=...,
new_metric_field=compute_new_metric(), # ADD THIS
)
```
**Result**: Changes require touching 3-4 files across Rust and Python codebases.
### Method 2: Dynamo MetricsRegistry in Python
Python creates typed metric objects using `endpoint.metrics.create_*()` methods, which automatically register with the endpoint. Python updates values through these objects with methods that have type hints (via `.pyi` files). Rust creates the underlying Prometheus metrics and calls Python callbacks before scraping.
**Communication pattern**: Currently unidirectional (Python → Rust for updates, Rust → Python for callback invocation). Could be extended to bidirectional communication in the future (e.g., Rust notifying Python of scrape events, configuration changes) without major architectural changes.
**Key advantage:** No Rust code modifications needed - metrics are defined and updated entirely in Python.
This method supports two update patterns:
#### Example A: Background Thread Updates (server_with_loop.py)
Update metrics continuously from a background thread, independent of scraping:
```python
# Create metric objects (automatically registered)
# Note: Prometheus prefixes these with "dynamo_component_", so they appear as:
# - dynamo_component_request_total_slots
# - dynamo_component_gpu_cache_usage_percent
request_slots: IntGauge = endpoint.metrics.create_intgauge(
"request_total_slots", "Total request slots available"
)
gpu_usage: Gauge = endpoint.metrics.create_gauge(
"gpu_cache_usage_percent", "GPU cache usage percentage"
)
# Background thread continuously updates metrics
def update_metrics_in_loop():
count = 0
while True:
count += 1
request_slots.set(1024 + count)
gpu_usage.set(0.01 + (count * 0.01))
time.sleep(2)
updater = threading.Thread(target=update_metrics_in_loop, daemon=True)
updater.start()
```
#### Example B: Callback-based Updates (server_with_callback.py)
Register a callback that updates metrics on-demand when Prometheus scrapes the `/metrics` endpoint:
```python
# Create metric objects (automatically registered)
# Note: Prometheus prefixes these with "dynamo_component_", so they appear as:
# - dynamo_component_request_total_slots
# - dynamo_component_gpu_cache_usage_percent
request_slots: IntGauge = endpoint.metrics.create_intgauge(
"request_total_slots", "Total request slots available"
)
gpu_usage: Gauge = endpoint.metrics.create_gauge(
"gpu_cache_usage_percent", "GPU cache usage percentage"
)
# Register callback for dynamic updates before scraping
def update_metrics():
request_slots.set(compute_current_slots())
gpu_usage.set(get_gpu_usage())
endpoint.metrics.register_update_callback(update_metrics)
```
Both examples support vector metrics with labels:
```python
# Create vector metrics with labels
worker_requests: IntGaugeVec = endpoint.metrics.create_intgaugevec(
"worker_active_requests",
"Active requests per worker",
["worker_id", "model"]
)
# Update vector metrics with specific label values
worker_requests.set(5, {"worker_id": "worker_1", "model": "llama-3"})
worker_requests.set(3, {"worker_id": "worker_2", "model": "llama-3"})
```
#### Available Metric Types
Method 2 supports all standard Prometheus metric types:
- **Gauges**: `Gauge` (float), `IntGauge` (integer)
- **GaugeVec**: `GaugeVec` (float with labels), `IntGaugeVec` (integer with labels)
- **Counters**: `Counter` (float), `IntCounter` (integer)
- **CounterVec**: `CounterVec` (float with labels), `IntCounterVec` (integer with labels)
- **Histograms**: `Histogram`
All metrics are imported from `dynamo._prometheus_metrics`.
#### Adding/Changing Metrics in Method 2
When you need to add or modify metrics in Method 2 (Dynamic Registration), you only update **Python code**:
1. **Create new metric** - Just add one line in Python (automatically registered):
```python
new_metric: IntGauge = endpoint.metrics.create_intgauge(
"new_metric_name", "Description of the metric"
)
```
2. **Update in callback** - Add update logic:
```python
def update_metrics():
request_slots.set(compute_slots())
gpu_usage.set(compute_gpu_usage())
new_metric.set(compute_new_metric()) # ADD THIS
```
3. **For vector metrics with labels** - Create with label names, update with label values:
```python
# Create vector metric
new_vec: IntGaugeVec = endpoint.metrics.create_intgaugevec(
"new_metric_vec", "Description", ["label1", "label2"]
)
# Update with specific label values
new_vec.set(100, {"label1": "value1", "label2": "value2"})
```
**Result**: Changes only require modifying Python code. No Rust changes needed. Metrics are automatically created and registered with Prometheus by the Rust runtime when you call `create_*()`.
#### Type-Hinted Methods
Dynamic Registration provides type hints (via `.pyi` stub files) for typed metric classes:
- **Gauges** use `.set()`, `.get()`, `.inc()`, `.dec()`, `.add()`, `.sub()`
- **Counters** use `.inc()`, `.inc_by()`, `.get()` (counters only increase)
- **Histograms** use `.observe()`
- **Vec metrics** take a `labels: Dict[str, str]` parameter for operations
### Architecture Diagrams
#### Component Architecture
##### Method 1: ForwardPassMetrics Pub/Sub via NATS - Component View
```mermaid
graph TB
subgraph "Python Layer"
PY[Python Application<br/>components/backends/sglang/main.py]
style PY fill:#3776ab,color:#fff
end
subgraph "Python/Rust Interface (PyO3)"
WMPB[WorkerMetricsPublisher Bindings<br/>bindings/python/rust/llm/kv.rs]
FPM[ForwardPassMetrics Struct<br/>bindings/python/rust/llm/kv.rs]
style WMPB fill:#f4a261,color:#000
style FPM fill:#f4a261,color:#000
end
subgraph "Rust Core"
subgraph "Worker Process Components"
WMP[WorkerMetricsPublisher<br/>llm/src/kv_router/publisher.rs]
WATCH[Watch Channel<br/>tokio::sync::watch]
PROM1[Local Prometheus Gauges<br/>prometheus::Gauge]
end
subgraph "NATS Infrastructure"
NATS[NATS Server<br/>KV_METRICS_SUBJECT]
end
subgraph "Aggregator Process Components"
AGG[KvMetricsAggregator<br/>llm/src/kv_router/metrics_aggregator.rs]
SUB[NATS Subscriber<br/>component/namespace.rs]
end
subgraph "System Status Servers"
SS[System Status Server<br/>runtime/src/system_status_server.rs<br/>Started by DistributedRuntime]
end
style WMP fill:#ce422b,color:#fff
style WATCH fill:#ce422b,color:#fff
style PROM1 fill:#ce422b,color:#fff
style NATS fill:#27aae1,color:#fff
style AGG fill:#ce422b,color:#fff
style SUB fill:#ce422b,color:#fff
style SS fill:#6c757d,color:#fff
end
PY -->|"WorkerMetricsPublisher()"| WMPB
PY -->|"ForwardPassMetrics(worker_stats, kv_stats, spec_decode_stats)"| FPM
PY -->|"publish(metrics)"| WMPB
WMPB -->|"FFI: publish(Arc ForwardPassMetrics)"| WMP
WMP -->|"update_from_kvstats(kv_stats)"| PROM1
WMP -->|"tx.send(metrics)"| WATCH
WATCH -->|"publish(KV_METRICS_SUBJECT, LoadEvent)"| NATS
NATS -->|"subscribe_with_type LoadEvent"| SUB
SUB -->|"discover endpoints"| AGG
SS -->|"Worker: gather() from PROM1"| PROM1
SS -->|"Aggregator: scrape_stats()"| AGG
```
##### Method 2: Dynamic Registration - Component View
```mermaid
graph TD
subgraph Python["Python Layer"]
PY[Python Application<br/>main.py]
style PY fill:#3776ab,color:#fff
end
subgraph PyO3["Python/Rust Interface - PyO3"]
PM[PrometheusMetricsUtils<br/>endpoint.metrics<br/>prometheus_metrics.rs]
MT[Metric Type Objects<br/>IntGauge/Gauge/Counter/etc.<br/>prometheus_metrics.rs]
style PM fill:#f4a261,color:#000
style MT fill:#f4a261,color:#000
end
subgraph Rust["Rust Core"]
EP[Endpoint<br/>component/endpoint.rs]
DRT[DistributedRuntime<br/>distributed.rs]
PROM["Prometheus Registry<br/>prometheus::IntGauge/Gauge/etc."]
SS[System Status Server<br/>system_status_server.rs]
style EP fill:#ce422b,color:#fff
style DRT fill:#ce422b,color:#fff
style PROM fill:#ce422b,color:#fff
style SS fill:#6c757d,color:#fff
end
PY -->|endpoint.metrics.create_intgauge| PM
PM -->|endpoint.create_intgauge| EP
EP -->|create & register| PROM
PM -->|wrap & return| MT
MT -->|return to Python| PY
PY -->|metric.set/get| MT
MT -->|direct FFI call| PROM
PY -.->|endpoint.metrics.register_update_callback| PM
PM -.->|drt.register_metrics_callback| DRT
SS ==>|execute_metrics_callbacks| DRT
DRT -.->|invoke Python callback| PY
SS -->|gather| PROM
linkStyle 7 stroke:#ff6b6b,stroke-width:2px
linkStyle 8 stroke:#ff6b6b,stroke-width:2px
linkStyle 9 stroke:#ff6b6b,stroke-width:2px
linkStyle 10 stroke:#ff6b6b,stroke-width:2px
```
### Running the Examples
The examples demonstrate Method 2 (Dynamo MetricsRegistry in Python) with two different update patterns.
#### Prerequisites
Update Python bindings if needed:
```bash
cd ~/dynamo/lib/bindings/python
maturin develop
```
#### Run Example A: Background Thread Updates
```bash
cd ~/dynamo/lib/bindings/python/examples/metrics
DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=8081 ./server_with_loop.py
```
#### Run Example B: Callback-based Updates
```bash
cd ~/dynamo/lib/bindings/python/examples/metrics
DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=8081 ./server_with_callback.py
```
**Note:** The environment variables are required:
- `DYN_SYSTEM_ENABLED=true` - Enables the system status server
- `DYN_SYSTEM_PORT=8081` - Sets the port for the metrics endpoint
#### Check the Metrics
The metrics are served via the system status server at:
```bash
curl http://localhost:8081/metrics
```
Expected output includes:
```
# HELP request_total_slots Total request slots available
# TYPE request_total_slots gauge
request_total_slots{dynamo_namespace="ns556",dynamo_component="cp556",dynamo_endpoint="ep556"} 1024
# HELP gpu_cache_usage_percent GPU cache usage percentage
# TYPE gpu_cache_usage_percent gauge
gpu_cache_usage_percent{dynamo_namespace="ns556",dynamo_component="cp556",dynamo_endpoint="ep556"} 0.00
# HELP worker_active_requests Active requests per worker
# TYPE worker_active_requests gauge
worker_active_requests{dynamo_namespace="ns556",dynamo_component="cp556",dynamo_endpoint="ep556",worker_id="worker_1",model="llama-3"} 5
worker_active_requests{dynamo_namespace="ns556",dynamo_component="cp556",dynamo_endpoint="ep556",worker_id="worker_2",model="llama-3"} 3
# HELP internal_update_count Number of times metrics callback was invoked
# TYPE internal_update_count counter
internal_update_count{dynamo_namespace="ns556",dynamo_component="cp556",dynamo_endpoint="ep556",type="internal"} 1
```
Each time you query the `/metrics` endpoint, the `update_metrics()` callback is invoked, updating the metric values with fresh data.
#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Example demonstrating the new typed Prometheus metrics API for declarative metrics registration.
This shows how Python code can:
1. Create typed metric objects directly (Gauge, IntGauge, GaugeVec, IntGaugeVec, etc.)
2. Register them with an endpoint
3. Update their values using type-safe methods (set for gauges, inc for counters)
4. The metrics are automatically served via the /metrics endpoint
"""
import asyncio
import uvloop
# Note that these imports are for type hints only. They cannot be instantiated directly.
# You can instantiate them using the endpoint.metrics.create_*() methods.
from dynamo._prometheus_metrics import Gauge, IntCounter, IntGauge, IntGaugeVec
from dynamo.runtime import DistributedRuntime, dynamo_worker
@dynamo_worker()
async def worker(runtime: DistributedRuntime) -> None:
await init(runtime)
async def init(runtime: DistributedRuntime):
# Create component and endpoint
component = runtime.namespace("ns556").component("cp556")
await component.create_service()
endpoint = component.endpoint("ep556")
# Step 1: Create metrics using the endpoint's metrics property
print("[python] Creating metrics...")
# Simple metrics (Gauge and IntGauge) - automatically registered
request_total_slots: IntGauge = endpoint.metrics.create_intgauge(
"request_total_slots", "Total request slots available"
)
gpu_cache_usage_perc: Gauge = endpoint.metrics.create_gauge(
"gpu_cache_usage_percent", "GPU cache usage percentage"
)
# Vector metrics (IntGaugeVec with labels)
worker_active_requests: IntGaugeVec = endpoint.metrics.create_intgaugevec(
"worker_active_requests",
"Active requests per worker",
["worker_id", "model"],
)
# Counter metric to track updates (with constant label values)
update_count: IntCounter = endpoint.metrics.create_intcounter(
"update_count",
"Number of times metrics were updated",
[("update_method", "callback")],
)
print(f"[python] Created IntGauge: {request_total_slots.name}")
print(f"[python] Created Gauge: {gpu_cache_usage_perc.name}")
print(f"[python] Created IntGaugeVec: {worker_active_requests.name}")
print(f"[python] Created IntCounter with constant labels: {update_count.name}")
print(f"[python] Const labels: {update_count.const_labels}")
print("[python] Metrics automatically registered with endpoint!")
# Step 2: Register a callback to update metrics on-demand
print("[python] Registering metrics callback...")
def update_metrics():
"""Called automatically before /metrics endpoint is scraped"""
update_count.inc()
# Update metrics with fresh values
count = update_count.get()
request_total_slots.set(1024 + count)
gpu_cache_usage_perc.set(0.01 + (count * 0.01))
print(f"[python] Updated metrics (call #{count})")
endpoint.metrics.register_update_callback(update_metrics)
print("[python] update (metrics) callback registered!")
# Step 3: Set initial values and test vector metrics
print("[python] Setting initial metric values...")
request_total_slots.set(1024)
gpu_cache_usage_perc.set(0.00)
print(f"[python] request_total_slots = {request_total_slots.get()}")
print(f"[python] gpu_cache_usage_perc = {gpu_cache_usage_perc.get()}")
print("[python] Updating vector metric with labels...")
worker_active_requests.set(5, {"worker_id": "worker_1", "model": "llama-3"})
worker_active_requests.set(3, {"worker_id": "worker_2", "model": "llama-3"})
print("[python] worker_active_requests set for worker_1 and worker_2")
# The metrics are now available at:
# http://localhost:<system_status_port>/metrics
print("[python] ✅ Metrics are now registered and served via /metrics endpoint")
print(
"[python] Check the system status server port to see them in Prometheus format"
)
print(
"[python] Supported types: Counter, IntCounter, Gauge, IntGauge, Histogram, and their Vec variants"
)
# Note: This example does not call serve_endpoint() to keep it simple.
# In a real service, you would call: await endpoint.serve_endpoint(handler, ...)
# Keep running so metrics endpoint stays up
_ = await asyncio.Event().wait()
if __name__ == "__main__":
uvloop.install()
asyncio.run(worker())
#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Example demonstrating metrics updates via background loop instead of callback.
This shows an alternative approach where:
1. Metrics are created and registered with an endpoint
2. A background thread continuously updates metrics in a loop
3. No callback is used - metrics are updated directly by the thread
4. The metrics are automatically served via the /metrics endpoint
"""
import asyncio
import threading
import time
import uvloop
from dynamo._prometheus_metrics import Gauge, IntCounter, IntGauge, IntGaugeVec
from dynamo.runtime import DistributedRuntime, dynamo_worker
def metrics_updater_thread(
request_total_slots: IntGauge,
gpu_cache_usage_perc: Gauge,
worker_active_requests: IntGaugeVec,
update_count: IntCounter,
):
"""Background thread that continuously updates metrics."""
print("[python] Metrics updater thread started")
while True:
update_count.inc()
count = update_count.get()
# Update simple metrics
request_total_slots.set(1024 + count)
gpu_cache_usage_perc.set(0.01 + (count * 0.01))
# Update vector metrics with varying values
worker_active_requests.set(
5 + (count % 10), {"worker_id": "worker_1", "model": "llama-3"}
)
worker_active_requests.set(
3 + (count % 5), {"worker_id": "worker_2", "model": "llama-3"}
)
print(f"[python] Updated metrics in loop (iteration #{count})")
# Update every 2 seconds
time.sleep(2)
@dynamo_worker()
async def worker(runtime: DistributedRuntime) -> None:
await init(runtime)
async def init(runtime: DistributedRuntime):
# Create component and endpoint
component = runtime.namespace("ns557").component("cp557")
await component.create_service()
endpoint = component.endpoint("ep557")
# Create metrics using the endpoint's metrics property
print("[python] Creating metrics...")
request_total_slots: IntGauge = endpoint.metrics.create_intgauge(
"request_total_slots", "Total request slots available"
)
gpu_cache_usage_perc: Gauge = endpoint.metrics.create_gauge(
"gpu_cache_usage_percent", "GPU cache usage percentage"
)
worker_active_requests: IntGaugeVec = endpoint.metrics.create_intgaugevec(
"worker_active_requests",
"Active requests per worker",
["worker_id", "model"],
)
update_count: IntCounter = endpoint.metrics.create_intcounter(
"update_count",
"Number of times metrics were updated",
[("update_method", "background_thread")],
)
print(f"[python] Created IntGauge: {request_total_slots.name}")
print(f"[python] Created Gauge: {gpu_cache_usage_perc.name}")
print(f"[python] Created IntGaugeVec: {worker_active_requests.name}")
print(f"[python] Created IntCounter: {update_count.name}")
print("[python] Metrics automatically registered with endpoint!")
# Set initial values
print("[python] Setting initial metric values...")
request_total_slots.set(1024)
gpu_cache_usage_perc.set(0.00)
worker_active_requests.set(5, {"worker_id": "worker_1", "model": "llama-3"})
worker_active_requests.set(3, {"worker_id": "worker_2", "model": "llama-3"})
# Start background thread to update metrics
print("[python] Starting background thread to update metrics...")
updater = threading.Thread(
target=metrics_updater_thread,
args=(
request_total_slots,
gpu_cache_usage_perc,
worker_active_requests,
update_count,
),
daemon=True,
)
updater.start()
print("[python] ✅ Metrics are now registered and served via /metrics endpoint")
print("[python] Metrics are being updated every 2 seconds by background thread")
print(
"[python] Check the system status server port to see them in Prometheus format"
)
# Note: This example does not call serve_endpoint() to keep it simple.
# In a real service, you would call: await endpoint.serve_endpoint(handler, ...)
# Keep running so metrics endpoint stays up
_ = await asyncio.Event().wait()
if __name__ == "__main__":
uvloop.install()
asyncio.run(worker())
......@@ -55,6 +55,7 @@ mod http;
mod llm;
mod parsers;
mod planner;
mod prometheus_metrics;
mod prometheus_names;
type JsonServerStreamingIngress =
......@@ -186,6 +187,11 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
parsers::add_to_module(m)?;
prometheus_names::add_to_module(m)?;
m.add_class::<prometheus_metrics::PyRuntimeMetrics>()?;
let prometheus_metrics = PyModule::new(m.py(), "prometheus_metrics")?;
prometheus_metrics::add_to_module(&prometheus_metrics)?;
m.add_submodule(&prometheus_metrics)?;
#[cfg(feature = "block-manager")]
llm::block_manager::add_to_module(m)?;
......@@ -639,6 +645,12 @@ impl Component {
Ok(())
})
}
/// Get a RuntimeMetrics helper for creating Prometheus metrics
#[getter]
fn metrics(&self) -> prometheus_metrics::PyRuntimeMetrics {
prometheus_metrics::PyRuntimeMetrics::from_component(self.inner.clone())
}
}
#[pymethods]
......@@ -722,6 +734,12 @@ impl Endpoint {
.map(|l| l.id())
.unwrap_or(0)
}
/// Get a RuntimeMetrics helper for creating Prometheus metrics
#[getter]
fn metrics(&self) -> prometheus_metrics::PyRuntimeMetrics {
prometheus_metrics::PyRuntimeMetrics::from_endpoint(self.inner.clone())
}
}
#[pymethods]
......@@ -733,6 +751,12 @@ impl Namespace {
event_loop: self.event_loop.clone(),
})
}
/// Get a RuntimeMetrics helper for creating Prometheus metrics
#[getter]
fn metrics(&self) -> prometheus_metrics::PyRuntimeMetrics {
prometheus_metrics::PyRuntimeMetrics::from_namespace(self.inner.clone())
}
}
#[pymethods]
......
This diff is collapsed.
......@@ -12,7 +12,8 @@ from typing import (
Tuple,
)
# Prometheus metric names are defined in a separate module
# Import from specialized modules
from ._prometheus_metrics import RuntimeMetrics
from ._prometheus_names import prometheus_names
def log_message(level: str, message: str, module: str, file: str, line: int) -> None:
......@@ -89,6 +90,16 @@ class Namespace:
"""
...
@property
def metrics(self) -> RuntimeMetrics:
"""
Get a RuntimeMetrics helper for creating Prometheus metrics.
Returns:
A RuntimeMetrics object that provides create_* methods for different metric types
"""
...
class Component:
"""
A component is a collection of endpoints
......@@ -96,7 +107,7 @@ class Component:
...
def create_service(self) -> None:
async def create_service(self) -> None:
"""
Create a service
"""
......@@ -108,6 +119,16 @@ class Component:
"""
...
@property
def metrics(self) -> RuntimeMetrics:
"""
Get a RuntimeMetrics helper for creating Prometheus metrics.
Returns:
A RuntimeMetrics object that provides create_* methods for different metric types
"""
...
class Endpoint:
"""
An Endpoint is a single API endpoint
......@@ -141,6 +162,17 @@ class Endpoint:
"""
...
@property
def metrics(self) -> RuntimeMetrics:
"""
Get a RuntimeMetrics helper for creating Prometheus metrics.
Returns:
A RuntimeMetrics object that provides create_* methods for different metric types
"""
...
class Client:
"""
A client capable of calling served instances of an endpoint
......@@ -1283,6 +1315,11 @@ class VirtualConnectorClient:
...
__all__ = [
# ... existing exports ...
"prometheus_names"
"Backend",
"Client",
"Component",
"Context",
"ModelDeploymentCard",
"OAIChatPreprocessor",
"prometheus_names",
]
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Re-export prometheus_metrics types from _core for convenience.
This module exists alongside _prometheus_metrics.pyi but serves a different purpose:
- This .py file: Executed at runtime to re-export metric types from _core.prometheus_metrics,
providing a clean public API (dynamo._prometheus_metrics.Counter) instead of exposing
internal implementation details (dynamo._core.prometheus_metrics.Counter).
- The .pyi file: Used only by type checkers/IDEs for type hints and autocomplete. Never executed.
Provides signatures and docstrings for the Rust-implemented classes.
Both files are needed: .py provides the runtime imports, .pyi provides static type information.
"""
# Note: IDEs/type checkers may complain about this import because _core is a compiled
# extension module (.so file), not a Python module. The type stub (_core.pyi) doesn't
# declare the prometheus_metrics submodule. However, this import is valid at runtime
# because the Rust code (lib.rs) creates and registers the prometheus_metrics submodule
# via PyModule::new() and add_submodule(). The type: ignore suppresses the false warning.
from dynamo._core import ( # type: ignore[attr-defined]
PyRuntimeMetrics,
prometheus_metrics,
)
# Import metric type classes from the prometheus_metrics submodule
Counter = prometheus_metrics.Counter
CounterVec = prometheus_metrics.CounterVec
Gauge = prometheus_metrics.Gauge
GaugeVec = prometheus_metrics.GaugeVec
Histogram = prometheus_metrics.Histogram
IntCounter = prometheus_metrics.IntCounter
IntCounterVec = prometheus_metrics.IntCounterVec
IntGauge = prometheus_metrics.IntGauge
IntGaugeVec = prometheus_metrics.IntGaugeVec
__all__ = [
"Counter",
"CounterVec",
"Gauge",
"GaugeVec",
"Histogram",
"IntCounter",
"IntCounterVec",
"IntGauge",
"IntGaugeVec",
"PyRuntimeMetrics",
]
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# NOTE: This file defines Python type stubs for Prometheus metric types.
# It should be kept in sync with:
# - lib/bindings/python/rust/metrics.rs (Rust implementations)
# - lib/runtime/src/metrics.rs (MetricsRegistry trait and Prometheus types)
from typing import Callable, Dict, List, Optional, Tuple
# Specific metric type classes
class Counter:
"""Prometheus Counter metric (float)"""
def name(self) -> str:
"""Get the metric name"""
...
def const_labels(self) -> Dict[str, str]:
"""Get the constant labels"""
...
def inc(self) -> None:
"""Increment counter by 1"""
...
def inc_by(self, value: float) -> None:
"""Increment counter by value"""
...
def get(self) -> float:
"""Get counter value"""
...
class CounterVec:
"""Prometheus CounterVec metric with labels (float)"""
def name(self) -> str:
"""Get the metric name"""
...
def const_labels(self) -> Dict[str, str]:
"""Get the constant labels"""
...
def variable_labels(self) -> List[str]:
"""Get the variable label names"""
...
def inc(self, labels: Dict[str, str]) -> None:
"""Increment counter by 1 with labels"""
...
def inc_by(self, labels: Dict[str, str], value: float) -> None:
"""Increment counter by value with labels"""
...
def get(self, labels: Dict[str, str]) -> float:
"""Get counter value with labels"""
...
class Gauge:
"""Prometheus Gauge metric (float)"""
def name(self) -> str:
"""Get the metric name"""
...
def const_labels(self) -> Dict[str, str]:
"""Get the constant labels"""
...
def set(self, value: float) -> None:
"""Set gauge value"""
...
def get(self) -> float:
"""Get gauge value"""
...
def inc(self) -> None:
"""Increment gauge by 1"""
...
def inc_by(self, value: float) -> None:
"""Increment gauge by value"""
...
def dec(self) -> None:
"""Decrement gauge by 1"""
...
def dec_by(self, value: float) -> None:
"""Decrement gauge by value"""
...
def add(self, value: float) -> None:
"""Add value to gauge"""
...
def sub(self, value: float) -> None:
"""Subtract value from gauge"""
...
class GaugeVec:
"""Prometheus GaugeVec metric with labels (float)"""
def name(self) -> str:
"""Get the metric name"""
...
def const_labels(self) -> Dict[str, str]:
"""Get the constant labels"""
...
def variable_labels(self) -> List[str]:
"""Get the variable label names"""
...
def set(self, value: float, labels: Dict[str, str]) -> None:
"""Set gauge value with labels"""
...
def get(self, labels: Dict[str, str]) -> float:
"""Get gauge value with labels"""
...
def inc(self, labels: Dict[str, str]) -> None:
"""Increment gauge by 1 with labels"""
...
def dec(self, labels: Dict[str, str]) -> None:
"""Decrement gauge by 1 with labels"""
...
def add(self, labels: Dict[str, str], value: float) -> None:
"""Add value to gauge with labels"""
...
def sub(self, labels: Dict[str, str], value: float) -> None:
"""Subtract value from gauge with labels"""
...
class Histogram:
"""Prometheus Histogram metric"""
def name(self) -> str:
"""Get the metric name"""
...
def const_labels(self) -> Dict[str, str]:
"""Get the constant labels"""
...
def observe(self, value: float) -> None:
"""Observe a value"""
...
class IntCounter:
"""Prometheus IntCounter metric (integer)"""
def name(self) -> str:
"""Get the metric name"""
...
def const_labels(self) -> Dict[str, str]:
"""Get the constant labels"""
...
def inc(self) -> None:
"""Increment counter by 1"""
...
def inc_by(self, value: int) -> None:
"""Increment counter by value"""
...
def get(self) -> int:
"""Get counter value"""
...
class IntCounterVec:
"""Prometheus IntCounterVec metric with labels (integer)"""
def name(self) -> str:
"""Get the metric name"""
...
def const_labels(self) -> Dict[str, str]:
"""Get the constant labels"""
...
def variable_labels(self) -> List[str]:
"""Get the variable label names"""
...
def inc(self, labels: Dict[str, str]) -> None:
"""Increment counter by 1 with labels"""
...
def inc_by(self, labels: Dict[str, str], value: int) -> None:
"""Increment counter by value with labels"""
...
def get(self, labels: Dict[str, str]) -> int:
"""Get counter value with labels"""
...
class IntGauge:
"""Prometheus IntGauge metric (integer)"""
def name(self) -> str:
"""Get the metric name"""
...
def const_labels(self) -> Dict[str, str]:
"""Get the constant labels"""
...
def set(self, value: int) -> None:
"""Set gauge value"""
...
def get(self) -> int:
"""Get gauge value"""
...
def inc(self) -> None:
"""Increment gauge by 1"""
...
def dec(self) -> None:
"""Decrement gauge by 1"""
...
def add(self, value: int) -> None:
"""Add value to gauge"""
...
def sub(self, value: int) -> None:
"""Subtract value from gauge"""
...
class IntGaugeVec:
"""Prometheus IntGaugeVec metric with labels (integer)"""
def name(self) -> str:
"""Get the metric name"""
...
def const_labels(self) -> Dict[str, str]:
"""Get the constant labels"""
...
def variable_labels(self) -> List[str]:
"""Get the variable label names"""
...
def set(self, value: int, labels: Dict[str, str]) -> None:
"""Set gauge value with labels"""
...
def get(self, labels: Dict[str, str]) -> int:
"""Get gauge value with labels"""
...
def inc(self, labels: Dict[str, str]) -> None:
"""Increment gauge by 1 with labels"""
...
def dec(self, labels: Dict[str, str]) -> None:
"""Decrement gauge by 1 with labels"""
...
def add(self, labels: Dict[str, str], value: int) -> None:
"""Add value to gauge with labels"""
...
def sub(self, labels: Dict[str, str], value: int) -> None:
"""Subtract value from gauge with labels"""
...
class RuntimeMetrics:
"""
Helper class for creating Prometheus metrics on an Endpoint.
Provides factory methods to create various Prometheus metric types
that are automatically registered with the endpoint's Prometheus registry.
Also provides utilities for registering metrics callbacks.
"""
def register_update_callback(self, callback: Callable[[], None]) -> None:
"""
Register a Python callback to be invoked before metrics are scraped.
This allows you to update metric values dynamically when the /metrics endpoint
is accessed. The callback will be executed synchronously before serving metrics.
Args:
callback: A callable that takes no arguments and returns None.
This function will be called each time metrics are scraped.
Example:
```python
metrics = endpoint.metrics
counter = metrics.create_intcounter("request_count", "Total requests")
def update_metrics():
counter.inc()
metrics.register_update_callback(update_metrics)
```
"""
...
def create_counter(self, name: str, description: str, const_labels: Optional[List[Tuple[str, str]]] = None) -> Counter:
"""Create a Counter metric (float) with optional static labels"""
...
def create_countervec(self, name: str, description: str, label_names: List[str], const_labels: Optional[List[Tuple[str, str]]] = None) -> CounterVec:
"""Create a CounterVec metric with labels (float)"""
...
def create_gauge(self, name: str, description: str, const_labels: Optional[List[Tuple[str, str]]] = None) -> Gauge:
"""Create a Gauge metric (float) with optional static labels"""
...
def create_gaugevec(self, name: str, description: str, label_names: List[str], const_labels: Optional[List[Tuple[str, str]]] = None) -> GaugeVec:
"""Create a GaugeVec metric with labels (float)"""
...
def create_histogram(self, name: str, description: str, const_labels: Optional[List[Tuple[str, str]]] = None) -> Histogram:
"""Create a Histogram metric with optional static labels"""
...
def create_intcounter(self, name: str, description: str, const_labels: Optional[List[Tuple[str, str]]] = None) -> IntCounter:
"""Create an IntCounter metric (integer) with optional static labels"""
...
def create_intcountervec(self, name: str, description: str, label_names: List[str], const_labels: Optional[List[Tuple[str, str]]] = None) -> IntCounterVec:
"""Create an IntCounterVec metric with labels (integer)"""
...
def create_intgauge(self, name: str, description: str, const_labels: Optional[List[Tuple[str, str]]] = None) -> IntGauge:
"""Create an IntGauge metric (integer) with optional static labels"""
...
def create_intgaugevec(self, name: str, description: str, label_names: List[str], const_labels: Optional[List[Tuple[str, str]]] = None) -> IntGaugeVec:
"""Create an IntGaugeVec metric with labels (integer)"""
...
__all__ = [
"Counter",
"CounterVec",
"Gauge",
"GaugeVec",
"Histogram",
"IntCounter",
"IntCounterVec",
"IntGauge",
"IntGaugeVec",
"RuntimeMetrics",
]
......@@ -233,5 +233,3 @@ class KvStatsMetrics:
# Module-level singleton instance for convenient access
prometheus_names: PrometheusNames
......@@ -13,40 +13,422 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test configuration and fixtures for Dynamo Python bindings tests.
TWO MODES OF OPERATION:
1. Isolated Mode (ENABLE_ISOLATED_ETCD_AND_NATS=1):
- Each test gets fresh NATS/ETCD on random ports
- Requires: pytest-forked (uv pip install pytest-forked)
- Tests using 'runtime' fixture MUST have @pytest.mark.forked
- Safer, enables parallel execution
- Run: ENABLE_ISOLATED_ETCD_AND_NATS=1 pytest tests/test_metrics_registry.py -n auto
2. Default Ports Mode (ENABLE_ISOLATED_ETCD_AND_NATS=0, default):
- All tests share NATS/ETCD on default ports (4222, 2379)
- No pytest-forked required
- No @pytest.mark.forked required
- Faster for sequential runs, but NO parallel execution
- Run: pytest tests/test_metrics_registry.py
Performance comparison (32-core machine, 13 tests):
Default ports (ENABLE_ISOLATED_ETCD_AND_NATS=0, default): 4.06s (sequential only)
Isolated sequential (ENABLE_ISOLATED_ETCD_AND_NATS=1): 8.58s (2.1x slower, but safer)
Isolated parallel -n 8: 2.82s (1.4x faster than default)
Isolated parallel -n 16: 2.28s (1.8x faster than default, optimal)
Isolated parallel -n 32: 2.74s (overhead dominates)
Recommendation: Default mode for simplicity. Use ENABLE_ISOLATED_ETCD_AND_NATS=1
with -n 8 to -n 16 when you need parallel execution and maximum test isolation.
"""
import asyncio
import json
import os
import re
import shutil
import socket
import subprocess
from time import sleep
import tempfile
import time
import pytest
from dynamo.runtime import DistributedRuntime
# Configuration constants
# ENABLE_ISOLATED_ETCD_AND_NATS: When True, each test gets isolated NATS/ETCD instances
# on random ports with unique data directories. This enables parallel test execution.
# Set to False to use default ports (4222, 2379) for sequential execution.
# Can be overridden by environment variable: ENABLE_ISOLATED_ETCD_AND_NATS=0 or =1
ENABLE_ISOLATED_ETCD_AND_NATS = (
os.environ.get("ENABLE_ISOLATED_ETCD_AND_NATS", "0") == "1"
)
@pytest.fixture(scope="module", autouse=True)
def nats_and_etcd():
# Setup code
# Check if pytest-forked is installed (only when using isolated NATS/ETCD)
# This is REQUIRED when ENABLE_ISOLATED_ETCD_AND_NATS=1 because each test gets
# fresh services and the DistributedRuntime singleton needs process isolation
if ENABLE_ISOLATED_ETCD_AND_NATS:
try:
import pytest_forked # noqa: F401
except ImportError:
pytest.exit(
"""
pytest-forked is required when ENABLE_ISOLATED_ETCD_AND_NATS=1.
Install it with: uv pip install pytest-forked
This is needed because DistributedRuntime is a process-level singleton
and tests must run in separate processes to avoid 'Worker already initialized' errors.
Alternatively, set ENABLE_ISOLATED_ETCD_AND_NATS=0 to use default ports (slower, sequential only).
""",
returncode=1,
)
# Timeout constants
SERVICE_STARTUP_TIMEOUT = 5
SERVICE_SHUTDOWN_TIMEOUT = 5
def get_free_port():
"""Find and return an available port."""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(("", 0))
port = sock.getsockname()[1]
sock.close()
return port
def wait_for_port(host, port, timeout: float = SERVICE_STARTUP_TIMEOUT):
"""Wait for a port to be available."""
start = time.time()
while time.time() - start < timeout:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
sock.connect((host, port))
sock.close()
return True
except (socket.error, ConnectionRefusedError):
time.sleep(0.1)
return False
def start_nats_and_etcd_default_ports():
"""
Start NATS and ETCD on default ports (4222, 2379).
Use this for sequential test execution or when running tests alone.
Faster startup if services are already running.
"""
# Use default ports
nats_port = 4222
etcd_client_port = 2379
# No data directories needed - use defaults
nats_data_dir = None
etcd_data_dir = None
# Check if ports are already in use (reuse them if so)
# TODO: In the future, error out to ensure proper test isolation
nats_already_running = wait_for_port("localhost", nats_port, timeout=0.1)
etcd_already_running = wait_for_port("localhost", etcd_client_port, timeout=0.1)
if nats_already_running and etcd_already_running:
print(
f"Reusing existing NATS on port {nats_port} and ETCD on port {etcd_client_port}"
)
# Set environment variables for the runtime to use
os.environ["NATS_SERVER"] = f"nats://localhost:{nats_port}"
os.environ["ETCD_ENDPOINTS"] = f"http://localhost:{etcd_client_port}"
# Return None for processes since we're reusing existing services
return None, None, nats_port, etcd_client_port, None, None
# Set environment variables for the runtime to use
os.environ["NATS_SERVER"] = f"nats://localhost:{nats_port}"
os.environ["ETCD_ENDPOINTS"] = f"http://localhost:{etcd_client_port}"
print(f"Using NATS on default port {nats_port}")
print(f"Using ETCD on default client port {etcd_client_port}")
# Start services with default ports
nats_server = subprocess.Popen(["nats-server", "-js"])
etcd = subprocess.Popen(["etcd"])
print("Setting up resources")
sleep(5) # wait for nats-server and etcd to start
yield
return nats_server, etcd, nats_port, etcd_client_port, nats_data_dir, etcd_data_dir
def start_nats_and_etcd_random_ports():
"""
Start NATS and ETCD with random ports and unique data directories.
This ensures test isolation by giving each test module (or parallel worker)
its own NATS/ETCD instances on different ports with separate data directories.
This allows tests to run in parallel without port or filesystem conflicts.
Note: etcd uses port 0 (OS-assigned port) to eliminate race conditions.
NATS uses get_free_port() with retry logic since it doesn't support port 0.
Port collision probability per NATS attempt: ~1% (heavy parallel testing), ~0.05% (normal load).
With 5 retries, probability of all NATS attempts failing: ~1.5e-10 (essentially never).
"""
# Create unique temporary data directories
nats_data_dir = tempfile.mkdtemp(prefix="nats_data_")
etcd_data_dir = tempfile.mkdtemp(prefix="etcd_data_")
# Start etcd first with port 0 (no race condition, no retries needed)
print(f"Starting ETCD with port 0 (OS-assigned), data dir: {etcd_data_dir}")
etcd = subprocess.Popen(
[
"etcd",
"--data-dir",
str(etcd_data_dir),
"--listen-client-urls",
"http://localhost:0",
"--advertise-client-urls",
"http://localhost:0",
"--listen-peer-urls",
"http://localhost:0",
"--initial-advertise-peer-urls",
"http://localhost:0",
"--initial-cluster",
"default=http://localhost:0",
],
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
text=True,
bufsize=1,
)
# Parse etcd's stderr to discover the actual client port it bound to
etcd_client_port = None
timeout_at = time.time() + 5.0
# Teardown code
print("Tearing down resources")
nats_server.terminate()
nats_server.wait()
etcd.terminate()
etcd.wait()
while time.time() < timeout_at:
if etcd.poll() is not None:
stderr = etcd.stderr.read() if etcd.stderr else ""
shutil.rmtree(nats_data_dir, ignore_errors=True)
shutil.rmtree(etcd_data_dir, ignore_errors=True)
raise RuntimeError(f"ETCD failed to start: {stderr}")
line = etcd.stderr.readline() if etcd.stderr else ""
if not line:
time.sleep(0.01)
continue
try:
log = json.loads(line)
msg = log.get("msg", "")
# Look for the client port
if "serving client traffic" in msg or "serving client" in msg:
address = log.get("address", "")
match = re.search(r":(\d+)$", address)
if match:
etcd_client_port = int(match.group(1))
print(f"ETCD bound to client port: {etcd_client_port}")
break
except (json.JSONDecodeError, ValueError):
continue
if etcd_client_port is None:
etcd.terminate()
etcd.wait()
shutil.rmtree(nats_data_dir, ignore_errors=True)
shutil.rmtree(etcd_data_dir, ignore_errors=True)
raise RuntimeError("Failed to discover ETCD client port from logs")
# Now start NATS with retry logic (up to 5 attempts due to race condition)
max_nats_retries = 5
nats_server = None
nats_port = None
last_error = None
for attempt in range(max_nats_retries):
try:
nats_port = get_free_port()
print(
f"Attempt {attempt + 1}: Starting NATS on port {nats_port}, data dir: {nats_data_dir}"
)
nats_server = subprocess.Popen(
["nats-server", "-js", "-p", str(nats_port), "-sd", str(nats_data_dir)],
stderr=subprocess.PIPE,
)
# Give NATS a moment to bind to the port
time.sleep(0.1)
# Check if NATS failed to start
if nats_server.poll() is not None:
stderr = (
nats_server.stderr.read().decode() if nats_server.stderr else ""
)
if "address already in use" in stderr.lower():
print(f"NATS port {nats_port} already in use, retrying...")
time.sleep(0.1)
continue
etcd.terminate()
etcd.wait()
shutil.rmtree(nats_data_dir, ignore_errors=True)
shutil.rmtree(etcd_data_dir, ignore_errors=True)
raise RuntimeError(f"NATS failed to start: {stderr}")
# Success - NATS started
break
except Exception as e:
last_error = e
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_nats_retries - 1:
time.sleep(0.2)
else:
etcd.terminate()
etcd.wait()
shutil.rmtree(nats_data_dir, ignore_errors=True)
shutil.rmtree(etcd_data_dir, ignore_errors=True)
raise RuntimeError(
f"Failed to start NATS after {max_nats_retries} attempts: {last_error}"
)
# Set environment variables for the runtime to use
os.environ["NATS_SERVER"] = f"nats://localhost:{nats_port}"
os.environ["ETCD_ENDPOINTS"] = f"http://localhost:{etcd_client_port}"
return nats_server, etcd, nats_port, etcd_client_port, nats_data_dir, etcd_data_dir
@pytest.fixture(scope="module", autouse=True)
def nats_and_etcd():
"""
Start NATS and ETCD for testing.
Scope is "module" which means each test module shares the same NATS/ETCD instance.
Behavior is controlled by ENABLE_ISOLATED_ETCD_AND_NATS constant:
- True (default): Random ports + unique data dirs for parallel execution
- False: Default ports (4222, 2379) for sequential execution
"""
if ENABLE_ISOLATED_ETCD_AND_NATS:
(
nats_server,
etcd,
nats_port,
etcd_client_port,
nats_data_dir,
etcd_data_dir,
) = start_nats_and_etcd_random_ports()
else:
(
nats_server,
etcd,
nats_port,
etcd_client_port,
nats_data_dir,
etcd_data_dir,
) = start_nats_and_etcd_default_ports()
try:
# Wait for services to be ready
if not wait_for_port("localhost", nats_port, timeout=SERVICE_STARTUP_TIMEOUT):
raise RuntimeError(f"NATS server failed to start on port {nats_port}")
if not wait_for_port(
"localhost", etcd_client_port, timeout=SERVICE_STARTUP_TIMEOUT
):
raise RuntimeError(f"ETCD failed to start on port {etcd_client_port}")
print(f"NATS ({nats_port}) and ETCD ({etcd_client_port}) services ready")
yield
finally:
# Teardown code - always runs even if setup fails or tests error
print("Tearing down resources")
# Only terminate services if we started them (not reusing existing)
if nats_server is None and etcd is None:
print("Reused existing services, not stopping them")
else:
# Terminate both processes first (parallel shutdown)
try:
if nats_server:
nats_server.terminate()
except Exception as e:
print(f"Error terminating NATS: {e}")
try:
if etcd:
etcd.terminate()
except Exception as e:
print(f"Error terminating ETCD: {e}")
# Wait for both processes to finish
try:
if nats_server:
nats_server.wait(timeout=SERVICE_SHUTDOWN_TIMEOUT)
except subprocess.TimeoutExpired:
print("NATS did not terminate gracefully, killing")
try:
nats_server.kill()
except Exception:
pass
except Exception as e:
print(f"Error waiting for NATS: {e}")
try:
if etcd:
etcd.wait(timeout=SERVICE_SHUTDOWN_TIMEOUT)
except subprocess.TimeoutExpired:
print("ETCD did not terminate gracefully, killing")
try:
etcd.kill()
except Exception:
pass
except Exception as e:
print(f"Error waiting for ETCD: {e}")
# Clean up temporary data directories (if created)
if nats_data_dir:
try:
shutil.rmtree(nats_data_dir, ignore_errors=True)
except Exception as e:
print(f"Error removing NATS data dir: {e}")
if etcd_data_dir:
try:
shutil.rmtree(etcd_data_dir, ignore_errors=True)
except Exception as e:
print(f"Error removing ETCD data dir: {e}")
@pytest.fixture(scope="function", autouse=False)
async def runtime():
async def runtime(request):
"""
Create a DistributedRuntime for testing.
DistributedRuntime has singleton requirements, so tests using this fixture should be
IMPORTANT: DistributedRuntime is a process-level singleton. When using isolated
NATS/ETCD (ENABLE_ISOLATED_ETCD_AND_NATS=1), tests using this fixture MUST be
marked with `@pytest.mark.forked` to run in a separate process for isolation.
Without @pytest.mark.forked in isolated mode, you will get "Worker already initialized"
errors when multiple tests try to create runtimes in the same process.
"""
# Check if the test is marked with @pytest.mark.forked (only in isolated mode)
if ENABLE_ISOLATED_ETCD_AND_NATS:
forked_marker = request.node.get_closest_marker("forked")
if forked_marker is None:
pytest.fail(
f"""
Test '{request.node.name}' uses the 'runtime' fixture but is not marked with @pytest.mark.forked.
This is required when ENABLE_ISOLATED_ETCD_AND_NATS=1.
Add @pytest.mark.forked decorator to run this test in a separate process:
@pytest.mark.forked
async def test_my_test(runtime):
...
Or set ENABLE_ISOLATED_ETCD_AND_NATS=0 to use default ports (no forking needed).
This is required because DistributedRuntime is a process-level singleton.
"""
)
loop = asyncio.get_running_loop()
runtime = DistributedRuntime(loop, True)
yield runtime
......
......@@ -35,16 +35,20 @@ from dynamo.runtime import Component, DistributedRuntime
pytestmark = pytest.mark.pre_merge
@pytest.fixture(scope="module")
@pytest.fixture
async def distributed_runtime():
"""TODO: This should not use scope='module' as DistributedRuntime has singleton requirements.
and blocks any tests with DistributedRuntime(loop, True) from running in the same process, or any forked process.
"""Function-scoped runtime fixture for use with @pytest.mark.forked tests.
Each test gets its own runtime in a forked process to avoid singleton conflicts.
"""
loop = asyncio.get_running_loop()
return DistributedRuntime(loop, False)
runtime = DistributedRuntime(loop, False)
yield runtime
runtime.shutdown()
# TODO: enable pytest.mark.forked + scope='function' runtime.
@pytest.mark.asyncio
@pytest.mark.forked
async def test_radix_tree_binding(distributed_runtime):
"""Test RadixTree binding directly with store event and find matches"""
import json
......@@ -101,6 +105,8 @@ async def test_radix_tree_binding(distributed_runtime):
# OnceCell initializations not being reset.
# The test works individually if I run it with 32, then 11, then 64.
# @pytest.mark.parametrize("kv_block_size", [11, 32, 64])
@pytest.mark.asyncio
@pytest.mark.forked
@pytest.mark.skip(reason="Flakey in CI. Likely race condition going on.")
async def test_event_handler(distributed_runtime):
kv_block_size = 32
......@@ -157,7 +163,8 @@ async def test_event_handler(distributed_runtime):
), f"Scores still present after {(retry+1)*0.5}s: {scores.scores}"
# TODO: enable pytest.mark.forked + scope='function' runtime.
@pytest.mark.asyncio
@pytest.mark.forked
async def test_approx_kv_indexer(distributed_runtime):
kv_block_size = 32
namespace = "kv_test"
......@@ -215,7 +222,8 @@ class EventPublisher:
self.event_id_counter += 1
# TODO: enable pytest.mark.forked + scope='function' runtime.
@pytest.mark.asyncio
@pytest.mark.forked
async def test_metrics_aggregator(distributed_runtime):
namespace = "kv_test"
component = "metrics"
......
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""Tests for Python MetricsRegistry bindings.
This test suite verifies that Python can create, introspect, and use Prometheus
metrics through the Dynamo MetricsRegistry interface.
"""
import pytest
async def get_metrics_runtime(runtime, endpoint_name):
"""Helper to create a unique metrics runtime for each test."""
namespace = runtime.namespace("test_metrics_ns")
component = namespace.component("test_metrics_comp")
await component.create_service()
endpoint = component.endpoint(endpoint_name)
return endpoint.metrics
pytestmark = pytest.mark.pre_merge
@pytest.mark.asyncio
@pytest.mark.forked
async def test_counter_introspection(runtime):
"""Test Counter metric introspection methods."""
metrics_runtime = await get_metrics_runtime(runtime, "ep_counter_introspection")
counter = metrics_runtime.create_counter(
"test_counter", "A test counter", [("env", "test")] # constant labels
)
# Test name() method
name = counter.name()
assert isinstance(name, str)
assert "test_counter" in name
assert name.endswith("test_counter")
# Test const_labels() method
labels = counter.const_labels()
assert isinstance(labels, dict)
assert "env" in labels
assert labels["env"] == "test"
assert "dynamo_namespace" in labels
assert labels["dynamo_namespace"] == "test_metrics_ns"
@pytest.mark.asyncio
@pytest.mark.forked
async def test_intcounter_introspection(runtime):
"""Test IntCounter metric introspection methods."""
metrics_runtime = await get_metrics_runtime(runtime, "ep_intcounter_introspection")
counter = metrics_runtime.create_intcounter(
"test_int_counter", "A test int counter", [("type", "integer")]
)
name = counter.name()
assert isinstance(name, str)
assert "test_int_counter" in name
labels = counter.const_labels()
assert isinstance(labels, dict)
assert labels["type"] == "integer"
@pytest.mark.asyncio
@pytest.mark.forked
async def test_gauge_introspection(runtime):
"""Test Gauge metric introspection methods."""
metrics_runtime = await get_metrics_runtime(runtime, "ep_gauge_introspection")
gauge = metrics_runtime.create_gauge(
"test_gauge", "A test gauge", [("unit", "bytes")]
)
name = gauge.name()
assert isinstance(name, str)
assert "test_gauge" in name
labels = gauge.const_labels()
assert isinstance(labels, dict)
assert labels["unit"] == "bytes"
@pytest.mark.asyncio
@pytest.mark.forked
async def test_intgauge_introspection(runtime):
"""Test IntGauge metric introspection methods."""
metrics_runtime = await get_metrics_runtime(runtime, "ep_intgauge_introspection")
gauge = metrics_runtime.create_intgauge(
"test_int_gauge", "A test int gauge", [] # no constant labels
)
name = gauge.name()
assert isinstance(name, str)
assert "test_int_gauge" in name
labels = gauge.const_labels()
assert isinstance(labels, dict)
# Should still have hierarchy labels
assert "dynamo_namespace" in labels
@pytest.mark.asyncio
@pytest.mark.forked
async def test_histogram_introspection(runtime):
"""Test Histogram metric introspection methods."""
metrics_runtime = await get_metrics_runtime(runtime, "ep_histogram_introspection")
histogram = metrics_runtime.create_histogram(
"test_histogram", "A test histogram", [("method", "POST")]
)
name = histogram.name()
assert isinstance(name, str)
assert "test_histogram" in name
labels = histogram.const_labels()
assert isinstance(labels, dict)
assert labels["method"] == "POST"
@pytest.mark.asyncio
@pytest.mark.forked
async def test_countervec_introspection(runtime):
"""Test CounterVec metric introspection methods."""
metrics_runtime = await get_metrics_runtime(runtime, "ep_countervec_introspection")
counter_vec = metrics_runtime.create_countervec(
"test_counter_vec",
"A test counter vec",
["worker_id", "status"], # variable labels
[("cluster", "prod")], # constant labels
)
# Test name()
name = counter_vec.name()
assert isinstance(name, str)
assert "test_counter_vec" in name
# Test const_labels()
const_labels = counter_vec.const_labels()
assert isinstance(const_labels, dict)
assert const_labels["cluster"] == "prod"
assert "dynamo_namespace" in const_labels
# Test variable_labels()
var_labels = counter_vec.variable_labels()
assert isinstance(var_labels, list)
assert len(var_labels) == 2
assert "worker_id" in var_labels
assert "status" in var_labels
@pytest.mark.asyncio
@pytest.mark.forked
async def test_intcountervec_introspection(runtime):
"""Test IntCounterVec metric introspection methods."""
metrics_runtime = await get_metrics_runtime(
runtime, "ep_intcountervec_introspection"
)
counter_vec = metrics_runtime.create_intcountervec(
"test_int_counter_vec",
"A test int counter vec",
["region", "zone"],
[], # no constant labels
)
name = counter_vec.name()
assert "test_int_counter_vec" in name
const_labels = counter_vec.const_labels()
assert isinstance(const_labels, dict)
var_labels = counter_vec.variable_labels()
assert len(var_labels) == 2
assert "region" in var_labels
assert "zone" in var_labels
@pytest.mark.asyncio
@pytest.mark.forked
async def test_gaugevec_introspection(runtime):
"""Test GaugeVec metric introspection methods."""
metrics_runtime = await get_metrics_runtime(runtime, "ep_gaugevec_introspection")
gauge_vec = metrics_runtime.create_gaugevec(
"test_gauge_vec", "A test gauge vec", ["instance", "job"], [("env", "staging")]
)
name = gauge_vec.name()
assert "test_gauge_vec" in name
const_labels = gauge_vec.const_labels()
assert const_labels["env"] == "staging"
var_labels = gauge_vec.variable_labels()
assert len(var_labels) == 2
assert "instance" in var_labels
assert "job" in var_labels
@pytest.mark.asyncio
@pytest.mark.forked
async def test_intgaugevec_introspection(runtime):
"""Test IntGaugeVec metric introspection methods."""
metrics_runtime = await get_metrics_runtime(runtime, "ep_intgaugevec_introspection")
gauge_vec = metrics_runtime.create_intgaugevec(
"test_int_gauge_vec",
"A test int gauge vec",
["device", "partition"],
[("datacenter", "us-west")],
)
name = gauge_vec.name()
assert "test_int_gauge_vec" in name
const_labels = gauge_vec.const_labels()
assert const_labels["datacenter"] == "us-west"
var_labels = gauge_vec.variable_labels()
assert len(var_labels) == 2
assert "device" in var_labels
assert "partition" in var_labels
@pytest.mark.asyncio
@pytest.mark.forked
async def test_metric_operations(runtime):
"""Test that metrics can be used after introspection."""
metrics_runtime = await get_metrics_runtime(runtime, "ep_metric_operations")
# Counter operations
counter = metrics_runtime.create_intcounter("ops_counter", "Operations counter", [])
counter.inc()
counter.inc_by(5)
assert counter.get() == 6
# Gauge operations
gauge = metrics_runtime.create_intgauge(
"connections_gauge", "Connections gauge", []
)
gauge.set(10)
assert gauge.get() == 10
gauge.inc()
assert gauge.get() == 11
gauge.dec()
assert gauge.get() == 10
# Vec operations
gauge_vec = metrics_runtime.create_intgaugevec(
"worker_gauge_vec", "Worker gauge vec", ["worker_id"], []
)
gauge_vec.set(5, {"worker_id": "w1"})
assert gauge_vec.get({"worker_id": "w1"}) == 5
gauge_vec.inc({"worker_id": "w1"})
assert gauge_vec.get({"worker_id": "w1"}) == 6
@pytest.mark.asyncio
@pytest.mark.forked
async def test_multiple_metrics_same_runtime(runtime):
"""Test creating multiple metrics in the same runtime."""
metrics_runtime = await get_metrics_runtime(
runtime, "ep_multiple_metrics_same_runtime"
)
counter1 = metrics_runtime.create_intcounter("counter1", "Counter 1", [])
counter2 = metrics_runtime.create_intcounter("counter2", "Counter 2", [])
gauge1 = metrics_runtime.create_gauge("gauge1", "Gauge 1", [])
# All should have unique names
names = {counter1.name(), counter2.name(), gauge1.name()}
assert len(names) == 3
# All should share the same hierarchy labels
for metric in [counter1, counter2, gauge1]:
labels = metric.const_labels()
assert labels["dynamo_namespace"] == "test_metrics_ns"
assert "dynamo_component" in labels # Component name is test-specific
assert labels["dynamo_endpoint"] == "ep_multiple_metrics_same_runtime"
......@@ -300,11 +300,13 @@ impl DistributedRuntime {
}
/// Add a callback function to metrics registries for the given hierarchies
// TODO: Rename to register_metrics_update_callback for consistency with Python API.
// Do this after we move the MetricsRegistry trait to composition pattern.
pub fn register_metrics_callback(&self, hierarchies: Vec<String>, callback: RuntimeCallback) {
let mut registries = self.hierarchy_to_metricsregistry.write().unwrap();
for hierarchy in hierarchies {
for hierarchy in &hierarchies {
registries
.entry(hierarchy)
.entry(hierarchy.clone())
.or_default()
.add_callback(callback.clone());
}
......
......@@ -113,6 +113,21 @@ impl PrometheusMetric for prometheus::IntGauge {
}
}
impl PrometheusMetric for prometheus::GaugeVec {
fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
Err(prometheus::Error::Msg(
"GaugeVec requires label names, use with_opts_and_label_names instead".to_string(),
))
}
fn with_opts_and_label_names(
opts: prometheus::Opts,
label_names: &[&str],
) -> Result<Self, prometheus::Error> {
prometheus::GaugeVec::new(opts, label_names)
}
}
impl PrometheusMetric for prometheus::IntGaugeVec {
fn with_opts(_opts: prometheus::Opts) -> Result<Self, prometheus::Error> {
Err(prometheus::Error::Msg(
......@@ -252,21 +267,8 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>(
// Handle different metric types
let prometheus_metric = if std::any::TypeId::of::<T>()
== std::any::TypeId::of::<prometheus::Histogram>()
== std::any::TypeId::of::<prometheus::CounterVec>()
{
// Special handling for Histogram with custom buckets
// buckets parameter is valid for Histogram, const_labels is not used
if const_labels.is_some() {
return Err(anyhow::anyhow!(
"const_labels parameter is not valid for Histogram"
));
}
let mut opts = prometheus::HistogramOpts::new(&metric_name, metric_desc);
for (key, value) in &updated_labels {
opts = opts.const_label(key.clone(), value.clone());
}
T::with_histogram_opts_and_buckets(opts, buckets)?
} else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::CounterVec>() {
// Special handling for CounterVec with label names
// const_labels parameter is required for CounterVec
if buckets.is_some() {
......@@ -281,12 +283,12 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>(
let label_names = const_labels
.ok_or_else(|| anyhow::anyhow!("CounterVec requires const_labels parameter"))?;
T::with_opts_and_label_names(opts, label_names)?
} else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntGaugeVec>() {
// Special handling for IntGaugeVec with label names
// const_labels parameter is required for IntGaugeVec
} else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::GaugeVec>() {
// Special handling for GaugeVec with label names
// const_labels parameter is required for GaugeVec
if buckets.is_some() {
return Err(anyhow::anyhow!(
"buckets parameter is not valid for IntGaugeVec"
"buckets parameter is not valid for GaugeVec"
));
}
let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
......@@ -294,8 +296,21 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>(
opts = opts.const_label(key.clone(), value.clone());
}
let label_names = const_labels
.ok_or_else(|| anyhow::anyhow!("IntGaugeVec requires const_labels parameter"))?;
.ok_or_else(|| anyhow::anyhow!("GaugeVec requires const_labels parameter"))?;
T::with_opts_and_label_names(opts, label_names)?
} else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::Histogram>() {
// Special handling for Histogram with custom buckets
// buckets parameter is valid for Histogram, const_labels is not used
if const_labels.is_some() {
return Err(anyhow::anyhow!(
"const_labels parameter is not valid for Histogram"
));
}
let mut opts = prometheus::HistogramOpts::new(&metric_name, metric_desc);
for (key, value) in &updated_labels {
opts = opts.const_label(key.clone(), value.clone());
}
T::with_histogram_opts_and_buckets(opts, buckets)?
} else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntCounterVec>() {
// Special handling for IntCounterVec with label names
// const_labels parameter is required for IntCounterVec
......@@ -311,6 +326,21 @@ fn create_metric<T: PrometheusMetric, R: MetricsRegistry + ?Sized>(
let label_names = const_labels
.ok_or_else(|| anyhow::anyhow!("IntCounterVec requires const_labels parameter"))?;
T::with_opts_and_label_names(opts, label_names)?
} else if std::any::TypeId::of::<T>() == std::any::TypeId::of::<prometheus::IntGaugeVec>() {
// Special handling for IntGaugeVec with label names
// const_labels parameter is required for IntGaugeVec
if buckets.is_some() {
return Err(anyhow::anyhow!(
"buckets parameter is not valid for IntGaugeVec"
));
}
let mut opts = prometheus::Opts::new(&metric_name, metric_desc);
for (key, value) in &updated_labels {
opts = opts.const_label(key.clone(), value.clone());
}
let label_names = const_labels
.ok_or_else(|| anyhow::anyhow!("IntGaugeVec requires const_labels parameter"))?;
T::with_opts_and_label_names(opts, label_names)?
} else {
// Standard handling for Counter, IntCounter, Gauge, IntGauge
// buckets and const_labels parameters are not valid for these types
......@@ -396,6 +426,9 @@ pub trait MetricsRegistry: Send + Sync + DistributedRuntimeProvider {
// - Summary: create_summary() - for quantiles and sum/count metrics
// - SummaryVec: create_summary_vec() - for labeled summaries
// - Untyped: create_untyped() - for untyped metrics
//
// NOTE: The order of create_* methods below is mirrored in lib/bindings/python/rust/lib.rs::Metrics
// Keep them synchronized when adding new metric types
/// Create a Counter metric
fn create_counter(
......@@ -435,6 +468,24 @@ pub trait MetricsRegistry: Send + Sync + DistributedRuntimeProvider {
create_metric(self, name, description, labels, None, None)
}
/// Create a GaugeVec metric with label names (for dynamic labels)
fn create_gaugevec(
&self,
name: &str,
description: &str,
const_labels: &[&str],
const_label_values: &[(&str, &str)],
) -> anyhow::Result<prometheus::GaugeVec> {
create_metric(
self,
name,
description,
const_label_values,
None,
Some(const_labels),
)
}
/// Create a Histogram metric with custom buckets
fn create_histogram(
&self,
......
......@@ -193,14 +193,22 @@ async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
.unwrap()
.update_uptime_gauge();
// Execute all the callbacks starting at the DistributedRuntime level
assert!(state.drt().basename() == "");
let callback_results = state
.drt()
.execute_metrics_callbacks(&state.drt().hierarchy());
for result in callback_results {
if let Err(e) = result {
tracing::error!("Error executing metrics callback: {}", e);
// Execute all the callbacks for all registered hierarchies
let all_hierarchies: Vec<String> = {
let registries = state.drt().hierarchy_to_metricsregistry.read().unwrap();
registries.keys().cloned().collect()
};
for hierarchy in &all_hierarchies {
let callback_results = state.drt().execute_metrics_callbacks(hierarchy);
for result in callback_results {
if let Err(e) = result {
tracing::error!(
"Error executing metrics callback for hierarchy '{}': {}",
hierarchy,
e
);
}
}
}
......
......@@ -219,7 +219,13 @@ def get_runtime():
except Exception as e:
# If no existing runtime, create a new one
logger.info(f"Creating new runtime (detached failed: {e})")
loop = asyncio.get_running_loop()
try:
# Try to get running loop (works in async context)
loop = asyncio.get_running_loop()
except RuntimeError:
# No running loop, create a new one (sync context)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
_runtime_instance = DistributedRuntime(loop, False)
return _runtime_instance
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment