Unverified Commit 065f466e authored by Tzu-Ling Kan's avatar Tzu-Ling Kan Committed by GitHub
Browse files

fix: backend error serialization and add HTTP status code detection (#4786)


Signed-off-by: default avatartzulingk@nvidia.com <tzulingk@nvidia.com>
parent fc161924
...@@ -369,8 +369,12 @@ class HandlerBase: ...@@ -369,8 +369,12 @@ class HandlerBase:
# 2. Per-request errors - send to client, don't shutdown # 2. Per-request errors - send to client, don't shutdown
except RequestError as e: except RequestError as e:
logging.warning(f"Request {request_id} error: {e}") error_msg = str(e)
yield {"finish_reason": "error", "token_ids": []} logging.warning(f"Request {request_id} error: {error_msg}")
yield {
"finish_reason": {"error": error_msg},
"token_ids": [],
}
# 3. ALL OTHER ERRORS - graceful shutdown # 3. ALL OTHER ERRORS - graceful shutdown
except Exception as e: except Exception as e:
...@@ -384,7 +388,7 @@ class HandlerBase: ...@@ -384,7 +388,7 @@ class HandlerBase:
# Try to send error to client before shutdown # Try to send error to client before shutdown
try: try:
yield { yield {
"finish_reason": "error", "finish_reason": {"error": error_msg},
"token_ids": [], "token_ids": [],
} }
except Exception: except Exception:
......
...@@ -41,7 +41,10 @@ use super::{ ...@@ -41,7 +41,10 @@ use super::{
use crate::engines::ValidateRequest; use crate::engines::ValidateRequest;
use crate::protocols::openai::chat_completions::aggregator::ChatCompletionAggregator; use crate::protocols::openai::chat_completions::aggregator::ChatCompletionAggregator;
use crate::protocols::openai::{ use crate::protocols::openai::{
chat_completions::{NvCreateChatCompletionRequest, NvCreateChatCompletionResponse}, chat_completions::{
NvCreateChatCompletionRequest, NvCreateChatCompletionResponse,
NvCreateChatCompletionStreamResponse,
},
completions::{NvCreateCompletionRequest, NvCreateCompletionResponse}, completions::{NvCreateCompletionRequest, NvCreateCompletionResponse},
embeddings::{NvCreateEmbeddingRequest, NvCreateEmbeddingResponse}, embeddings::{NvCreateEmbeddingRequest, NvCreateEmbeddingResponse},
responses::{NvCreateResponse, NvResponse}, responses::{NvCreateResponse, NvResponse},
...@@ -717,6 +720,116 @@ async fn handler_chat_completions( ...@@ -717,6 +720,116 @@ async fn handler_chat_completions(
response response
} }
/// Checks if an Annotated event represents a backend error and extracts error information.
/// Returns Some((message, status_code)) if it's an error, None otherwise.
fn extract_backend_error_if_present<T: serde::Serialize>(
event: &Annotated<T>,
) -> Option<(String, StatusCode)> {
#[derive(serde::Deserialize)]
struct ErrorPayload {
message: Option<String>,
code: Option<u16>,
}
// Check if event type is "error" (from postprocessor when FinishReason::Error is encountered)
if let Some(event_type) = &event.event
&& event_type == "error"
{
let comment_str = event
.comment
.as_ref()
.map(|c| c.join(", "))
.unwrap_or_else(|| "Unknown error".to_string());
// Try to parse comment as error JSON to extract status code
if let Ok(error_payload) = serde_json::from_str::<ErrorPayload>(&comment_str) {
let code = error_payload
.code
.and_then(|c| StatusCode::from_u16(c).ok())
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
let message = error_payload.message.unwrap_or(comment_str);
return Some((message, code));
}
return Some((comment_str, StatusCode::INTERNAL_SERVER_ERROR));
}
// Check if the data payload itself contains an error structure with code >= 400
if let Some(data) = &event.data
&& let Ok(json_value) = serde_json::to_value(data)
&& let Ok(error_payload) = serde_json::from_value::<ErrorPayload>(json_value.clone())
&& let Some(code_num) = error_payload.code
&& code_num >= 400
{
let code = StatusCode::from_u16(code_num).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
let message = error_payload
.message
.unwrap_or_else(|| json_value.to_string());
return Some((message, code));
}
// Check if comment contains error information (without event: error)
if let Some(comments) = &event.comment
&& !comments.is_empty()
{
let comment_str = comments.join(", ");
// Try to parse comment as error JSON with code >= 400
if let Ok(error_payload) = serde_json::from_str::<ErrorPayload>(&comment_str)
&& let Some(code_num) = error_payload.code
&& code_num >= 400
{
let code = StatusCode::from_u16(code_num).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
let message = error_payload.message.unwrap_or(comment_str);
return Some((message, code));
}
// Comments present with no data AND no event type indicates error
// (events with event types like "request_id" or "event.dynamo.test.sentinel" are annotations)
if event.data.is_none() && event.event.is_none() {
return Some((comment_str, StatusCode::INTERNAL_SERVER_ERROR));
}
}
None
}
/// Checks if the first event in the stream is a backend error.
/// Returns Err(ErrorResponse) if error detected, Ok(stream) otherwise.
async fn check_for_backend_error(
mut stream: impl futures::Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>>
+ Send
+ Unpin
+ 'static,
) -> Result<
impl futures::Stream<Item = Annotated<NvCreateChatCompletionStreamResponse>> + Send,
ErrorResponse,
> {
use futures::stream::StreamExt;
// Peek at the first event
if let Some(first_event) = stream.next().await {
// Check if it's an error event
if let Some((error_msg, status_code)) = extract_backend_error_if_present(&first_event) {
return Err((
status_code,
Json(ErrorMessage {
message: error_msg,
error_type: map_error_code_to_error_type(status_code),
code: status_code.as_u16(),
}),
));
}
// Not an error - reconstruct stream with first event
let reconstructed_stream = futures::stream::iter(vec![first_event]).chain(stream);
Ok(reconstructed_stream)
} else {
// Empty stream - this shouldn't happen but handle gracefully
Ok(futures::stream::iter(vec![]).chain(stream))
}
}
/// OpenAI Chat Completions Request Handler /// OpenAI Chat Completions Request Handler
/// ///
/// This method will handle the incoming request for the /v1/chat/completions endpoint. The endpoint is a "source" /// This method will handle the incoming request for the /v1/chat/completions endpoint. The endpoint is a "source"
...@@ -822,11 +935,16 @@ async fn chat_completions( ...@@ -822,11 +935,16 @@ async fn chat_completions(
// note - we might do this as part of the post processing set to make it more generic // note - we might do this as part of the post processing set to make it more generic
if streaming { if streaming {
// For streaming responses, we return HTTP 200 immediately without checking for errors.
// Once HTTP 200 OK is sent, we cannot change the status code, so any backend errors
// must be delivered as SSE events with `event: error` in the stream (handled by
// EventConverter and monitor_for_disconnects). This is standard SSE behavior.
stream_handle.arm(); // allows the system to detect client disconnects and cancel the LLM generation stream_handle.arm(); // allows the system to detect client disconnects and cancel the LLM generation
let mut http_queue_guard = Some(http_queue_guard); let mut http_queue_guard = Some(http_queue_guard);
let stream = stream.map(move |response| { let stream = stream.map(move |response| {
// Calls observe_response() on each token // Calls observe_response() on each token
// EventConverter will detect `event: "error"` and convert to SSE error events
process_response_using_event_converter_and_observe_metrics( process_response_using_event_converter_and_observe_metrics(
EventConverter::from(response), EventConverter::from(response),
&mut response_collector, &mut response_collector,
...@@ -843,8 +961,17 @@ async fn chat_completions( ...@@ -843,8 +961,17 @@ async fn chat_completions(
Ok(sse_stream.into_response()) Ok(sse_stream.into_response())
} else { } else {
// Check first event for backend errors before aggregating (non-streaming only)
let stream_with_check =
check_for_backend_error(stream)
.await
.map_err(|error_response| {
tracing::error!(request_id, "Backend error detected: {:?}", error_response);
error_response
})?;
let mut http_queue_guard = Some(http_queue_guard); let mut http_queue_guard = Some(http_queue_guard);
let stream = stream.inspect(move |response| { let stream = stream_with_check.inspect(move |response| {
// Calls observe_response() on each token - drops http_queue_guard on first token // Calls observe_response() on each token - drops http_queue_guard on first token
process_response_and_observe_metrics( process_response_and_observe_metrics(
response, response,
...@@ -859,11 +986,11 @@ async fn chat_completions( ...@@ -859,11 +986,11 @@ async fn chat_completions(
.map_err(|e| { .map_err(|e| {
tracing::error!( tracing::error!(
request_id, request_id,
"Failed to fold chat completions stream for: {:?}", "Failed to parse chat completion response: {:?}",
e e
); );
ErrorMessage::internal_server_error(&format!( ErrorMessage::internal_server_error(&format!(
"Failed to fold chat completions stream: {}", "Failed to parse chat completion response: {}",
e e
)) ))
})?; })?;
...@@ -2055,4 +2182,136 @@ mod tests { ...@@ -2055,4 +2182,136 @@ mod tests {
assert!(msg.contains("response_format")); assert!(msg.contains("response_format"));
} }
} }
#[tokio::test]
async fn test_check_for_backend_error_with_error_event() {
use crate::types::openai::chat_completions::NvCreateChatCompletionStreamResponse;
use futures::stream;
// Create an error event
let error_event = Annotated::<NvCreateChatCompletionStreamResponse> {
data: None,
id: None,
event: Some("error".to_string()),
comment: Some(vec!["Backend service unavailable".to_string()]),
};
let test_stream = stream::iter(vec![error_event]);
let result = check_for_backend_error(test_stream).await;
// Should return an error
assert!(result.is_err());
if let Err(error_response) = result {
assert_eq!(error_response.0, StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.1.message, "Backend service unavailable");
}
}
#[tokio::test]
async fn test_check_for_backend_error_with_json_error_and_code() {
use crate::types::openai::chat_completions::NvCreateChatCompletionStreamResponse;
use futures::stream;
// Create an error event with JSON payload containing error code in comment
let error_json =
r#"{"message":"prompt > max_seq_len","type":"Internal Server Error","code":500}"#;
let error_event = Annotated::<NvCreateChatCompletionStreamResponse> {
data: None,
id: None,
event: Some("error".to_string()),
comment: Some(vec![error_json.to_string()]),
};
let test_stream = stream::iter(vec![error_event]);
let result = check_for_backend_error(test_stream).await;
// Should return an error with correct status code extracted from JSON
assert!(result.is_err());
if let Err(error_response) = result {
assert_eq!(error_response.0, StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.1.message, "prompt > max_seq_len");
assert_eq!(error_response.1.code, 500);
}
}
#[tokio::test]
async fn test_check_for_backend_error_with_normal_event() {
use crate::types::openai::chat_completions::NvCreateChatCompletionStreamResponse;
use dynamo_async_openai::types::CreateChatCompletionStreamResponse;
use futures::stream::{self, StreamExt};
// Create a normal data event
let normal_event = Annotated::<NvCreateChatCompletionStreamResponse> {
data: Some(CreateChatCompletionStreamResponse {
id: "test-id".to_string(),
choices: vec![],
created: 0,
model: "test-model".to_string(),
system_fingerprint: None,
object: "chat.completion.chunk".to_string(),
service_tier: None,
usage: None,
nvext: None,
}),
id: Some("msg-1".to_string()),
event: None,
comment: None,
};
let test_stream = stream::iter(vec![normal_event.clone()]);
let result = check_for_backend_error(test_stream).await;
// Should return Ok with the stream
assert!(result.is_ok());
let mut returned_stream = result.unwrap();
// Verify we can read the event back from the stream
let first = returned_stream.next().await;
assert!(first.is_some());
let first_event = first.unwrap();
assert_eq!(first_event.id, Some("msg-1".to_string()));
}
#[tokio::test]
async fn test_check_for_backend_error_with_empty_stream() {
use crate::types::openai::chat_completions::NvCreateChatCompletionStreamResponse;
use futures::stream::{self, StreamExt};
// Create an empty stream
let test_stream =
stream::iter::<Vec<Annotated<NvCreateChatCompletionStreamResponse>>>(vec![]);
let result = check_for_backend_error(test_stream).await;
// Should return Ok with an empty stream
assert!(result.is_ok());
let mut returned_stream = result.unwrap();
// Verify stream is empty
let first = returned_stream.next().await;
assert!(first.is_none());
}
#[tokio::test]
async fn test_check_for_backend_error_with_comment_but_no_event_type() {
use crate::types::openai::chat_completions::NvCreateChatCompletionStreamResponse;
use futures::stream;
// Create an event with comment but no event type and no data (error indicator)
let error_event = Annotated::<NvCreateChatCompletionStreamResponse> {
data: None,
id: None,
event: None,
comment: Some(vec!["Connection timeout".to_string()]),
};
let test_stream = stream::iter(vec![error_event]);
let result = check_for_backend_error(test_stream).await;
// Should return an error based on is_backend_error_event logic
assert!(result.is_err());
if let Err(error_response) = result {
assert_eq!(error_response.0, StatusCode::INTERNAL_SERVER_ERROR);
assert_eq!(error_response.1.message, "Connection timeout");
}
}
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment