Unverified Commit f46720c9 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

feat: add router-level Prometheus metrics and centralize request tracking (#6146)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
Co-authored-by: default avatarCursor <cursoragent@cursor.com>
parent 2cee89a0
......@@ -947,6 +947,7 @@ impl KvPushRouter {
tokio::spawn(async move {
let mut stream = stream;
let mut first_item = true;
let mut first_token_gauges_observed = false;
while let Some(mut response) = stream.next().await {
// Inject worker_id into first response if tracker is available
......@@ -957,6 +958,21 @@ impl KvPushRouter {
}
}
// Observe per-worker TTFT/ISL gauges on first response with actual tokens
if !first_token_gauges_observed {
let has_tokens = response
.data
.as_ref()
.map(|d| !d.token_ids.is_empty())
.unwrap_or(false);
if has_tokens {
if let Some(ref tracker) = tracker {
tracker.observe_first_token_gauges();
}
first_token_gauges_observed = true;
}
}
// Convert LLMEngineOutput to PyObject
let py_response = Python::with_gil(|py| {
pythonize(py, &response.data)
......@@ -976,6 +992,11 @@ impl KvPushRouter {
}
}
}
// Observe per-worker ITL gauge at stream end
if let Some(ref tracker) = tracker {
tracker.observe_finish_gauges();
}
});
// Return a Python async generator wrapper
......
......@@ -12,7 +12,5 @@ pub use watcher::{ModelUpdate, ModelWatcher};
mod worker_monitor;
pub use worker_monitor::{
KvWorkerMonitor, LoadThresholdConfig, WORKER_ACTIVE_DECODE_BLOCKS_GAUGE,
WORKER_ACTIVE_PREFILL_TOKENS_GAUGE, WORKER_TYPE_DECODE, WORKER_TYPE_PREFILL, WorkerLoadState,
register_worker_load_metrics,
KvWorkerMonitor, LoadThresholdConfig, WORKER_TYPE_DECODE, WORKER_TYPE_PREFILL, WorkerLoadState,
};
......@@ -3,13 +3,12 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::sync::{LazyLock, RwLock};
use tokio::sync::Notify;
use dashmap::DashMap;
use prometheus::{IntGaugeVec, Opts, Registry};
use serde::{Deserialize, Serialize};
use crate::http::service::metrics::{
......@@ -17,11 +16,11 @@ use crate::http::service::metrics::{
WORKER_LAST_TIME_TO_FIRST_TOKEN_GAUGE,
};
use crate::kv_router::KV_METRICS_SUBJECT;
use crate::kv_router::metrics::WORKER_LOAD_METRICS;
use crate::kv_router::protocols::ActiveLoad;
use crate::model_card::ModelDeploymentCard;
use dynamo_runtime::component::Client;
use dynamo_runtime::discovery::{DiscoveryQuery, watch_and_extract_field};
use dynamo_runtime::metrics::prometheus_names::frontend_service;
use dynamo_runtime::pipeline::{WorkerLoadMonitor, async_trait};
use dynamo_runtime::traits::DistributedRuntimeProvider;
use dynamo_runtime::transports::event_plane::EventSubscriber;
......@@ -29,62 +28,18 @@ use dynamo_runtime::transports::event_plane::EventSubscriber;
// Re-export worker type constants from timing.rs (single source of truth)
pub use crate::protocols::common::timing::{WORKER_TYPE_DECODE, WORKER_TYPE_PREFILL};
/// Global Prometheus gauge for active decode blocks per worker (labels: worker_id, dp_rank, worker_type)
/// This is shared across all KvWorkerMonitor instances.
pub static WORKER_ACTIVE_DECODE_BLOCKS_GAUGE: LazyLock<IntGaugeVec> = LazyLock::new(|| {
IntGaugeVec::new(
Opts::new(
format!(
"dynamo_frontend_{}",
frontend_service::WORKER_ACTIVE_DECODE_BLOCKS
),
"Active KV cache decode blocks per worker",
),
&["worker_id", "dp_rank", "worker_type"],
)
.expect("Failed to create worker_active_decode_blocks gauge")
});
/// Global Prometheus gauge for active prefill tokens per worker (labels: worker_id, dp_rank, worker_type)
/// This is shared across all KvWorkerMonitor instances.
pub static WORKER_ACTIVE_PREFILL_TOKENS_GAUGE: LazyLock<IntGaugeVec> = LazyLock::new(|| {
IntGaugeVec::new(
Opts::new(
format!(
"dynamo_frontend_{}",
frontend_service::WORKER_ACTIVE_PREFILL_TOKENS
),
"Active prefill tokens queued per worker",
),
&["worker_id", "dp_rank", "worker_type"],
)
.expect("Failed to create worker_active_prefill_tokens gauge")
});
/// Register the global worker load Prometheus metrics with the given registry.
///
/// This should be called once during HTTP service setup to expose the worker load
/// metrics via the `/metrics` endpoint.
///
/// # Errors
/// Returns an error if the metrics are already registered with the registry.
pub fn register_worker_load_metrics(registry: &Registry) -> Result<(), prometheus::Error> {
registry.register(Box::new(WORKER_ACTIVE_DECODE_BLOCKS_GAUGE.clone()))?;
registry.register(Box::new(WORKER_ACTIVE_PREFILL_TOKENS_GAUGE.clone()))?;
Ok(())
}
/// Clean up all Prometheus metrics for a worker across the specified dp_ranks.
///
/// This removes metrics with the given worker_id, dp_rank, and worker_type label combination.
/// Called when workers are removed to prevent stale metrics from accumulating.
fn cleanup_worker_metrics(worker_id: u64, dp_ranks: &[u32], worker_type: &str) {
let worker_id_str = worker_id.to_string();
let m = &*WORKER_LOAD_METRICS;
for dp_rank in dp_ranks {
let dp_rank_str = dp_rank.to_string();
let labels = &[worker_id_str.as_str(), dp_rank_str.as_str(), worker_type];
let _ = WORKER_ACTIVE_DECODE_BLOCKS_GAUGE.remove_label_values(labels);
let _ = WORKER_ACTIVE_PREFILL_TOKENS_GAUGE.remove_label_values(labels);
let _ = m.active_decode_blocks.remove_label_values(labels);
let _ = m.active_prefill_tokens.remove_label_values(labels);
let _ = WORKER_LAST_TIME_TO_FIRST_TOKEN_GAUGE.remove_label_values(labels);
let _ = WORKER_LAST_INPUT_SEQUENCE_TOKENS_GAUGE.remove_label_values(labels);
let _ = WORKER_LAST_INTER_TOKEN_LATENCY_GAUGE.remove_label_values(labels);
......@@ -215,9 +170,9 @@ impl WorkerLoadState {
/// Cloning shares state via internal Arc-wrapped fields. This allows multiple pipelines
/// (e.g., chat and completions) to share the same monitor instance.
///
/// Prometheus metrics are exposed via the global gauges [`WORKER_ACTIVE_DECODE_BLOCKS_GAUGE`]
/// and [`WORKER_ACTIVE_PREFILL_TOKENS_GAUGE`], which should be registered with the HTTP
/// service's Prometheus registry using [`register_worker_load_metrics`].
/// Prometheus metrics are exposed via [`WORKER_LOAD_METRICS`] (defined in `kv_router::sequence`),
/// which should be registered with the HTTP service's Prometheus registry using
/// [`register_worker_load_metrics`](crate::kv_router::metrics::register_worker_load_metrics).
///
/// In disaggregated mode, use `set_prefill_client` to register the prefill endpoint for
/// proper TTFT metric cleanup when prefill workers are removed.
......@@ -251,8 +206,9 @@ impl KvWorkerMonitor {
/// - `active_prefill_tokens_threshold`: DEFAULT_MAX_TOKENS (effectively disabled)
/// - `active_prefill_tokens_threshold_frac`: 1.5 (effectively disabled)
///
/// Prometheus metrics are exposed via the global gauges and should be registered
/// using [`register_worker_load_metrics`] during HTTP service setup.
/// Prometheus metrics are exposed via [`WORKER_LOAD_METRICS`] and should be registered
/// using [`register_worker_load_metrics`](crate::kv_router::metrics::register_worker_load_metrics)
/// during HTTP service setup.
///
/// For disaggregated mode, call `set_prefill_client` after creation to enable
/// proper TTFT metric cleanup when prefill workers are removed.
......
......@@ -111,7 +111,7 @@ pub fn register_worker_timing_metrics(registry: &Registry) -> Result<(), prometh
/// # Note
/// With 2 significant figures, there are roughly 90 unique values per order of magnitude.
/// Requesting more buckets than can be uniquely represented will result in deduplication.
fn generate_log_buckets(min: f64, max: f64, count: usize) -> Vec<f64> {
pub fn generate_log_buckets(min: f64, max: f64, count: usize) -> Vec<f64> {
if count == 0 {
return vec![];
}
......@@ -153,7 +153,7 @@ fn generate_log_buckets(min: f64, max: f64, count: usize) -> Vec<f64> {
}
/// Round a number to a specified number of significant figures
fn round_to_sig_figs(value: f64, sig_figs: u32) -> f64 {
pub fn round_to_sig_figs(value: f64, sig_figs: u32) -> f64 {
if value == 0.0 {
return 0.0;
}
......
......@@ -16,8 +16,9 @@ use super::Metrics;
use super::RouteDoc;
use super::metrics;
use super::metrics::register_worker_timing_metrics;
use crate::discovery::{ModelManager, register_worker_load_metrics};
use crate::discovery::ModelManager;
use crate::endpoint_type::EndpointType;
use crate::kv_router::metrics::{register_routing_overhead_metrics, register_worker_load_metrics};
use crate::request_template::RequestTemplate;
use anyhow::Result;
use axum_server::tls_rustls::RustlsConfig;
......@@ -391,6 +392,12 @@ impl HttpServiceConfigBuilder {
tracing::warn!("Failed to register worker timing metrics: {}", e);
}
// Register routing overhead metrics (block hashing, find matches, scheduling latencies)
// These are updated by KvRouter::find_best_match on every routing decision
if let Err(e) = register_routing_overhead_metrics(&registry) {
tracing::warn!("Failed to register routing overhead metrics: {}", e);
}
let mut router = axum::Router::new();
let mut all_docs = Vec::new();
......
......@@ -2,9 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
use std::collections::HashMap;
use std::time::Duration;
#[cfg(feature = "bench")]
use std::time::Instant;
use std::time::{Duration, Instant};
use anyhow::Result;
use dynamo_runtime::{
......@@ -27,6 +25,7 @@ pub use dynamo_kv_router::indexer;
pub use dynamo_kv_router::protocols;
pub mod config;
pub mod metrics;
pub mod prefill_router;
pub mod publisher;
pub mod push_router;
......@@ -63,7 +62,6 @@ pub const KV_METRICS_ENDPOINT: &str = "load_metrics";
// for metric publishing (push-based)
pub const KV_EVENT_SUBJECT: &str = "kv-events";
pub const KV_HIT_RATE_SUBJECT: &str = "kv-hit-rate";
pub const KV_METRICS_SUBJECT: &str = "kv_metrics";
// for inter-router comms
......@@ -316,7 +314,6 @@ impl KvRouter {
update_states: bool,
lora_name: Option<String>,
) -> anyhow::Result<(WorkerWithDpRank, u32)> {
#[cfg(feature = "bench")]
let start = Instant::now();
if update_states && context_id.is_none() {
......@@ -326,16 +323,16 @@ impl KvRouter {
let isl_tokens = tokens.len();
let block_hashes = compute_block_hash_for_seq(tokens, self.block_size, None);
#[cfg(feature = "bench")]
let hash_elapsed = start.elapsed();
let overlap_scores = self.indexer.find_matches(block_hashes).await?;
#[cfg(feature = "bench")]
let find_matches_elapsed = start.elapsed();
// Compute seq_hashes only if scheduler needs it for active blocks tracking
let maybe_seq_hashes = self
.kv_router_config
.compute_seq_hashes_for_tracking(tokens, self.block_size);
let seq_hash_elapsed = start.elapsed();
let best_worker = self
.scheduler
......@@ -349,19 +346,25 @@ impl KvRouter {
lora_name,
)
.await?;
let total_elapsed = start.elapsed();
metrics::ROUTING_OVERHEAD_METRICS.observe(
hash_elapsed,
find_matches_elapsed,
seq_hash_elapsed,
total_elapsed,
);
#[cfg(feature = "bench")]
{
let total_elapsed = start.elapsed();
tracing::info!(
isl_tokens,
hash_us = hash_elapsed.as_micros() as u64,
find_matches_us = (find_matches_elapsed - hash_elapsed).as_micros() as u64,
schedule_us = (total_elapsed - find_matches_elapsed).as_micros() as u64,
seq_hash_us = (seq_hash_elapsed - find_matches_elapsed).as_micros() as u64,
schedule_us = (total_elapsed - seq_hash_elapsed).as_micros() as u64,
total_us = total_elapsed.as_micros() as u64,
"find_best_match completed"
);
}
// Note: Routing decision recording (for approximate mode) is now handled
// by KvPushRouter::generate after select_worker returns.
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Prometheus metrics for the KV router.
//!
//! This module centralizes all router-side Prometheus metric definitions:
//! - [`WorkerLoadMetrics`]: Per-worker active decode blocks and prefill tokens gauges.
//! - [`RoutingOverheadMetrics`]: Per-request routing phase latency histograms.
use std::sync::{Arc, LazyLock, OnceLock};
use std::time::Duration;
use dynamo_runtime::component::Component;
use dynamo_runtime::metrics::MetricsHierarchy;
use dynamo_runtime::metrics::prometheus_names::{
frontend_service, labels, name_prefix, routing_overhead,
};
use prometheus::{IntGaugeVec, Opts};
use crate::http::service::metrics::generate_log_buckets;
// ---------------------------------------------------------------------------
// Worker load metrics (gauges)
// ---------------------------------------------------------------------------
/// Per-worker active load gauges, published by `ActiveSequencesMultiWorker`
/// and cleaned up by `KvWorkerMonitor` when workers disappear.
pub struct WorkerLoadMetrics {
pub active_decode_blocks: IntGaugeVec,
pub active_prefill_tokens: IntGaugeVec,
}
impl WorkerLoadMetrics {
pub fn observe(
&self,
worker_id: u64,
dp_rank: u32,
worker_type: &str,
active_blocks: usize,
active_tokens: usize,
) {
let worker_id_str = worker_id.to_string();
let dp_rank_str = dp_rank.to_string();
let labels = &[worker_id_str.as_str(), dp_rank_str.as_str(), worker_type];
self.active_decode_blocks
.with_label_values(labels)
.set(active_blocks as i64);
self.active_prefill_tokens
.with_label_values(labels)
.set(active_tokens as i64);
}
}
pub static WORKER_LOAD_METRICS: LazyLock<WorkerLoadMetrics> = LazyLock::new(|| WorkerLoadMetrics {
active_decode_blocks: IntGaugeVec::new(
Opts::new(
format!(
"{}_{}",
name_prefix::FRONTEND,
frontend_service::WORKER_ACTIVE_DECODE_BLOCKS
),
"Active KV cache decode blocks per worker",
),
&[labels::WORKER_ID, labels::DP_RANK, labels::WORKER_TYPE],
)
.expect("Failed to create worker_active_decode_blocks gauge"),
active_prefill_tokens: IntGaugeVec::new(
Opts::new(
format!(
"{}_{}",
name_prefix::FRONTEND,
frontend_service::WORKER_ACTIVE_PREFILL_TOKENS
),
"Active prefill tokens queued per worker",
),
&[labels::WORKER_ID, labels::DP_RANK, labels::WORKER_TYPE],
)
.expect("Failed to create worker_active_prefill_tokens gauge"),
});
/// Register the worker load gauges with the given Prometheus registry.
pub fn register_worker_load_metrics(
registry: &prometheus::Registry,
) -> Result<(), prometheus::Error> {
let m = &*WORKER_LOAD_METRICS;
registry.register(Box::new(m.active_decode_blocks.clone()))?;
registry.register(Box::new(m.active_prefill_tokens.clone()))?;
Ok(())
}
// ---------------------------------------------------------------------------
// Routing overhead metrics (histograms)
// ---------------------------------------------------------------------------
/// Per-request routing phase latency histograms (milliseconds).
pub struct RoutingOverheadMetrics {
pub block_hashing: prometheus::Histogram,
pub indexer_find_matches: prometheus::Histogram,
pub seq_hashing: prometheus::Histogram,
pub scheduling: prometheus::Histogram,
pub total: prometheus::Histogram,
}
pub static ROUTING_OVERHEAD_METRICS: LazyLock<RoutingOverheadMetrics> = LazyLock::new(|| {
// Buckets from 0.0001ms (0.1μs) to ~10ms, exponential with factor 2
let buckets = prometheus::exponential_buckets(0.0001, 2.0, 18)
.expect("exponential buckets should not fail");
let make = |suffix: &str, help: &str| {
let name = format!("{}_{}", name_prefix::ROUTING_OVERHEAD, suffix);
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(name, help).buckets(buckets.clone()),
)
.expect("histogram creation should not fail")
};
RoutingOverheadMetrics {
block_hashing: make(
routing_overhead::BLOCK_HASHING_MS,
"Time spent computing block hashes in milliseconds",
),
indexer_find_matches: make(
routing_overhead::INDEXER_FIND_MATCHES_MS,
"Time spent in indexer find_matches in milliseconds",
),
seq_hashing: make(
routing_overhead::SEQ_HASHING_MS,
"Time spent computing sequence hashes in milliseconds",
),
scheduling: make(
routing_overhead::SCHEDULING_MS,
"Time spent in scheduler worker selection in milliseconds",
),
total: make(
routing_overhead::TOTAL_MS,
"Total routing overhead per request in milliseconds",
),
}
});
impl RoutingOverheadMetrics {
/// Observe routing overhead timings in milliseconds.
pub fn observe(
&self,
hash_elapsed: Duration,
find_matches_elapsed: Duration,
seq_hash_elapsed: Duration,
total_elapsed: Duration,
) {
self.block_hashing
.observe(hash_elapsed.as_secs_f64() * 1000.0);
self.indexer_find_matches.observe(
find_matches_elapsed
.saturating_sub(hash_elapsed)
.as_secs_f64()
* 1000.0,
);
self.seq_hashing.observe(
seq_hash_elapsed
.saturating_sub(find_matches_elapsed)
.as_secs_f64()
* 1000.0,
);
self.scheduling
.observe(total_elapsed.saturating_sub(seq_hash_elapsed).as_secs_f64() * 1000.0);
self.total.observe(total_elapsed.as_secs_f64() * 1000.0);
}
}
/// Register the routing overhead histograms with the given Prometheus registry.
pub fn register_routing_overhead_metrics(
registry: &prometheus::Registry,
) -> Result<(), prometheus::Error> {
let m = &*ROUTING_OVERHEAD_METRICS;
registry.register(Box::new(m.block_hashing.clone()))?;
registry.register(Box::new(m.indexer_find_matches.clone()))?;
registry.register(Box::new(m.seq_hashing.clone()))?;
registry.register(Box::new(m.scheduling.clone()))?;
registry.register(Box::new(m.total.clone()))?;
Ok(())
}
// ---------------------------------------------------------------------------
// Router request metrics (component-scoped aggregate histograms + counter)
// ---------------------------------------------------------------------------
/// Aggregate per-request metrics observed at the router level.
/// Component-scoped via `from_component()` to get automatic namespace/component labels.
/// Uses `router_` prefix to distinguish from `ResponseMetricCollector` metrics.
pub struct RouterRequestMetrics {
pub requests_total: prometheus::IntCounter,
pub time_to_first_token_seconds: prometheus::Histogram,
pub inter_token_latency_seconds: prometheus::Histogram,
pub input_sequence_tokens: prometheus::Histogram,
pub output_sequence_tokens: prometheus::Histogram,
}
static ROUTER_REQUEST_METRICS: OnceLock<Arc<RouterRequestMetrics>> = OnceLock::new();
impl RouterRequestMetrics {
fn new(
requests_total: prometheus::IntCounter,
time_to_first_token_seconds: prometheus::Histogram,
inter_token_latency_seconds: prometheus::Histogram,
input_sequence_tokens: prometheus::Histogram,
output_sequence_tokens: prometheus::Histogram,
) -> Self {
Self {
requests_total,
time_to_first_token_seconds,
inter_token_latency_seconds,
input_sequence_tokens,
output_sequence_tokens,
}
}
/// Create from a Component, memoized in a static OnceLock.
pub fn from_component(component: &Component) -> Arc<Self> {
ROUTER_REQUEST_METRICS
.get_or_init(|| {
let metrics = component.metrics();
let requests_total = metrics
.create_intcounter(
"router_requests_total",
"Total number of requests processed by the router",
&[],
)
.expect("failed to create router_requests_total");
let time_to_first_token_seconds = metrics
.create_histogram(
"router_time_to_first_token_seconds",
"Time to first token observed at the router",
&[],
Some(generate_log_buckets(0.001, 480.0, 18)),
)
.expect("failed to create router_time_to_first_token_seconds");
let inter_token_latency_seconds = metrics
.create_histogram(
"router_inter_token_latency_seconds",
"Average inter-token latency observed at the router",
&[],
Some(generate_log_buckets(0.001, 2.0, 13)),
)
.expect("failed to create router_inter_token_latency_seconds");
let input_sequence_tokens = metrics
.create_histogram(
"router_input_sequence_tokens",
"Input sequence length in tokens observed at the router",
&[],
Some(generate_log_buckets(50.0, 128000.0, 12)),
)
.expect("failed to create router_input_sequence_tokens");
let output_sequence_tokens = metrics
.create_histogram(
"router_output_sequence_tokens",
"Output sequence length in tokens observed at the router",
&[],
Some(generate_log_buckets(50.0, 32000.0, 10)),
)
.expect("failed to create router_output_sequence_tokens");
Arc::new(Self::new(
requests_total,
time_to_first_token_seconds,
inter_token_latency_seconds,
input_sequence_tokens,
output_sequence_tokens,
))
})
.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
use prometheus::{Encoder, TextEncoder};
fn gather_pef(registry: &prometheus::Registry) -> String {
let encoder = TextEncoder::new();
let mut buffer = Vec::new();
encoder.encode(&registry.gather(), &mut buffer).unwrap();
String::from_utf8(buffer).unwrap()
}
#[test]
fn test_worker_load_metrics_pef() {
let registry = prometheus::Registry::new();
let metrics = WorkerLoadMetrics {
active_decode_blocks: IntGaugeVec::new(
Opts::new(
format!(
"{}_{}",
name_prefix::FRONTEND,
frontend_service::WORKER_ACTIVE_DECODE_BLOCKS
),
"Active KV cache decode blocks per worker",
),
&[labels::WORKER_ID, labels::DP_RANK, labels::WORKER_TYPE],
)
.unwrap(),
active_prefill_tokens: IntGaugeVec::new(
Opts::new(
format!(
"{}_{}",
name_prefix::FRONTEND,
frontend_service::WORKER_ACTIVE_PREFILL_TOKENS
),
"Active prefill tokens queued per worker",
),
&[labels::WORKER_ID, labels::DP_RANK, labels::WORKER_TYPE],
)
.unwrap(),
};
registry
.register(Box::new(metrics.active_decode_blocks.clone()))
.unwrap();
registry
.register(Box::new(metrics.active_prefill_tokens.clone()))
.unwrap();
metrics.observe(123, 0, "decode", 42, 100);
let output = gather_pef(&registry);
let expected = "\
# HELP dynamo_frontend_worker_active_decode_blocks Active KV cache decode blocks per worker
# TYPE dynamo_frontend_worker_active_decode_blocks gauge
dynamo_frontend_worker_active_decode_blocks{dp_rank=\"0\",worker_id=\"123\",worker_type=\"decode\"} 42
# HELP dynamo_frontend_worker_active_prefill_tokens Active prefill tokens queued per worker
# TYPE dynamo_frontend_worker_active_prefill_tokens gauge
dynamo_frontend_worker_active_prefill_tokens{dp_rank=\"0\",worker_id=\"123\",worker_type=\"decode\"} 100
";
assert_eq!(
output, expected,
"\nActual PEF:\n{output}\nExpected PEF:\n{expected}"
);
}
#[test]
fn test_routing_overhead_metric_names_pef() {
let registry = prometheus::Registry::new();
let buckets = prometheus::exponential_buckets(0.0001, 2.0, 18).unwrap();
let make = |suffix: &str, help: &str| {
let name = format!("{}_{}", name_prefix::ROUTING_OVERHEAD, suffix);
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(name, help).buckets(buckets.clone()),
)
.unwrap()
};
let total = make(
routing_overhead::TOTAL_MS,
"Total routing overhead per request in milliseconds",
);
registry.register(Box::new(total.clone())).unwrap();
total.observe(1.5);
let output = gather_pef(&registry);
assert!(
output.contains("# HELP dynamo_routing_overhead_total_ms"),
"PEF missing HELP for routing overhead metric"
);
assert!(
output.contains("# TYPE dynamo_routing_overhead_total_ms histogram"),
"PEF missing TYPE for routing overhead metric"
);
assert!(
output.contains("dynamo_routing_overhead_total_ms_count 1"),
"PEF missing observation count"
);
}
#[test]
fn test_routing_overhead_saturating_sub() {
let buckets = prometheus::exponential_buckets(0.0001, 2.0, 18).unwrap();
let make = |name: &str| {
prometheus::Histogram::with_opts(
prometheus::HistogramOpts::new(name, "test").buckets(buckets.clone()),
)
.unwrap()
};
let metrics = RoutingOverheadMetrics {
block_hashing: make("test_block_hashing_ms"),
indexer_find_matches: make("test_find_matches_ms"),
seq_hashing: make("test_seq_hashing_ms"),
scheduling: make("test_scheduling_ms"),
total: make("test_total_ms"),
};
// Out-of-order durations: each phase < previous (would panic without saturating_sub)
metrics.observe(
Duration::from_millis(10),
Duration::from_millis(5),
Duration::from_millis(3),
Duration::from_millis(1),
);
// Reaching here without panic confirms saturating_sub works
}
}
......@@ -348,7 +348,7 @@ impl PrefillRouter {
///
/// If `phase_permit` is provided, it is dropped after the first output is received,
/// allowing subsequent `set_phase` calls to proceed. This is used in the bootstrap
/// optimization path to ensure `record_worker` completes before the phase changes.
/// optimization path to ensure `record_worker_full` completes before the phase changes.
///
/// Returns (PrefillResult, Option<(worker_id, dp_rank)>).
async fn execute_prefill(
......@@ -363,7 +363,7 @@ impl PrefillRouter {
.await
.map_err(|e| PrefillError::PrefillError(e.to_string()))?;
// Drop phase permit now - routing is complete, record_worker was called in select_worker.
// Drop phase permit now - routing is complete, record_worker_full was called in select_worker.
// This unblocks set_phase(Decode) in the main task without waiting for prefill output.
drop(phase_permit);
......@@ -530,7 +530,6 @@ impl
}
let tracker = req.tracker.as_ref().unwrap();
let prefill_phase_permit = tracker.set_phase(RequestPhase::Prefill).await;
tracker.record_prefill_start();
// Prepare prefill request with max_tokens = 1 (clone after tracker is set)
let mut prefill_req = req.clone();
......@@ -557,14 +556,6 @@ impl
router.select_next_worker();
}
// Record prefill worker on the main request's tracker for metrics.
// (The cloned prefill_req has its own tracker, so we need to record here)
// Worker type is stored at routing time to avoid expensive MDC lookups when
// updating Prometheus TTFT metrics later in the response stream.
if let Some(ref tracker) = req.tracker {
tracker.record_prefill_worker_full(worker_id, dp_rank, WORKER_TYPE_PREFILL);
}
let routing = prefill_req.routing_mut();
routing.prefill_worker_id = Some(worker_id);
routing.dp_rank = Some(dp_rank);
......@@ -573,7 +564,7 @@ impl
let prefill_context = Context::with_id(prefill_req, request_id.clone());
engine_ctx.link_child(prefill_context.context());
// Pass phase permit to spawned task - it drops after first output (record_worker complete)
// Pass phase permit to spawned task - it drops after first output (record_worker_full complete)
// This allows set_phase(Decode) below to proceed only after prefill routing is done
self.spawn_prefill_task(prefill_context, Some(worker_id), prefill_phase_permit);
......@@ -591,16 +582,6 @@ impl
let result = self.call_prefill(prefill_context).await;
// Record prefill worker on the main request's tracker for metrics.
// (call_prefill returns the worker_id and dp_rank from the prefill routing)
// Worker type is stored at routing time to avoid expensive MDC lookups when
// updating Prometheus TTFT metrics later in the response stream.
if let Ok((_, Some((worker_id, dp_rank)))) = &result
&& let Some(ref tracker) = req.tracker
{
tracker.record_prefill_worker_full(*worker_id, *dp_rank, WORKER_TYPE_PREFILL);
}
result.map(|(result, worker_info)| {
(Some(result), worker_info.map(|(id, _)| id), None)
})
......@@ -625,7 +606,7 @@ impl
// Set phase to Decode for the decode request.
// In bootstrap path, this blocks until the spawned prefill task drops its permit
// (after first output / record_worker completes), ensuring correct phase for routing.
// (after first output / record_worker_full completes), ensuring correct phase for routing.
if let Some(ref tracker) = req.tracker {
let _decode_permit = tracker.set_phase(RequestPhase::Decode).await;
// Permit is dropped immediately - decode proceeds, no need to hold it
......
......@@ -17,6 +17,7 @@ use serde_json::json;
use crate::{
kv_router::{
KvRouter,
metrics::RouterRequestMetrics,
protocols::{TokensWithHashes, WorkerWithDpRank},
},
preprocessor::PreprocessedRequest,
......@@ -219,20 +220,25 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
}
}
// Record metrics in tracker: KV hit rate, worker ID, and worker type based on phase.
// Worker type is stored at routing time to avoid expensive MDC lookups when
// updating Prometheus metrics (TTFT/ITL) later in the response stream.
// Record routing metrics on tracker and observe ISL + prefill start.
let request_metrics =
RouterRequestMetrics::from_component(self.chooser.client().endpoint.component());
if let Some(ref tracker) = request.tracker {
let isl_blocks = request.token_ids.len().div_ceil(block_size);
tracker.record_kv_hit(overlap_amount, isl_blocks);
tracker.record_isl(
request.token_ids.len(),
overlap_amount as usize * block_size,
);
tracker.record_worker_full(instance_id, dp_rank, self.chooser.worker_type());
}
request_metrics
.input_sequence_tokens
.observe(request.token_ids.len() as f64);
// Handle query-only requests: early return with worker info
if is_query_only {
let stream_context = request.context().clone();
// Tracker is always created for query-only requests (delta generator enables tracking
// when query_instance_id annotation is present)
let worker_id_info = request.tracker.as_ref().and_then(|t| t.get_worker_info());
tracing::trace!(
......@@ -262,11 +268,17 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
.and_then(|r| r.expected_output_tokens);
let track_output_blocks =
self.chooser.kv_router_config().router_track_output_blocks && handle_local_updates;
let tracker = request.tracker.clone();
let (mut backend_input, context) = request.into_parts();
backend_input.routing_mut().dp_rank = Some(dp_rank);
let updated_request = context.map(|_| backend_input);
// Record prefill start right before pushing to backend (OnceLock: first call wins).
if let Some(ref tracker) = tracker {
tracker.record_prefill_start();
}
let chooser = self.chooser.clone();
let mut response_stream = self.inner.direct(updated_request, instance_id).await?;
let stream_context = response_stream.context();
......@@ -277,6 +289,7 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
// When false, an external caller (e.g., GAIE sidecar) handles bookkeeping via C FFI.
let wrapped_stream = Box::pin(async_stream::stream! {
let mut prefill_marked = false;
let mut first_token_recorded = false;
// Output block tracking state
let mut cumulative_osl: usize = 0;
......@@ -310,13 +323,27 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
}
}
// Track output blocks if enabled
if track_output_blocks {
let new_tokens = item.data.as_ref()
.map(|d| d.token_ids.len())
.unwrap_or(0);
// Record first token time on tracker when actual tokens arrive
if !first_token_recorded && new_tokens > 0 {
if let Some(ref tracker) = tracker {
tracker.record_first_token();
if let Some(ttft) = tracker.ttft_ms() {
request_metrics
.time_to_first_token_seconds
.observe(ttft / 1000.0);
}
}
first_token_recorded = true;
}
cumulative_osl += new_tokens;
// Track output blocks if enabled
if track_output_blocks {
let new_total_blocks = (isl_tokens + cumulative_osl).div_ceil(block_size);
if new_total_blocks > current_total_blocks {
// New block boundary crossed - add output block with decay
......@@ -329,6 +356,18 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
"Failed to add output block for request {context_id}: {e}"
);
}
// Update tracker and observe avg ITL at each block boundary
if let Some(ref tracker) = tracker {
tracker.record_osl(cumulative_osl);
tracker.record_finish();
if let Some(avg_itl) = tracker.avg_itl_ms() {
request_metrics
.inter_token_latency_seconds
.observe(avg_itl / 1000.0);
}
}
current_total_blocks = new_total_blocks;
}
}
......@@ -338,6 +377,17 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
}
}
// Record final aggregate metrics (histograms sampled once per request)
if let Some(ref tracker) = tracker {
tracker.record_finish();
tracker.record_osl(cumulative_osl);
request_metrics
.output_sequence_tokens
.observe(cumulative_osl as f64);
}
request_metrics.requests_total.inc();
// Only call free() if we handle local updates.
// When handle_local_updates=false, external caller handles cleanup via C FFI.
if handle_local_updates
......
......@@ -6,7 +6,6 @@ use crate::local_model::runtime_config::ModelRuntimeConfig;
use anyhow::Result;
use dynamo_runtime::component::Component;
use dynamo_runtime::traits::DistributedRuntimeProvider;
use dynamo_runtime::transports::event_plane::EventPublisher;
use rand::Rng;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
......@@ -15,7 +14,6 @@ use std::time::Duration;
#[cfg(feature = "bench")]
use std::time::Instant;
use super::KV_HIT_RATE_SUBJECT;
use super::KvRouterConfig;
use super::RouterConfigOverride;
use super::WorkerSelector;
......@@ -24,15 +22,6 @@ use super::sequence::{ActiveSequencesMultiWorker, SequenceError};
use dynamo_tokens::SequenceHash;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KVHitRateEvent {
pub worker_id: WorkerId,
#[serde(default)]
pub dp_rank: DpRank,
pub isl_blocks: usize,
pub overlap_blocks: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PotentialLoad {
pub worker_id: WorkerId,
......@@ -160,10 +149,6 @@ impl KvScheduler {
let scheduler_rx = workers_with_configs.clone();
let (request_tx, request_rx) = tokio::sync::mpsc::channel::<SchedulingRequest>(1024);
let scheduler_cancel_token = component.drt().primary_token();
let hit_rate_publisher =
EventPublisher::for_namespace(component.namespace(), KV_HIT_RATE_SUBJECT)
.await
.map_err(|e| KvSchedulerError::InitFailed(e.to_string()))?;
// Background task to handle scheduling requests
tokio::spawn(async move {
......@@ -199,16 +184,6 @@ impl KvScheduler {
match selector.select_worker(&workers, &request, block_size) {
Ok(selection) => {
let event = KVHitRateEvent {
worker_id: selection.worker.worker_id,
dp_rank: selection.worker.dp_rank,
isl_blocks: selection.required_blocks as usize,
overlap_blocks: selection.overlap_blocks,
};
if let Err(e) = hit_rate_publisher.publish(&event).await {
tracing::warn!("Failed to publish KV hit rate event: {:?}", e);
}
let response = SchedulingResponse {
best_worker: selection.worker,
overlap_blocks: selection.overlap_blocks,
......
......@@ -37,10 +37,10 @@ use std::time::Duration;
use tokio::time::Instant;
use uuid::Uuid;
use super::metrics::WORKER_LOAD_METRICS;
use super::protocols::{
ActiveLoad, ActiveSequenceEvent, ActiveSequenceEventData, WorkerWithDpRank,
};
use crate::discovery::{WORKER_ACTIVE_DECODE_BLOCKS_GAUGE, WORKER_ACTIVE_PREFILL_TOKENS_GAUGE};
use crate::kv_router::{ACTIVE_SEQUENCES_SUBJECT, KV_METRICS_SUBJECT};
use crate::local_model::runtime_config::ModelRuntimeConfig;
use dynamo_runtime::CancellationToken;
......@@ -1043,22 +1043,13 @@ impl ActiveSequencesMultiWorker {
};
// Update Prometheus gauges directly (router's own bookkeeping)
let worker_id_str = worker.worker_id.to_string();
let dp_rank_str = worker.dp_rank.to_string();
WORKER_ACTIVE_DECODE_BLOCKS_GAUGE
.with_label_values(&[
worker_id_str.as_str(),
dp_rank_str.as_str(),
WORKER_LOAD_METRICS.observe(
worker.worker_id,
worker.dp_rank,
self.worker_type,
])
.set(active_blocks as i64);
WORKER_ACTIVE_PREFILL_TOKENS_GAUGE
.with_label_values(&[
worker_id_str.as_str(),
dp_rank_str.as_str(),
self.worker_type,
])
.set(active_tokens as i64);
active_blocks,
active_tokens,
);
// Also publish ActiveLoad to NATS for other subscribers (if NATS is available)
let active_load = ActiveLoad {
......
This diff is collapsed.
......@@ -429,11 +429,6 @@ impl crate::protocols::openai::DeltaGeneratorExt<NvCreateChatCompletionStreamRes
delta.stop_reason,
);
// Record first token time (only succeeds on first call due to OnceLock)
if let Some(ref tracker) = self.tracker {
tracker.record_first_token();
}
// Get worker_id info from tracker (set by KvPushRouter based on phase)
let worker_id_info = self.tracker.as_ref().and_then(|t| t.get_worker_info());
......
......@@ -323,11 +323,6 @@ impl crate::protocols::openai::DeltaGeneratorExt<NvCreateCompletionResponse> for
let index = delta.index.unwrap_or(0);
let mut response = self.create_choice(index, delta.text.clone(), finish_reason, logprobs);
// Record first token time (only succeeds on first call due to OnceLock)
if let Some(ref tracker) = self.tracker {
tracker.record_first_token();
}
// Get worker_id info from tracker (set by KvPushRouter based on phase)
let worker_id_info = self.tracker.as_ref().and_then(|t| t.get_worker_info());
......
......@@ -68,6 +68,9 @@ pub mod name_prefix {
/// Prefix for frontend service metrics
pub const FRONTEND: &str = "dynamo_frontend";
/// Prefix for routing overhead metrics (raw Prometheus, not component-scoped)
pub const ROUTING_OVERHEAD: &str = "dynamo_routing_overhead";
}
/// Automatically inserted Prometheus label names used across the metrics system
......@@ -93,6 +96,9 @@ pub mod labels {
/// It is used by worker/load-style metrics that need to disambiguate per-worker series.
pub const DP_RANK: &str = "dp_rank";
/// Label for worker instance ID (etcd lease ID).
pub const WORKER_ID: &str = "worker_id";
/// Label for model name/path (OpenAI API standard, injected by Dynamo)
/// This is the standard label name injected by all backends in metrics_labels=[("model", ...)].
/// Ensures compatibility with OpenAI-compatible tooling.
......@@ -367,6 +373,27 @@ pub mod kvbm {
pub const OBJECT_WRITE_FAILURES: &str = "object_write_failures";
}
/// Routing overhead phase latency histogram names (raw Prometheus, not component-scoped).
///
/// These are combined with [`name_prefix::ROUTING_OVERHEAD`] to form full metric names,
/// e.g. `dynamo_routing_overhead_block_hashing_ms`.
pub mod routing_overhead {
/// Time spent computing block hashes
pub const BLOCK_HASHING_MS: &str = "block_hashing_ms";
/// Time spent in indexer find_matches
pub const INDEXER_FIND_MATCHES_MS: &str = "indexer_find_matches_ms";
/// Time spent computing sequence hashes
pub const SEQ_HASHING_MS: &str = "seq_hashing_ms";
/// Time spent in scheduler worker selection
pub const SCHEDULING_MS: &str = "scheduling_ms";
/// Total routing overhead per request
pub const TOTAL_MS: &str = "total_ms";
}
// KvRouter (including KvInexer) Prometheus metric names
pub mod kvrouter {
/// Number of KV cache events applied to the index (including status)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment