Unverified Commit 073fb437 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

feat(kv-router): add router queue depth Prometheus metric and nvext field (#6786)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 75bf1e09
...@@ -107,6 +107,8 @@ class frontend_service: ...@@ -107,6 +107,8 @@ class frontend_service:
# Last observed inter-token latency per worker (in seconds) # Last observed inter-token latency per worker (in seconds)
# Gauge metric tracking the most recent ITL for each worker # Gauge metric tracking the most recent ITL for each worker
WORKER_LAST_INTER_TOKEN_LATENCY_SECONDS = "worker_last_inter_token_latency_seconds" WORKER_LAST_INTER_TOKEN_LATENCY_SECONDS = "worker_last_inter_token_latency_seconds"
# Number of requests pending in the router's scheduler queue (gauge per worker_type)
ROUTER_QUEUE_PENDING_REQUESTS = "router_queue_pending_requests"
# Label name for the type of migration # Label name for the type of migration
MIGRATION_TYPE_LABEL = "migration_type" MIGRATION_TYPE_LABEL = "migration_type"
# Label name for tokenizer operation # Label name for tokenizer operation
...@@ -206,25 +208,8 @@ class name_prefix: ...@@ -206,25 +208,8 @@ class name_prefix:
ROUTER = "dynamo_router" ROUTER = "dynamo_router"
class router_request:
"""Router per-request metrics (component-scoped via `MetricsHierarchy`)."""
# Prefix prepended to `frontend_service::*` names to form router metric names.
# e.g. `"router_"` + `frontend_service::REQUESTS_TOTAL` → `"router_requests_total"`.
METRIC_PREFIX = "router_"
class router: class router:
"""Router request metrics (dynamo_component_router_* with dynamo_namespace label). """Router request metrics (component-scoped aggregate histograms + counter)"""
These constants are the full suffix portions combined with name_prefix.COMPONENT
("dynamo_component") to form the complete metric name, e.g.
dynamo_component_router_requests_total.
Registered via MetricsHierarchy (from_component()) which auto-injects
dynamo_namespace (underscores) and dynamo_component labels and registers
with the component's registry (port 9090).
"""
# Total number of requests processed by the router # Total number of requests processed by the router
REQUESTS_TOTAL = "router_requests_total" REQUESTS_TOTAL = "router_requests_total"
...@@ -236,11 +221,14 @@ class router: ...@@ -236,11 +221,14 @@ class router:
INPUT_SEQUENCE_TOKENS = "router_input_sequence_tokens" INPUT_SEQUENCE_TOKENS = "router_input_sequence_tokens"
# Output sequence length in tokens observed at the router # Output sequence length in tokens observed at the router
OUTPUT_SEQUENCE_TOKENS = "router_output_sequence_tokens" OUTPUT_SEQUENCE_TOKENS = "router_output_sequence_tokens"
# TODO: Add REQUEST_DURATION_SECONDS = "router_request_duration_seconds" once
# RouterRequestMetrics in lib/llm/src/kv_router/metrics.rs registers a
# dynamo_component_router_request_duration_seconds histogram. Until then, class router_request:
# get_avg_request_duration (router path) falls back to the work_handler """Router per-request metrics (component-scoped via `MetricsHierarchy`)."""
# constant and queries a non-existent metric, silently returning 0.
# Prefix prepended to `frontend_service::*` names to form router metric names.
# e.g. `"router_"` + `frontend_service::REQUESTS_TOTAL` → `"router_requests_total"`.
METRIC_PREFIX = "router_"
class routing_overhead: class routing_overhead:
......
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
use std::cmp::Ordering; use std::cmp::Ordering;
use std::collections::{BinaryHeap, HashMap}; use std::collections::{BinaryHeap, HashMap};
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::sync::Mutex; use tokio::sync::Mutex;
...@@ -51,6 +52,9 @@ impl PartialOrd for QueueEntry { ...@@ -51,6 +52,9 @@ impl PartialOrd for QueueEntry {
/// If queueing is disabled (threshold_frac is None), requests are scheduled immediately. /// If queueing is disabled (threshold_frac is None), requests are scheduled immediately.
pub struct SchedulerQueue<P: SequencePublisher, C: WorkerConfigLike> { pub struct SchedulerQueue<P: SequencePublisher, C: WorkerConfigLike> {
pending: Mutex<BinaryHeap<QueueEntry>>, pending: Mutex<BinaryHeap<QueueEntry>>,
/// Number of requests currently parked in the pending queue.
/// Incremented after push, decremented after pop. Lock-free reads via `Relaxed` load.
pending_count: AtomicUsize,
slots: Arc<ActiveSequencesMultiWorker<P>>, slots: Arc<ActiveSequencesMultiWorker<P>>,
workers_with_configs: watch::Receiver<HashMap<WorkerId, C>>, workers_with_configs: watch::Receiver<HashMap<WorkerId, C>>,
/// Cached threshold fraction; None means queueing is disabled. /// Cached threshold fraction; None means queueing is disabled.
...@@ -74,6 +78,7 @@ impl<P: SequencePublisher + 'static, C: WorkerConfigLike> SchedulerQueue<P, C> { ...@@ -74,6 +78,7 @@ impl<P: SequencePublisher + 'static, C: WorkerConfigLike> SchedulerQueue<P, C> {
} }
Self { Self {
pending: Mutex::new(BinaryHeap::new()), pending: Mutex::new(BinaryHeap::new()),
pending_count: AtomicUsize::new(0),
slots, slots,
workers_with_configs, workers_with_configs,
threshold_frac, threshold_frac,
...@@ -107,6 +112,7 @@ impl<P: SequencePublisher + 'static, C: WorkerConfigLike> SchedulerQueue<P, C> { ...@@ -107,6 +112,7 @@ impl<P: SequencePublisher + 'static, C: WorkerConfigLike> SchedulerQueue<P, C> {
tracing::debug!("all workers busy, queueing request"); tracing::debug!("all workers busy, queueing request");
let entry = self.make_entry(request); let entry = self.make_entry(request);
self.pending.lock().await.push(entry); self.pending.lock().await.push(entry);
self.pending_count.fetch_add(1, AtomicOrdering::Relaxed);
} else { } else {
self.schedule(request).await; self.schedule(request).await;
} }
...@@ -127,6 +133,7 @@ impl<P: SequencePublisher + 'static, C: WorkerConfigLike> SchedulerQueue<P, C> { ...@@ -127,6 +133,7 @@ impl<P: SequencePublisher + 'static, C: WorkerConfigLike> SchedulerQueue<P, C> {
let Some(entry) = self.pending.lock().await.pop() else { let Some(entry) = self.pending.lock().await.pop() else {
break; break;
}; };
self.pending_count.fetch_sub(1, AtomicOrdering::Relaxed);
tracing::debug!("scheduling request from pending queue"); tracing::debug!("scheduling request from pending queue");
self.schedule(entry.request).await; self.schedule(entry.request).await;
} }
...@@ -189,6 +196,11 @@ impl<P: SequencePublisher + 'static, C: WorkerConfigLike> SchedulerQueue<P, C> { ...@@ -189,6 +196,11 @@ impl<P: SequencePublisher + 'static, C: WorkerConfigLike> SchedulerQueue<P, C> {
} }
} }
/// Number of requests currently parked in the pending queue (lock-free).
pub fn pending_count(&self) -> usize {
self.pending_count.load(AtomicOrdering::Relaxed)
}
/// Check if all workers are busy based on threshold. /// Check if all workers are busy based on threshold.
/// Returns true only if ALL workers exceed the threshold (no worker has capacity). /// Returns true only if ALL workers exceed the threshold (no worker has capacity).
fn all_workers_busy(&self, threshold: f64) -> bool { fn all_workers_busy(&self, threshold: f64) -> bool {
...@@ -378,6 +390,57 @@ mod tests { ...@@ -378,6 +390,57 @@ mod tests {
assert_eq!(ok_count, num_requests, "not all requests were scheduled"); assert_eq!(ok_count, num_requests, "not all requests were scheduled");
} }
#[tokio::test(flavor = "multi_thread")]
async fn test_pending_count() {
let block_size = 16;
let isl = 512;
let num_workers = 1;
// threshold_frac=0.0 means any active tokens trigger queueing
let (queue, slots) = make_queue(num_workers, block_size, isl, Some(0.0));
assert_eq!(queue.pending_count(), 0);
// First request goes through (worker is idle)
let (req1, rx1) = make_request("req-1", isl);
queue.enqueue(req1).await;
let _resp1 = rx1.await.unwrap().unwrap();
assert_eq!(queue.pending_count(), 0); // scheduled immediately
// Second and third requests should be queued (worker is now busy)
let (req2, _rx2) = make_request("req-2", isl);
queue.enqueue(req2).await;
assert_eq!(queue.pending_count(), 1);
let (req3, _rx3) = make_request("req-3", isl);
queue.enqueue(req3).await;
assert_eq!(queue.pending_count(), 2);
// Free the first request and update — should drain one from pending
slots
.mark_prefill_completed(&"req-1".to_string())
.await
.unwrap();
slots.free(&"req-1".to_string()).await.unwrap();
queue.update().await;
// After update, one pending request should have been scheduled
assert!(
queue.pending_count() < 2,
"pending_count should decrease after free+update, got {}",
queue.pending_count()
);
// Free req-2 and update to drain remaining
let _ = slots.mark_prefill_completed(&"req-2".to_string()).await;
let _ = slots.free(&"req-2".to_string()).await;
queue.update().await;
let _ = slots.mark_prefill_completed(&"req-3".to_string()).await;
let _ = slots.free(&"req-3".to_string()).await;
queue.update().await;
assert_eq!(queue.pending_count(), 0, "all requests should be drained");
}
#[tokio::test] #[tokio::test]
async fn test_no_workers_returns_error() { async fn test_no_workers_returns_error() {
let (queue, _slots) = make_queue(0, 16, 512, None); let (queue, _slots) = make_queue(0, 16, 512, None);
......
...@@ -18,7 +18,9 @@ use super::metrics; ...@@ -18,7 +18,9 @@ use super::metrics;
use super::metrics::register_worker_timing_metrics; use super::metrics::register_worker_timing_metrics;
use crate::discovery::ModelManager; use crate::discovery::ModelManager;
use crate::endpoint_type::EndpointType; use crate::endpoint_type::EndpointType;
use crate::kv_router::metrics::{RoutingOverheadMetrics, register_worker_load_metrics}; use crate::kv_router::metrics::{
RoutingOverheadMetrics, register_router_queue_metrics, register_worker_load_metrics,
};
use crate::request_template::RequestTemplate; use crate::request_template::RequestTemplate;
use anyhow::Result; use anyhow::Result;
use axum_server::tls_rustls::RustlsConfig; use axum_server::tls_rustls::RustlsConfig;
...@@ -426,6 +428,12 @@ impl HttpServiceConfigBuilder { ...@@ -426,6 +428,12 @@ impl HttpServiceConfigBuilder {
tracing::warn!("Failed to register worker timing metrics: {}", e); tracing::warn!("Failed to register worker timing metrics: {}", e);
} }
// Register router queue metrics (pending requests per worker_type)
// These are updated by KvScheduler on enqueue/update/free
if let Err(e) = register_router_queue_metrics(&registry) {
tracing::warn!("Failed to register router queue metrics: {}", e);
}
if let Some(ref discovery) = config.drt_discovery { if let Some(ref discovery) = config.drt_discovery {
let instance_id = discovery.instance_id(); let instance_id = discovery.instance_id();
if let Err(e) = RoutingOverheadMetrics::register(&registry, instance_id) { if let Err(e) = RoutingOverheadMetrics::register(&registry, instance_id) {
......
...@@ -493,6 +493,11 @@ impl KvRouter { ...@@ -493,6 +493,11 @@ impl KvRouter {
self.scheduler.free(request_id).await self.scheduler.free(request_id).await
} }
/// Number of requests currently parked in the scheduler queue.
pub fn pending_count(&self) -> usize {
self.scheduler.pending_count()
}
/// Get the worker type for this router ("prefill" or "decode"). /// Get the worker type for this router ("prefill" or "decode").
/// Used for Prometheus metric labeling. /// Used for Prometheus metric labeling.
pub fn worker_type(&self) -> &'static str { pub fn worker_type(&self) -> &'static str {
......
...@@ -132,6 +132,51 @@ pub fn register_worker_load_metrics( ...@@ -132,6 +132,51 @@ pub fn register_worker_load_metrics(
Ok(()) Ok(())
} }
// ---------------------------------------------------------------------------
// Router queue metrics (gauge)
// ---------------------------------------------------------------------------
/// Gauge tracking the number of requests pending in the router's scheduler queue.
/// Labeled by `worker_type` ("prefill" or "decode") to distinguish queues in
/// disaggregated mode. At most 2 label combinations.
pub struct RouterQueueMetrics {
pub pending_requests: IntGaugeVec,
}
pub static ROUTER_QUEUE_METRICS: LazyLock<RouterQueueMetrics> =
LazyLock::new(|| RouterQueueMetrics {
pending_requests: IntGaugeVec::new(
Opts::new(
format!(
"{}_{}",
name_prefix::FRONTEND,
frontend_service::ROUTER_QUEUE_PENDING_REQUESTS
),
"Number of requests pending in the router scheduler queue",
),
&[labels::WORKER_TYPE],
)
.expect("Failed to create router_queue_pending_requests gauge"),
});
impl RouterQueueMetrics {
pub fn set_pending(&self, worker_type: &str, count: usize) {
self.pending_requests
.with_label_values(&[worker_type])
.set(count as i64);
}
}
/// Register the router queue gauge with the given Prometheus registry.
/// Called during frontend HTTP service setup (`service_v2.rs`), served on port 8000.
pub fn register_router_queue_metrics(
registry: &prometheus::Registry,
) -> Result<(), prometheus::Error> {
let m = &*ROUTER_QUEUE_METRICS;
registry.register(Box::new(m.pending_requests.clone()))?;
Ok(())
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Routing overhead metrics (histograms) // Routing overhead metrics (histograms)
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
...@@ -426,6 +471,41 @@ dynamo_frontend_worker_active_prefill_tokens{dp_rank=\"0\",worker_id=\"123\",wor ...@@ -426,6 +471,41 @@ dynamo_frontend_worker_active_prefill_tokens{dp_rank=\"0\",worker_id=\"123\",wor
); );
} }
#[test]
fn test_router_queue_metrics_pef() {
let registry = prometheus::Registry::new();
let metrics = RouterQueueMetrics {
pending_requests: IntGaugeVec::new(
Opts::new(
format!(
"{}_{}",
name_prefix::FRONTEND,
frontend_service::ROUTER_QUEUE_PENDING_REQUESTS
),
"Number of requests pending in the router scheduler queue",
),
&[labels::WORKER_TYPE],
)
.unwrap(),
};
registry
.register(Box::new(metrics.pending_requests.clone()))
.unwrap();
metrics.set_pending("decode", 5);
let output = gather_pef(&registry);
let expected = "\
# HELP dynamo_frontend_router_queue_pending_requests Number of requests pending in the router scheduler queue
# TYPE dynamo_frontend_router_queue_pending_requests gauge
dynamo_frontend_router_queue_pending_requests{worker_type=\"decode\"} 5
";
assert_eq!(
output, expected,
"\nActual PEF:\n{output}\nExpected PEF:\n{expected}"
);
}
#[test] #[test]
fn test_routing_overhead_metric_names_pef() { fn test_routing_overhead_metric_names_pef() {
// Verify the overhead constants produce valid histogram names when // Verify the overhead constants produce valid histogram names when
......
...@@ -412,6 +412,7 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu ...@@ -412,6 +412,7 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
overlap_amount as usize * block_size, overlap_amount as usize * block_size,
); );
tracker.record_worker_full(instance_id, dp_rank, self.chooser.worker_type()); tracker.record_worker_full(instance_id, dp_rank, self.chooser.worker_type());
tracker.record_router_queue_depth(self.chooser.pending_count());
if let Some(hit_rate) = tracker.kv_hit_rate() { if let Some(hit_rate) = tracker.kv_hit_rate() {
request_metrics.kv_hit_rate.observe(hit_rate); request_metrics.kv_hit_rate.observe(hit_rate);
} }
......
...@@ -9,6 +9,7 @@ pub use dynamo_kv_router::selector::DefaultWorkerSelector; ...@@ -9,6 +9,7 @@ pub use dynamo_kv_router::selector::DefaultWorkerSelector;
use super::KvRouterConfig; use super::KvRouterConfig;
use super::RouterConfigOverride; use super::RouterConfigOverride;
use super::WorkerSelector; use super::WorkerSelector;
use super::metrics::ROUTER_QUEUE_METRICS;
use super::protocols::{OverlapScores, WorkerId}; use super::protocols::{OverlapScores, WorkerId};
use super::queue::SchedulerQueue; use super::queue::SchedulerQueue;
use super::sequence::{ use super::sequence::{
...@@ -127,9 +128,11 @@ impl KvScheduler { ...@@ -127,9 +128,11 @@ impl KvScheduler {
}; };
tracing::trace!("received request to be scheduled"); tracing::trace!("received request to be scheduled");
queue_clone.enqueue(request).await; queue_clone.enqueue(request).await;
ROUTER_QUEUE_METRICS.set_pending(worker_type, queue_clone.pending_count());
} }
_ = recheck_interval.tick() => { _ = recheck_interval.tick() => {
queue_clone.update().await; queue_clone.update().await;
ROUTER_QUEUE_METRICS.set_pending(worker_type, queue_clone.pending_count());
} }
} }
} }
...@@ -210,15 +213,22 @@ impl KvScheduler { ...@@ -210,15 +213,22 @@ impl KvScheduler {
.mark_prefill_completed(&request_id.to_string()) .mark_prefill_completed(&request_id.to_string())
.await?; .await?;
self.queue.update().await; self.queue.update().await;
ROUTER_QUEUE_METRICS.set_pending(self.worker_type(), self.queue.pending_count());
Ok(()) Ok(())
} }
pub async fn free(&self, request_id: &str) -> Result<(), SequenceError> { pub async fn free(&self, request_id: &str) -> Result<(), SequenceError> {
self.slots.free(&request_id.to_string()).await?; self.slots.free(&request_id.to_string()).await?;
self.queue.update().await; self.queue.update().await;
ROUTER_QUEUE_METRICS.set_pending(self.worker_type(), self.queue.pending_count());
Ok(()) Ok(())
} }
/// Number of requests currently parked in the scheduler queue.
pub fn pending_count(&self) -> usize {
self.queue.pending_count()
}
/// Get the worker type for this scheduler ("prefill" or "decode"). /// Get the worker type for this scheduler ("prefill" or "decode").
/// Used for Prometheus metric labeling. /// Used for Prometheus metric labeling.
pub fn worker_type(&self) -> &'static str { pub fn worker_type(&self) -> &'static str {
......
...@@ -160,6 +160,9 @@ pub struct RequestTracker { ...@@ -160,6 +160,9 @@ pub struct RequestTracker {
/// Number of detokenize samples accumulated for this request /// Number of detokenize samples accumulated for this request
detokenize_count: AtomicU64, detokenize_count: AtomicU64,
/// Router scheduler queue depth at routing time (how many requests were pending)
router_queue_depth: OnceLock<usize>,
} }
impl RequestTracker { impl RequestTracker {
...@@ -193,6 +196,7 @@ impl RequestTracker { ...@@ -193,6 +196,7 @@ impl RequestTracker {
tokenize_latency: OnceLock::new(), tokenize_latency: OnceLock::new(),
detokenize_total_ns: AtomicU64::new(0), detokenize_total_ns: AtomicU64::new(0),
detokenize_count: AtomicU64::new(0), detokenize_count: AtomicU64::new(0),
router_queue_depth: OnceLock::new(),
} }
} }
...@@ -382,6 +386,16 @@ impl RequestTracker { ...@@ -382,6 +386,16 @@ impl RequestTracker {
self.detokenize_count.load(Ordering::Relaxed) self.detokenize_count.load(Ordering::Relaxed)
} }
/// Record router scheduler queue depth at routing time.
pub fn record_router_queue_depth(&self, depth: usize) {
let _ = self.router_queue_depth.set(depth);
}
/// Get the router scheduler queue depth recorded at routing time.
pub fn router_queue_depth(&self) -> Option<usize> {
self.router_queue_depth.get().copied()
}
/// Get worker ID information if any worker IDs have been recorded. /// Get worker ID information if any worker IDs have been recorded.
pub fn get_worker_info(&self) -> Option<WorkerIdInfo> { pub fn get_worker_info(&self) -> Option<WorkerIdInfo> {
let prefill = self.prefill_worker_id(); let prefill = self.prefill_worker_id();
...@@ -486,6 +500,7 @@ impl RequestTracker { ...@@ -486,6 +500,7 @@ impl RequestTracker {
ttft_ms: self.ttft_ms(), ttft_ms: self.ttft_ms(),
total_time_ms: self.total_time_ms(), total_time_ms: self.total_time_ms(),
kv_hit_rate: self.kv_hit_rate(), kv_hit_rate: self.kv_hit_rate(),
router_queue_depth: self.router_queue_depth(),
} }
} }
} }
...@@ -524,6 +539,10 @@ pub struct TimingInfo { ...@@ -524,6 +539,10 @@ pub struct TimingInfo {
/// KV cache hit rate (0.0 to 1.0) - ratio of cached blocks to total input blocks /// KV cache hit rate (0.0 to 1.0) - ratio of cached blocks to total input blocks
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub kv_hit_rate: Option<f64>, pub kv_hit_rate: Option<f64>,
/// Number of requests pending in the router scheduler queue at routing time
#[serde(skip_serializing_if = "Option::is_none")]
pub router_queue_depth: Option<usize>,
} }
#[cfg(test)] #[cfg(test)]
...@@ -620,6 +639,22 @@ mod tests { ...@@ -620,6 +639,22 @@ mod tests {
); );
} }
#[test]
fn test_router_queue_depth() {
let tracker = RequestTracker::new();
assert!(tracker.router_queue_depth().is_none());
tracker.record_router_queue_depth(42);
assert_eq!(tracker.router_queue_depth(), Some(42));
// OnceLock: second write is ignored
tracker.record_router_queue_depth(99);
assert_eq!(tracker.router_queue_depth(), Some(42));
let timing = tracker.get_timing_info();
assert_eq!(timing.router_queue_depth, Some(42));
}
#[test] #[test]
fn test_observe_first_token_gauges_no_panic_without_worker() { fn test_observe_first_token_gauges_no_panic_without_worker() {
let tracker = RequestTracker::new(); let tracker = RequestTracker::new();
......
...@@ -229,6 +229,9 @@ pub mod frontend_service { ...@@ -229,6 +229,9 @@ pub mod frontend_service {
pub const WORKER_LAST_INTER_TOKEN_LATENCY_SECONDS: &str = pub const WORKER_LAST_INTER_TOKEN_LATENCY_SECONDS: &str =
"worker_last_inter_token_latency_seconds"; "worker_last_inter_token_latency_seconds";
/// Number of requests pending in the router's scheduler queue (gauge per worker_type)
pub const ROUTER_QUEUE_PENDING_REQUESTS: &str = "router_queue_pending_requests";
/// Label name for the type of migration /// Label name for the type of migration
pub const MIGRATION_TYPE_LABEL: &str = "migration_type"; pub const MIGRATION_TYPE_LABEL: &str = "migration_type";
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment