"tests/vscode:/vscode.git/clone" did not exist on "ddbb4f50ef8b69bb2e81a47905652f1946255cf7"
Unverified Commit 0c98d9a1 authored by Jie Hao's avatar Jie Hao Committed by GitHub
Browse files

feat: add staged frontend gauges (#8162)

parent 7f58231b
......@@ -47,6 +47,14 @@ class frontend_perf:
# Per-stage latency histogram (label: stage = preprocess|route|transport_roundtrip|postprocess)
STAGE_DURATION_SECONDS = "stage_duration_seconds"
# Per-stage inflight request gauge (labels: stage, phase)
# Tracks how many requests are currently in each pipeline stage.
# Phase values: "prefill", "decode", "aggregated" (for route/dispatch); empty for preprocess.
STAGE_REQUESTS = "stage_requests"
# Stage label values for STAGE_REQUESTS and STAGE_DURATION_SECONDS.
STAGE_PREPROCESS = "preprocess"
STAGE_ROUTE = "route"
STAGE_DISPATCH = "dispatch"
# Tokenization time in preprocessor
TOKENIZE_SECONDS = "tokenize_seconds"
# Template application time in preprocessor
......@@ -73,6 +81,9 @@ class frontend_service:
# Number of inflight/concurrent requests going to the engine (vLLM, SGLang, ...)
# Note: This is a gauge metric (current state) that can go up and down, so no _total suffix
INFLIGHT_REQUESTS = "inflight_requests"
# Number of requests currently being handled by the frontend, from HTTP handler
# entry to response completion. Clearer name for what inflight_requests measures.
ACTIVE_REQUESTS = "active_requests"
# Number of disconnected clients (gauge that can go up and down)
DISCONNECTED_CLIENTS = "disconnected_clients"
# Duration of LLM requests
......@@ -112,6 +123,11 @@ class frontend_service:
MODEL_MIGRATION_LIMIT = "model_migration_limit"
# Total number of request migrations due to worker unavailability
MODEL_MIGRATION_TOTAL = "model_migration_total"
# Total number of times migration was disabled because the sequence length
# exceeded the configured max_seq_len limit
MODEL_MIGRATION_MAX_SEQ_LEN_EXCEEDED_TOTAL = (
"model_migration_max_seq_len_exceeded_total"
)
# Total number of request cancellations
MODEL_CANCELLATION_TOTAL = "model_cancellation_total"
# Total number of requests rejected due to resource exhaustion
......
......@@ -251,7 +251,9 @@ struct MetricsHandlerState {
pub struct Metrics {
request_counter: IntCounterVec,
/// Deprecated: use `active_requests_gauge`. Kept for backwards compatibility until Phase 3.
inflight_gauge: IntGaugeVec,
active_requests_gauge: IntGaugeVec,
client_disconnect_gauge: prometheus::IntGauge,
http_queue_gauge: IntGaugeVec,
request_duration: HistogramVec,
......@@ -504,6 +506,15 @@ impl Metrics {
)
.unwrap();
let active_requests_gauge = IntGaugeVec::new(
Opts::new(
frontend_metric_name(frontend_service::ACTIVE_REQUESTS),
"Number of requests currently being handled by the frontend, from HTTP handler entry to response completion",
),
&["model"],
)
.unwrap();
let client_disconnect_gauge = prometheus::IntGauge::new(
frontend_metric_name(frontend_service::DISCONNECTED_CLIENTS),
"Number of disconnected clients",
......@@ -722,6 +733,7 @@ impl Metrics {
Metrics {
request_counter,
inflight_gauge,
active_requests_gauge,
client_disconnect_gauge,
http_queue_gauge,
request_duration,
......@@ -799,11 +811,13 @@ impl Metrics {
}
fn inc_inflight_gauge(&self, model: &str) {
self.inflight_gauge.with_label_values(&[model]).inc()
self.inflight_gauge.with_label_values(&[model]).inc();
self.active_requests_gauge.with_label_values(&[model]).inc();
}
fn dec_inflight_gauge(&self, model: &str) {
self.inflight_gauge.with_label_values(&[model]).dec()
self.inflight_gauge.with_label_values(&[model]).dec();
self.active_requests_gauge.with_label_values(&[model]).dec();
}
/// Increment the gauge for client disconnections
......@@ -827,6 +841,7 @@ impl Metrics {
pub fn register(&self, registry: &Registry) -> Result<(), prometheus::Error> {
registry.register(Box::new(self.request_counter.clone()))?;
registry.register(Box::new(self.inflight_gauge.clone()))?;
registry.register(Box::new(self.active_requests_gauge.clone()))?;
registry.register(Box::new(self.client_disconnect_gauge.clone()))?;
registry.register(Box::new(self.http_queue_gauge.clone()))?;
registry.register(Box::new(self.request_duration.clone()))?;
......@@ -2355,6 +2370,79 @@ mod tests {
assert_eq!(counter_value, 1);
}
#[test]
fn test_active_requests_tracks_inflight_guard() {
let metrics = Arc::new(Metrics::new());
let registry = prometheus::Registry::new();
metrics.register(&registry).unwrap();
let model = "test-model-active";
// Both gauges start at 0
assert_eq!(metrics.inflight_gauge.with_label_values(&[model]).get(), 0);
assert_eq!(
metrics
.active_requests_gauge
.with_label_values(&[model])
.get(),
0
);
{
let _guard = metrics.clone().create_inflight_guard(
model,
Endpoint::ChatCompletions,
true,
"req-1",
);
// Both gauges increment together
assert_eq!(metrics.inflight_gauge.with_label_values(&[model]).get(), 1);
assert_eq!(
metrics
.active_requests_gauge
.with_label_values(&[model])
.get(),
1
);
{
let _guard2 = metrics.clone().create_inflight_guard(
model,
Endpoint::ChatCompletions,
true,
"req-2",
);
assert_eq!(metrics.inflight_gauge.with_label_values(&[model]).get(), 2);
assert_eq!(
metrics
.active_requests_gauge
.with_label_values(&[model])
.get(),
2
);
}
// guard2 dropped
assert_eq!(metrics.inflight_gauge.with_label_values(&[model]).get(), 1);
assert_eq!(
metrics
.active_requests_gauge
.with_label_values(&[model])
.get(),
1
);
}
// guard dropped — both back to 0
assert_eq!(metrics.inflight_gauge.with_label_values(&[model]).get(), 0);
assert_eq!(
metrics
.active_requests_gauge
.with_label_values(&[model])
.get(),
0
);
}
#[test]
fn test_all_error_types_recorded_correctly() {
let metrics = Arc::new(Metrics::new());
......
......@@ -7,6 +7,7 @@ use anyhow::Result;
use dynamo_kv_router::protocols::{TokensWithHashes, WorkerWithDpRank};
use dynamo_runtime::{
dynamo_nvtx_range,
metrics::frontend_perf::{STAGE_DISPATCH, STAGE_ROUTE, StageGuard},
pipeline::{
AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, PushRouter, ResponseStream,
SingleIn, async_trait,
......@@ -83,6 +84,8 @@ struct RequestGuard {
freed: bool,
prefill_marked: bool,
first_token_recorded: bool,
first_response_received: bool,
dispatch_guard: Option<StageGuard>,
track_output_blocks: bool,
current_total_blocks: usize,
isl_tokens: usize,
......@@ -99,6 +102,12 @@ struct RequestGuard {
impl RequestGuard {
async fn on_item(&mut self, item: &Annotated<LLMEngineOutput>) {
// End dispatch stage on first response from backend (any item, not just tokens).
if !self.first_response_received {
self.first_response_received = true;
self.dispatch_guard.take();
}
if !self.prefill_marked {
let has_tokens = item
.data
......@@ -503,6 +512,8 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
.as_ref()
.map(|t| t.phase())
.unwrap_or(RequestPhase::Aggregated);
let phase_label = phase.to_string();
let route_guard = StageGuard::new(STAGE_ROUTE, &phase_label);
let block_size = self.chooser.block_size() as usize;
let selection = self
......@@ -603,7 +614,12 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
return Ok(ResponseStream::new(Box::pin(stream), stream_context));
}
// Route to worker
// End route stage — worker has been selected and routing metrics recorded.
// Dispatch stage starts immediately so there is no gap between stages.
drop(route_guard);
let stage_dispatch_guard = StageGuard::new(STAGE_DISPATCH, &phase_label);
// Dispatch to worker
let isl_tokens = request.token_ids.len();
let expected_output_tokens = request
.routing
......@@ -657,6 +673,8 @@ impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutpu
freed: false,
prefill_marked: false,
first_token_recorded: false,
first_response_received: false,
dispatch_guard: Some(stage_dispatch_guard),
track_output_blocks: scheduler_tracked && track_output_blocks,
current_total_blocks: isl_tokens.div_ceil(block_size),
isl_tokens,
......
......@@ -30,8 +30,8 @@ use std::time::{Duration, Instant};
use dynamo_runtime::dynamo_nvtx_range;
use dynamo_runtime::metrics::frontend_perf::{
DETOKENIZE_TOKEN_COUNT, DETOKENIZE_TOTAL_US, STAGE_DURATION_SECONDS, TEMPLATE_SECONDS,
TOKENIZE_SECONDS,
DETOKENIZE_TOKEN_COUNT, DETOKENIZE_TOTAL_US, STAGE_DURATION_SECONDS, STAGE_PREPROCESS,
StageGuard, TEMPLATE_SECONDS, TOKENIZE_SECONDS,
};
use std::borrow::Cow;
use std::{collections::HashMap, pin::Pin, sync::Arc};
......@@ -235,6 +235,7 @@ impl OpenAIPreprocessor {
request: &R,
tracker: Option<&RequestTracker>,
) -> Result<(PreprocessedRequest, HashMap<String, String>, bool)> {
let _stage_guard = StageGuard::new(STAGE_PREPROCESS, "");
let preprocess_start = Instant::now();
let mut builder = self.builder(request)?;
......@@ -267,7 +268,7 @@ impl OpenAIPreprocessor {
.with_context(|| "Failed to gather multimodal data")?;
STAGE_DURATION_SECONDS
.with_label_values(&["preprocess"])
.with_label_values(&[STAGE_PREPROCESS])
.observe(preprocess_start.elapsed().as_secs_f64());
Ok((builder.build()?, annotations, prompt_injected_reasoning))
......@@ -683,6 +684,7 @@ impl OpenAIPreprocessor {
&self,
request: &NvCreateEmbeddingRequest,
) -> Result<(PreprocessedEmbeddingRequest, HashMap<String, String>)> {
let _stage_guard = StageGuard::new(STAGE_PREPROCESS, "");
let mut annotations = HashMap::new();
let mut builder = PreprocessedEmbeddingRequest::builder();
......@@ -1462,6 +1464,8 @@ impl
dyn AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<BackendOutput>>, Error>,
>,
) -> Result<ManyOut<Annotated<NvCreateCompletionResponse>>, Error> {
let _stage_guard = StageGuard::new(STAGE_PREPROCESS, "");
// unpack the request
let (mut request, context) = request.into_parts();
......@@ -1519,6 +1523,9 @@ impl
.collect();
let annotations_stream = stream::iter(annotations);
// End preprocess stage before handing off to downstream (route/dispatch).
drop(_stage_guard);
// forward the common completion request to the next operator
let response_stream = next.generate(common_request).await?;
......
......@@ -5,15 +5,59 @@
//! Used by both runtime (route, transport_roundtrip) and llm (preprocess, postprocess, tokenize, template, detokenize).
use once_cell::sync::{Lazy, OnceCell};
use prometheus::{Counter, Histogram, HistogramOpts, HistogramVec, Opts, Registry};
use prometheus::{Counter, Histogram, HistogramOpts, HistogramVec, IntGaugeVec, Opts, Registry};
use super::prometheus_names::{frontend_perf, name_prefix};
use crate::MetricsRegistry;
pub use super::prometheus_names::frontend_perf::{STAGE_DISPATCH, STAGE_PREPROCESS, STAGE_ROUTE};
fn frontend_metric_name(suffix: &str) -> String {
format!("{}_{}", name_prefix::FRONTEND, suffix)
}
/// Per-stage inflight request count: preprocess, route, dispatch.
/// Labels: stage (pipeline stage), phase (prefill/decode/aggregated or empty for preprocess).
pub static STAGE_REQUESTS: Lazy<IntGaugeVec> = Lazy::new(|| {
IntGaugeVec::new(
Opts::new(
frontend_metric_name(frontend_perf::STAGE_REQUESTS),
"Number of requests currently in the given pipeline stage",
),
&["stage", "phase"],
)
.expect("failed to create dynamo_frontend_stage_requests gauge")
});
/// RAII guard that increments a per-stage gauge on creation and decrements on drop.
///
/// Used to track how many requests are in each frontend pipeline stage at any given time.
/// Create with [`StageGuard::new`] at stage entry; the gauge decrements automatically when
/// the guard is dropped (end of scope, explicit drop, or stream completion).
pub struct StageGuard {
gauge: prometheus::IntGauge,
}
impl StageGuard {
/// Increment the stage gauge and return a guard that decrements on drop.
///
/// * `stage` — pipeline stage name; use `frontend_perf::STAGE_{PREPROCESS,ROUTE,DISPATCH}`
/// constants from [`crate::metrics::prometheus_names`].
/// * `phase` — request phase; use [`RequestPhase::to_string`] output
/// (`"prefill"|"decode"|"aggregated"`), or `""` for stages without a phase.
pub fn new(stage: &str, phase: &str) -> Self {
let gauge = STAGE_REQUESTS.with_label_values(&[stage, phase]);
gauge.inc();
Self { gauge }
}
}
impl Drop for StageGuard {
fn drop(&mut self) {
self.gauge.dec();
}
}
/// Per-stage latency: preprocess, route, transport_roundtrip, postprocess.
pub static STAGE_DURATION_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
HistogramVec::new(
......@@ -87,6 +131,7 @@ static PROMETHEUS_REGISTERED: OnceCell<()> = OnceCell::new();
/// Register frontend perf metrics with the given registry. Idempotent.
pub fn ensure_frontend_perf_metrics_registered(registry: &MetricsRegistry) {
let _ = REGISTERED.get_or_init(|| {
registry.add_metric(Box::new(STAGE_REQUESTS.clone())).ok();
registry
.add_metric(Box::new(STAGE_DURATION_SECONDS.clone()))
.ok();
......@@ -109,6 +154,7 @@ pub fn ensure_frontend_perf_metrics_registered_prometheus(
if PROMETHEUS_REGISTERED.get().is_some() {
return Ok(());
}
registry.register(Box::new(STAGE_REQUESTS.clone()))?;
registry.register(Box::new(STAGE_DURATION_SECONDS.clone()))?;
registry.register(Box::new(TOKENIZE_SECONDS.clone()))?;
registry.register(Box::new(TEMPLATE_SECONDS.clone()))?;
......@@ -117,3 +163,48 @@ pub fn ensure_frontend_perf_metrics_registered_prometheus(
let _ = PROMETHEUS_REGISTERED.set(());
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_stage_guard_inc_dec() {
let gauge = STAGE_REQUESTS.with_label_values(&["test_stage", "test_phase"]);
assert_eq!(gauge.get(), 0);
{
let _guard = StageGuard::new("test_stage", "test_phase");
assert_eq!(gauge.get(), 1);
{
let _guard2 = StageGuard::new("test_stage", "test_phase");
assert_eq!(gauge.get(), 2);
}
// guard2 dropped
assert_eq!(gauge.get(), 1);
}
// guard dropped
assert_eq!(gauge.get(), 0);
}
#[test]
fn test_stage_guard_different_labels() {
let preprocess = STAGE_REQUESTS.with_label_values(&["preprocess_t", ""]);
let route_prefill = STAGE_REQUESTS.with_label_values(&["route_t", "prefill"]);
let route_decode = STAGE_REQUESTS.with_label_values(&["route_t", "decode"]);
let _g1 = StageGuard::new("preprocess_t", "");
let _g2 = StageGuard::new("route_t", "prefill");
let _g3 = StageGuard::new("route_t", "decode");
assert_eq!(preprocess.get(), 1);
assert_eq!(route_prefill.get(), 1);
assert_eq!(route_decode.get(), 1);
drop(_g2);
assert_eq!(preprocess.get(), 1);
assert_eq!(route_prefill.get(), 0);
assert_eq!(route_decode.get(), 1);
}
}
......@@ -177,6 +177,10 @@ pub mod frontend_service {
/// Note: This is a gauge metric (current state) that can go up and down, so no _total suffix
pub const INFLIGHT_REQUESTS: &str = "inflight_requests";
/// Number of requests currently being handled by the frontend, from HTTP handler
/// entry to response completion. Clearer name for what inflight_requests measures.
pub const ACTIVE_REQUESTS: &str = "active_requests";
/// Number of disconnected clients (gauge that can go up and down)
pub const DISCONNECTED_CLIENTS: &str = "disconnected_clients";
......@@ -542,6 +546,15 @@ pub mod router {
pub mod frontend_perf {
/// Per-stage latency histogram (label: stage = preprocess|route|transport_roundtrip|postprocess)
pub const STAGE_DURATION_SECONDS: &str = "stage_duration_seconds";
/// Per-stage inflight request gauge (labels: stage, phase)
/// Tracks how many requests are currently in each pipeline stage.
/// Phase values: "prefill", "decode", "aggregated" (for route/dispatch); empty for preprocess.
pub const STAGE_REQUESTS: &str = "stage_requests";
/// Stage label values for STAGE_REQUESTS and STAGE_DURATION_SECONDS.
pub const STAGE_PREPROCESS: &str = "preprocess";
pub const STAGE_ROUTE: &str = "route";
pub const STAGE_DISPATCH: &str = "dispatch";
/// Tokenization time in preprocessor
pub const TOKENIZE_SECONDS: &str = "tokenize_seconds";
/// Template application time in preprocessor
......
......@@ -9,7 +9,7 @@ use crate::{
},
dynamo_nvtx_range,
engine::{AsyncEngine, AsyncEngineContext, Data},
metrics::frontend_perf::STAGE_DURATION_SECONDS,
metrics::frontend_perf::{STAGE_DURATION_SECONDS, STAGE_ROUTE},
pipeline::{
AddressedPushRouter, AddressedRequest, Error, ManyOut, SingleIn,
error::{PipelineError, PipelineErrorExt},
......@@ -771,7 +771,7 @@ where
let request = request.map(|req| AddressedRequest::new(req, address));
STAGE_DURATION_SECONDS
.with_label_values(&["route"])
.with_label_values(&[STAGE_ROUTE])
.observe(route_start.elapsed().as_secs_f64());
let _nvtx_transport = dynamo_nvtx_range!(_transport_kind);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment