Unverified Commit 1df620b4 authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

feat: router metrics with dynamo_router_* {worker_id=...}. Update docs (#6227)


Signed-off-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
Co-authored-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
Co-authored-by: default avatarCursor <cursoragent@cursor.com>
parent f8908800
......@@ -262,6 +262,12 @@ The `router_temperature` parameter controls routing randomness:
- To reduce ITL: Decrease the weight
4. If you observe severe load imbalance, increase the temperature setting
## Prometheus Metrics
The router exposes Prometheus metrics on the frontend's HTTP port (default 8000) at `/metrics`. All router metrics require `--router-mode kv` and will not appear when using `round-robin` or `random` routing.
For the full list of router metrics (`dynamo_router_*`, `dynamo_router_overhead_*`, per-worker gauges), see the [Metrics reference](../../observability/metrics.md#router-metrics).
## Disaggregated Serving
Dynamo supports disaggregated serving where prefill (prompt processing) and decode (token generation) are handled by separate worker pools. When you register workers with `ModelType.Prefill` (see [Backend Guide](../../development/backend-guide.md)), the frontend automatically detects them and activates an internal prefill router.
......@@ -436,3 +442,5 @@ curl http://localhost:8000/busy_threshold
- **[KV Router Index Data Structures](../../../../lib/kv-router/README.md)**: `RadixTree`, `ConcurrentRadixTree`, and `PositionalIndexer` internals and concurrency model
- **[Router Design](../../design-docs/router-design.md)**: Architecture details and event transport modes
- **[KV Event Publishing for Custom Engines](../../integrations/kv-events-custom-engines.md)**: Integrate custom inference engines with KV-aware routing
- **[Prometheus and Grafana Setup](../../observability/prometheus-grafana.md)**: General Prometheus/Grafana configuration
- **[Metrics Developer Guide](../../observability/metrics-developer-guide.md)**: How the Dynamo metrics API works
......@@ -112,7 +112,7 @@ The core Dynamo backend system automatically exposes metrics on the system statu
- `dynamo_component_request_duration_seconds`: Request processing time (histogram)
- `dynamo_component_requests_total`: Total requests processed (counter)
- `dynamo_component_response_bytes_total`: Total bytes sent in responses (counter)
- `dynamo_component_uptime_seconds`: DistributedRuntime uptime (gauge)
- `dynamo_component_uptime_seconds`: DistributedRuntime uptime (gauge). Automatically updated before each Prometheus scrape on both the frontend (`/metrics` on port 8000) and system status server (`/metrics` on port 8081).
**Access backend component metrics:**
```bash
......@@ -219,35 +219,72 @@ Suppose the backend allows 3 concurrent requests and there are 10 clients contin
### Router Metrics
When using the KV cache router (`--router-mode kv`), the frontend exposes additional metrics for monitoring routing decisions and overhead. These metrics are defined in `lib/llm/src/kv_router/metrics.rs`.
When using the KV cache router (`--router-mode kv`), the frontend exposes additional metrics for monitoring routing decisions and overhead. These metrics are not registered when using `round-robin` or `random` routing, so they will not appear in `/metrics` output at all. Defined in `lib/llm/src/kv_router/metrics.rs`.
#### Per-Request Routing Overhead (`dynamo_routing_overhead_*`)
For router configuration and tuning, see the [Router Guide](../components/router/router-guide.md).
Histograms (in milliseconds) tracking the time spent in each phase of the routing decision for every request. Exposed on the frontend port (default 8000) at `/metrics`.
#### Router Request Metrics (`dynamo_router_*`)
- `dynamo_routing_overhead_block_hashing_ms`: Time computing block hashes from input tokens
- `dynamo_routing_overhead_indexer_find_matches_ms`: Time in the KV indexer finding prefix matches
- `dynamo_routing_overhead_seq_hashing_ms`: Time computing sequence hashes for active block tracking
- `dynamo_routing_overhead_scheduling_ms`: Time in the scheduler selecting a worker (includes channel round-trip and load-aware selection)
- `dynamo_routing_overhead_total_ms`: Total routing overhead per request (sum of all phases above)
Histograms and counters for aggregate request-level statistics. Only registered when `--router-mode kv` is used. If no requests have been routed yet, the metrics will exist but show zero values. Exposed on the frontend port (default 8000) at `/metrics`.
#### Per-Worker Load (`dynamo_frontend_worker_active_*`)
All metrics carry a `router_id` constant label (the frontend's discovery instance ID). Filter in Prometheus with:
Gauges tracking the active load on each worker, labeled by `worker_id`, `dp_rank`, and `worker_type`. Exposed on the frontend port at `/metrics`.
```promql
dynamo_router_requests_total{router_id="12345"}
```
| Metric | Type | Description |
|--------|------|-------------|
| `dynamo_router_requests_total` | Counter | Total requests processed by the router |
| `dynamo_router_time_to_first_token_seconds` | Histogram | Time to first token (seconds) |
| `dynamo_router_inter_token_latency_seconds` | Histogram | Average inter-token latency (seconds) |
| `dynamo_router_input_sequence_tokens` | Histogram | Input sequence length (tokens) |
| `dynamo_router_output_sequence_tokens` | Histogram | Output sequence length (tokens) |
| `dynamo_router_kv_hit_rate` | Histogram | Predicted KV cache hit rate at routing time (0.0-1.0) |
#### Per-Request Routing Overhead (`dynamo_router_overhead_*`)
Histograms (in milliseconds) tracking the time spent in each phase of the routing decision for every request. Created on first routing decision. Same `router_id` label as the request metrics above.
| Metric | Type | Description |
|--------|------|-------------|
| `dynamo_router_overhead_block_hashing_ms` | Histogram | Time computing block hashes |
| `dynamo_router_overhead_indexer_find_matches_ms` | Histogram | Time in indexer find_matches |
| `dynamo_router_overhead_seq_hashing_ms` | Histogram | Time computing sequence hashes |
| `dynamo_router_overhead_scheduling_ms` | Histogram | Time in scheduler worker selection |
| `dynamo_router_overhead_total_ms` | Histogram | Total routing overhead per request |
#### KV Indexer Metrics
Tracks KV cache events applied to the router's radix tree index. Only appears when `--router-kv-overlap-score-weight` is greater than 0 (default) and workers are publishing KV events. Will not appear if `--router-kv-overlap-score-weight 0` is set or no KV events have been received.
| Metric | Type | Description |
|--------|------|-------------|
| `dynamo_component_kv_cache_events_applied` | Counter | KV cache events applied to the index |
**Additional labels:** `status` (`ok` / `error`), `event_type` (`stored` / `removed` / `cleared`)
#### Per-Worker Load and Timing Gauges (`dynamo_frontend_worker_*`)
These appear once workers register and begin serving requests. They are registered on the frontend's local Prometheus registry (not component-scoped) and do not carry `dynamo_namespace` or `dynamo_component` labels.
- `dynamo_frontend_worker_active_decode_blocks`: Active KV cache decode blocks per worker
- `dynamo_frontend_worker_active_prefill_tokens`: Active prefill tokens queued per worker
| Metric | Type | Description |
|--------|------|-------------|
| `dynamo_frontend_worker_active_decode_blocks` | Gauge | Active KV cache decode blocks per worker |
| `dynamo_frontend_worker_active_prefill_tokens` | Gauge | Active prefill tokens queued per worker |
| `dynamo_frontend_worker_last_time_to_first_token_seconds` | Gauge | Last observed TTFT per worker (seconds) |
| `dynamo_frontend_worker_last_input_sequence_tokens` | Gauge | Last observed input sequence length per worker |
| `dynamo_frontend_worker_last_inter_token_latency_seconds` | Gauge | Last observed ITL per worker (seconds) |
#### Router Request Metrics (`dynamo_component_router_*`)
**Labels:**
Component-scoped histograms and counters for aggregate request-level statistics. These use the `dynamo_component_*` prefix with standard component labels (`dynamo_namespace`, `dynamo_component`, `dynamo_endpoint`). The `dynamo_component` label is set to the frontend component name. Exposed on the frontend port at `/metrics`.
| Label | Example Value | Description |
|-------|---------------|-------------|
| `worker_id` | `7890` | Worker instance ID (etcd lease ID) |
| `dp_rank` | `0` | Data-parallel rank |
| `worker_type` | `prefill` or `decode` | Worker role |
- `dynamo_component_router_requests_total`: Total requests processed by the router (counter)
- `dynamo_component_router_time_to_first_token_seconds`: Time to first token as observed at the router (histogram)
- `dynamo_component_router_inter_token_latency_seconds`: Average inter-token latency at the router (histogram)
- `dynamo_component_router_input_sequence_tokens`: Input sequence length in tokens (histogram)
- `dynamo_component_router_output_sequence_tokens`: Output sequence length in tokens (histogram)
- `dynamo_component_router_kv_hit_rate`: Predicted KV cache hit rate at routing time, 0.0-1.0 (histogram)
In disaggregated mode, the `worker_type` label shows both `"prefill"` and `"decode"` values; in aggregated mode, all workers report as `"decode"`.
## Related Documentation
......
......@@ -28,6 +28,13 @@ Usage (both patterns supported):
from __future__ import annotations
class component_names:
"""Well-known component names used as values for the `dynamo_component` label."""
# Component name for the KV router (frontend-side request routing).
ROUTER = "router"
class distributed_runtime:
"""DistributedRuntime core metrics"""
......@@ -55,6 +62,8 @@ class frontend_service:
INPUT_SEQUENCE_TOKENS = "input_sequence_tokens"
# Output sequence length in tokens
OUTPUT_SEQUENCE_TOKENS = "output_sequence_tokens"
# Predicted KV cache hit rate at routing time (0.0-1.0)
KV_HIT_RATE = "kv_hit_rate"
# Number of cached tokens (prefix cache hits) per request
CACHED_TOKENS = "cached_tokens"
# Tokenizer latency in milliseconds
......@@ -164,6 +173,8 @@ class labels:
# Note: this is not an auto-inserted label like `dynamo_namespace`/`dynamo_component`.
# It is used by worker/load-style metrics that need to disambiguate per-worker series.
DP_RANK = "dp_rank"
# Label for worker instance ID (etcd lease ID).
WORKER_ID = "worker_id"
# Label for model name/path (OpenAI API standard, injected by Dynamo)
# This is the standard label name injected by all backends in metrics_labels=[("model", ...)].
# Ensures compatibility with OpenAI-compatible tooling.
......@@ -191,6 +202,21 @@ class name_prefix:
FRONTEND = "dynamo_frontend"
class routing_overhead:
"""Routing overhead phase latency histogram names (component-scoped)."""
# Time spent computing block hashes
BLOCK_HASHING_MS = "overhead_block_hashing_ms"
# Time spent in indexer find_matches
INDEXER_FIND_MATCHES_MS = "overhead_indexer_find_matches_ms"
# Time spent computing sequence hashes
SEQ_HASHING_MS = "overhead_seq_hashing_ms"
# Time spent in scheduler worker selection
SCHEDULING_MS = "overhead_scheduling_ms"
# Total routing overhead per request
TOTAL_MS = "overhead_total_ms"
class task_tracker:
"""Task tracker Prometheus metric name suffixes"""
......
......@@ -183,6 +183,7 @@ impl KvbmMetrics {
let (_route_docs, app): (Vec<RouteDoc>, Router) = router(
(*registry).clone(), // take owned Registry (Clone) for router to wrap in Arc
None, // or Some("/metrics".to_string()) to override the path
None, // no DRT metrics for standalone KVBM server
);
let run_server = async move {
......
......@@ -16,6 +16,7 @@ use crate::{
},
};
use dynamo_runtime::DistributedRuntime;
use dynamo_runtime::metrics::MetricsHierarchy;
/// Build and run an HTTP service
pub async fn run(
......@@ -53,6 +54,16 @@ pub async fn run(
http_service_builder =
http_service_builder.with_request_template(engine_config.local_model().request_template());
// Inject the DRT's metrics registry so that component-scoped metrics
// (e.g. KvIndexerMetrics) are exposed (default port 8000 if not overridden).
http_service_builder =
http_service_builder.drt_metrics(Some(distributed_runtime.get_metrics_registry().clone()));
// Wire DRT discovery so that router metrics (dynamo_router_*) are registered
// with the instance_id as the router_id label.
http_service_builder =
http_service_builder.drt_discovery(Some(distributed_runtime.discovery()));
let http_service = match engine_config {
EngineConfig::Dynamic {
ref model,
......
......@@ -40,7 +40,8 @@ pub static WORKER_LAST_TIME_TO_FIRST_TOKEN_GAUGE: LazyLock<GaugeVec> = LazyLock:
GaugeVec::new(
Opts::new(
format!(
"dynamo_frontend_{}",
"{}_{}",
name_prefix::FRONTEND,
frontend_service::WORKER_LAST_TIME_TO_FIRST_TOKEN_SECONDS
),
"Last observed time to first token per worker (seconds)",
......@@ -57,7 +58,8 @@ pub static WORKER_LAST_INPUT_SEQUENCE_TOKENS_GAUGE: LazyLock<IntGaugeVec> = Lazy
IntGaugeVec::new(
Opts::new(
format!(
"dynamo_frontend_{}",
"{}_{}",
name_prefix::FRONTEND,
frontend_service::WORKER_LAST_INPUT_SEQUENCE_TOKENS
),
"Last observed input sequence tokens per worker",
......@@ -73,7 +75,8 @@ pub static WORKER_LAST_INTER_TOKEN_LATENCY_GAUGE: LazyLock<GaugeVec> = LazyLock:
GaugeVec::new(
Opts::new(
format!(
"dynamo_frontend_{}",
"{}_{}",
name_prefix::FRONTEND,
frontend_service::WORKER_LAST_INTER_TOKEN_LATENCY_SECONDS
),
"Last observed inter-token latency per worker (seconds)",
......@@ -220,9 +223,13 @@ fn parse_bucket_config(
(min, max, count)
}
/// State for metrics handler with custom backend support
/// State for metrics handler.
/// Optionally holds a reference to the DRT's [`MetricsRegistry`] so that
/// metrics created via `metrics().create*()` anywhere in the process are
/// automatically included in the `/metrics` response.
struct MetricsHandlerState {
registry: Arc<Registry>,
drt_metrics: Option<dynamo_runtime::metrics::MetricsRegistry>,
}
pub struct Metrics {
......@@ -425,6 +432,9 @@ impl Metrics {
/// Metrics are never removed to preserve historical data. Runtime config and MDC
/// metrics are updated when models are discovered and their configurations are available.
pub fn new() -> Self {
// TODO: Remove DYN_METRICS_PREFIX env-var override (added in PR #2432 for
// NIM compatibility with the old "nv_llm_http_service_" prefix). No longer
// needed — hardcode name_prefix::FRONTEND and drop the sanitize function.
let raw_prefix = std::env::var(env_metrics::DYN_METRICS_PREFIX)
.unwrap_or_else(|_| name_prefix::FRONTEND.to_string());
let prefix = sanitize_frontend_prometheus_prefix(&raw_prefix);
......@@ -1379,13 +1389,21 @@ pub fn process_response_using_event_converter_and_observe_metrics<T: Serialize>(
}
}
/// Create a new router with optional custom backend metrics support
pub fn router(registry: Registry, path: Option<String>) -> (Vec<RouteDoc>, Router) {
/// Create a new router with optional DRT metrics integration.
///
/// When `drt_metrics` is provided, the `/metrics` handler will also include
/// metrics from the DRT's registry tree (anything created via `metrics().create*()`).
pub fn router(
registry: Registry,
path: Option<String>,
drt_metrics: Option<dynamo_runtime::metrics::MetricsRegistry>,
) -> (Vec<RouteDoc>, Router) {
let path = path.unwrap_or_else(|| "/metrics".to_string());
let doc = RouteDoc::new(axum::http::Method::GET, &path);
let metrics_state = MetricsHandlerState {
registry: Arc::new(registry),
drt_metrics,
};
let route = Router::new()
......@@ -1394,7 +1412,10 @@ pub fn router(registry: Registry, path: Option<String>) -> (Vec<RouteDoc>, Route
(vec![doc], route)
}
/// Unified metrics handler
/// Unified metrics handler.
///
/// Gathers from the local HTTP-service registry first, then appends any
/// metrics from the DRT's registry tree (if configured).
async fn handler_metrics(State(state): State<Arc<MetricsHandlerState>>) -> impl IntoResponse {
let encoder = prometheus::TextEncoder::new();
let metric_families = state.registry.gather();
......@@ -1407,7 +1428,7 @@ async fn handler_metrics(State(state): State<Arc<MetricsHandlerState>>) -> impl
.into_response();
}
let metrics = match String::from_utf8(buffer) {
let mut metrics = match String::from_utf8(buffer) {
Ok(metrics) => metrics,
Err(_) => {
return (
......@@ -1418,6 +1439,23 @@ async fn handler_metrics(State(state): State<Arc<MetricsHandlerState>>) -> impl
}
};
// Append DRT registry tree metrics (anything created via metrics().create*()).
if let Some(ref drt_metrics) = state.drt_metrics {
match drt_metrics.prometheus_expfmt_combined() {
Ok(drt_text) => {
if !drt_text.is_empty() {
if !metrics.is_empty() && !metrics.ends_with('\n') {
metrics.push('\n');
}
metrics.push_str(&drt_text);
}
}
Err(e) => {
tracing::warn!("Failed to gather DRT metrics: {}", e);
}
}
}
(StatusCode::OK, metrics).into_response()
}
......
......@@ -18,7 +18,9 @@ use super::metrics;
use super::metrics::register_worker_timing_metrics;
use crate::discovery::ModelManager;
use crate::endpoint_type::EndpointType;
use crate::kv_router::metrics::{register_routing_overhead_metrics, register_worker_load_metrics};
use crate::kv_router::metrics::{
RouterRequestMetrics, RoutingOverheadMetrics, register_worker_load_metrics,
};
use crate::request_template::RequestTemplate;
use anyhow::Result;
use axum_server::tls_rustls::RustlsConfig;
......@@ -211,6 +213,16 @@ pub struct HttpServiceConfig {
#[builder(default = "None")]
cancel_token: Option<CancellationToken>,
/// When set, the `/metrics` endpoint will also expose metrics from the
/// DRT's registry tree (anything created via `metrics().create*()`).
#[builder(default = "None")]
drt_metrics: Option<dynamo_runtime::metrics::MetricsRegistry>,
/// When set (e.g. DRT discovery), router metrics (dynamo_router_* with router_id label)
/// are registered using discovery.instance_id() and exposed on /metrics.
#[builder(default = "None")]
drt_discovery: Option<Arc<dyn Discovery>>,
}
impl HttpService {
......@@ -416,10 +428,14 @@ impl HttpServiceConfigBuilder {
tracing::warn!("Failed to register worker timing metrics: {}", e);
}
// Register routing overhead metrics (block hashing, find matches, scheduling latencies)
// These are updated by KvRouter::find_best_match on every routing decision
if let Err(e) = register_routing_overhead_metrics(&registry) {
tracing::warn!("Failed to register routing overhead metrics: {}", e);
if let Some(ref discovery) = config.drt_discovery {
let instance_id = discovery.instance_id();
if let Err(e) = RouterRequestMetrics::register(&registry, instance_id) {
tracing::warn!("Failed to register router request metrics: {}", e);
}
if let Err(e) = RoutingOverheadMetrics::register(&registry, instance_id) {
tracing::warn!("Failed to register routing overhead metrics: {}", e);
}
}
let mut router = axum::Router::new();
......@@ -427,7 +443,11 @@ impl HttpServiceConfigBuilder {
let mut all_docs = Vec::new();
let mut routes = vec![
metrics::router(registry, var(HTTP_SVC_METRICS_PATH_ENV).ok()),
metrics::router(
registry,
var(HTTP_SVC_METRICS_PATH_ENV).ok(),
config.drt_metrics,
),
super::openai::list_models_router(state.clone(), var(HTTP_SVC_MODELS_PATH_ENV).ok()),
super::health::health_check_router(state.clone(), var(HTTP_SVC_HEALTH_PATH_ENV).ok()),
super::health::live_check_router(state.clone(), var(HTTP_SVC_LIVE_PATH_ENV).ok()),
......
......@@ -414,12 +414,14 @@ impl KvRouter {
.await?;
let total_elapsed = start.elapsed();
metrics::ROUTING_OVERHEAD_METRICS.observe(
hash_elapsed,
find_matches_elapsed,
seq_hash_elapsed,
total_elapsed,
);
if let Some(m) = metrics::RoutingOverheadMetrics::get() {
m.observe(
hash_elapsed,
find_matches_elapsed,
seq_hash_elapsed,
total_elapsed,
);
}
#[cfg(feature = "bench")]
tracing::info!(
......
......@@ -13,15 +13,19 @@
use std::sync::{Arc, LazyLock, OnceLock};
use std::time::Duration;
use dynamo_runtime::component::Component;
use dynamo_runtime::metrics::MetricsHierarchy;
use dynamo_runtime::metrics::prometheus_names::{
frontend_service, labels, name_prefix, routing_overhead,
};
use prometheus::{IntGaugeVec, Opts};
use prometheus::{HistogramOpts, IntCounter, IntGaugeVec, Opts};
use crate::http::service::metrics::generate_log_buckets;
/// Exponential buckets for routing overhead histograms:
/// from 0.0001 ms (0.1 µs) to ~13.1 ms, factor 2, 18 steps.
fn overhead_buckets() -> Vec<f64> {
prometheus::exponential_buckets(0.0001, 2.0, 18).expect("exponential buckets should not fail")
}
// ---------------------------------------------------------------------------
// Worker load metrics (gauges)
// ---------------------------------------------------------------------------
......@@ -104,42 +108,73 @@ pub struct RoutingOverheadMetrics {
pub total: prometheus::Histogram,
}
pub static ROUTING_OVERHEAD_METRICS: LazyLock<RoutingOverheadMetrics> = LazyLock::new(|| {
// Buckets from 0.0001ms (0.1μs) to ~10ms, exponential with factor 2
let buckets = prometheus::exponential_buckets(0.0001, 2.0, 18)
.expect("exponential buckets should not fail");
let make = |suffix: &str, help: &str| {
let name = format!("{}_{}", name_prefix::ROUTING_OVERHEAD, suffix);
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(name, help).buckets(buckets.clone()),
)
.expect("histogram creation should not fail")
};
RoutingOverheadMetrics {
block_hashing: make(
routing_overhead::BLOCK_HASHING_MS,
"Time spent computing block hashes in milliseconds",
),
indexer_find_matches: make(
routing_overhead::INDEXER_FIND_MATCHES_MS,
"Time spent in indexer find_matches in milliseconds",
),
seq_hashing: make(
routing_overhead::SEQ_HASHING_MS,
"Time spent computing sequence hashes in milliseconds",
),
scheduling: make(
routing_overhead::SCHEDULING_MS,
"Time spent in scheduler worker selection in milliseconds",
),
total: make(
routing_overhead::TOTAL_MS,
"Total routing overhead per request in milliseconds",
),
}
});
static ROUTING_OVERHEAD_METRICS: OnceLock<Arc<RoutingOverheadMetrics>> = OnceLock::new();
impl RoutingOverheadMetrics {
/// Register routing overhead histograms with the given registry and store for later use.
/// Metric names: `dynamo_router_overhead_*` with const label `router_id=instance_id`.
/// Call once during HTTP service setup when `--router-mode kv` is used.
pub fn register(
registry: &prometheus::Registry,
instance_id: u64,
) -> Result<(), prometheus::Error> {
let m = ROUTING_OVERHEAD_METRICS.get_or_init(|| {
let buckets = overhead_buckets();
let router_id = instance_id.to_string();
let make = |suffix: &str, help: &str| {
let name = format!("{}_{}", name_prefix::ROUTER, suffix);
prometheus::Histogram::with_opts(
HistogramOpts::new(name, help)
.const_label(labels::ROUTER_ID, &router_id)
.buckets(buckets.clone()),
)
};
let block_hashing = make(
routing_overhead::BLOCK_HASHING_MS,
"Time spent computing block hashes in milliseconds",
)
.expect("overhead_block_hashing_ms");
let indexer_find_matches = make(
routing_overhead::INDEXER_FIND_MATCHES_MS,
"Time spent in indexer find_matches in milliseconds",
)
.expect("overhead_indexer_find_matches_ms");
let seq_hashing = make(
routing_overhead::SEQ_HASHING_MS,
"Time spent computing sequence hashes in milliseconds",
)
.expect("overhead_seq_hashing_ms");
let scheduling = make(
routing_overhead::SCHEDULING_MS,
"Time spent in scheduler worker selection in milliseconds",
)
.expect("overhead_scheduling_ms");
let total = make(
routing_overhead::TOTAL_MS,
"Total routing overhead per request in milliseconds",
)
.expect("overhead_total_ms");
Arc::new(Self {
block_hashing,
indexer_find_matches,
seq_hashing,
scheduling,
total,
})
});
registry.register(Box::new(m.block_hashing.clone()))?;
registry.register(Box::new(m.indexer_find_matches.clone()))?;
registry.register(Box::new(m.seq_hashing.clone()))?;
registry.register(Box::new(m.scheduling.clone()))?;
registry.register(Box::new(m.total.clone()))?;
Ok(())
}
/// Returns the registered metrics if `register()` was called earlier.
pub fn get() -> Option<Arc<Self>> {
ROUTING_OVERHEAD_METRICS.get().cloned()
}
/// Observe routing overhead timings in milliseconds.
pub fn observe(
&self,
......@@ -168,28 +203,14 @@ impl RoutingOverheadMetrics {
}
}
/// Register the routing overhead histograms with the given Prometheus registry.
pub fn register_routing_overhead_metrics(
registry: &prometheus::Registry,
) -> Result<(), prometheus::Error> {
let m = &*ROUTING_OVERHEAD_METRICS;
registry.register(Box::new(m.block_hashing.clone()))?;
registry.register(Box::new(m.indexer_find_matches.clone()))?;
registry.register(Box::new(m.seq_hashing.clone()))?;
registry.register(Box::new(m.scheduling.clone()))?;
registry.register(Box::new(m.total.clone()))?;
Ok(())
}
// ---------------------------------------------------------------------------
// Router request metrics (component-scoped aggregate histograms + counter)
// Router request metrics (dynamo_router_* with router_id label)
// ---------------------------------------------------------------------------
/// Aggregate per-request metrics observed at the router level.
/// Component-scoped via `from_component()` to get automatic namespace/component labels.
/// Uses `router_` prefix to distinguish from `ResponseMetricCollector` metrics.
/// Registered via `register()` with `dynamo_router_*` names and `router_id` label.
pub struct RouterRequestMetrics {
pub requests_total: prometheus::IntCounter,
pub requests_total: IntCounter,
pub time_to_first_token_seconds: prometheus::Histogram,
pub inter_token_latency_seconds: prometheus::Histogram,
pub input_sequence_tokens: prometheus::Histogram,
......@@ -200,89 +221,109 @@ pub struct RouterRequestMetrics {
static ROUTER_REQUEST_METRICS: OnceLock<Arc<RouterRequestMetrics>> = OnceLock::new();
impl RouterRequestMetrics {
fn new(
requests_total: prometheus::IntCounter,
time_to_first_token_seconds: prometheus::Histogram,
inter_token_latency_seconds: prometheus::Histogram,
input_sequence_tokens: prometheus::Histogram,
output_sequence_tokens: prometheus::Histogram,
kv_hit_rate: prometheus::Histogram,
) -> Self {
Self {
requests_total,
time_to_first_token_seconds,
inter_token_latency_seconds,
input_sequence_tokens,
output_sequence_tokens,
kv_hit_rate,
}
/// Register router request metrics with the given registry and store for later use.
/// Metric names: `dynamo_router_*` with const label `router_id=instance_id`.
/// Call once during HTTP service setup when `--router-mode kv` is used.
pub fn register(
registry: &prometheus::Registry,
instance_id: u64,
) -> Result<(), prometheus::Error> {
let m = ROUTER_REQUEST_METRICS.get_or_init(|| {
let router_id = instance_id.to_string();
let requests_total = IntCounter::with_opts(
Opts::new(
format!(
"{}_{}",
name_prefix::ROUTER,
frontend_service::REQUESTS_TOTAL
),
"Total number of requests processed by the router",
)
.const_label(labels::ROUTER_ID, &router_id),
)
.expect("dynamo_router_requests_total");
let time_to_first_token_seconds = prometheus::Histogram::with_opts(
HistogramOpts::new(
format!(
"{}_{}",
name_prefix::ROUTER,
frontend_service::TIME_TO_FIRST_TOKEN_SECONDS
),
"Time to first token observed at the router",
)
.const_label(labels::ROUTER_ID, &router_id)
.buckets(generate_log_buckets(0.001, 480.0, 18)),
)
.expect("dynamo_router_time_to_first_token_seconds");
let inter_token_latency_seconds = prometheus::Histogram::with_opts(
HistogramOpts::new(
format!(
"{}_{}",
name_prefix::ROUTER,
frontend_service::INTER_TOKEN_LATENCY_SECONDS
),
"Average inter-token latency observed at the router",
)
.const_label(labels::ROUTER_ID, &router_id)
.buckets(generate_log_buckets(0.001, 2.0, 13)),
)
.expect("dynamo_router_inter_token_latency_seconds");
let input_sequence_tokens = prometheus::Histogram::with_opts(
HistogramOpts::new(
format!(
"{}_{}",
name_prefix::ROUTER,
frontend_service::INPUT_SEQUENCE_TOKENS
),
"Input sequence length in tokens observed at the router",
)
.const_label(labels::ROUTER_ID, &router_id)
.buckets(generate_log_buckets(50.0, 128000.0, 12)),
)
.expect("dynamo_router_input_sequence_tokens");
let output_sequence_tokens = prometheus::Histogram::with_opts(
HistogramOpts::new(
format!(
"{}_{}",
name_prefix::ROUTER,
frontend_service::OUTPUT_SEQUENCE_TOKENS
),
"Output sequence length in tokens observed at the router",
)
.const_label(labels::ROUTER_ID, &router_id)
.buckets(generate_log_buckets(50.0, 32000.0, 10)),
)
.expect("dynamo_router_output_sequence_tokens");
let kv_hit_rate = prometheus::Histogram::with_opts(
HistogramOpts::new(
format!("{}_{}", name_prefix::ROUTER, frontend_service::KV_HIT_RATE),
"Predicted KV cache hit rate at routing time (0.0-1.0)",
)
.const_label(labels::ROUTER_ID, &router_id)
.buckets(prometheus::linear_buckets(0.0, 0.05, 21).unwrap()),
)
.expect("dynamo_router_kv_hit_rate");
Arc::new(Self {
requests_total,
time_to_first_token_seconds,
inter_token_latency_seconds,
input_sequence_tokens,
output_sequence_tokens,
kv_hit_rate,
})
});
registry.register(Box::new(m.requests_total.clone()))?;
registry.register(Box::new(m.time_to_first_token_seconds.clone()))?;
registry.register(Box::new(m.inter_token_latency_seconds.clone()))?;
registry.register(Box::new(m.input_sequence_tokens.clone()))?;
registry.register(Box::new(m.output_sequence_tokens.clone()))?;
registry.register(Box::new(m.kv_hit_rate.clone()))?;
Ok(())
}
// TODO: move all `router_*` metric name strings to `prometheus_names.rs` constants
// for consistency with the other metric families (routing_overhead, frontend_service).
/// Create from a Component, memoized in a static OnceLock.
pub fn from_component(component: &Component) -> Arc<Self> {
ROUTER_REQUEST_METRICS
.get_or_init(|| {
let metrics = component.metrics();
let requests_total = metrics
.create_intcounter(
"router_requests_total",
"Total number of requests processed by the router",
&[],
)
.expect("failed to create router_requests_total");
let time_to_first_token_seconds = metrics
.create_histogram(
"router_time_to_first_token_seconds",
"Time to first token observed at the router",
&[],
Some(generate_log_buckets(0.001, 480.0, 18)),
)
.expect("failed to create router_time_to_first_token_seconds");
let inter_token_latency_seconds = metrics
.create_histogram(
"router_inter_token_latency_seconds",
"Average inter-token latency observed at the router",
&[],
Some(generate_log_buckets(0.001, 2.0, 13)),
)
.expect("failed to create router_inter_token_latency_seconds");
let input_sequence_tokens = metrics
.create_histogram(
"router_input_sequence_tokens",
"Input sequence length in tokens observed at the router",
&[],
Some(generate_log_buckets(50.0, 128000.0, 12)),
)
.expect("failed to create router_input_sequence_tokens");
let output_sequence_tokens = metrics
.create_histogram(
"router_output_sequence_tokens",
"Output sequence length in tokens observed at the router",
&[],
Some(generate_log_buckets(50.0, 32000.0, 10)),
)
.expect("failed to create router_output_sequence_tokens");
let kv_hit_rate = metrics
.create_histogram(
"router_kv_hit_rate",
"Predicted KV cache hit rate at routing time (0.0-1.0)",
&[],
Some(prometheus::linear_buckets(0.0, 0.05, 21).unwrap()),
)
.expect("failed to create router_kv_hit_rate");
Arc::new(Self::new(
requests_total,
time_to_first_token_seconds,
inter_token_latency_seconds,
input_sequence_tokens,
output_sequence_tokens,
kv_hit_rate,
))
})
.clone()
/// Returns the registered metrics if `register()` was called earlier.
pub fn get() -> Option<Arc<Self>> {
ROUTER_REQUEST_METRICS.get().cloned()
}
}
......@@ -353,34 +394,34 @@ dynamo_frontend_worker_active_prefill_tokens{dp_rank=\"0\",worker_id=\"123\",wor
#[test]
fn test_routing_overhead_metric_names_pef() {
// Verify the overhead constants produce valid histogram names when
// combined with dynamo_router_ prefix.
let registry = prometheus::Registry::new();
let buckets = prometheus::exponential_buckets(0.0001, 2.0, 18).unwrap();
let make = |suffix: &str, help: &str| {
let name = format!("{}_{}", name_prefix::ROUTING_OVERHEAD, suffix);
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(name, help).buckets(buckets.clone()),
let buckets = overhead_buckets();
let prefix = name_prefix::ROUTER;
let name = format!("{}_{}", prefix, routing_overhead::TOTAL_MS);
let total = prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(
name,
"Total routing overhead per request in milliseconds",
)
.unwrap()
};
let total = make(
routing_overhead::TOTAL_MS,
"Total routing overhead per request in milliseconds",
);
.buckets(buckets),
)
.unwrap();
registry.register(Box::new(total.clone())).unwrap();
total.observe(1.5);
let output = gather_pef(&registry);
assert!(
output.contains("# HELP dynamo_routing_overhead_total_ms"),
output.contains("# HELP dynamo_router_overhead_total_ms"),
"PEF missing HELP for routing overhead metric"
);
assert!(
output.contains("# TYPE dynamo_routing_overhead_total_ms histogram"),
output.contains("# TYPE dynamo_router_overhead_total_ms histogram"),
"PEF missing TYPE for routing overhead metric"
);
assert!(
output.contains("dynamo_routing_overhead_total_ms_count 1"),
output.contains("dynamo_router_overhead_total_ms_count 1"),
"PEF missing observation count"
);
}
......
......@@ -50,7 +50,7 @@ struct RequestGuard {
chooser: Arc<KvRouter>,
context_id: String,
tracker: Option<Arc<RequestTracker>>,
request_metrics: Arc<RouterRequestMetrics>,
request_metrics: Option<Arc<RouterRequestMetrics>>,
cumulative_osl: usize,
metrics_recorded: bool,
freed: bool,
......@@ -87,10 +87,8 @@ impl RequestGuard {
if !self.first_token_recorded && new_tokens > 0 {
if let Some(ref tracker) = self.tracker {
tracker.record_first_token();
if let Some(ttft) = tracker.ttft_ms() {
self.request_metrics
.time_to_first_token_seconds
.observe(ttft / 1000.0);
if let (Some(m), Some(ttft)) = (&self.request_metrics, tracker.ttft_ms()) {
m.time_to_first_token_seconds.observe(ttft / 1000.0);
}
}
self.first_token_recorded = true;
......@@ -118,10 +116,9 @@ impl RequestGuard {
if let Some(ref tracker) = self.tracker {
tracker.record_osl(self.cumulative_osl);
tracker.record_finish();
if let Some(avg_itl) = tracker.avg_itl_ms() {
self.request_metrics
.inter_token_latency_seconds
.observe(avg_itl / 1000.0);
if let (Some(m), Some(avg_itl)) = (&self.request_metrics, tracker.avg_itl_ms())
{
m.inter_token_latency_seconds.observe(avg_itl / 1000.0);
}
}
......@@ -146,11 +143,11 @@ impl RequestGuard {
if let Some(ref tracker) = self.tracker {
tracker.record_finish();
tracker.record_osl(self.cumulative_osl);
self.request_metrics
.output_sequence_tokens
.observe(self.cumulative_osl as f64);
}
self.request_metrics.requests_total.inc();
if let Some(ref m) = self.request_metrics {
m.output_sequence_tokens.observe(self.cumulative_osl as f64);
m.requests_total.inc();
}
}
}
......@@ -368,8 +365,7 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
}
// Record routing metrics on tracker and observe ISL + prefill start.
let request_metrics =
RouterRequestMetrics::from_component(self.chooser.client().endpoint.component());
let request_metrics = RouterRequestMetrics::get();
if let Some(ref tracker) = request.tracker {
let (routing_token_ids, _) = request.block_mm_routing_info();
let isl_blocks = routing_token_ids.len().div_ceil(block_size);
......@@ -379,13 +375,14 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
overlap_amount as usize * block_size,
);
tracker.record_worker_full(instance_id, dp_rank, self.chooser.worker_type());
if let Some(hit_rate) = tracker.kv_hit_rate() {
request_metrics.kv_hit_rate.observe(hit_rate);
if let (Some(m), Some(hit_rate)) = (&request_metrics, tracker.kv_hit_rate()) {
m.kv_hit_rate.observe(hit_rate);
}
}
request_metrics
.input_sequence_tokens
.observe(request.token_ids.len() as f64);
if let Some(ref m) = request_metrics {
m.input_sequence_tokens
.observe(request.token_ids.len() as f64);
}
// Handle query-only requests: early return with worker info
if is_query_only {
......
......@@ -198,6 +198,18 @@ impl DistributedRuntime {
.lock()
.initialize_uptime_gauge(&distributed_runtime)?;
// Register an update callback so the uptime gauge is refreshed before
// every Prometheus scrape (both system status server and frontend).
{
let system_health = distributed_runtime.system_health.clone();
distributed_runtime
.metrics_registry
.add_update_callback(std::sync::Arc::new(move || {
system_health.lock().update_uptime_gauge();
Ok(())
}));
}
// Handle system status server initialization
if let Some(cancel_token) = cancel_token {
// System server is enabled - start both the state and HTTP server
......
......@@ -1491,7 +1491,9 @@ dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string();
println!("DRT output:");
println!("{}", drt_output_raw);
let expected_drt_output = r#"# HELP dynamo_component_testcounter A test counter
// The uptime_seconds value is dynamic (depends on elapsed wall-clock time),
// so we check all other lines exactly and validate uptime separately.
let expected_drt_output_without_uptime = r#"# HELP dynamo_component_testcounter A test counter
# TYPE dynamo_component_testcounter counter
dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345",dynamo_namespace="ns345"} 123.456789
# HELP dynamo_component_testcountervec A test counter vector
......@@ -1526,20 +1528,53 @@ dynamo_component_testintgauge{dynamo_namespace="ns345"} 42
# HELP dynamo_component_testintgaugevec A test int gauge vector
# TYPE dynamo_component_testintgaugevec gauge
dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server1",service="api",status="active"} 10
dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server2",service="api",status="inactive"} 0
# HELP dynamo_component_uptime_seconds Total uptime of the DistributedRuntime in seconds
# TYPE dynamo_component_uptime_seconds gauge
dynamo_component_uptime_seconds 0"#.to_string();
dynamo_component_testintgaugevec{dynamo_namespace="ns345",instance="server2",service="api",status="inactive"} 0"#;
// Split actual output into non-uptime lines and the uptime value line.
let mut non_uptime_lines = Vec::new();
let mut uptime_value: Option<f64> = None;
for line in drt_output_raw.trim_end_matches('\n').lines() {
if line.starts_with("dynamo_component_uptime_seconds ") {
let val_str = line
.strip_prefix("dynamo_component_uptime_seconds ")
.unwrap();
uptime_value = Some(val_str.parse::<f64>().expect("uptime should be a float"));
} else if line.starts_with("# HELP dynamo_component_uptime_seconds")
|| line.starts_with("# TYPE dynamo_component_uptime_seconds")
{
// Skip HELP/TYPE lines for uptime (we just verify it exists via the value)
} else {
non_uptime_lines.push(line);
}
}
let actual_without_uptime = non_uptime_lines.join("\n");
assert_eq!(
drt_output_raw.trim_end_matches('\n'),
expected_drt_output.trim_end_matches('\n'),
"\n=== DRT COMPARISON FAILED ===\n\
actual_without_uptime,
expected_drt_output_without_uptime.trim_end_matches('\n'),
"\n=== DRT COMPARISON FAILED (excluding uptime) ===\n\
Expected:\n{}\n\
Actual (filtered):\n{}\n\
Actual:\n{}\n\
==============================",
expected_drt_output,
drt_output_raw
expected_drt_output_without_uptime,
actual_without_uptime
);
// Wait briefly so the uptime gauge is clearly positive on the next scrape.
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
let drt_output_after = drt.metrics().prometheus_expfmt().unwrap();
let uptime_after: f64 = drt_output_after
.lines()
.find(|l| l.starts_with("dynamo_component_uptime_seconds "))
.expect("uptime_seconds metric should be present after sleep")
.strip_prefix("dynamo_component_uptime_seconds ")
.unwrap()
.parse()
.expect("uptime should be a float");
assert!(
uptime_after > 0.0,
"uptime_seconds should be > 0 after 10ms sleep, got {}",
uptime_after
);
println!("✓ All Prometheus format outputs verified successfully!");
......
......@@ -69,8 +69,8 @@ pub mod name_prefix {
/// Prefix for frontend service metrics
pub const FRONTEND: &str = "dynamo_frontend";
/// Prefix for routing overhead metrics (raw Prometheus, not component-scoped)
pub const ROUTING_OVERHEAD: &str = "dynamo_routing_overhead";
/// Prefix for KV router metrics (used with router_id label)
pub const ROUTER: &str = "dynamo_router";
}
/// Automatically inserted Prometheus label names used across the metrics system
......@@ -112,14 +112,32 @@ pub mod labels {
/// Label for worker type (e.g., "aggregated", "prefill", "decode", "encoder", etc.)
pub const WORKER_TYPE: &str = "worker_type";
/// Label for router instance (discovery.instance_id() of the frontend)
pub const ROUTER_ID: &str = "router_id";
}
/// Well-known component names used as values for the `dynamo_component` label.
///
/// These are the canonical names passed to `namespace.component(name)` to create
/// `Component` instances whose metrics carry `dynamo_component=<name>`.
///
/// Python codegen: These constants are exported to lib/bindings/python/src/dynamo/prometheus_names.py
pub mod component_names {
/// Component name for the KV router (frontend-side request routing).
pub const ROUTER: &str = "router";
// TODO: add PREFILL = "prefill" and DECODE = "decode" component names
// and migrate backend worker component creation to use these constants.
}
/// Frontend service metrics (LLM HTTP service)
///
/// ⚠️ Python codegen: Run gen-python-prometheus-names after changes
pub mod frontend_service {
// TODO: Move DYN_METRICS_PREFIX and other environment variable names to environment_names.rs
// for centralized environment variable constant management across the codebase
// TODO: Remove DYN_METRICS_PREFIX — the custom prefix override was added for NIM
// compatibility (PR #2432) but is no longer needed. All frontend metrics should
// use the fixed `dynamo_frontend_` prefix from `name_prefix::FRONTEND`.
/// Environment variable that overrides the default metric prefix
pub const METRICS_PREFIX_ENV: &str = "DYN_METRICS_PREFIX";
......@@ -145,6 +163,9 @@ pub mod frontend_service {
/// Output sequence length in tokens
pub const OUTPUT_SEQUENCE_TOKENS: &str = "output_sequence_tokens";
/// Predicted KV cache hit rate at routing time (0.0-1.0)
pub const KV_HIT_RATE: &str = "kv_hit_rate";
/// Number of cached tokens (prefix cache hits) per request
pub const CACHED_TOKENS: &str = "cached_tokens";
......@@ -396,25 +417,26 @@ pub mod kvbm {
pub const OBJECT_WRITE_FAILURES: &str = "object_write_failures";
}
/// Routing overhead phase latency histogram names (raw Prometheus, not component-scoped).
/// Routing overhead phase latency histogram suffixes.
///
/// These are combined with [`name_prefix::ROUTING_OVERHEAD`] to form full metric names,
/// e.g. `dynamo_routing_overhead_block_hashing_ms`.
/// Combined with `name_prefix::ROUTER` ("dynamo_router") in `RoutingOverheadMetrics::register()`,
/// yielding e.g. `dynamo_router_overhead_block_hashing_ms{router_id="..."}`.
/// See `lib/llm/src/kv_router/metrics.rs`.
pub mod routing_overhead {
/// Time spent computing block hashes
pub const BLOCK_HASHING_MS: &str = "block_hashing_ms";
pub const BLOCK_HASHING_MS: &str = "overhead_block_hashing_ms";
/// Time spent in indexer find_matches
pub const INDEXER_FIND_MATCHES_MS: &str = "indexer_find_matches_ms";
pub const INDEXER_FIND_MATCHES_MS: &str = "overhead_indexer_find_matches_ms";
/// Time spent computing sequence hashes
pub const SEQ_HASHING_MS: &str = "seq_hashing_ms";
pub const SEQ_HASHING_MS: &str = "overhead_seq_hashing_ms";
/// Time spent in scheduler worker selection
pub const SCHEDULING_MS: &str = "scheduling_ms";
pub const SCHEDULING_MS: &str = "overhead_scheduling_ms";
/// Total routing overhead per request
pub const TOTAL_MS: &str = "total_ms";
pub const TOTAL_MS: &str = "overhead_total_ms";
}
// KvRouter (including KvInexer) Prometheus metric names
......
......@@ -288,10 +288,9 @@ async fn health_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
/// Metrics handler with DistributedRuntime uptime
#[tracing::instrument(skip_all, level = "trace")]
async fn metrics_handler(state: Arc<SystemStatusState>) -> impl IntoResponse {
// Update the uptime gauge with current value
state.drt().system_health().lock().update_uptime_gauge();
// Get all metrics from the DistributedRuntime.
// The uptime gauge is updated automatically via a PrometheusUpdateCallback
// registered in DistributedRuntime::new(), so it is always fresh before scrape.
//
// NOTE: We use a multi-registry model (e.g. one registry per endpoint) and merge at scrape time,
// so /metrics traverses registered child registries and produces a single combined output.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment