Unverified Commit 3205e7db authored by Jacky's avatar Jacky Committed by GitHub
Browse files

feat: Request Rejection Frontend metrics (#7644)


Signed-off-by: default avatarJacky <18255193+kthui@users.noreply.github.com>
parent 91700375
...@@ -110,6 +110,10 @@ class frontend_service: ...@@ -110,6 +110,10 @@ class frontend_service:
MODEL_MIGRATION_LIMIT = "model_migration_limit" MODEL_MIGRATION_LIMIT = "model_migration_limit"
# Total number of request migrations due to worker unavailability # Total number of request migrations due to worker unavailability
MODEL_MIGRATION_TOTAL = "model_migration_total" MODEL_MIGRATION_TOTAL = "model_migration_total"
# Total number of request cancellations
MODEL_CANCELLATION_TOTAL = "model_cancellation_total"
# Total number of requests rejected due to resource exhaustion
MODEL_REJECTION_TOTAL = "model_rejection_total"
# Active decode blocks (KV cache blocks) per worker # Active decode blocks (KV cache blocks) per worker
# Gauge metric tracking current KV cache block utilization for each worker # Gauge metric tracking current KV cache block utilization for each worker
WORKER_ACTIVE_DECODE_BLOCKS = "worker_active_decode_blocks" WORKER_ACTIVE_DECODE_BLOCKS = "worker_active_decode_blocks"
...@@ -239,25 +243,29 @@ class model_info: ...@@ -239,25 +243,29 @@ class model_info:
class name_prefix: class name_prefix:
"""Metric name prefixes used across the metrics system""" """Metric name prefixes used across the metrics system."""
# Prefix for all Prometheus metric names. # Prefix for component-scoped metrics, auto-labeled with namespace/endpoint.
COMPONENT = "dynamo_component" COMPONENT = "dynamo_component"
# Prefix for frontend service metrics # Prefix for frontend HTTP service metrics (requests, TTFT, ITL, disconnects).
FRONTEND = "dynamo_frontend" FRONTEND = "dynamo_frontend"
# Prefix for KV router metrics (used with router_id label) # Prefix for KV router instance metrics (carries `router_id` label).
ROUTER = "dynamo_router" ROUTER = "dynamo_router"
# Prefix for request-plane (transport-agnostic) metrics at AddressedPushRouter
REQUEST_PLANE = "dynamo_request_plane"
# Prefix for tokio runtime metrics
TOKIO = "dynamo_tokio"
# Prefix for standalone KV indexer metrics # Prefix for standalone KV indexer metrics
KVINDEXER = "dynamo_kvindexer" KVINDEXER = "dynamo_kvindexer"
# Prefix for transport-layer metrics (TCP / NATS) # Prefix for request-plane metrics at AddressedPushRouter.
# Transport-agnostic: measures request lifecycle latency and concurrency
# (queue → send → roundtrip TTFT, inflight gauge).
REQUEST_PLANE = "dynamo_request_plane"
# Prefix for transport-layer metrics (TCP / NATS).
# Protocol-specific: measures wire-level health (bytes sent/received, error counts).
TRANSPORT = "dynamo_transport" TRANSPORT = "dynamo_transport"
# Prefix for work-handler transport breakdown metrics (backend side) # Prefix for work-handler transport breakdown metrics (backend side)
WORK_HANDLER = "dynamo_work_handler" WORK_HANDLER = "dynamo_work_handler"
# Prefix for routing overhead metrics (raw Prometheus, not component-scoped) # Prefix for tokio runtime metrics (poll times, queue depths, stalls).
TOKIO = "dynamo_tokio"
# Prefix for per-phase routing overhead latency (hashing, scheduling).
# Raw Prometheus, not component-scoped.
ROUTING_OVERHEAD = "dynamo_routing_overhead" ROUTING_OVERHEAD = "dynamo_routing_overhead"
...@@ -401,6 +409,8 @@ class work_handler: ...@@ -401,6 +409,8 @@ class work_handler:
REQUEST_DURATION_SECONDS = "request_duration_seconds" REQUEST_DURATION_SECONDS = "request_duration_seconds"
# Total number of errors in work handler processing # Total number of errors in work handler processing
ERRORS_TOTAL = "errors_total" ERRORS_TOTAL = "errors_total"
# Total number of requests cancelled by work handler (client stop/kill or disconnect)
CANCELLATION_TOTAL = "cancellation_total"
# Network transit: frontend send to backend receive (wall-clock, cross-process) # Network transit: frontend send to backend receive (wall-clock, cross-process)
NETWORK_TRANSIT_SECONDS = "network_transit_seconds" NETWORK_TRANSIT_SECONDS = "network_transit_seconds"
# Backend processing: handle_payload entry to first response sent # Backend processing: handle_payload entry to first response sent
......
...@@ -55,8 +55,9 @@ pub async fn completion_response_stream( ...@@ -55,8 +55,9 @@ pub async fn completion_response_stream(
// [WIP] from request id. // [WIP] from request id.
let request_id = get_or_create_request_id(request.inner.user.as_deref()); let request_id = get_or_create_request_id(request.inner.user.as_deref());
let streaming = request.inner.stream.unwrap_or(false); let streaming = request.inner.stream.unwrap_or(false);
let model_name = request.inner.model.clone();
let cancellation_labels = CancellationLabels { let cancellation_labels = CancellationLabels {
model: request.inner.model.clone(), model: model_name.clone(),
endpoint: "grpc_completions".to_string(), endpoint: "grpc_completions".to_string(),
request_type: if streaming { "stream" } else { "unary" }.to_string(), request_type: if streaming { "stream" } else { "unary" }.to_string(),
}; };
...@@ -101,10 +102,16 @@ pub async fn completion_response_stream( ...@@ -101,10 +102,16 @@ pub async fn completion_response_stream(
let annotations = request.annotations(); let annotations = request.annotations();
// issue the generate call on the engine // issue the generate call on the engine
let stream = engine let stream = engine.generate(request).await.map_err(|e| {
.generate(request) if crate::http::service::metrics::request_was_rejected(e.as_ref()) {
.await state.metrics_clone().inc_rejection(
.map_err(|e| Status::internal(format!("Failed to generate completions: {}", e)))?; &model_name,
crate::http::service::metrics::Endpoint::Completions,
);
return Status::resource_exhausted(e.to_string());
}
Status::internal(format!("Failed to generate completions: {}", e))
})?;
// capture the context to cancel the stream if the client disconnects // capture the context to cancel the stream if the client disconnects
let ctx = stream.context(); let ctx = stream.context();
......
...@@ -60,8 +60,9 @@ pub async fn tensor_response_stream( ...@@ -60,8 +60,9 @@ pub async fn tensor_response_stream(
) -> Result<impl Stream<Item = Annotated<NvCreateTensorResponse>>, Status> { ) -> Result<impl Stream<Item = Annotated<NvCreateTensorResponse>>, Status> {
// create the context for the request // create the context for the request
let request_id = get_or_create_request_id(request.id.as_deref()); let request_id = get_or_create_request_id(request.id.as_deref());
let model_name = request.model.clone();
let cancellation_labels = CancellationLabels { let cancellation_labels = CancellationLabels {
model: request.model.clone(), model: model_name.clone(),
endpoint: Endpoint::Tensor.to_string(), endpoint: Endpoint::Tensor.to_string(),
request_type: if streaming { "stream" } else { "unary" }.to_string(), request_type: if streaming { "stream" } else { "unary" }.to_string(),
}; };
...@@ -103,6 +104,12 @@ pub async fn tensor_response_stream( ...@@ -103,6 +104,12 @@ pub async fn tensor_response_stream(
// issue the generate call on the engine // issue the generate call on the engine
let stream = engine.generate(request).await.map_err(|e| { let stream = engine.generate(request).await.map_err(|e| {
if crate::http::service::metrics::request_was_rejected(e.as_ref()) {
state
.metrics_clone()
.inc_rejection(&model_name, crate::http::service::metrics::Endpoint::Tensor);
return Status::resource_exhausted(e.to_string());
}
Status::internal(format!("Failed to generate tensor response stream: {}", e)) Status::internal(format!("Failed to generate tensor response stream: {}", e))
})?; })?;
......
...@@ -284,6 +284,11 @@ async fn anthropic_messages( ...@@ -284,6 +284,11 @@ async fn anthropic_messages(
tracing::trace!("Issuing generate call for Anthropic messages"); tracing::trace!("Issuing generate call for Anthropic messages");
let engine_stream = engine.generate(request).await.map_err(|e| { let engine_stream = engine.generate(request).await.map_err(|e| {
if super::metrics::request_was_rejected(e.as_ref()) {
state
.metrics_clone()
.inc_rejection(&model, super::metrics::Endpoint::AnthropicMessages);
}
anthropic_error( anthropic_error(
StatusCode::INTERNAL_SERVER_ERROR, StatusCode::INTERNAL_SERVER_ERROR,
"api_error", "api_error",
......
...@@ -27,6 +27,15 @@ use crate::local_model::runtime_config::ModelRuntimeConfig; ...@@ -27,6 +27,15 @@ use crate::local_model::runtime_config::ModelRuntimeConfig;
use crate::model_card::ModelDeploymentCard; use crate::model_card::ModelDeploymentCard;
use dynamo_runtime::metrics::prometheus_names::clamp_u64_to_i64; use dynamo_runtime::metrics::prometheus_names::clamp_u64_to_i64;
use dynamo_runtime::error::ErrorType as DynamoErrorType;
/// Check whether an error chain indicates the request was rejected.
pub fn request_was_rejected(err: &(dyn std::error::Error + 'static)) -> bool {
const REJECTION: &[DynamoErrorType] = &[DynamoErrorType::ResourceExhausted];
const NON_REJECTION: &[DynamoErrorType] = &[];
dynamo_runtime::error::match_error_chain(err, REJECTION, NON_REJECTION)
}
pub use prometheus::Registry; pub use prometheus::Registry;
use super::RouteDoc; use super::RouteDoc;
...@@ -257,6 +266,7 @@ pub struct Metrics { ...@@ -257,6 +266,7 @@ pub struct Metrics {
model_migration_limit: IntGaugeVec, model_migration_limit: IntGaugeVec,
model_migration_total: IntCounterVec, model_migration_total: IntCounterVec,
model_cancellation_total: IntCounterVec, model_cancellation_total: IntCounterVec,
model_rejection_total: IntCounterVec,
} }
// Inflight tracks requests from HTTP handler start until complete response is finished. // Inflight tracks requests from HTTP handler start until complete response is finished.
...@@ -679,6 +689,15 @@ impl Metrics { ...@@ -679,6 +689,15 @@ impl Metrics {
) )
.unwrap(); .unwrap();
let model_rejection_total = IntCounterVec::new(
Opts::new(
frontend_metric_name(frontend_service::MODEL_REJECTION_TOTAL),
"Total number of requests rejected due to resource exhaustion",
),
&["model", "endpoint"],
)
.unwrap();
Metrics { Metrics {
request_counter, request_counter,
inflight_gauge, inflight_gauge,
...@@ -700,6 +719,7 @@ impl Metrics { ...@@ -700,6 +719,7 @@ impl Metrics {
model_migration_limit, model_migration_limit,
model_migration_total, model_migration_total,
model_cancellation_total, model_cancellation_total,
model_rejection_total,
} }
} }
...@@ -805,6 +825,7 @@ impl Metrics { ...@@ -805,6 +825,7 @@ impl Metrics {
registry.register(Box::new(self.model_migration_limit.clone()))?; registry.register(Box::new(self.model_migration_limit.clone()))?;
registry.register(Box::new(self.model_migration_total.clone()))?; registry.register(Box::new(self.model_migration_total.clone()))?;
registry.register(Box::new(self.model_cancellation_total.clone()))?; registry.register(Box::new(self.model_cancellation_total.clone()))?;
registry.register(Box::new(self.model_rejection_total.clone()))?;
Ok(()) Ok(())
} }
...@@ -902,6 +923,20 @@ impl Metrics { ...@@ -902,6 +923,20 @@ impl Metrics {
.get() .get()
} }
/// Increment the rejection counter for a request rejected due to resource exhaustion
pub fn inc_rejection(&self, model: &str, endpoint: Endpoint) {
self.model_rejection_total
.with_label_values(&[model, &endpoint.to_string()])
.inc();
}
/// Get the current rejection count for a model and endpoint
pub fn get_rejection_count(&self, model: &str, endpoint: Endpoint) -> u64 {
self.model_rejection_total
.with_label_values(&[model, &endpoint.to_string()])
.get()
}
/// Create a new [`InflightGuard`] for the given model and annotate if its a streaming request, /// Create a new [`InflightGuard`] for the given model and annotate if its a streaming request,
/// and the kind of endpoint that was hit /// and the kind of endpoint that was hit
/// ///
......
...@@ -194,18 +194,12 @@ impl ErrorMessage { ...@@ -194,18 +194,12 @@ impl ErrorMessage {
/// If successful, it will return the [`HttpError`] as an [`ErrorMessage::internal_server_error`] /// If successful, it will return the [`HttpError`] as an [`ErrorMessage::internal_server_error`]
/// with the details of the error. /// with the details of the error.
pub fn from_anyhow(err: anyhow::Error, alt_msg: &str) -> ErrorResponse { pub fn from_anyhow(err: anyhow::Error, alt_msg: &str) -> ErrorResponse {
// First check for PipelineError::ServiceOverloaded // Check for ResourceExhausted anywhere in the error chain → HTTP 503
if let Some(pipeline_err) = if super::metrics::request_was_rejected(err.as_ref()) {
err.downcast_ref::<dynamo_runtime::pipeline::error::PipelineError>()
&& matches!(
pipeline_err,
dynamo_runtime::pipeline::error::PipelineError::ServiceOverloaded(_)
)
{
return ( return (
StatusCode::SERVICE_UNAVAILABLE, StatusCode::SERVICE_UNAVAILABLE,
Json(ErrorMessage { Json(ErrorMessage {
message: pipeline_err.to_string(), message: err.to_string(),
error_type: map_error_code_to_error_type(StatusCode::SERVICE_UNAVAILABLE), error_type: map_error_code_to_error_type(StatusCode::SERVICE_UNAVAILABLE),
code: StatusCode::SERVICE_UNAVAILABLE.as_u16(), code: StatusCode::SERVICE_UNAVAILABLE.as_u16(),
}), }),
...@@ -470,6 +464,11 @@ async fn completions_single( ...@@ -470,6 +464,11 @@ async fn completions_single(
// issue the generate call on the engine // issue the generate call on the engine
let stream = engine.generate(request).await.map_err(|e| { let stream = engine.generate(request).await.map_err(|e| {
if super::metrics::request_was_rejected(e.as_ref()) {
state
.metrics_clone()
.inc_rejection(&model, super::metrics::Endpoint::Completions);
}
let err_response = ErrorMessage::from_anyhow(e, "Failed to generate completions"); let err_response = ErrorMessage::from_anyhow(e, "Failed to generate completions");
inflight_guard.mark_error(extract_error_type_from_response(&err_response)); inflight_guard.mark_error(extract_error_type_from_response(&err_response));
err_response err_response
...@@ -621,6 +620,11 @@ async fn completions_batch( ...@@ -621,6 +620,11 @@ async fn completions_batch(
// Generate stream for this prompt // Generate stream for this prompt
let stream = engine.generate(single_request_context).await.map_err(|e| { let stream = engine.generate(single_request_context).await.map_err(|e| {
if super::metrics::request_was_rejected(e.as_ref()) {
state
.metrics_clone()
.inc_rejection(&model, super::metrics::Endpoint::Completions);
}
let err_response = ErrorMessage::from_anyhow(e, "Failed to generate completions"); let err_response = ErrorMessage::from_anyhow(e, "Failed to generate completions");
inflight_guard.mark_error(extract_error_type_from_response(&err_response)); inflight_guard.mark_error(extract_error_type_from_response(&err_response));
err_response err_response
...@@ -775,9 +779,15 @@ async fn embeddings( ...@@ -775,9 +779,15 @@ async fn embeddings(
})?; })?;
let mut response_collector = state.metrics_clone().create_response_collector(model); let mut response_collector = state.metrics_clone().create_response_collector(model);
let model_name = model.to_string();
// issue the generate call on the engine // issue the generate call on the engine
let stream = engine.generate(request).await.map_err(|e| { let stream = engine.generate(request).await.map_err(|e| {
if super::metrics::request_was_rejected(e.as_ref()) {
state
.metrics_clone()
.inc_rejection(&model_name, super::metrics::Endpoint::Embeddings);
}
let err_response = ErrorMessage::from_anyhow(e, "Failed to generate embeddings"); let err_response = ErrorMessage::from_anyhow(e, "Failed to generate embeddings");
inflight.mark_error(extract_error_type_from_response(&err_response)); inflight.mark_error(extract_error_type_from_response(&err_response));
err_response err_response
...@@ -1184,6 +1194,11 @@ async fn chat_completions( ...@@ -1184,6 +1194,11 @@ async fn chat_completions(
// issue the generate call on the engine // issue the generate call on the engine
let stream = engine.generate(request).await.map_err(|e| { let stream = engine.generate(request).await.map_err(|e| {
if super::metrics::request_was_rejected(e.as_ref()) {
state
.metrics_clone()
.inc_rejection(&model, super::metrics::Endpoint::ChatCompletions);
}
let err_response = ErrorMessage::from_anyhow(e, "Failed to generate completions"); let err_response = ErrorMessage::from_anyhow(e, "Failed to generate completions");
inflight_guard.mark_error(extract_error_type_from_response(&err_response)); inflight_guard.mark_error(extract_error_type_from_response(&err_response));
err_response err_response
...@@ -1590,6 +1605,11 @@ async fn responses( ...@@ -1590,6 +1605,11 @@ async fn responses(
// issue the generate call on the engine // issue the generate call on the engine
let engine_stream = engine.generate(request).await.map_err(|e| { let engine_stream = engine.generate(request).await.map_err(|e| {
if super::metrics::request_was_rejected(e.as_ref()) {
state
.metrics_clone()
.inc_rejection(&model, super::metrics::Endpoint::Responses);
}
let err_response = ErrorMessage::from_anyhow(e, "Failed to generate completions"); let err_response = ErrorMessage::from_anyhow(e, "Failed to generate completions");
inflight_guard.mark_error(extract_error_type_from_response(&err_response)); inflight_guard.mark_error(extract_error_type_from_response(&err_response));
err_response err_response
...@@ -1972,10 +1992,14 @@ async fn images( ...@@ -1972,10 +1992,14 @@ async fn images(
// Note: This uses ServerStreamingEngine for internal routing/distribution, // Note: This uses ServerStreamingEngine for internal routing/distribution,
// NOT for client-facing SSE streaming. The stream is immediately folded into // NOT for client-facing SSE streaming. The stream is immediately folded into
// a single response below. // a single response below.
let stream = engine let stream = engine.generate(request).await.map_err(|e| {
.generate(request) if super::metrics::request_was_rejected(e.as_ref()) {
.await state
.map_err(|e| ErrorMessage::from_anyhow(e, "Failed to generate images"))?; .metrics_clone()
.inc_rejection(&model, super::metrics::Endpoint::Images);
}
ErrorMessage::from_anyhow(e, "Failed to generate images")
})?;
// Process stream to collect metrics and drop http_queue_guard on first response // Process stream to collect metrics and drop http_queue_guard on first response
let mut http_queue_guard = Some(http_queue_guard); let mut http_queue_guard = Some(http_queue_guard);
...@@ -2055,10 +2079,14 @@ async fn videos( ...@@ -2055,10 +2079,14 @@ async fn videos(
let mut response_collector = state.metrics_clone().create_response_collector(&model); let mut response_collector = state.metrics_clone().create_response_collector(&model);
// issue the generate call on the engine // issue the generate call on the engine
let stream = engine let stream = engine.generate(request).await.map_err(|e| {
.generate(request) if super::metrics::request_was_rejected(e.as_ref()) {
.await state
.map_err(|e| ErrorMessage::from_anyhow(e, "Failed to generate videos"))?; .metrics_clone()
.inc_rejection(&model, super::metrics::Endpoint::Videos);
}
ErrorMessage::from_anyhow(e, "Failed to generate videos")
})?;
// Process stream to collect metrics and drop http_queue_guard on first token // Process stream to collect metrics and drop http_queue_guard on first token
let mut http_queue_guard = Some(http_queue_guard); let mut http_queue_guard = Some(http_queue_guard);
...@@ -2116,10 +2144,14 @@ async fn video_stream( ...@@ -2116,10 +2144,14 @@ async fn video_stream(
let mut response_collector = state.metrics_clone().create_response_collector(&model); let mut response_collector = state.metrics_clone().create_response_collector(&model);
let stream = engine let stream = engine.generate(request).await.map_err(|e| {
.generate(request) if super::metrics::request_was_rejected(e.as_ref()) {
.await state
.map_err(|e| ErrorMessage::from_anyhow(e, "Failed to start video stream"))?; .metrics_clone()
.inc_rejection(&model, super::metrics::Endpoint::Videos);
}
ErrorMessage::from_anyhow(e, "Failed to start video stream")
})?;
// Capture the context to cancel the stream if the client disconnects. // Capture the context to cancel the stream if the client disconnects.
let ctx = stream.context(); let ctx = stream.context();
...@@ -2435,18 +2467,24 @@ mod tests { ...@@ -2435,18 +2467,24 @@ mod tests {
} }
#[test] #[test]
fn test_service_overloaded_error_response_from_anyhow() { fn test_resource_exhausted_error_response_from_anyhow() {
use dynamo_runtime::error::{DynamoError, ErrorType};
use dynamo_runtime::pipeline::error::PipelineError; use dynamo_runtime::pipeline::error::PipelineError;
let err: anyhow::Error = PipelineError::ServiceOverloaded( let cause = PipelineError::ServiceOverloaded(
"All workers are busy, please retry later".to_string(), "All workers are busy, please retry later".to_string(),
) );
let err: anyhow::Error = DynamoError::builder()
.error_type(ErrorType::ResourceExhausted)
.message("All workers are busy, please retry later")
.cause(cause)
.build()
.into(); .into();
let response = ErrorMessage::from_anyhow(err, BACKUP_ERROR_MESSAGE); let response = ErrorMessage::from_anyhow(err, BACKUP_ERROR_MESSAGE);
assert_eq!(response.0, StatusCode::SERVICE_UNAVAILABLE); assert_eq!(response.0, StatusCode::SERVICE_UNAVAILABLE);
assert_eq!( assert_eq!(
response.1.message, response.1.message,
"Service temporarily unavailable: All workers are busy, please retry later" "ResourceExhausted: All workers are busy, please retry later"
); );
} }
......
...@@ -27,7 +27,7 @@ fn is_migratable(err: &(dyn StdError + 'static)) -> bool { ...@@ -27,7 +27,7 @@ fn is_migratable(err: &(dyn StdError + 'static)) -> bool {
ErrorType::ConnectionTimeout, ErrorType::ConnectionTimeout,
ErrorType::Backend(BackendError::EngineShutdown), ErrorType::Backend(BackendError::EngineShutdown),
]; ];
const NON_MIGRATABLE: &[ErrorType] = &[ErrorType::Cancelled]; const NON_MIGRATABLE: &[ErrorType] = &[ErrorType::Cancelled, ErrorType::ResourceExhausted];
error::match_error_chain(err, MIGRATABLE, NON_MIGRATABLE) error::match_error_chain(err, MIGRATABLE, NON_MIGRATABLE)
} }
......
...@@ -53,6 +53,8 @@ pub enum ErrorType { ...@@ -53,6 +53,8 @@ pub enum ErrorType {
ConnectionTimeout, ConnectionTimeout,
/// The request was cancelled (e.g., client disconnected). /// The request was cancelled (e.g., client disconnected).
Cancelled, Cancelled,
/// The system does not have enough resources to handle the request.
ResourceExhausted,
/// Error originating from a backend engine. /// Error originating from a backend engine.
Backend(BackendError), Backend(BackendError),
} }
...@@ -66,6 +68,7 @@ impl fmt::Display for ErrorType { ...@@ -66,6 +68,7 @@ impl fmt::Display for ErrorType {
ErrorType::Disconnected => write!(f, "Disconnected"), ErrorType::Disconnected => write!(f, "Disconnected"),
ErrorType::ConnectionTimeout => write!(f, "ConnectionTimeout"), ErrorType::ConnectionTimeout => write!(f, "ConnectionTimeout"),
ErrorType::Cancelled => write!(f, "Cancelled"), ErrorType::Cancelled => write!(f, "Cancelled"),
ErrorType::ResourceExhausted => write!(f, "ResourceExhausted"),
ErrorType::Backend(sub) => write!(f, "Backend{sub}"), ErrorType::Backend(sub) => write!(f, "Backend{sub}"),
} }
} }
......
...@@ -235,6 +235,9 @@ pub mod frontend_service { ...@@ -235,6 +235,9 @@ pub mod frontend_service {
/// Total number of request cancellations /// Total number of request cancellations
pub const MODEL_CANCELLATION_TOTAL: &str = "model_cancellation_total"; pub const MODEL_CANCELLATION_TOTAL: &str = "model_cancellation_total";
/// Total number of requests rejected due to resource exhaustion
pub const MODEL_REJECTION_TOTAL: &str = "model_rejection_total";
/// Active decode blocks (KV cache blocks) per worker /// Active decode blocks (KV cache blocks) per worker
/// Gauge metric tracking current KV cache block utilization for each worker /// Gauge metric tracking current KV cache block utilization for each worker
pub const WORKER_ACTIVE_DECODE_BLOCKS: &str = "worker_active_decode_blocks"; pub const WORKER_ACTIVE_DECODE_BLOCKS: &str = "worker_active_decode_blocks";
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use super::{AsyncEngineContextProvider, ResponseStream}; use super::{AsyncEngineContextProvider, ResponseStream};
use crate::error::{BackendError, ErrorType, match_error_chain}; use crate::error::{BackendError, DynamoError, ErrorType, match_error_chain};
/// Check if an error chain indicates the worker should be reported as down. /// Check if an error chain indicates the worker should be reported as down.
fn is_inhibited(err: &(dyn std::error::Error + 'static)) -> bool { fn is_inhibited(err: &(dyn std::error::Error + 'static)) -> bool {
...@@ -518,9 +518,14 @@ where ...@@ -518,9 +518,14 @@ where
total_workers = all_instances.len(), total_workers = all_instances.len(),
"Rejecting request: all workers are busy" "Rejecting request: all workers are busy"
); );
return Err(PipelineError::ServiceOverloaded( let cause = PipelineError::ServiceOverloaded(
"All workers are busy, please retry later".to_string(), "All workers are busy, please retry later".to_string(),
) );
return Err(DynamoError::builder()
.error_type(ErrorType::ResourceExhausted)
.message("All workers are busy, please retry later")
.cause(cause)
.build()
.into()); .into());
} }
} }
......
...@@ -11,8 +11,10 @@ from typing import TYPE_CHECKING, Any, Optional ...@@ -11,8 +11,10 @@ from typing import TYPE_CHECKING, Any, Optional
import aiohttp import aiohttp
import nats import nats
import requests
from dynamo.llm import AicPerfConfig, KvRouter, KvRouterConfig from dynamo.llm import AicPerfConfig, KvRouter, KvRouterConfig
from dynamo.prometheus_names import frontend_service, name_prefix
from tests.router.helper import ( from tests.router.helper import (
_nats_server, _nats_server,
assert_event_dumps_equal, assert_event_dumps_equal,
...@@ -604,6 +606,66 @@ def _test_router_query_instance_id( ...@@ -604,6 +606,66 @@ def _test_router_query_instance_id(
logger.info(f"Token count: {result['token_count']}") logger.info(f"Token count: {result['token_count']}")
def _parse_frontend_rejection_metric(
metrics_text: str, model_name: str, endpoint: str
) -> int:
"""Parse frontend model_rejection_total from Prometheus metrics text.
Args:
metrics_text: Raw Prometheus metrics text
model_name: The model name label value
endpoint: The endpoint label value (e.g. "chat_completions")
Returns:
The metric count, or 0 if not found
"""
metric_name = f"{name_prefix.FRONTEND}_{frontend_service.MODEL_REJECTION_TOTAL}"
for line in metrics_text.splitlines():
if not line.startswith(f"{metric_name}{{"):
continue
if f'model="{model_name}"' in line and f'endpoint="{endpoint}"' in line:
parts = line.rsplit(None, 1)
if len(parts) == 2:
try:
return int(float(parts[1]))
except ValueError:
pass
return 0
def _verify_frontend_rejection_metrics(
frontend_port: int,
model_name: str,
endpoint: str,
expected_count: int,
) -> None:
"""Verify frontend rejection metrics by scraping the /metrics endpoint.
Args:
frontend_port: Port where the frontend /metrics is served
model_name: The model name label value
endpoint: The endpoint label value (e.g. "chat_completions")
expected_count: Expected rejection count to match exactly
"""
metrics_url = f"http://localhost:{frontend_port}/metrics"
try:
metrics_response = requests.get(metrics_url, timeout=5)
metrics_response.raise_for_status()
except requests.RequestException as e:
raise AssertionError(
f"Failed to fetch frontend metrics from {metrics_url}: {e}"
) from e
metric_count = _parse_frontend_rejection_metric(
metrics_response.text, model_name, endpoint
)
logger.info(f"Frontend rejection metric: model_rejection_total={metric_count}")
assert metric_count == expected_count, (
f"Frontend model_rejection_total ({metric_count}) does not match "
f"expected count ({expected_count})"
)
def _test_router_overload_503( def _test_router_overload_503(
engine_workers, engine_workers,
block_size: int, block_size: int,
...@@ -612,11 +674,16 @@ def _test_router_overload_503( ...@@ -612,11 +674,16 @@ def _test_router_overload_503(
test_payload: dict, test_payload: dict,
blocks_threshold: float = 0.2, blocks_threshold: float = 0.2,
): ):
"""Test that KV router returns 503 when all workers are busy. """Test that 503 is returned when all workers are busy, and verify rejection metrics.
Assumes engine_workers are already initialized. This function manages router lifecycle. Assumes engine_workers are already initialized. This function manages router lifecycle.
Uses limited resources to intentionally trigger the overload condition. Uses limited resources to intentionally trigger the overload condition.
Sends staggered requests (0.1s apart) to exhaust worker resources, then verifies:
1. At least one request succeeds (routed before busy state propagates)
2. At least one request is rejected with 503 (worker busy)
3. The frontend model_rejection_total metric matches the observed 503 count
Args: Args:
engine_workers: Backend workers (mocker/vllm) already initialized with __enter__() engine_workers: Backend workers (mocker/vllm) already initialized with __enter__()
block_size: Block size for KV cache (should be small to exhaust quickly, e.g. 4) block_size: Block size for KV cache (should be small to exhaust quickly, e.g. 4)
...@@ -626,9 +693,8 @@ def _test_router_overload_503( ...@@ -626,9 +693,8 @@ def _test_router_overload_503(
blocks_threshold: Active decode blocks threshold for the router (default 0.2) blocks_threshold: Active decode blocks threshold for the router (default 0.2)
Raises: Raises:
AssertionError: If 503 response is not received when expected AssertionError: If success/rejection counts or metrics don't meet expectations
""" """
logger.info( logger.info(
f"Starting KV router frontend on port {frontend_port} with limited resources" f"Starting KV router frontend on port {frontend_port} with limited resources"
) )
...@@ -662,8 +728,6 @@ def _test_router_overload_503( ...@@ -662,8 +728,6 @@ def _test_router_overload_503(
async def exhaust_resources_and_verify_503(): async def exhaust_resources_and_verify_503():
stop_event = asyncio.Event() stop_event = asyncio.Event()
overload_response = {}
unexpected_statuses = []
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
tasks = [] tasks = []
...@@ -681,23 +745,24 @@ def _test_router_overload_503( ...@@ -681,23 +745,24 @@ def _test_router_overload_503(
logger.info( logger.info(
f"Request {req_id} got expected 503: {body}" f"Request {req_id} got expected 503: {body}"
) )
overload_response["status"] = response.status
overload_response["body"] = body
stop_event.set() stop_event.set()
error_msg = body.get("message", "")
assert (
"Service temporarily unavailable" in error_msg
or "All workers are busy" in error_msg
), f"Expected service overload error message, got: {body}"
return response.status return response.status
body = await response.text() body = await response.text()
logger.info( logger.info(
f"Request {req_id} got unexpected status {response.status}: {body}" f"Request {req_id} got unexpected status {response.status}: {body}"
) )
unexpected_statuses.append((response.status, body))
return response.status return response.status
except asyncio.CancelledError: except asyncio.CancelledError:
raise raise
except Exception as e: except Exception as e:
logger.info(f"Request {req_id} failed: {e}") logger.info(f"Request {req_id} failed: {e}")
unexpected_statuses.append(("exception", str(e))) raise
return None
try: try:
for i in range(50): for i in range(50):
...@@ -732,27 +797,42 @@ def _test_router_overload_503( ...@@ -732,27 +797,42 @@ def _test_router_overload_503(
for task in pending: for task in pending:
task.cancel() task.cancel()
await asyncio.gather(*pending, return_exceptions=True) await asyncio.gather(*pending, return_exceptions=True)
for task in done:
task.result()
if overload_response.get("status") != 503: return [t.result() for t in done]
logger.error(
f"Observed statuses before timeout: {unexpected_statuses}" results = asyncio.run(exhaust_resources_and_verify_503())
# Count outcomes
num_succeeded = sum(1 for s in results if s == 200)
num_rejected = sum(1 for s in results if s == 503)
num_other = sum(1 for s in results if s not in (200, 503))
logger.info(
f"Results: {num_succeeded} succeeded, {num_rejected} rejected (503), "
f"{num_other} other"
) )
return False
error_msg = overload_response["body"].get("message", "") # Assert minimum thresholds
assert ( assert (
"Service temporarily unavailable" in error_msg num_other == 0
or "All workers are busy" in error_msg ), f"Expected only 200 or 503 responses, but got {num_other} other"
), f"Expected service overload error message, got: {overload_response['body']}" assert (
return True num_rejected > 0
), f"Expected at least 1 rejection, but got {num_rejected}"
assert (
num_succeeded > 0
), f"Expected at least 1 success, but got {num_succeeded}"
# Run the test # Verify rejection metrics from frontend /metrics endpoint
success = asyncio.run(exhaust_resources_and_verify_503()) model_name = test_payload.get("model", "")
assert success, "Failed to verify 503 response when resources are exhausted" _verify_frontend_rejection_metrics(
frontend_port, model_name, "chat_completions", num_rejected
)
logger.info("Successfully verified 503 response when all workers are busy") logger.info(
f"Successfully verified overload 503: {num_rejected} rejected, "
f"{num_succeeded} succeeded, metrics match"
)
async def _zmq_replay_cycle( async def _zmq_replay_cycle(
......
...@@ -809,7 +809,7 @@ def test_mocker_two_kv_router( ...@@ -809,7 +809,7 @@ def test_mocker_two_kv_router(
@pytest.mark.parametrize( @pytest.mark.parametrize(
"durable_kv_events", [False], ids=["nondurable"], indirect=True "durable_kv_events", [False], ids=["nondurable"], indirect=True
) # Use NATS Core (local indexer) ) # Use NATS Core (local indexer)
@pytest.mark.timeout(60) # ~3x average (~19.86s), rounded up (when enabled) @pytest.mark.timeout(45) # ~3x average (~13.10s), rounded up (when enabled)
def test_mocker_kv_router_overload_503( def test_mocker_kv_router_overload_503(
request, runtime_services_dynamic_ports, predownload_tokenizers, durable_kv_events request, runtime_services_dynamic_ports, predownload_tokenizers, durable_kv_events
): ):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment