Unverified Commit 65cc5337 authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

feat: implement custom backend metrics for NIM (#3266)


Signed-off-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent 4224e57d
......@@ -41,6 +41,10 @@ from dynamo.runtime import DistributedRuntime
from . import __version__
DYN_NAMESPACE_ENV_VAR = "DYN_NAMESPACE"
CUSTOM_BACKEND_METRICS_POLLING_INTERVAL_ENV_VAR = (
"CUSTOM_BACKEND_METRICS_POLLING_INTERVAL"
)
CUSTOM_BACKEND_ENDPOINT_ENV_VAR = "CUSTOM_BACKEND_ENDPOINT"
logger = logging.getLogger(__name__)
......@@ -205,6 +209,22 @@ def parse_args():
help="Start KServe gRPC server.",
)
add_config_dump_args(parser)
parser.add_argument(
"--custom-backend-metrics-endpoint",
type=str,
default=os.environ.get(
CUSTOM_BACKEND_ENDPOINT_ENV_VAR, "nim.backend.runtime_stats"
),
help=f"Custom backend endpoint to poll for metrics in format 'namespace.component.endpoint' (default: 'nim.backend.runtime_stats'). Required if --custom-backend-metrics-polling-interval is specified. All metrics will be prefixed with 'dynamo_component_' in Prometheus. Can be set via {CUSTOM_BACKEND_ENDPOINT_ENV_VAR} env var.",
)
parser.add_argument(
"--custom-backend-metrics-polling-interval",
type=float,
default=float(
os.environ.get(CUSTOM_BACKEND_METRICS_POLLING_INTERVAL_ENV_VAR, "0")
),
help=f"Interval in seconds for polling custom backend metrics. Set to > 0 to enable polling (default: 0=disabled, suggested: 9.2s which is less than typical Prometheus scrape interval). Can be set via {CUSTOM_BACKEND_METRICS_POLLING_INTERVAL_ENV_VAR} env var.",
)
flags = parser.parse_args()
......@@ -212,6 +232,10 @@ def parse_args():
parser.error("--static-endpoint requires both --model-name and --model-path")
if bool(flags.tls_cert_path) ^ bool(flags.tls_key_path): # ^ is XOR
parser.error("--tls-cert-path and --tls-key-path must be provided together")
if flags.custom_backend_metrics_polling_interval < 0:
parser.error(
"--custom-backend-metrics-polling-interval must be >= 0 (0=disabled)"
)
return flags
......@@ -277,6 +301,14 @@ async def async_main():
kwargs["tls_key_path"] = flags.tls_key_path
if flags.namespace:
kwargs["namespace"] = flags.namespace
if flags.custom_backend_metrics_endpoint:
kwargs[
"custom_backend_metrics_endpoint"
] = flags.custom_backend_metrics_endpoint
if flags.custom_backend_metrics_polling_interval:
kwargs[
"custom_backend_metrics_polling_interval"
] = flags.custom_backend_metrics_polling_interval
if is_static:
# out=dyn://<static_endpoint>
......
# NIM Backend Metrics Mock Server
This directory contains a mock NIM (NVIDIA Inference Microservices) backend server for testing the frontend's on-demand metrics collection feature.
## Purpose
**NOTE: This is temporary code.** Once NIM starts using Dynamo backend components natively, this mock server and the associated NIM metrics polling code will be removed.
This example demonstrates:
- How the Dynamo frontend can poll external backends for metrics
- Dynamic metric generation and collection
- The `runtime_stats` endpoint pattern
- Integration between frontend metrics and backend services
## Running the Example
### 1. Start the Mock NIM Backend
**Static mode (default - NATS only, no etcd):**
```bash
python3 examples/custom_backend/nim/mock_nim_backend.py
```
**Dynamic mode (with etcd for service discovery):**
```bash
python3 examples/custom_backend/nim/mock_nim_backend.py
```
This starts a backend on `nim.backend.runtime_stats` (default) that returns incrementing metrics. You can customize with `--custom-backend-metrics-endpoint "namespace.component.endpoint"`.
### 2. Start the Frontend with Metrics Polling
```bash
python3 -m dynamo.frontend \
--model-name Qwen/Qwen2.5-0.5B-Instruct \
--custom-backend-metrics-endpoint nim.backend.runtime_stats \
--custom-backend-metrics-polling-interval 9.2
```
**Note:** The custom backend metrics polling works in both static (NATS-only) and dynamic (with etcd) modes. The frontend automatically detects and adapts to the backend's mode.
### 3. Query Metrics
```bash
curl http://localhost:8000/metrics
```
The frontend will periodically (every 9.2 seconds in this example):
1. Poll the mock NIM backend via the `runtime_stats` endpoint
2. Parse the returned metrics
3. Update Prometheus gauges
When you query the `/metrics` endpoint, you'll see the most recently polled metrics.
## Metrics Exposed
The mock server returns:
**Gauges:**
- `kv_cache_usage_perc` - Cycles between 0.30 and 0.93
**Note:** All metrics collected from custom backends are automatically prefixed with `dynamo_component_` when exposed via the frontend's `/metrics` endpoint. For example, the gauge `kv_cache_usage_perc` from the backend will appear as `dynamo_component_kv_cache_usage_perc` in Prometheus metrics.
## Implementation Details
The frontend's NIM metrics collection is implemented in:
- `lib/llm/src/http/service/custom_backend_metrics.rs` - Custom backend metrics collection (temporary)
- `lib/llm/src/http/service/metrics.rs` - Metrics router
- `components/src/dynamo/frontend/main.py` - `--custom-backend-metrics-polling-interval` flag
All NIM-specific code is marked with TODO comments for removal once NIM adopts Dynamo backend.
#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Mock NIM Backend Server for Metrics Testing
This server mocks a NIM (NVIDIA Inference Microservices) backend that exposes
runtime statistics via the runtime_stats endpoint.
NOTE: This is temporary code for testing purposes only. Once NIM starts using
Dynamo backend components natively, this mock server and the associated NIM
metrics polling code in the frontend will be removed. The NIM-specific metrics
collection exists only as a bridge until NIM adopts the Dynamo runtime.
The server demonstrates:
- Dynamic metric generation (gauges and counters)
- Proper async generator pattern for Dynamo endpoints
- JSON-encoded metric responses compatible with the frontend metrics collector
"""
import asyncio
import json
import time
from typing import Any, AsyncGenerator
import uvloop
from dynamo.runtime import DistributedRuntime, dynamo_worker
# Global counter for incrementing metrics
request_count = 0
async def handle_stats_request(request: Any) -> AsyncGenerator[str, None]:
"""Mock stats handler - returns incrementing metrics for testing
Args:
request: JsonLike input from the client (can be dict, list, str, int, float, bool, or None)
Yields:
str: JSON string of stats dict conforming to the runtime_stats schema
"""
global request_count
request_count += 1
print(f"Received stats request #{request_count}: {request!r}")
# Simulate changing metrics
kv_cache_usage = 0.3 + (request_count % 10) * 0.07 # Cycles between 0.3 and 0.93
gpu_utilization = 50 + (request_count % 20) * 2.5 # Cycles between 50 and 97.5
active_requests = request_count % 15 # Cycles 0-14
stats = {
"schema_version": 1,
"worker_id": "mock-worker-1",
"backend": "vllm",
"ts": int(time.time()),
"metrics": {
"gauges": {
"kv_cache_usage_perc": round(kv_cache_usage, 2),
"gpu_utilization_perc": round(gpu_utilization, 2),
"active_requests": active_requests,
},
},
}
# Yield as JSON string for Rust Annotated<String> compatibility
yield json.dumps(stats)
async def worker(runtime: DistributedRuntime):
import argparse
parser = argparse.ArgumentParser(description="Mock NIM Backend Server")
parser.add_argument(
"--custom-backend-metrics-endpoint",
type=str,
default="nim.backend.runtime_stats",
help="Custom backend metrics endpoint in format 'namespace.component.endpoint' (default: 'nim.backend.runtime_stats')",
)
parser.add_argument(
"--use-etcd",
action="store_true",
help="Use etcd for service discovery (dynamic mode). Default is static mode (no etcd).",
)
args = parser.parse_args()
# Parse endpoint (namespace.component.endpoint)
parts = args.custom_backend_metrics_endpoint.split(".")
if len(parts) != 3:
raise ValueError(
f"Invalid endpoint format. Expected 'namespace.component.endpoint', got: {args.custom_backend_metrics_endpoint}"
)
namespace, comp_name, endpoint_name = parts
component = runtime.namespace(namespace).component(comp_name)
await component.create_service()
stats_endpoint = component.endpoint(endpoint_name)
print(
f"Mock NIM stats server started on {namespace}/{comp_name}/{endpoint_name} endpoint"
)
print(
"Exposing incrementing metrics: kv_cache_usage_perc, gpu_utilization_perc, active_requests, memory_used_gb, counters"
)
await stats_endpoint.serve_endpoint(handle_stats_request) # type: ignore[arg-type]
def main():
import argparse
# Parse args before calling dynamo_worker to determine static mode
parser = argparse.ArgumentParser(
description="Mock NIM Backend Server", add_help=False
)
parser.add_argument("--use-etcd", action="store_true")
args, _ = parser.parse_known_args()
# Set static mode based on --use-etcd flag (default is static/no etcd)
is_static = not args.use_etcd
# Create the worker with appropriate static mode
worker_func = dynamo_worker(static=is_static)(worker)
uvloop.install()
asyncio.run(worker_func()) # type: ignore[arg-type]
if __name__ == "__main__":
main()
#!/usr/bin/env python3
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
"""
Mock NIM Frontend - Polls the mock NIM backend for metrics
This script demonstrates how to poll a custom backend for metrics using
the Dynamo runtime in static mode (no etcd required, uses NATS only).
"""
import asyncio
import json
import signal
import uvloop
from dynamo.runtime import DistributedRuntime
async def poll_custom_backend_metrics(
runtime, namespace_component_endpoint, interval_secs
):
"""Poll custom backend metrics and print the data"""
print(
f"Starting custom backend metrics polling: endpoint={namespace_component_endpoint}, interval={interval_secs}s"
)
# Parse endpoint string (namespace.component.endpoint)
parts = namespace_component_endpoint.split(".")
if len(parts) != 3:
print(f"ERROR: Invalid endpoint format: {namespace_component_endpoint}")
return
namespace, component_name, endpoint_name = parts
print(f"Polling {namespace}/{component_name}/{endpoint_name}")
try:
# Get the component and endpoint
ns = runtime.namespace(namespace)
component = ns.component(component_name)
endpoint = component.endpoint(endpoint_name)
# Get client (in static mode, no need to wait for instances)
client = await endpoint.client()
print("Client created for static endpoint")
except Exception as e:
print(f"ERROR during polling setup: {e}")
import traceback
traceback.print_exc()
return
# Poll loop
print(f"Starting polling loop (every {interval_secs}s)...")
while True:
try:
await asyncio.sleep(interval_secs)
print(f"\n{'='*60}")
print(f"Polling tick at {asyncio.get_event_loop().time():.2f}")
# Send request and collect responses
# In static mode, use client.static() or client.generate()
response_stream = await client.generate("")
responses = []
async for response in response_stream:
if response.data():
responses.append(response.data())
print(f"Received {len(responses)} responses")
for idx, data in enumerate(responses):
print(f"\nResponse #{idx+1}:")
if isinstance(data, str):
try:
parsed = json.loads(data)
print(json.dumps(parsed, indent=2))
except json.JSONDecodeError:
print(data)
else:
print(data)
print(f"{'='*60}\n")
except asyncio.CancelledError:
print("Polling cancelled")
break
except Exception as e:
print(f"ERROR polling backend: {e}")
import traceback
traceback.print_exc()
await asyncio.sleep(interval_secs)
async def graceful_shutdown(runtime):
"""Gracefully shutdown the runtime"""
print("\nShutting down...")
runtime.shutdown()
async def async_main():
"""Main async function - similar to frontend/main.py"""
import argparse
parser = argparse.ArgumentParser(
description="Mock NIM Frontend - Poll backend for metrics"
)
parser.add_argument(
"--custom-backend-metrics-endpoint",
type=str,
default="nim.backend.runtime_stats",
help="Custom backend metrics endpoint in format 'namespace.component.endpoint' (default: 'nim.backend.runtime_stats')",
)
parser.add_argument(
"--polling-interval",
type=float,
default=3.0,
help="Polling interval in seconds (default: 3.0)",
)
args = parser.parse_args()
# Get the event loop
loop = asyncio.get_running_loop()
# Create DistributedRuntime - similar to frontend/main.py line 246
is_static = True # Use static mode (no etcd)
runtime = DistributedRuntime(loop, is_static) # type: ignore[call-arg]
# Setup signal handlers for graceful shutdown
def signal_handler():
asyncio.create_task(graceful_shutdown(runtime))
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, signal_handler)
print("Mock NIM Frontend starting...")
print(f"Target endpoint: {args.custom_backend_metrics_endpoint}")
print(f"Polling interval: {args.polling_interval}s")
print("Static mode: No etcd required, using NATS only\n")
try:
# Start polling
await poll_custom_backend_metrics(
runtime, args.custom_backend_metrics_endpoint, args.polling_interval
)
except asyncio.exceptions.CancelledError:
pass
def main():
"""Entry point - similar to frontend/main.py"""
uvloop.run(async_main())
if __name__ == "__main__":
main()
......@@ -118,13 +118,15 @@ pub(crate) struct EntrypointArgs {
tls_key_path: Option<PathBuf>,
extra_engine_args: Option<PathBuf>,
namespace: Option<String>,
custom_backend_metrics_endpoint: Option<String>,
custom_backend_metrics_polling_interval: Option<f64>,
}
#[pymethods]
impl EntrypointArgs {
#[allow(clippy::too_many_arguments)]
#[new]
#[pyo3(signature = (engine_type, model_path=None, model_name=None, endpoint_id=None, context_length=None, template_file=None, router_config=None, kv_cache_block_size=None, http_host=None, http_port=None, tls_cert_path=None, tls_key_path=None, extra_engine_args=None, namespace=None))]
#[pyo3(signature = (engine_type, model_path=None, model_name=None, endpoint_id=None, context_length=None, template_file=None, router_config=None, kv_cache_block_size=None, http_host=None, http_port=None, tls_cert_path=None, tls_key_path=None, extra_engine_args=None, namespace=None, custom_backend_metrics_endpoint=None, custom_backend_metrics_polling_interval=None))]
pub fn new(
engine_type: EngineType,
model_path: Option<PathBuf>,
......@@ -140,6 +142,8 @@ impl EntrypointArgs {
tls_key_path: Option<PathBuf>,
extra_engine_args: Option<PathBuf>,
namespace: Option<String>,
custom_backend_metrics_endpoint: Option<String>,
custom_backend_metrics_polling_interval: Option<f64>,
) -> PyResult<Self> {
let endpoint_id_obj: Option<EndpointId> = endpoint_id.as_deref().map(EndpointId::from);
if (tls_cert_path.is_some() && tls_key_path.is_none())
......@@ -164,6 +168,8 @@ impl EntrypointArgs {
tls_key_path,
extra_engine_args,
namespace,
custom_backend_metrics_endpoint,
custom_backend_metrics_polling_interval,
})
}
}
......@@ -196,7 +202,9 @@ pub fn make_engine<'p>(
.tls_key_path(args.tls_key_path.clone())
.is_mocker(matches!(args.engine_type, EngineType::Mocker))
.extra_engine_args(args.extra_engine_args.clone())
.namespace(args.namespace.clone());
.namespace(args.namespace.clone())
.custom_backend_metrics_endpoint(args.custom_backend_metrics_endpoint.clone())
.custom_backend_metrics_polling_interval(args.custom_backend_metrics_polling_interval);
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let local_model = builder.build().await.map_err(to_pyerr)?;
let inner = select_engine(distributed_runtime, args, local_model)
......
......@@ -52,6 +52,15 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul
http_service_builder =
http_service_builder.with_request_template(engine_config.local_model().request_template());
// DEPRECATED: To be removed after custom backends migrate to Dynamo backend.
// Pass the custom backend metrics endpoint as-is (already in namespace.component.endpoint format)
http_service_builder = http_service_builder.with_custom_backend_config(
local_model
.custom_backend_metrics_endpoint()
.map(|s| s.to_string()),
local_model.custom_backend_metrics_polling_interval(),
);
let http_service = match engine_config {
EngineConfig::Dynamic(_) => {
let distributed_runtime = DistributedRuntime::from_settings(runtime.clone()).await?;
......@@ -213,7 +222,57 @@ pub async fn run(runtime: Runtime, engine_config: EngineConfig) -> anyhow::Resul
.map(|rd| rd.to_string())
.collect::<Vec<String>>()
);
// DEPRECATED: To be removed after custom backends migrate to Dynamo backend.
// Start custom backend metrics polling if configured
let polling_task =
if let (Some(namespace_component_endpoint), Some(polling_interval), Some(registry)) = (
http_service
.custom_backend_namespace_component_endpoint
.as_ref(),
http_service.custom_backend_metrics_polling_interval,
http_service.custom_backend_registry.as_ref(),
) {
// Create DistributedRuntime for polling, matching the engine's mode
// Check if we have etcd_client to determine if we're in dynamic or static mode
let drt = if http_service.state().etcd_client().is_some() {
// Dynamic mode: use from_settings() which respects environment (includes etcd)
DistributedRuntime::from_settings(runtime.clone()).await?
} else {
// Static mode: no etcd
let dst_config =
dynamo_runtime::distributed::DistributedConfig::from_settings(true);
DistributedRuntime::new(runtime.clone(), dst_config).await?
};
tracing::info!(
namespace_component_endpoint=%namespace_component_endpoint,
polling_interval_secs=polling_interval,
"Starting custom backend metrics polling task"
);
// Spawn the polling task and keep the JoinHandle alive so it can be aborted during
// shutdown. While graceful shutdown is not strictly necessary for this non-critical
// metrics polling, explicitly aborting it prevents the task from running during the
// shutdown phase.
Some(
crate::http::service::custom_backend_metrics::spawn_custom_backend_polling_task(
drt,
namespace_component_endpoint.clone(),
polling_interval,
registry.clone(),
),
)
} else {
None
};
http_service.run(runtime.primary_token()).await?;
// Abort the polling task if it was started
if let Some(task) = polling_task {
task.abort();
}
runtime.shutdown(); // Cancel primary token
Ok(())
}
......
......@@ -20,6 +20,7 @@
mod openai;
pub mod custom_backend_metrics;
pub mod disconnect;
pub mod error;
pub mod health;
......
// SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
// DEPRECATED: To be removed after custom backends migrate to Dynamo backend.
//
// Custom backend metrics polling and collection.
//
// This module provides a bridge to poll metrics from custom backends (like NIM) that expose
// their own metrics endpoints, and makes them available through Prometheus.
use std::{
collections::HashMap,
sync::{Arc, Mutex},
time::Duration,
};
use serde::Deserialize;
/// Maximum number of custom backend gauges that can be registered to prevent unbounded growth.
pub const MAX_CUSTOM_BACKEND_GAUGES: usize = 100;
/// Registry for custom backend metrics discovered at runtime.
///
/// Metrics from custom backends are exposed as Prometheus gauges since we're setting
/// absolute values received from polling, not incrementing them locally.
///
/// All metrics are automatically prefixed when registered. For example, if the prefix is
/// `dynamo_component` and a backend reports a gauge named `kv_cache_usage_perc`, it will
/// be exposed as `dynamo_component_kv_cache_usage_perc` in Prometheus metrics.
pub struct CustomBackendMetricsRegistry {
gauges: Mutex<HashMap<String, prometheus::Gauge>>,
prefix: String,
prometheus_registry: prometheus::Registry,
}
impl CustomBackendMetricsRegistry {
pub fn new(prefix: String, prometheus_registry: prometheus::Registry) -> Self {
Self {
gauges: Mutex::new(HashMap::new()),
prefix,
prometheus_registry,
}
}
/// Get or create a gauge for the given metric name, registering it with Prometheus if new.
/// Returns None if the maximum number of gauges has been reached.
fn get_or_create_gauge(&self, name: &str) -> Option<prometheus::Gauge> {
let mut gauges = self.gauges.lock().unwrap();
if let Some(gauge) = gauges.get(name) {
return Some(gauge.clone());
}
// Cap the number of gauges to prevent unbounded growth
if gauges.len() >= MAX_CUSTOM_BACKEND_GAUGES {
tracing::warn!(
"Maximum number of custom backend gauges ({}) reached, dropping metric: {}",
MAX_CUSTOM_BACKEND_GAUGES,
name
);
return None;
}
let full_name = format!("{}_{}", self.prefix, name);
let gauge = prometheus::Gauge::new(full_name.as_str(), name)
.unwrap_or_else(|e| panic!("Failed to create gauge {}: {}", full_name, e));
if let Err(e) = self.prometheus_registry.register(Box::new(gauge.clone())) {
tracing::warn!(
"Failed to register custom backend gauge {}: {}",
full_name,
e
);
}
gauges.insert(name.to_string(), gauge.clone());
Some(gauge)
}
/// Update a gauge metric with a new value.
pub fn set_gauge(&self, name: &str, value: f64) {
if let Some(gauge) = self.get_or_create_gauge(name) {
gauge.set(value);
}
}
}
/// Response format from custom backend runtime_stats endpoint
#[derive(Debug, Deserialize)]
struct CustomBackendStatsResponse {
metrics: CustomBackendMetrics,
}
#[derive(Debug, Deserialize)]
struct CustomBackendMetrics {
gauges: HashMap<String, f64>,
}
/// Spawn a background task that polls custom backend metrics periodically.
///
/// All metrics collected from the backend will be prefixed according to the registry's prefix
/// (typically `dynamo_component_`). For example, a backend gauge `kv_cache_usage_perc` will
/// appear as `dynamo_component_kv_cache_usage_perc` in Prometheus.
///
/// This task does not use a CancellationToken for graceful shutdown. When the executable exits,
/// the task is abruptly terminated by the tokio runtime shutdown. This is acceptable because
/// metrics polling is non-critical with no risk of data corruption or resource leaks, typical
/// polling intervals are short, and the Worker already has a graceful shutdown timeout mechanism.
pub fn spawn_custom_backend_polling_task(
drt: dynamo_runtime::DistributedRuntime,
namespace_component_endpoint: String,
polling_interval_secs: f64,
registry: Arc<CustomBackendMetricsRegistry>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
tracing::info!(
namespace_component_endpoint=%namespace_component_endpoint,
interval_secs=polling_interval_secs,
"Starting custom backend metrics polling"
);
// Parse namespace.component.endpoint format
let parts: Vec<&str> = namespace_component_endpoint.split('.').collect();
if parts.len() != 3 {
tracing::error!(
namespace_component_endpoint=%namespace_component_endpoint,
"Invalid endpoint format, expected 'namespace.component.endpoint'"
);
return;
}
let (namespace, component_name, endpoint_name) = (parts[0], parts[1], parts[2]);
// Get namespace, component, and endpoint from DRT
let Ok(ns) = drt.namespace(namespace.to_string()) else {
tracing::error!("Namespace not available: {}", namespace);
return;
};
let Ok(component) = ns.component(component_name) else {
tracing::error!("Component not available: {}", component_name);
return;
};
let endpoint = component.endpoint(endpoint_name);
// Wait for client to be ready (backend might not be available yet)
let client = loop {
match endpoint.client().await {
Ok(client) => break client,
Err(e) => {
tracing::warn!(
error=%e,
namespace=%namespace,
component=%component_name,
endpoint=%endpoint_name,
"Failed to create client for custom backend endpoint, retrying in 5s"
);
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
};
// Create router for sending requests to the backend
use dynamo_runtime::pipeline::{PushRouter, RouterMode};
use dynamo_runtime::protocols::annotated::Annotated;
let Ok(router) =
PushRouter::<String, Annotated<String>>::from_client(client, RouterMode::Random).await
else {
tracing::error!(
namespace=%namespace,
component=%component_name,
endpoint=%endpoint_name,
"Failed to create router for custom backend endpoint"
);
return;
};
tracing::info!(
namespace=%namespace,
component=%component_name,
endpoint=%endpoint_name,
"Custom backend metrics polling started"
);
// Poll backend at regular intervals
let interval = Duration::from_secs_f64(polling_interval_secs);
loop {
tokio::time::sleep(interval).await;
match poll_backend_once(&router, &registry).await {
Ok(num_metrics) => {
tracing::debug!(
num_metrics=%num_metrics,
"Successfully polled custom backend metrics"
);
}
Err(e) => {
tracing::warn!(
error=%e,
"Failed to poll custom backend metrics"
);
}
}
}
})
}
/// Poll the backend once and update the registry.
async fn poll_backend_once(
router: &dynamo_runtime::pipeline::PushRouter<
String,
dynamo_runtime::protocols::annotated::Annotated<String>,
>,
registry: &Arc<CustomBackendMetricsRegistry>,
) -> anyhow::Result<usize> {
use dynamo_runtime::pipeline::Context;
// Send request to backend (try static mode first, fall back to dynamic mode)
let response_stream = match router.r#static(Context::new("".to_string())).await {
Ok(stream) => stream,
Err(_) => router.random(Context::new("".to_string())).await?,
};
// Collect responses from the stream
let mut responses = Vec::new();
{
use futures::StreamExt;
let mut stream = response_stream;
while let Some(response) = stream.next().await {
if let Some(data) = response.data {
responses.push(data);
}
}
}
if responses.is_empty() {
anyhow::bail!("No responses received from custom backend");
}
// Parse the first response as JSON
// Expected format from backend (as JSON string):
// {
// "schema_version": 1,
// "worker_id": "mock-worker-1",
// "backend": "vllm",
// "ts": 1759967807,
// "metrics": {
// "gauges": {
// "kv_cache_usage_perc": 0.3,
// "gpu_utilization_perc": 75.5,
// "active_requests": 5
// }
// }
// }
let stats: CustomBackendStatsResponse = serde_json::from_str(&responses[0])
.map_err(|e| anyhow::anyhow!("Failed to parse backend stats JSON: {}", e))?;
// Update gauges in the registry
for (name, value) in &stats.metrics.gauges {
registry.set_gauge(name, *value);
}
Ok(stats.metrics.gauges.len())
}
......@@ -26,6 +26,11 @@ pub use prometheus::Registry;
use super::RouteDoc;
/// State for metrics handler with custom backend support
struct MetricsHandlerState {
registry: Arc<Registry>,
}
pub struct Metrics {
request_counter: IntCounterVec,
inflight_gauge: IntGaugeVec,
......@@ -825,21 +830,28 @@ pub fn process_response_using_event_converter_and_observe_metrics<T: Serialize>(
Ok(event)
}
/// Create a new router with the given path
/// Create a new router with optional custom backend metrics support
pub fn router(registry: Registry, path: Option<String>) -> (Vec<RouteDoc>, Router) {
let registry = Arc::new(registry);
let path = path.unwrap_or_else(|| "/metrics".to_string());
let doc = RouteDoc::new(axum::http::Method::GET, &path);
let metrics_state = MetricsHandlerState {
registry: Arc::new(registry),
};
let route = Router::new()
.route(&path, get(handler_metrics))
.with_state(registry);
.with_state(Arc::new(metrics_state));
(vec![doc], route)
}
/// Metrics Handler
async fn handler_metrics(State(registry): State<Arc<Registry>>) -> impl IntoResponse {
/// Unified metrics handler
async fn handler_metrics(State(state): State<Arc<MetricsHandlerState>>) -> impl IntoResponse {
// Gather and encode metrics
// Note: If nim_on_demand is enabled, the NimMetricsCollector registered with the registry
// will automatically call poll_nim_backend_stats when gather() is invoked
let encoder = prometheus::TextEncoder::new();
let metric_families = registry.gather();
let metric_families = state.registry.gather();
let mut buffer = vec![];
if encoder.encode(&metric_families, &mut buffer).is_err() {
return (
......
......@@ -19,6 +19,7 @@ use anyhow::Result;
use axum_server::tls_rustls::RustlsConfig;
use derive_builder::Builder;
use dynamo_runtime::logging::make_request_span;
use dynamo_runtime::metrics::prometheus_names::name_prefix;
use dynamo_runtime::storage::key_value_store::EtcdStore;
use dynamo_runtime::storage::key_value_store::KeyValueStore;
use dynamo_runtime::storage::key_value_store::MemoryStore;
......@@ -142,6 +143,12 @@ pub struct HttpService {
tls_cert_path: Option<PathBuf>,
tls_key_path: Option<PathBuf>,
route_docs: Vec<RouteDoc>,
// DEPRECATED: To be removed after custom backends migrate to Dynamo backend.
pub(crate) custom_backend_namespace_component_endpoint: Option<String>,
pub(crate) custom_backend_metrics_polling_interval: Option<f64>,
pub(crate) custom_backend_registry:
Option<Arc<super::custom_backend_metrics::CustomBackendMetricsRegistry>>,
}
#[derive(Clone, Builder)]
......@@ -181,6 +188,13 @@ pub struct HttpServiceConfig {
#[builder(default = "None")]
etcd_client: Option<etcd::Client>,
// DEPRECATED: To be removed after custom backends migrate to Dynamo backend.
#[builder(default = "None")]
custom_backend_namespace_component_endpoint: Option<String>,
#[builder(default = "None")]
custom_backend_metrics_polling_interval: Option<f64>,
}
impl HttpService {
......@@ -324,7 +338,21 @@ impl HttpServiceConfigBuilder {
let registry = metrics::Registry::new();
state.metrics_clone().register(&registry)?;
// Note: Metrics polling task will be started in run() method to have access to cancellation token
// DEPRECATED: To be removed after custom backends migrate to Dynamo backend.
// Setup custom backend metrics if configured
let custom_backend_registry =
if config.custom_backend_namespace_component_endpoint.is_some()
&& config.custom_backend_metrics_polling_interval.is_some()
{
Some(Arc::new(
super::custom_backend_metrics::CustomBackendMetricsRegistry::new(
name_prefix::COMPONENT.to_string(),
registry.clone(),
),
))
} else {
None
};
let mut router = axum::Router::new();
......@@ -364,6 +392,10 @@ impl HttpServiceConfigBuilder {
tls_cert_path: config.tls_cert_path,
tls_key_path: config.tls_key_path,
route_docs: all_docs,
custom_backend_namespace_component_endpoint: config
.custom_backend_namespace_component_endpoint,
custom_backend_metrics_polling_interval: config.custom_backend_metrics_polling_interval,
custom_backend_registry,
})
}
......@@ -377,6 +409,17 @@ impl HttpServiceConfigBuilder {
self
}
// DEPRECATED: To be removed after custom backends migrate to Dynamo backend.
pub fn with_custom_backend_config(
mut self,
namespace_component_endpoint: Option<String>,
polling_interval: Option<f64>,
) -> Self {
self.custom_backend_namespace_component_endpoint = Some(namespace_component_endpoint);
self.custom_backend_metrics_polling_interval = Some(polling_interval);
self
}
fn get_endpoints_router(
state: Arc<State>,
request_template: &Option<RequestTemplate>,
......
......@@ -58,6 +58,8 @@ pub struct LocalModelBuilder {
user_data: Option<serde_json::Value>,
custom_template_path: Option<PathBuf>,
namespace: Option<String>,
custom_backend_metrics_endpoint: Option<String>,
custom_backend_metrics_polling_interval: Option<f64>,
}
impl Default for LocalModelBuilder {
......@@ -81,6 +83,8 @@ impl Default for LocalModelBuilder {
user_data: Default::default(),
custom_template_path: Default::default(),
namespace: Default::default(),
custom_backend_metrics_endpoint: Default::default(),
custom_backend_metrics_polling_interval: Default::default(),
}
}
}
......@@ -177,6 +181,16 @@ impl LocalModelBuilder {
self
}
pub fn custom_backend_metrics_endpoint(&mut self, endpoint: Option<String>) -> &mut Self {
self.custom_backend_metrics_endpoint = endpoint;
self
}
pub fn custom_backend_metrics_polling_interval(&mut self, interval: Option<f64>) -> &mut Self {
self.custom_backend_metrics_polling_interval = interval;
self
}
/// Make an LLM ready for use:
/// - Download it from Hugging Face (and NGC in future) if necessary
/// - Resolve the path
......@@ -221,6 +235,9 @@ impl LocalModelBuilder {
router_config: self.router_config.take().unwrap_or_default(),
runtime_config: self.runtime_config.clone(),
namespace: self.namespace.clone(),
custom_backend_metrics_endpoint: self.custom_backend_metrics_endpoint.clone(),
custom_backend_metrics_polling_interval: self
.custom_backend_metrics_polling_interval,
});
}
......@@ -296,6 +313,8 @@ impl LocalModelBuilder {
router_config: self.router_config.take().unwrap_or_default(),
runtime_config: self.runtime_config.clone(),
namespace: self.namespace.clone(),
custom_backend_metrics_endpoint: self.custom_backend_metrics_endpoint.clone(),
custom_backend_metrics_polling_interval: self.custom_backend_metrics_polling_interval,
})
}
}
......@@ -313,6 +332,8 @@ pub struct LocalModel {
router_config: RouterConfig,
runtime_config: ModelRuntimeConfig,
namespace: Option<String>,
custom_backend_metrics_endpoint: Option<String>,
custom_backend_metrics_polling_interval: Option<f64>,
}
impl LocalModel {
......@@ -367,6 +388,20 @@ impl LocalModel {
self.namespace.as_deref()
}
pub fn custom_backend_metrics_endpoint(&self) -> Option<&str> {
self.custom_backend_metrics_endpoint.as_deref()
}
pub fn custom_backend_metrics_polling_interval(&self) -> Option<f64> {
self.custom_backend_metrics_polling_interval
}
pub fn is_gguf(&self) -> bool {
// GGUF is the only file (not-folder) we accept, so we don't need to check the extension
// We will error when we come to parse it
self.full_path.is_file()
}
/// An endpoint to identify this model by.
pub fn endpoint_id(&self) -> &EndpointId {
&self.endpoint_id
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment