Unverified Commit 432fae67 authored by Vladislav Nosivskoy's avatar Vladislav Nosivskoy Committed by GitHub
Browse files

feat: add error_type label to reqs metric for fine-grained error classification (#5568)


Signed-off-by: default avatarVladislav Nosivskoy <vladnosiv@gmail.com>
Signed-off-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
Co-authored-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
Co-authored-by: default avatarCursor <cursoragent@cursor.com>
parent c0f34b15
...@@ -33,7 +33,7 @@ use dynamo_runtime::engine::AsyncEngineContext; ...@@ -33,7 +33,7 @@ use dynamo_runtime::engine::AsyncEngineContext;
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use std::sync::Arc; use std::sync::Arc;
use crate::http::service::metrics::{InflightGuard, Metrics}; use crate::http::service::metrics::{ErrorType, InflightGuard, Metrics};
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
pub enum ConnectionStatus { pub enum ConnectionStatus {
...@@ -171,6 +171,12 @@ pub fn monitor_for_disconnects( ...@@ -171,6 +171,12 @@ pub fn monitor_for_disconnects(
mut stream_handle: ConnectionHandle, mut stream_handle: ConnectionHandle,
) -> impl Stream<Item = Result<Event, axum::Error>> { ) -> impl Stream<Item = Result<Event, axum::Error>> {
stream_handle.arm(); stream_handle.arm();
// Default to Cancelled: if the stream is dropped unexpectedly (e.g. client
// disconnect causing a broken-pipe on the SSE write), the guard will report
// "cancelled" instead of "internal". The happy path overrides this via mark_ok().
inflight_guard.mark_error(ErrorType::Cancelled);
async_stream::try_stream! { async_stream::try_stream! {
tokio::pin!(stream); tokio::pin!(stream);
loop { loop {
...@@ -181,7 +187,11 @@ pub fn monitor_for_disconnects( ...@@ -181,7 +187,11 @@ pub fn monitor_for_disconnects(
yield event; yield event;
} }
Some(Err(err)) => { Some(Err(err)) => {
// Mark error as internal since it's a streaming error
inflight_guard.mark_error(ErrorType::Internal);
yield Event::default().event("error").comment(err.to_string()); yield Event::default().event("error").comment(err.to_string());
// Break to prevent any subsequent mark_ok() from overwriting the error
break;
} }
None => { None => {
// Stream ended normally // Stream ended normally
...@@ -197,6 +207,8 @@ pub fn monitor_for_disconnects( ...@@ -197,6 +207,8 @@ pub fn monitor_for_disconnects(
} }
_ = context.stopped() => { _ = context.stopped() => {
tracing::trace!("Context stopped; breaking stream"); tracing::trace!("Context stopped; breaking stream");
// Mark as cancelled when context is stopped (client disconnect or timeout)
inflight_guard.mark_error(ErrorType::Cancelled);
break; break;
} }
} }
......
...@@ -273,11 +273,13 @@ pub struct InflightGuard { ...@@ -273,11 +273,13 @@ pub struct InflightGuard {
endpoint: Endpoint, endpoint: Endpoint,
request_type: RequestType, request_type: RequestType,
status: Status, status: Status,
error_type: ErrorType,
timer: Instant, timer: Instant,
} }
/// Requests will be logged by the type of endpoint hit /// Requests will be logged by the type of endpoint hit
/// This will include llamastack in the future /// This will include llamastack in the future
#[derive(Clone, Copy)]
pub enum Endpoint { pub enum Endpoint {
/// OAI Completions /// OAI Completions
Completions, Completions,
...@@ -320,6 +322,25 @@ pub enum Status { ...@@ -320,6 +322,25 @@ pub enum Status {
Error, Error,
} }
/// Error type classification for fine-grained observability
#[derive(PartialEq, Clone, Debug)]
pub enum ErrorType {
/// No error (for successful requests)
None,
/// Client validation error (4xx with "Validation:" prefix)
Validation,
/// Model or resource not found (404)
NotFound,
/// Service overloaded, too many requests (503)
Overload,
/// Request cancelled by client or timeout
Cancelled,
/// Internal server error (500 and other unexpected errors)
Internal,
/// Feature not implemented (501)
NotImplemented,
}
/// Track response-specific metrics /// Track response-specific metrics
pub struct ResponseMetricCollector { pub struct ResponseMetricCollector {
metrics: Arc<Metrics>, metrics: Arc<Metrics>,
...@@ -422,7 +443,7 @@ impl Metrics { ...@@ -422,7 +443,7 @@ impl Metrics {
frontend_metric_name(frontend_service::REQUESTS_TOTAL), frontend_metric_name(frontend_service::REQUESTS_TOTAL),
"Total number of LLM requests processed", "Total number of LLM requests processed",
), ),
&["model", "endpoint", "request_type", "status"], &["model", "endpoint", "request_type", "status", "error_type"],
) )
.unwrap(); .unwrap();
...@@ -657,6 +678,7 @@ impl Metrics { ...@@ -657,6 +678,7 @@ impl Metrics {
endpoint: &Endpoint, endpoint: &Endpoint,
request_type: &RequestType, request_type: &RequestType,
status: &Status, status: &Status,
error_type: &ErrorType,
) -> u64 { ) -> u64 {
self.request_counter self.request_counter
.with_label_values(&[ .with_label_values(&[
...@@ -664,6 +686,7 @@ impl Metrics { ...@@ -664,6 +686,7 @@ impl Metrics {
endpoint.as_str(), endpoint.as_str(),
request_type.as_str(), request_type.as_str(),
status.as_str(), status.as_str(),
error_type.as_str(),
]) ])
.get() .get()
} }
...@@ -679,6 +702,7 @@ impl Metrics { ...@@ -679,6 +702,7 @@ impl Metrics {
endpoint: &Endpoint, endpoint: &Endpoint,
request_type: &RequestType, request_type: &RequestType,
status: &Status, status: &Status,
error_type: &ErrorType,
) { ) {
self.request_counter self.request_counter
.with_label_values(&[ .with_label_values(&[
...@@ -686,6 +710,7 @@ impl Metrics { ...@@ -686,6 +710,7 @@ impl Metrics {
endpoint.as_str(), endpoint.as_str(),
request_type.as_str(), request_type.as_str(),
status.as_str(), status.as_str(),
error_type.as_str(),
]) ])
.inc() .inc()
} }
...@@ -908,12 +933,19 @@ impl InflightGuard { ...@@ -908,12 +933,19 @@ impl InflightGuard {
endpoint, endpoint,
request_type, request_type,
status: Status::Error, status: Status::Error,
error_type: ErrorType::Internal,
timer, timer,
} }
} }
pub(crate) fn mark_ok(&mut self) { pub(crate) fn mark_ok(&mut self) {
self.status = Status::Success; self.status = Status::Success;
self.error_type = ErrorType::None;
}
pub(crate) fn mark_error(&mut self, error_type: ErrorType) {
self.status = Status::Error;
self.error_type = error_type;
} }
} }
...@@ -932,6 +964,7 @@ impl Drop for InflightGuard { ...@@ -932,6 +964,7 @@ impl Drop for InflightGuard {
&self.endpoint, &self.endpoint,
&self.request_type, &self.request_type,
&self.status, &self.status,
&self.error_type,
); );
// Record the duration of the request // Record the duration of the request
...@@ -990,6 +1023,20 @@ impl Status { ...@@ -990,6 +1023,20 @@ impl Status {
} }
} }
impl ErrorType {
pub fn as_str(&self) -> &'static str {
match self {
ErrorType::None => frontend_service::error_type::NONE,
ErrorType::Validation => frontend_service::error_type::VALIDATION,
ErrorType::NotFound => frontend_service::error_type::NOT_FOUND,
ErrorType::Overload => frontend_service::error_type::OVERLOAD,
ErrorType::Cancelled => frontend_service::error_type::CANCELLED,
ErrorType::Internal => frontend_service::error_type::INTERNAL,
ErrorType::NotImplemented => frontend_service::error_type::NOT_IMPLEMENTED,
}
}
}
impl ResponseMetricCollector { impl ResponseMetricCollector {
fn new(metrics: Arc<Metrics>, model: String) -> Self { fn new(metrics: Arc<Metrics>, model: String) -> Self {
ResponseMetricCollector { ResponseMetricCollector {
...@@ -1940,4 +1987,229 @@ mod tests { ...@@ -1940,4 +1987,229 @@ mod tests {
detokenize_metric.get_histogram().get_sample_sum() detokenize_metric.get_histogram().get_sample_sum()
); );
} }
#[test]
fn test_error_type_as_str() {
assert_eq!(ErrorType::None.as_str(), "");
assert_eq!(ErrorType::Validation.as_str(), "validation");
assert_eq!(ErrorType::NotFound.as_str(), "not_found");
assert_eq!(ErrorType::Overload.as_str(), "overload");
assert_eq!(ErrorType::Cancelled.as_str(), "cancelled");
assert_eq!(ErrorType::Internal.as_str(), "internal");
assert_eq!(ErrorType::NotImplemented.as_str(), "not_implemented");
}
#[test]
fn test_inflight_guard_marks_success_with_correct_error_type() {
let metrics = Arc::new(Metrics::new());
let registry = prometheus::Registry::new();
metrics.register(&registry).unwrap();
let model = "test-model";
{
let mut guard =
metrics
.clone()
.create_inflight_guard(model, Endpoint::ChatCompletions, false);
guard.mark_ok();
} // guard drops here
// Verify counter incremented with status=success, error_type=""
let counter_value = metrics
.request_counter
.with_label_values(&[
model,
Endpoint::ChatCompletions.as_str(),
RequestType::Unary.as_str(),
Status::Success.as_str(),
ErrorType::None.as_str(),
])
.get();
assert_eq!(counter_value, 1);
}
#[test]
fn test_inflight_guard_marks_validation_error() {
let metrics = Arc::new(Metrics::new());
let registry = prometheus::Registry::new();
metrics.register(&registry).unwrap();
let model = "test-model";
{
let mut guard =
metrics
.clone()
.create_inflight_guard(model, Endpoint::ChatCompletions, false);
guard.mark_error(ErrorType::Validation);
} // guard drops here
// Verify counter incremented with status=error, error_type=validation
let counter_value = metrics
.request_counter
.with_label_values(&[
model,
Endpoint::ChatCompletions.as_str(),
RequestType::Unary.as_str(),
Status::Error.as_str(),
ErrorType::Validation.as_str(),
])
.get();
assert_eq!(counter_value, 1);
}
#[test]
fn test_inflight_guard_defaults_to_internal_error_on_drop() {
let metrics = Arc::new(Metrics::new());
let registry = prometheus::Registry::new();
metrics.register(&registry).unwrap();
let model = "test-model";
{
let _guard =
metrics
.clone()
.create_inflight_guard(model, Endpoint::ChatCompletions, false);
// Don't call mark_ok() or mark_error() - simulate panic/unhandled error
} // guard drops with default error_type=Internal
// Verify counter incremented with status=error, error_type=internal
let counter_value = metrics
.request_counter
.with_label_values(&[
model,
Endpoint::ChatCompletions.as_str(),
RequestType::Unary.as_str(),
Status::Error.as_str(),
ErrorType::Internal.as_str(),
])
.get();
assert_eq!(counter_value, 1);
}
#[test]
fn test_all_error_types_recorded_correctly() {
let metrics = Arc::new(Metrics::new());
let registry = prometheus::Registry::new();
metrics.register(&registry).unwrap();
let model = "test-model";
let endpoint = Endpoint::ChatCompletions;
// Test each error type
let error_types = vec![
ErrorType::Validation,
ErrorType::NotFound,
ErrorType::Overload,
ErrorType::Cancelled,
ErrorType::Internal,
ErrorType::NotImplemented,
];
for error_type in &error_types {
let mut guard = metrics
.clone()
.create_inflight_guard(model, endpoint, false);
guard.mark_error(error_type.clone());
drop(guard);
}
// Verify each error type recorded correctly
for error_type in &error_types {
let counter_value = metrics
.request_counter
.with_label_values(&[
model,
endpoint.as_str(),
RequestType::Unary.as_str(),
Status::Error.as_str(),
error_type.as_str(),
])
.get();
assert_eq!(
counter_value,
1,
"Should have 1 request for error_type={}",
error_type.as_str()
);
}
}
#[test]
fn test_multiple_requests_different_error_types() {
let metrics = Arc::new(Metrics::new());
let registry = prometheus::Registry::new();
metrics.register(&registry).unwrap();
let model = "test-model";
// Record 2 validation errors, 3 internal errors, 1 success
for _ in 0..2 {
let mut guard =
metrics
.clone()
.create_inflight_guard(model, Endpoint::ChatCompletions, false);
guard.mark_error(ErrorType::Validation);
drop(guard);
}
for _ in 0..3 {
let mut guard =
metrics
.clone()
.create_inflight_guard(model, Endpoint::Completions, false);
guard.mark_error(ErrorType::Internal);
drop(guard);
}
{
let mut guard =
metrics
.clone()
.create_inflight_guard(model, Endpoint::Embeddings, false);
guard.mark_ok();
drop(guard);
}
// Check validation errors (2 from ChatCompletions)
let validation_count = metrics
.request_counter
.with_label_values(&[
model,
Endpoint::ChatCompletions.as_str(),
RequestType::Unary.as_str(),
Status::Error.as_str(),
ErrorType::Validation.as_str(),
])
.get();
assert_eq!(validation_count, 2);
// Check internal errors (3 from Completions)
let internal_count = metrics
.request_counter
.with_label_values(&[
model,
Endpoint::Completions.as_str(),
RequestType::Unary.as_str(),
Status::Error.as_str(),
ErrorType::Internal.as_str(),
])
.get();
assert_eq!(internal_count, 3);
// Check success (1 from Embeddings)
let success_count = metrics
.request_counter
.with_label_values(&[
model,
Endpoint::Embeddings.as_str(),
RequestType::Unary.as_str(),
Status::Success.as_str(),
ErrorType::None.as_str(),
])
.get();
assert_eq!(success_count, 1);
}
} }
...@@ -34,7 +34,7 @@ use super::{ ...@@ -34,7 +34,7 @@ use super::{
disconnect::{ConnectionHandle, create_connection_monitor, monitor_for_disconnects}, disconnect::{ConnectionHandle, create_connection_monitor, monitor_for_disconnects},
error::HttpError, error::HttpError,
metrics::{ metrics::{
Endpoint, EventConverter, process_response_and_observe_metrics, Endpoint, ErrorType, EventConverter, process_response_and_observe_metrics,
process_response_using_event_converter_and_observe_metrics, process_response_using_event_converter_and_observe_metrics,
}, },
service_v2, service_v2,
...@@ -93,6 +93,32 @@ fn map_error_code_to_error_type(code: StatusCode) -> String { ...@@ -93,6 +93,32 @@ fn map_error_code_to_error_type(code: StatusCode) -> String {
} }
} }
/// Classify error for metrics based on status code and message
fn classify_error_for_metrics(code: StatusCode, message: &str) -> ErrorType {
match code {
StatusCode::BAD_REQUEST => {
// 400
if message.starts_with("Validation:") {
ErrorType::Validation
} else {
ErrorType::Internal
}
}
StatusCode::NOT_FOUND => ErrorType::NotFound, // 404
StatusCode::NOT_IMPLEMENTED => ErrorType::NotImplemented, // 501
StatusCode::TOO_MANY_REQUESTS => ErrorType::Overload, // 429
StatusCode::SERVICE_UNAVAILABLE => ErrorType::Overload, // 503
StatusCode::INTERNAL_SERVER_ERROR => ErrorType::Internal, // 500
_ if code.is_client_error() => ErrorType::Validation, // other 4xx
_ => ErrorType::Internal, // everything else
}
}
/// Extract ErrorType from ErrorResponse for metrics
fn extract_error_type_from_response(response: &ErrorResponse) -> ErrorType {
classify_error_for_metrics(response.0, &response.1.message)
}
impl ErrorMessage { impl ErrorMessage {
/// Not Found Error /// Not Found Error
pub fn model_not_found() -> ErrorResponse { pub fn model_not_found() -> ErrorResponse {
...@@ -368,6 +394,13 @@ async fn completions_single( ...@@ -368,6 +394,13 @@ async fn completions_single(
// todo - make the protocols be optional for model name // todo - make the protocols be optional for model name
// todo - when optional, if none, apply a default // todo - when optional, if none, apply a default
let model = request.inner.model.clone(); let model = request.inner.model.clone();
// Create inflight_guard early to ensure all errors are counted
let mut inflight_guard =
state
.metrics_clone()
.create_inflight_guard(&model, Endpoint::Completions, streaming);
// Create http_queue_guard early - tracks time waiting to be processed // Create http_queue_guard early - tracks time waiting to be processed
let http_queue_guard = state.metrics_clone().create_http_queue_guard(&model); let http_queue_guard = state.metrics_clone().create_http_queue_guard(&model);
...@@ -375,24 +408,23 @@ async fn completions_single( ...@@ -375,24 +408,23 @@ async fn completions_single(
let (engine, parsing_options) = state let (engine, parsing_options) = state
.manager() .manager()
.get_completions_engine_with_parsing(&model) .get_completions_engine_with_parsing(&model)
.map_err(|_| ErrorMessage::model_not_found())?; .map_err(|_| {
let err_response = ErrorMessage::model_not_found();
inflight_guard.mark_error(extract_error_type_from_response(&err_response));
err_response
})?;
let mut response_collector = state.metrics_clone().create_response_collector(&model); let mut response_collector = state.metrics_clone().create_response_collector(&model);
// prepare to process any annotations // prepare to process any annotations
let annotations = request.annotations(); let annotations = request.annotations();
// Create inflight_guard before calling engine to ensure errors are counted
let mut inflight_guard =
state
.metrics_clone()
.create_inflight_guard(&model, Endpoint::Completions, streaming);
// issue the generate call on the engine // issue the generate call on the engine
let stream = engine let stream = engine.generate(request).await.map_err(|e| {
.generate(request) let err_response = ErrorMessage::from_anyhow(e, "Failed to generate completions");
.await inflight_guard.mark_error(extract_error_type_from_response(&err_response));
.map_err(|e| ErrorMessage::from_anyhow(e, "Failed to generate completions"))?; err_response
})?;
// capture the context to cancel the stream if the client disconnects // capture the context to cancel the stream if the client disconnects
let ctx = stream.context(); let ctx = stream.context();
...@@ -463,13 +495,20 @@ async fn completions_single( ...@@ -463,13 +495,20 @@ async fn completions_single(
request_id, request_id,
e e
); );
ErrorMessage::internal_server_error(&format!( let err_response = ErrorMessage::internal_server_error(&format!(
"Failed to fold completions stream for {}: {:?}", "Failed to fold completions stream for {}: {:?}",
request_id, e request_id, e
)) ));
inflight_guard.mark_error(extract_error_type_from_response(&err_response));
err_response
})?; })?;
inflight_guard.mark_ok(); inflight_guard.mark_ok();
// If the engine context was killed (client disconnect), the response was
// assembled but never delivered. Override to cancelled.
if ctx.is_killed() {
inflight_guard.mark_error(ErrorType::Cancelled);
}
Ok(Json(response).into_response()) Ok(Json(response).into_response())
} }
} }
...@@ -490,25 +529,29 @@ async fn completions_batch( ...@@ -490,25 +529,29 @@ async fn completions_batch(
let streaming = request.inner.stream.unwrap_or(false); let streaming = request.inner.stream.unwrap_or(false);
let model = request.inner.model.clone(); let model = request.inner.model.clone();
// Create inflight_guard early to ensure all errors are counted
let mut inflight_guard =
state
.metrics_clone()
.create_inflight_guard(&model, Endpoint::Completions, streaming);
// Create http_queue_guard early - tracks time waiting to be processed // Create http_queue_guard early - tracks time waiting to be processed
let http_queue_guard = state.metrics_clone().create_http_queue_guard(&model); let http_queue_guard = state.metrics_clone().create_http_queue_guard(&model);
let (engine, parsing_options) = state let (engine, parsing_options) = state
.manager() .manager()
.get_completions_engine_with_parsing(&model) .get_completions_engine_with_parsing(&model)
.map_err(|_| ErrorMessage::model_not_found())?; .map_err(|_| {
let err_response = ErrorMessage::model_not_found();
inflight_guard.mark_error(extract_error_type_from_response(&err_response));
err_response
})?;
let mut response_collector = state.metrics_clone().create_response_collector(&model); let mut response_collector = state.metrics_clone().create_response_collector(&model);
// prepare to process any annotations // prepare to process any annotations
let annotations = request.annotations(); let annotations = request.annotations();
// Create inflight_guard before calling engine to ensure errors are counted
let mut inflight_guard =
state
.metrics_clone()
.create_inflight_guard(&model, Endpoint::Completions, streaming);
// Generate streams for each prompt in the batch // Generate streams for each prompt in the batch
let mut all_streams = Vec::new(); let mut all_streams = Vec::new();
let mut first_ctx = None; let mut first_ctx = None;
...@@ -526,10 +569,11 @@ async fn completions_batch( ...@@ -526,10 +569,11 @@ async fn completions_batch(
let single_request_context = Context::with_id(single_request, unique_request_id); let single_request_context = Context::with_id(single_request, unique_request_id);
// Generate stream for this prompt // Generate stream for this prompt
let stream = engine let stream = engine.generate(single_request_context).await.map_err(|e| {
.generate(single_request_context) let err_response = ErrorMessage::from_anyhow(e, "Failed to generate completions");
.await inflight_guard.mark_error(extract_error_type_from_response(&err_response));
.map_err(|e| ErrorMessage::from_anyhow(e, "Failed to generate completions"))?; err_response
})?;
// Capture context from first stream // Capture context from first stream
if first_ctx.is_none() { if first_ctx.is_none() {
...@@ -623,13 +667,20 @@ async fn completions_batch( ...@@ -623,13 +667,20 @@ async fn completions_batch(
request_id, request_id,
e e
); );
ErrorMessage::internal_server_error(&format!( let err_response = ErrorMessage::internal_server_error(&format!(
"Failed to fold completions stream for {}: {:?}", "Failed to fold completions stream for {}: {:?}",
request_id, e request_id, e
)) ));
inflight_guard.mark_error(extract_error_type_from_response(&err_response));
err_response
})?; })?;
inflight_guard.mark_ok(); inflight_guard.mark_ok();
// If the engine context was killed (client disconnect), the response was
// assembled but never delivered. Override to cancelled.
if ctx.is_killed() {
inflight_guard.mark_error(ErrorType::Cancelled);
}
Ok(Json(response).into_response()) Ok(Json(response).into_response())
} }
} }
...@@ -654,28 +705,30 @@ async fn embeddings( ...@@ -654,28 +705,30 @@ async fn embeddings(
// todo - when optional, if none, apply a default // todo - when optional, if none, apply a default
let model = &request.inner.model; let model = &request.inner.model;
// Create http_queue_guard early - tracks time waiting to be processed // Create inflight_guard early to ensure all errors are counted
let http_queue_guard = state.metrics_clone().create_http_queue_guard(model);
// todo - error handling should be more robust
let engine = state
.manager()
.get_embeddings_engine(model)
.map_err(|_| ErrorMessage::model_not_found())?;
// this will increment the inflight gauge for the model
let mut inflight = let mut inflight =
state state
.metrics_clone() .metrics_clone()
.create_inflight_guard(model, Endpoint::Embeddings, streaming); .create_inflight_guard(model, Endpoint::Embeddings, streaming);
// Create http_queue_guard early - tracks time waiting to be processed
let http_queue_guard = state.metrics_clone().create_http_queue_guard(model);
// todo - error handling should be more robust
let engine = state.manager().get_embeddings_engine(model).map_err(|_| {
let err_response = ErrorMessage::model_not_found();
inflight.mark_error(extract_error_type_from_response(&err_response));
err_response
})?;
let mut response_collector = state.metrics_clone().create_response_collector(model); let mut response_collector = state.metrics_clone().create_response_collector(model);
// issue the generate call on the engine // issue the generate call on the engine
let stream = engine let stream = engine.generate(request).await.map_err(|e| {
.generate(request) let err_response = ErrorMessage::from_anyhow(e, "Failed to generate embeddings");
.await inflight.mark_error(extract_error_type_from_response(&err_response));
.map_err(|e| ErrorMessage::from_anyhow(e, "Failed to generate embeddings"))?; err_response
})?;
// Process stream to collect metrics and drop http_queue_guard on first token // Process stream to collect metrics and drop http_queue_guard on first token
let mut http_queue_guard = Some(http_queue_guard); let mut http_queue_guard = Some(http_queue_guard);
...@@ -698,7 +751,10 @@ async fn embeddings( ...@@ -698,7 +751,10 @@ async fn embeddings(
request_id, request_id,
e e
); );
ErrorMessage::internal_server_error("Failed to fold embeddings stream") let err_response =
ErrorMessage::internal_server_error("Failed to fold embeddings stream");
inflight.mark_error(extract_error_type_from_response(&err_response));
err_response
})?; })?;
inflight.mark_ok(); inflight.mark_ok();
...@@ -887,22 +943,11 @@ async fn chat_completions( ...@@ -887,22 +943,11 @@ async fn chat_completions(
let request_id = request.id().to_string(); let request_id = request.id().to_string();
// Handle unsupported fields - if Some(resp) is returned by // Determine streaming mode early
// validate_chat_completion_unsupported_fields, // todo - decide on default
// then a field was used that is unsupported. We will log an error message let streaming = request.inner.stream.unwrap_or(false);
// and early return a 501 NOT_IMPLEMENTED status code. Otherwise, proceed.
validate_chat_completion_unsupported_fields(&request)?;
// Handle required fields like messages shouldn't be empty.
validate_chat_completion_required_fields(&request)?;
// Validate stream_options is only used when streaming (NVBug 5662680)
validate_chat_completion_stream_options(&request)?;
// Handle Rest of Validation Errors
validate_chat_completion_fields_generic(&request)?;
// Apply template values if present // Apply template values first to resolve the model before creating metrics guards
if let Some(template) = template { if let Some(template) = template {
if request.inner.model.is_empty() { if request.inner.model.is_empty() {
request.inner.model = template.model.clone(); request.inner.model = template.model.clone();
...@@ -914,16 +959,48 @@ async fn chat_completions( ...@@ -914,16 +959,48 @@ async fn chat_completions(
request.inner.max_completion_tokens = Some(template.max_completion_tokens); request.inner.max_completion_tokens = Some(template.max_completion_tokens);
} }
} }
tracing::trace!("Received chat completions request: {:?}", request.content());
// todo - decide on default
let streaming = request.inner.stream.unwrap_or(false);
// Capture the resolved model after template application for metrics and engine lookup
// todo - make the protocols be optional for model name // todo - make the protocols be optional for model name
// todo - when optional, if none, apply a default // todo - when optional, if none, apply a default
// todo - determine the proper error code for when a request model is not present // todo - determine the proper error code for when a request model is not present
let model = request.inner.model.clone(); let model = request.inner.model.clone();
tracing::trace!("Received chat completions request: {:?}", request.content());
// Create inflight_guard early to ensure all errors (including validation) are counted
let mut inflight_guard =
state
.metrics_clone()
.create_inflight_guard(&model, Endpoint::ChatCompletions, streaming);
// Handle unsupported fields - if Some(resp) is returned by
// validate_chat_completion_unsupported_fields,
// then a field was used that is unsupported. We will log an error message
// and early return a 501 NOT_IMPLEMENTED status code. Otherwise, proceeed.
if let Err(err_response) = validate_chat_completion_unsupported_fields(&request) {
inflight_guard.mark_error(extract_error_type_from_response(&err_response));
return Err(err_response);
}
// Handle required fields like messages shouldn't be empty.
if let Err(err_response) = validate_chat_completion_required_fields(&request) {
inflight_guard.mark_error(extract_error_type_from_response(&err_response));
return Err(err_response);
}
// Validate stream_options is only used when streaming (NVBug 5662680)
if let Err(err_response) = validate_chat_completion_stream_options(&request) {
inflight_guard.mark_error(extract_error_type_from_response(&err_response));
return Err(err_response);
}
// Handle Rest of Validation Errors
if let Err(err_response) = validate_chat_completion_fields_generic(&request) {
inflight_guard.mark_error(extract_error_type_from_response(&err_response));
return Err(err_response);
}
// Create HTTP queue guard after template resolution so labels are correct // Create HTTP queue guard after template resolution so labels are correct
let http_queue_guard = state.metrics_clone().create_http_queue_guard(&model); let http_queue_guard = state.metrics_clone().create_http_queue_guard(&model);
...@@ -932,23 +1009,22 @@ async fn chat_completions( ...@@ -932,23 +1009,22 @@ async fn chat_completions(
let (engine, parsing_options) = state let (engine, parsing_options) = state
.manager() .manager()
.get_chat_completions_engine_with_parsing(&model) .get_chat_completions_engine_with_parsing(&model)
.map_err(|_| ErrorMessage::model_not_found())?; .map_err(|_| {
let err_response = ErrorMessage::model_not_found();
inflight_guard.mark_error(extract_error_type_from_response(&err_response));
err_response
})?;
let mut response_collector = state.metrics_clone().create_response_collector(&model); let mut response_collector = state.metrics_clone().create_response_collector(&model);
let annotations = request.annotations(); let annotations = request.annotations();
// Create inflight_guard before calling engine to ensure errors are counted
let mut inflight_guard =
state
.metrics_clone()
.create_inflight_guard(&model, Endpoint::ChatCompletions, streaming);
// issue the generate call on the engine // issue the generate call on the engine
let stream = engine let stream = engine.generate(request).await.map_err(|e| {
.generate(request) let err_response = ErrorMessage::from_anyhow(e, "Failed to generate completions");
.await inflight_guard.mark_error(extract_error_type_from_response(&err_response));
.map_err(|e| ErrorMessage::from_anyhow(e, "Failed to generate completions"))?; err_response
})?;
// capture the context to cancel the stream if the client disconnects // capture the context to cancel the stream if the client disconnects
let ctx = stream.context(); let ctx = stream.context();
...@@ -1012,6 +1088,7 @@ async fn chat_completions( ...@@ -1012,6 +1088,7 @@ async fn chat_completions(
.await .await
.map_err(|error_response| { .map_err(|error_response| {
tracing::error!(request_id, "Backend error detected: {:?}", error_response); tracing::error!(request_id, "Backend error detected: {:?}", error_response);
inflight_guard.mark_error(extract_error_type_from_response(&error_response));
error_response error_response
})?; })?;
...@@ -1034,13 +1111,20 @@ async fn chat_completions( ...@@ -1034,13 +1111,20 @@ async fn chat_completions(
"Failed to parse chat completion response: {:?}", "Failed to parse chat completion response: {:?}",
e e
); );
ErrorMessage::internal_server_error(&format!( let err_response = ErrorMessage::internal_server_error(&format!(
"Failed to parse chat completion response: {}", "Failed to parse chat completion response: {}",
e e
)) ));
inflight_guard.mark_error(extract_error_type_from_response(&err_response));
err_response
})?; })?;
inflight_guard.mark_ok(); inflight_guard.mark_ok();
// If the engine context was killed (client disconnect), the response was
// assembled but never delivered. Override to cancelled.
if ctx.is_killed() {
inflight_guard.mark_error(ErrorType::Cancelled);
}
Ok(Json(response).into_response()) Ok(Json(response).into_response())
} }
} }
...@@ -1198,18 +1282,6 @@ async fn responses( ...@@ -1198,18 +1282,6 @@ async fn responses(
// return a 503 if the service is not ready // return a 503 if the service is not ready
check_ready(&state)?; check_ready(&state)?;
// Create http_queue_guard early - tracks time waiting to be processed
// model is Option<String> in upstream; extract to String, defaulting to empty
let model = request.inner.model.clone().unwrap_or_default();
let http_queue_guard = state.metrics_clone().create_http_queue_guard(&model);
// Handle unsupported fields - if Some(resp) is returned by validate_unsupported_fields,
// then a field was used that is unsupported. We will log an error message
// and early return a 501 NOT_IMPLEMENTED status code. Otherwise, proceed.
if let Some(resp) = validate_response_unsupported_fields(&request) {
return Ok(resp.into_response());
}
// Apply template values if present, with sensible defaults for the Responses API. // Apply template values if present, with sensible defaults for the Responses API.
// Unlike chat completions where backends may have their own defaults, the Responses API // Unlike chat completions where backends may have their own defaults, the Responses API
// should provide a generous default to avoid truncated responses (especially with // should provide a generous default to avoid truncated responses (especially with
...@@ -1231,6 +1303,24 @@ async fn responses( ...@@ -1231,6 +1303,24 @@ async fn responses(
} }
tracing::trace!("Received responses request: {:?}", request.inner); tracing::trace!("Received responses request: {:?}", request.inner);
let model = request.inner.model.clone().unwrap_or_default();
let streaming = request.inner.stream.unwrap_or(false);
// Create http_queue_guard early - tracks time waiting to be processed
let http_queue_guard = state.metrics_clone().create_http_queue_guard(&model);
let mut inflight_guard =
state
.metrics_clone()
.create_inflight_guard(&model, Endpoint::Responses, streaming);
// Handle unsupported fields - if Some(resp) is returned by validate_unsupported_fields,
// then a field was used that is unsupported. We will log an error message
// and early return a 501 NOT_IMPLEMENTED status code.
if let Some(resp) = validate_response_unsupported_fields(&request) {
inflight_guard.mark_error(ErrorType::NotImplemented);
return Ok(resp.into_response());
}
// Extract request parameters before into_parts() consumes the request. // Extract request parameters before into_parts() consumes the request.
// These are echoed back in the Response object per the OpenAI spec. // These are echoed back in the Response object per the OpenAI spec.
let response_params = ResponseParams { let response_params = ResponseParams {
...@@ -1242,8 +1332,6 @@ async fn responses( ...@@ -1242,8 +1332,6 @@ async fn responses(
tool_choice: request.inner.tool_choice.clone(), tool_choice: request.inner.tool_choice.clone(),
instructions: request.inner.instructions.clone(), instructions: request.inner.instructions.clone(),
}; };
let streaming = request.inner.stream.unwrap_or(false);
let request_id = request.id().to_string(); let request_id = request.id().to_string();
let (orig_request, context) = request.into_parts(); let (orig_request, context) = request.into_parts();
...@@ -1254,11 +1342,13 @@ async fn responses( ...@@ -1254,11 +1342,13 @@ async fn responses(
error = %e, error = %e,
"Failed to convert NvCreateResponse to NvCreateChatCompletionRequest", "Failed to convert NvCreateResponse to NvCreateChatCompletionRequest",
); );
ErrorMessage::not_implemented_error( let err_response = ErrorMessage::not_implemented_error(
VALIDATION_PREFIX.to_string() VALIDATION_PREFIX.to_string()
+ "Failed to convert responses request: " + "Failed to convert responses request: "
+ &e.to_string(), + &e.to_string(),
) );
inflight_guard.mark_error(extract_error_type_from_response(&err_response));
err_response
})?; })?;
// For non-streaming responses, we still use internal streaming for aggregation, // For non-streaming responses, we still use internal streaming for aggregation,
...@@ -1274,27 +1364,26 @@ async fn responses( ...@@ -1274,27 +1364,26 @@ async fn responses(
let (engine, parsing_options) = state let (engine, parsing_options) = state
.manager() .manager()
.get_chat_completions_engine_with_parsing(&model) .get_chat_completions_engine_with_parsing(&model)
.map_err(|_| ErrorMessage::model_not_found())?; .map_err(|_| {
let err_response = ErrorMessage::model_not_found();
inflight_guard.mark_error(extract_error_type_from_response(&err_response));
err_response
})?;
let mut response_collector = state.metrics_clone().create_response_collector(&model); let mut response_collector = state.metrics_clone().create_response_collector(&model);
tracing::trace!("Issuing generate call for responses"); tracing::trace!("Issuing generate call for responses");
// issue the generate call on the engine // issue the generate call on the engine
let engine_stream = engine let engine_stream = engine.generate(request).await.map_err(|e| {
.generate(request) let err_response = ErrorMessage::from_anyhow(e, "Failed to generate completions");
.await inflight_guard.mark_error(extract_error_type_from_response(&err_response));
.map_err(|e| ErrorMessage::from_anyhow(e, "Failed to generate completions"))?; err_response
})?;
// Capture the context to cancel the stream if the client disconnects // Capture the context to cancel the stream if the client disconnects
let ctx = engine_stream.context(); let ctx = engine_stream.context();
// Create inflight_guard now that actual processing has begun
let mut inflight_guard =
state
.metrics_clone()
.create_inflight_guard(&model, Endpoint::Responses, streaming);
if streaming { if streaming {
// For streaming responses, we return HTTP 200 immediately without checking for errors. // For streaming responses, we return HTTP 200 immediately without checking for errors.
// Once HTTP 200 OK is sent, we cannot change the status code, so any backend errors // Once HTTP 200 OK is sent, we cannot change the status code, so any backend errors
...@@ -1388,6 +1477,7 @@ async fn responses( ...@@ -1388,6 +1477,7 @@ async fn responses(
.await .await
.map_err(|error_response| { .map_err(|error_response| {
tracing::error!(request_id, "Backend error detected: {:?}", error_response); tracing::error!(request_id, "Backend error detected: {:?}", error_response);
inflight_guard.mark_error(extract_error_type_from_response(&error_response));
error_response error_response
})?; })?;
...@@ -1405,10 +1495,12 @@ async fn responses( ...@@ -1405,10 +1495,12 @@ async fn responses(
.await .await
.map_err(|e| { .map_err(|e| {
tracing::error!(request_id, "Failed to fold responses stream: {:?}", e); tracing::error!(request_id, "Failed to fold responses stream: {:?}", e);
ErrorMessage::internal_server_error(&format!( let err_response = ErrorMessage::internal_server_error(&format!(
"Failed to fold responses stream: {}", "Failed to fold responses stream: {}",
e e
)) ));
inflight_guard.mark_error(extract_error_type_from_response(&err_response));
err_response
})?; })?;
// Convert NvCreateChatCompletionResponse --> NvResponse // Convert NvCreateChatCompletionResponse --> NvResponse
...@@ -1419,10 +1511,18 @@ async fn responses( ...@@ -1419,10 +1511,18 @@ async fn responses(
"Failed to convert NvCreateChatCompletionResponse to NvResponse: {:?}", "Failed to convert NvCreateChatCompletionResponse to NvResponse: {:?}",
e e
); );
ErrorMessage::internal_server_error("Failed to convert internal response") let err_response =
ErrorMessage::internal_server_error("Failed to convert internal response");
inflight_guard.mark_error(extract_error_type_from_response(&err_response));
err_response
})?; })?;
inflight_guard.mark_ok(); inflight_guard.mark_ok();
// If the engine context was killed (client disconnect), the response was
// assembled but never delivered. Override to cancelled.
if ctx.is_killed() {
inflight_guard.mark_error(ErrorType::Cancelled);
}
Ok(Json(response).into_response()) Ok(Json(response).into_response())
} }
...@@ -2596,4 +2696,92 @@ mod tests { ...@@ -2596,4 +2696,92 @@ mod tests {
assert_eq!(error_response.1.message, "Connection timeout"); assert_eq!(error_response.1.message, "Connection timeout");
} }
} }
#[test]
fn test_classify_error_for_metrics_validation() {
// 400 with "Validation:" prefix to validation
let error_type =
classify_error_for_metrics(StatusCode::BAD_REQUEST, "Validation: Invalid parameter");
assert_eq!(error_type, ErrorType::Validation);
// 400 WITHOUT "Validation:" to internal (fallback)
let error_type = classify_error_for_metrics(StatusCode::BAD_REQUEST, "Some other error");
assert_eq!(error_type, ErrorType::Internal);
}
#[test]
fn test_classify_error_for_metrics_status_codes() {
assert_eq!(
classify_error_for_metrics(StatusCode::NOT_FOUND, "Model not found"),
ErrorType::NotFound
);
assert_eq!(
classify_error_for_metrics(StatusCode::NOT_IMPLEMENTED, "Feature not supported"),
ErrorType::NotImplemented
);
assert_eq!(
classify_error_for_metrics(StatusCode::TOO_MANY_REQUESTS, "Rate limit exceeded"),
ErrorType::Overload
);
assert_eq!(
classify_error_for_metrics(StatusCode::SERVICE_UNAVAILABLE, "Overloaded"),
ErrorType::Overload
);
assert_eq!(
classify_error_for_metrics(StatusCode::INTERNAL_SERVER_ERROR, "Panic"),
ErrorType::Internal
);
}
#[test]
fn test_classify_error_for_metrics_client_errors() {
// Other 4xx errors should be classified as validation
assert_eq!(
classify_error_for_metrics(StatusCode::UNAUTHORIZED, "Unauthorized"),
ErrorType::Validation
);
assert_eq!(
classify_error_for_metrics(StatusCode::FORBIDDEN, "Forbidden"),
ErrorType::Validation
);
}
#[test]
fn test_extract_error_type_from_response_validation() {
let response = ErrorMessage::from_http_error(HttpError {
code: 400,
message: "Validation: bad input".to_string(),
});
assert_eq!(
extract_error_type_from_response(&response),
ErrorType::Validation
);
}
#[test]
fn test_extract_error_type_from_response_not_found() {
let response = ErrorMessage::model_not_found();
assert_eq!(
extract_error_type_from_response(&response),
ErrorType::NotFound
);
}
#[test]
fn test_extract_error_type_from_response_internal() {
let response = ErrorMessage::internal_server_error("Something went wrong");
assert_eq!(
extract_error_type_from_response(&response),
ErrorType::Internal
);
}
#[test]
fn test_extract_error_type_from_response_not_implemented() {
let response = ErrorMessage::not_implemented_error("Feature not available");
assert_eq!(
extract_error_type_from_response(&response),
ErrorType::NotImplemented
);
}
} }
...@@ -22,7 +22,7 @@ use dynamo_llm::{ ...@@ -22,7 +22,7 @@ use dynamo_llm::{
service::{ service::{
Metrics, Metrics,
error::HttpError, error::HttpError,
metrics::{Endpoint, RequestType, Status}, metrics::{Endpoint, ErrorType, RequestType, Status},
service_v2::HttpService, service_v2::HttpService,
}, },
}, },
...@@ -197,16 +197,18 @@ fn compare_counter( ...@@ -197,16 +197,18 @@ fn compare_counter(
endpoint: &Endpoint, endpoint: &Endpoint,
request_type: &RequestType, request_type: &RequestType,
status: &Status, status: &Status,
error_type: &ErrorType,
expected: u64, expected: u64,
) { ) {
assert_eq!( assert_eq!(
metrics.get_request_counter(model, endpoint, request_type, status), metrics.get_request_counter(model, endpoint, request_type, status, error_type),
expected, expected,
"model: {}, endpoint: {:?}, request_type: {:?}, status: {:?}", "model: {}, endpoint: {:?}, request_type: {:?}, status: {:?}, error_type: {:?}",
model, model,
endpoint.as_str(), endpoint.as_str(),
request_type.as_str(), request_type.as_str(),
status.as_str() status.as_str(),
error_type.as_str()
); );
} }
...@@ -240,12 +242,17 @@ fn compare_counters(metrics: &Metrics, model: &str, expected: &[u64; 8]) { ...@@ -240,12 +242,17 @@ fn compare_counters(metrics: &Metrics, model: &str, expected: &[u64; 8]) {
for request_type in &[RequestType::Unary, RequestType::Stream] { for request_type in &[RequestType::Unary, RequestType::Stream] {
for status in &[Status::Success, Status::Error] { for status in &[Status::Success, Status::Error] {
let index = compute_index(endpoint, request_type, status); let index = compute_index(endpoint, request_type, status);
let error_type = match status {
Status::Success => &ErrorType::None,
Status::Error => &ErrorType::Validation, // Test engines return 4xx errors
};
compare_counter( compare_counter(
metrics, metrics,
model, model,
endpoint, endpoint,
request_type, request_type,
status, status,
error_type,
expected[index], expected[index],
); );
} }
......
...@@ -249,6 +249,30 @@ pub mod frontend_service { ...@@ -249,6 +249,30 @@ pub mod frontend_service {
/// Value for unary requests /// Value for unary requests
pub const UNARY: &str = "unary"; pub const UNARY: &str = "unary";
} }
/// Error type label values for fine-grained error classification
pub mod error_type {
/// No error (used for successful requests)
pub const NONE: &str = "";
/// Client validation error (4xx with "Validation:" prefix)
pub const VALIDATION: &str = "validation";
/// Model or resource not found (404)
pub const NOT_FOUND: &str = "not_found";
/// Service overloaded, too many requests (503)
pub const OVERLOAD: &str = "overload";
/// Request cancelled by client or timeout
pub const CANCELLED: &str = "cancelled";
/// Internal server error (500 and other unexpected errors)
pub const INTERNAL: &str = "internal";
/// Feature not implemented (501)
pub const NOT_IMPLEMENTED: &str = "not_implemented";
}
} }
/// Work handler Prometheus metric names /// Work handler Prometheus metric names
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment