Unverified Commit c8276cd2 authored by Jacky's avatar Jacky Committed by GitHub
Browse files

feat: Standardized Dynamo Error Type (#6303)


Signed-off-by: default avatarJacky <18255193+kthui@users.noreply.github.com>
parent eb76a8b5
...@@ -14,10 +14,11 @@ use tokio::sync::mpsc; ...@@ -14,10 +14,11 @@ use tokio::sync::mpsc;
use tokio_stream::{StreamExt, wrappers::ReceiverStream}; use tokio_stream::{StreamExt, wrappers::ReceiverStream};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use dynamo_runtime::error::{BackendError, DynamoError, ErrorType};
use dynamo_runtime::logging::get_distributed_tracing_context; use dynamo_runtime::logging::get_distributed_tracing_context;
pub use dynamo_runtime::{ pub use dynamo_runtime::{
pipeline::{AsyncEngine, AsyncEngineContextProvider, Data, ManyOut, ResponseStream, SingleIn}, pipeline::{AsyncEngine, AsyncEngineContextProvider, Data, ManyOut, ResponseStream, SingleIn},
protocols::annotated::Annotated, protocols::{annotated::Annotated, maybe_error::MaybeError},
}; };
use super::context::{Context, callable_accepts_kwarg}; use super::context::{Context, callable_accepts_kwarg};
...@@ -237,38 +238,36 @@ where ...@@ -237,38 +238,36 @@ where
Err(e) => { Err(e) => {
done = true; done = true;
let msg = match &e { match e {
ResponseProcessingError::DeserializeError(e) => { ResponseProcessingError::DeserializeError(e) => {
// tell the python async generator to stop generating // tell the python async generator to stop generating
// right now, this is impossible as we are not passing the context to the python async generator // right now, this is impossible as we are not passing the context to the python async generator
// todo: add task-local context to the python async generator // todo: add task-local context to the python async generator
ctx.stop_generating(); ctx.stop_generating();
let msg = format!( Annotated::from_error(format!(
"critical error: invalid response object from python async generator; application-logic-mismatch: {}", "critical error: invalid response object from python async generator; application-logic-mismatch: {}",
e e
); ))
msg
}
ResponseProcessingError::PyGeneratorExit(_) => {
"Stream ended before generation completed".to_string()
} }
ResponseProcessingError::PyGeneratorExit(_) => Annotated::from_err(
DynamoError::builder()
.error_type(ErrorType::Backend(BackendError::EngineShutdown))
.message("engine shutting down")
.build(),
),
ResponseProcessingError::PythonException(e) => { ResponseProcessingError::PythonException(e) => {
let msg = format!( Annotated::from_error(format!(
"a python exception was caught while processing the async generator: {}", "a python exception was caught while processing the async generator: {}",
e e
); ))
msg
} }
ResponseProcessingError::OffloadError(e) => { ResponseProcessingError::OffloadError(e) => {
let msg = format!( Annotated::from_error(format!(
"critical error: failed to offload the python async generator to a new thread: {}", "critical error: failed to offload the python async generator to a new thread: {}",
e e
); ))
msg
} }
}; }
Annotated::from_error(msg)
} }
}; };
......
...@@ -36,6 +36,7 @@ use std::time::Instant; ...@@ -36,6 +36,7 @@ use std::time::Instant;
use async_trait::async_trait; use async_trait::async_trait;
use dashmap::DashMap; use dashmap::DashMap;
use dynamo_runtime::error::DynamoError;
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
pub use dynamo_runtime::protocols::maybe_error::MaybeError; pub use dynamo_runtime::protocols::maybe_error::MaybeError;
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
...@@ -51,9 +52,9 @@ use rustc_hash::FxBuildHasher; ...@@ -51,9 +52,9 @@ use rustc_hash::FxBuildHasher;
#[cfg(not(feature = "metrics"))] #[cfg(not(feature = "metrics"))]
pub trait MaybeError { pub trait MaybeError {
/// Construct an instance from an error. /// Construct an instance from an error.
fn from_err(err: Box<dyn std::error::Error + Send + Sync>) -> Self; fn from_err(err: impl std::error::Error + 'static) -> Self;
/// Convert to an error instance if this represents an error. /// Convert to an error instance if this represents an error.
fn err(&self) -> Option<anyhow::Error>; fn err(&self) -> Option<DynamoError>;
} }
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
...@@ -125,14 +126,15 @@ pub enum WorkerKvQueryResponse { ...@@ -125,14 +126,15 @@ pub enum WorkerKvQueryResponse {
Error(String), Error(String),
} }
#[cfg(feature = "metrics")]
impl MaybeError for WorkerKvQueryResponse { impl MaybeError for WorkerKvQueryResponse {
fn from_err(err: Box<dyn std::error::Error + Send + Sync>) -> Self { fn from_err(err: impl std::error::Error + 'static) -> Self {
WorkerKvQueryResponse::Error(err.to_string()) WorkerKvQueryResponse::Error(err.to_string())
} }
fn err(&self) -> Option<anyhow::Error> { fn err(&self) -> Option<DynamoError> {
match self { match self {
WorkerKvQueryResponse::Error(msg) => Some(anyhow::Error::msg(msg.clone())), WorkerKvQueryResponse::Error(msg) => Some(DynamoError::msg(msg.clone())),
_ => None, _ => None,
} }
} }
......
...@@ -238,6 +238,7 @@ pub fn final_response_to_one_chunk_stream( ...@@ -238,6 +238,7 @@ pub fn final_response_to_one_chunk_stream(
id: None, id: None,
event: None, event: None,
comment: None, comment: None,
error: None,
}; };
Box::pin(futures::stream::once(async move { annotated })) Box::pin(futures::stream::once(async move { annotated }))
} }
...@@ -290,6 +291,7 @@ mod tests { ...@@ -290,6 +291,7 @@ mod tests {
id: None, id: None,
event: None, event: None,
comment: None, comment: None,
error: None,
} }
} }
...@@ -328,6 +330,7 @@ mod tests { ...@@ -328,6 +330,7 @@ mod tests {
id: None, id: None,
event: None, event: None,
comment: None, comment: None,
error: None,
} }
} }
...@@ -451,6 +454,7 @@ mod tests { ...@@ -451,6 +454,7 @@ mod tests {
id: Some("correlation-123".to_string()), id: Some("correlation-123".to_string()),
event: Some("test-event".to_string()), event: Some("test-event".to_string()),
comment: Some(vec!["test-comment".to_string()]), comment: Some(vec!["test-comment".to_string()]),
error: None,
}; };
let input_stream = stream::iter(vec![chunk_with_metadata.clone()]); let input_stream = stream::iter(vec![chunk_with_metadata.clone()]);
......
...@@ -160,12 +160,12 @@ impl ...@@ -160,12 +160,12 @@ impl
// we are returning characters not tokens, so there will be some postprocessing overhead // we are returning characters not tokens, so there will be some postprocessing overhead
tokio::time::sleep(*TOKEN_ECHO_DELAY).await; tokio::time::sleep(*TOKEN_ECHO_DELAY).await;
let response = deltas.create_choice(0, Some(c.to_string()), None, None, None); let response = deltas.create_choice(0, Some(c.to_string()), None, None, None);
yield Annotated{ id: Some(id.to_string()), data: Some(response), event: None, comment: None }; yield Annotated{ id: Some(id.to_string()), data: Some(response), event: None, comment: None, error: None };
id += 1; id += 1;
} }
let response = deltas.create_choice(0, None, Some(dynamo_async_openai::types::FinishReason::Stop), None, None); let response = deltas.create_choice(0, None, Some(dynamo_async_openai::types::FinishReason::Stop), None, None);
yield Annotated { id: Some(id.to_string()), data: Some(response), event: None, comment: None }; yield Annotated { id: Some(id.to_string()), data: Some(response), event: None, comment: None, error: None };
}; };
Ok(ResponseStream::new(Box::pin(output), ctx)) Ok(ResponseStream::new(Box::pin(output), ctx))
...@@ -193,11 +193,11 @@ impl ...@@ -193,11 +193,11 @@ impl
for c in chars_string.chars() { for c in chars_string.chars() {
tokio::time::sleep(*TOKEN_ECHO_DELAY).await; tokio::time::sleep(*TOKEN_ECHO_DELAY).await;
let response = deltas.create_choice(0, Some(c.to_string()), None, None); let response = deltas.create_choice(0, Some(c.to_string()), None, None);
yield Annotated{ id: Some(id.to_string()), data: Some(response), event: None, comment: None }; yield Annotated{ id: Some(id.to_string()), data: Some(response), event: None, comment: None, error: None };
id += 1; id += 1;
} }
let response = deltas.create_choice(0, None, Some(dynamo_async_openai::types::CompletionFinishReason::Stop), None); let response = deltas.create_choice(0, None, Some(dynamo_async_openai::types::CompletionFinishReason::Stop), None);
yield Annotated { id: Some(id.to_string()), data: Some(response), event: None, comment: None }; yield Annotated { id: Some(id.to_string()), data: Some(response), event: None, comment: None, error: None };
}; };
......
...@@ -1766,6 +1766,7 @@ mod tests { ...@@ -1766,6 +1766,7 @@ mod tests {
data: None, data: None,
event: Some(crate::preprocessor::ANNOTATION_LLM_METRICS.to_string()), event: Some(crate::preprocessor::ANNOTATION_LLM_METRICS.to_string()),
comment: None, comment: None,
error: None,
}; };
// Add metrics annotation with cached_tokens // Add metrics annotation with cached_tokens
...@@ -1870,6 +1871,7 @@ mod tests { ...@@ -1870,6 +1871,7 @@ mod tests {
data: None, data: None,
event: Some(crate::preprocessor::ANNOTATION_LLM_METRICS.to_string()), event: Some(crate::preprocessor::ANNOTATION_LLM_METRICS.to_string()),
comment: None, comment: None,
error: None,
}; };
let llm_metrics = LLMMetricAnnotation { let llm_metrics = LLMMetricAnnotation {
......
...@@ -756,23 +756,40 @@ fn extract_backend_error_if_present<T: serde::Serialize>( ...@@ -756,23 +756,40 @@ fn extract_backend_error_if_present<T: serde::Serialize>(
if let Some(event_type) = &event.event if let Some(event_type) = &event.event
&& event_type == "error" && event_type == "error"
{ {
let comment_str = event // Extract error string: prefer DynamoError field, fallback to legacy comment.
.comment // Use message() instead of to_string() for DynamoError to avoid prefixing
.as_ref() // the ErrorType (e.g., "Unknown: {...}"), which would break JSON parsing.
.map(|c| c.join(", ")) let error_str = if let Some(ref dynamo_err) = event.error {
.unwrap_or_else(|| "Unknown error".to_string()); let mut parts = Vec::new();
let mut current: Option<&dyn std::error::Error> = Some(dynamo_err);
// Try to parse comment as error JSON to extract status code while let Some(e) = current {
if let Ok(error_payload) = serde_json::from_str::<ErrorPayload>(&comment_str) { if let Some(de) = e.downcast_ref::<dynamo_runtime::error::DynamoError>() {
parts.push(de.message().to_string());
} else {
parts.push(e.to_string());
}
current = e.source();
}
parts.join(", ")
} else {
event
.comment
.as_ref()
.map(|c| c.join(", "))
.unwrap_or_else(|| "Unknown error".to_string())
};
// Try to parse as error JSON to extract status code
if let Ok(error_payload) = serde_json::from_str::<ErrorPayload>(&error_str) {
let code = error_payload let code = error_payload
.code .code
.and_then(|c| StatusCode::from_u16(c).ok()) .and_then(|c| StatusCode::from_u16(c).ok())
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
let message = error_payload.message.unwrap_or(comment_str); let message = error_payload.message.unwrap_or(error_str);
return Some((message, code)); return Some((message, code));
} }
return Some((comment_str, StatusCode::INTERNAL_SERVER_ERROR)); return Some((error_str, StatusCode::INTERNAL_SERVER_ERROR));
} }
// Check if the data payload itself contains an error structure with code >= 400 // Check if the data payload itself contains an error structure with code >= 400
...@@ -2455,6 +2472,7 @@ mod tests { ...@@ -2455,6 +2472,7 @@ mod tests {
id: None, id: None,
event: Some("error".to_string()), event: Some("error".to_string()),
comment: Some(vec!["Backend service unavailable".to_string()]), comment: Some(vec!["Backend service unavailable".to_string()]),
error: None,
}; };
let test_stream = stream::iter(vec![error_event]); let test_stream = stream::iter(vec![error_event]);
...@@ -2481,6 +2499,7 @@ mod tests { ...@@ -2481,6 +2499,7 @@ mod tests {
id: None, id: None,
event: Some("error".to_string()), event: Some("error".to_string()),
comment: Some(vec![error_json.to_string()]), comment: Some(vec![error_json.to_string()]),
error: None,
}; };
let test_stream = stream::iter(vec![error_event]); let test_stream = stream::iter(vec![error_event]);
...@@ -2517,6 +2536,7 @@ mod tests { ...@@ -2517,6 +2536,7 @@ mod tests {
id: Some("msg-1".to_string()), id: Some("msg-1".to_string()),
event: None, event: None,
comment: None, comment: None,
error: None,
}; };
let test_stream = stream::iter(vec![normal_event.clone()]); let test_stream = stream::iter(vec![normal_event.clone()]);
...@@ -2563,6 +2583,7 @@ mod tests { ...@@ -2563,6 +2583,7 @@ mod tests {
id: None, id: None,
event: None, event: None,
comment: Some(vec!["Connection timeout".to_string()]), comment: Some(vec!["Connection timeout".to_string()]),
error: None,
}; };
let test_stream = stream::iter(vec![error_event]); let test_stream = stream::iter(vec![error_event]);
......
...@@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize}; ...@@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize};
use dynamo_runtime::{ use dynamo_runtime::{
component::Component, component::Component,
error::DynamoError,
pipeline::{ pipeline::{
AsyncEngine, AsyncEngineContextProvider, ManyOut, ResponseStream, SingleIn, async_trait, AsyncEngine, AsyncEngineContextProvider, ManyOut, ResponseStream, SingleIn, async_trait,
network::Ingress, network::Ingress,
...@@ -42,13 +43,13 @@ pub enum IndexerQueryResponse { ...@@ -42,13 +43,13 @@ pub enum IndexerQueryResponse {
} }
impl MaybeError for IndexerQueryResponse { impl MaybeError for IndexerQueryResponse {
fn from_err(err: Box<dyn std::error::Error + Send + Sync>) -> Self { fn from_err(err: impl std::error::Error + 'static) -> Self {
IndexerQueryResponse::Error(err.to_string()) IndexerQueryResponse::Error(err.to_string())
} }
fn err(&self) -> Option<anyhow::Error> { fn err(&self) -> Option<DynamoError> {
match self { match self {
IndexerQueryResponse::Error(msg) => Some(anyhow::Error::msg(msg.clone())), IndexerQueryResponse::Error(msg) => Some(DynamoError::msg(msg.clone())),
_ => None, _ => None,
} }
} }
......
...@@ -34,10 +34,13 @@ pub enum PrefillError { ...@@ -34,10 +34,13 @@ pub enum PrefillError {
#[error("Prefill router not yet activated")] #[error("Prefill router not yet activated")]
NotActivated, NotActivated,
/// Error during prefill execution
/// TODO: Separate prefill worker error from prefill router error /// TODO: Separate prefill worker error from prefill router error
/// Error during prefill execution
#[error("Prefill execution failed: {0}")] #[error("Prefill execution failed: {0}")]
PrefillError(String), PrefillError(
String,
#[source] Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
),
/// Disaggregated params not found in prefill response /// Disaggregated params not found in prefill response
#[error("No disaggregated params in prefill response: {0}")] #[error("No disaggregated params in prefill response: {0}")]
...@@ -360,7 +363,12 @@ impl PrefillRouter { ...@@ -360,7 +363,12 @@ impl PrefillRouter {
let mut prefill_response = router let mut prefill_response = router
.generate_to_worker(request, target_worker) .generate_to_worker(request, target_worker)
.await .await
.map_err(|e| PrefillError::PrefillError(e.to_string()))?; .map_err(|e| {
PrefillError::PrefillError(
"failed to route to prefill worker".to_string(),
Some(e.into()),
)
})?;
// Drop phase permit now - routing is complete, record_worker_full was called in select_worker. // Drop phase permit now - routing is complete, record_worker_full was called in select_worker.
// This unblocks set_phase(Decode) in the main task without waiting for prefill output. // This unblocks set_phase(Decode) in the main task without waiting for prefill output.
...@@ -369,6 +377,7 @@ impl PrefillRouter { ...@@ -369,6 +377,7 @@ impl PrefillRouter {
let Some(first_output) = prefill_response.next().await else { let Some(first_output) = prefill_response.next().await else {
return Err(PrefillError::PrefillError( return Err(PrefillError::PrefillError(
"Prefill router returned no output (stream ended)".to_string(), "Prefill router returned no output (stream ended)".to_string(),
None,
)); ));
}; };
...@@ -390,9 +399,10 @@ impl PrefillRouter { ...@@ -390,9 +399,10 @@ impl PrefillRouter {
} }
if let Some(err) = first_output.err() { if let Some(err) = first_output.err() {
return Err(PrefillError::PrefillError(format!( return Err(PrefillError::PrefillError(
"Prefill router returned error in output: {err:?}" "Prefill router returned error in output".to_string(),
))); Some(Box::new(err)),
));
} }
let Some(output) = &first_output.data else { let Some(output) = &first_output.data else {
......
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use std::error::Error as StdError;
use std::sync::Arc; use std::sync::Arc;
use anyhow::{Error, Result}; use anyhow::{Error, Result};
use futures::{stream, stream::StreamExt}; use futures::{stream, stream::StreamExt};
use async_nats::client::{
RequestError as NatsRequestError, RequestErrorKind::NoResponders as NatsNoResponders,
};
use crate::{ use crate::{
http::service::metrics::Metrics, model_card::ModelDeploymentCard, preprocessor::BackendOutput, http::service::metrics::Metrics, model_card::ModelDeploymentCard, preprocessor::BackendOutput,
protocols::common::llm_backend::PreprocessedRequest, protocols::common::llm_backend::PreprocessedRequest,
}; };
use dynamo_runtime::error::{self, BackendError, DynamoError, ErrorType};
use dynamo_runtime::pipeline::{ use dynamo_runtime::pipeline::{
AsyncEngineContext, AsyncEngineContextProvider, Context, ManyOut, Operator, ResponseStream, AsyncEngineContext, AsyncEngineContextProvider, Context, ManyOut, Operator, ResponseStream,
ServerStreamingEngine, SingleIn, async_trait, network::STREAM_ERR_MSG, ServerStreamingEngine, SingleIn, async_trait,
}; };
use dynamo_runtime::protocols::{annotated::Annotated, maybe_error::MaybeError}; use dynamo_runtime::protocols::{annotated::Annotated, maybe_error::MaybeError};
/// Check if an error chain indicates the request should be migrated.
fn is_migratable(err: &(dyn StdError + 'static)) -> bool {
const MIGRATABLE: &[ErrorType] = &[
ErrorType::CannotConnect,
ErrorType::Disconnected,
ErrorType::ConnectionTimeout,
ErrorType::Backend(BackendError::EngineShutdown),
];
const NON_MIGRATABLE: &[ErrorType] = &[
// Future: ErrorType::Cancelled, ErrorType::ValidationError, etc.
];
error::match_error_chain(err, MIGRATABLE, NON_MIGRATABLE)
}
pub struct Migration { pub struct Migration {
migration_limit: u32, migration_limit: u32,
model_name: Arc<String>, model_name: Arc<String>,
...@@ -121,18 +133,15 @@ impl RetryManager { ...@@ -121,18 +133,15 @@ impl RetryManager {
Some(stream) => stream, Some(stream) => stream,
None => { None => {
tracing::error!("next() called with next_stream is None - should not happen"); tracing::error!("next() called with next_stream is None - should not happen");
return Some(Annotated::from_err( return Some(Annotated::from_err(DynamoError::msg("next_stream is None")));
Error::msg("next_stream is None").into(),
));
} }
}; };
if let Some(response) = response_stream.next().await { if let Some(response) = response_stream.next().await {
// Check if this is a migratable error that should trigger stream recreation.
if let Some(err) = response.err() if let Some(err) = response.err()
&& err && is_migratable(&err)
.chain()
.any(|e| e.to_string().starts_with(STREAM_ERR_MSG))
{ {
tracing::warn!("Stream disconnected... recreating stream..."); tracing::warn!("Stream disconnected... recreating stream... {}", err);
self.metrics.inc_migration_ongoing_request(&self.model_name); self.metrics.inc_migration_ongoing_request(&self.model_name);
if let Err(err) = self.new_stream().await { if let Err(err) = self.new_stream().await {
tracing::warn!("Cannot recreate stream: {:#}", err); tracing::warn!("Cannot recreate stream: {:#}", err);
...@@ -162,10 +171,9 @@ impl RetryManager { ...@@ -162,10 +171,9 @@ impl RetryManager {
} }
response_stream = Some(self.next_generate.generate(request).await); response_stream = Some(self.next_generate.generate(request).await);
if let Some(err) = response_stream.as_ref().unwrap().as_ref().err() if let Some(err) = response_stream.as_ref().unwrap().as_ref().err()
&& let Some(req_err) = err.downcast_ref::<NatsRequestError>() && is_migratable(err.as_ref())
&& matches!(req_err.kind(), NatsNoResponders)
{ {
tracing::warn!("Creating new stream... retrying..."); tracing::warn!("Creating new stream... retrying... {}", err);
self.metrics.inc_migration_new_request(&self.model_name); self.metrics.inc_migration_new_request(&self.model_name);
continue; continue;
} }
...@@ -206,6 +214,7 @@ mod tests { ...@@ -206,6 +214,7 @@ mod tests {
use super::*; use super::*;
use crate::http::service::metrics::Metrics; use crate::http::service::metrics::Metrics;
use crate::protocols::common::{OutputOptions, SamplingOptions, StopConditions}; use crate::protocols::common::{OutputOptions, SamplingOptions, StopConditions};
use dynamo_runtime::error::{DynamoError, ErrorType};
use dynamo_runtime::pipeline::AsyncEngine; use dynamo_runtime::pipeline::AsyncEngine;
use dynamo_runtime::pipeline::context::Controller; use dynamo_runtime::pipeline::context::Controller;
use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::atomic::{AtomicU32, Ordering};
...@@ -337,8 +346,12 @@ mod tests { ...@@ -337,8 +346,12 @@ mod tests {
MockBehavior::FailThenSuccess => { MockBehavior::FailThenSuccess => {
if call_num == 0 { if call_num == 0 {
// First call - return "No responders available" error to trigger retry // First call - return "No responders available" error to trigger retry
let nats_error: NatsRequestError = NatsNoResponders.into(); return Err(anyhow::anyhow!(
return Err(nats_error.into()); DynamoError::builder()
.error_type(ErrorType::CannotConnect)
.message("no responders")
.build()
));
} else { } else {
// Subsequent calls - succeed with remaining responses // Subsequent calls - succeed with remaining responses
self.send_responses(responses_already_generated, self.num_responses) self.send_responses(responses_already_generated, self.num_responses)
...@@ -362,8 +375,12 @@ mod tests { ...@@ -362,8 +375,12 @@ mod tests {
} }
} }
// Send the specific error that triggers retry logic // Send the specific error that triggers retry logic
let error_response = let error_response = Annotated::from_err(
Annotated::from_err(anyhow::Error::msg(STREAM_ERR_MSG).into()); DynamoError::builder()
.error_type(ErrorType::Disconnected)
.message("Stream ended before generation completed")
.build(),
);
let _ = tx.send(error_response).await; let _ = tx.send(error_response).await;
}); });
} else { } else {
...@@ -402,8 +419,12 @@ mod tests { ...@@ -402,8 +419,12 @@ mod tests {
} }
} }
// Send the specific error that triggers retry logic // Send the specific error that triggers retry logic
let error_response = let error_response = Annotated::from_err(
Annotated::from_err(anyhow::Error::msg(STREAM_ERR_MSG).into()); DynamoError::builder()
.error_type(ErrorType::Disconnected)
.message("Stream ended before generation completed")
.build(),
);
let _ = tx.send(error_response).await; let _ = tx.send(error_response).await;
}); });
...@@ -415,8 +436,12 @@ mod tests { ...@@ -415,8 +436,12 @@ mod tests {
)) ))
} else { } else {
// Subsequent calls - always fail with NoResponders error (same as AlwaysFail) // Subsequent calls - always fail with NoResponders error (same as AlwaysFail)
let nats_error: NatsRequestError = NatsNoResponders.into(); Err(anyhow::anyhow!(
Err(nats_error.into()) DynamoError::builder()
.error_type(ErrorType::CannotConnect)
.message("no responders")
.build()
))
} }
} }
MockBehavior::MidStreamFailAlwaysStreamError { fail_after } => { MockBehavior::MidStreamFailAlwaysStreamError { fail_after } => {
...@@ -436,8 +461,12 @@ mod tests { ...@@ -436,8 +461,12 @@ mod tests {
} }
} }
// Send the specific error that triggers retry logic // Send the specific error that triggers retry logic
let error_response = let error_response = Annotated::from_err(
Annotated::from_err(anyhow::Error::msg(STREAM_ERR_MSG).into()); DynamoError::builder()
.error_type(ErrorType::Disconnected)
.message("Stream ended before generation completed")
.build(),
);
let _ = tx.send(error_response).await; let _ = tx.send(error_response).await;
}); });
...@@ -451,8 +480,12 @@ mod tests { ...@@ -451,8 +480,12 @@ mod tests {
// Subsequent calls - immediately send stream error (no successful responses) // Subsequent calls - immediately send stream error (no successful responses)
tokio::spawn(async move { tokio::spawn(async move {
// Send the stream error immediately // Send the stream error immediately
let error_response = let error_response = Annotated::from_err(
Annotated::from_err(anyhow::Error::msg(STREAM_ERR_MSG).into()); DynamoError::builder()
.error_type(ErrorType::Disconnected)
.message("Stream ended before generation completed")
.build(),
);
let _ = tx.send(error_response).await; let _ = tx.send(error_response).await;
}); });
...@@ -466,8 +499,12 @@ mod tests { ...@@ -466,8 +499,12 @@ mod tests {
} }
MockBehavior::AlwaysFail => { MockBehavior::AlwaysFail => {
// Always fail with NoResponders error (same as FailThenSuccess first call) // Always fail with NoResponders error (same as FailThenSuccess first call)
let nats_error: NatsRequestError = NatsNoResponders.into(); Err(anyhow::anyhow!(
Err(nats_error.into()) DynamoError::builder()
.error_type(ErrorType::CannotConnect)
.message("no responders")
.build()
))
} }
} }
} }
...@@ -741,12 +778,10 @@ mod tests { ...@@ -741,12 +778,10 @@ mod tests {
} }
} }
// 4th response should be an error after retries are exhausted // 4th response should be a Disconnected error after retries are exhausted
let error_response = &responses[3]; let error_response = &responses[3];
assert!(error_response.err().is_some()); let err = error_response.err().expect("expected error response");
if let Some(error) = error_response.err() { assert_eq!(err.error_type(), ErrorType::Disconnected);
assert!(error.to_string().contains(STREAM_ERR_MSG));
}
assert_eq!(metrics.get_migration_new_request_count(TEST_MODEL), 3); // 2 retries + 1 final failure assert_eq!(metrics.get_migration_new_request_count(TEST_MODEL), 3); // 2 retries + 1 final failure
assert_eq!(metrics.get_migration_ongoing_request_count(TEST_MODEL), 1); // initial ongoing failure retry assert_eq!(metrics.get_migration_ongoing_request_count(TEST_MODEL), 1); // initial ongoing failure retry
...@@ -801,12 +836,10 @@ mod tests { ...@@ -801,12 +836,10 @@ mod tests {
} }
} }
// 4th response should be an error after retries are exhausted // 4th response should be a Disconnected error after retries are exhausted
let error_response = &responses[3]; let error_response = &responses[3];
assert!(error_response.err().is_some()); let err = error_response.err().expect("expected error response");
if let Some(error) = error_response.err() { assert_eq!(err.error_type(), ErrorType::Disconnected);
assert!(error.to_string().contains(STREAM_ERR_MSG));
}
assert_eq!(metrics.get_migration_new_request_count(TEST_MODEL), 0); assert_eq!(metrics.get_migration_new_request_count(TEST_MODEL), 0);
assert_eq!(metrics.get_migration_ongoing_request_count(TEST_MODEL), 4); // 3 retries + 1 final failure assert_eq!(metrics.get_migration_ongoing_request_count(TEST_MODEL), 4); // 3 retries + 1 final failure
......
...@@ -810,6 +810,7 @@ impl OpenAIPreprocessor { ...@@ -810,6 +810,7 @@ impl OpenAIPreprocessor {
data, data,
event: Some(ANNOTATION_LLM_METRICS.to_string()), event: Some(ANNOTATION_LLM_METRICS.to_string()),
comment: annotation.comment, comment: annotation.comment,
error: None,
}; };
tracing::trace!( tracing::trace!(
......
...@@ -108,6 +108,7 @@ where ...@@ -108,6 +108,7 @@ where
id: value.id, id: value.id,
event: value.event, event: value.event,
comment: value.comments, comment: value.comments,
error: None,
}) })
} }
} }
......
...@@ -8,6 +8,7 @@ pub use super::preprocessor::PreprocessedRequest; ...@@ -8,6 +8,7 @@ pub use super::preprocessor::PreprocessedRequest;
use crate::protocols::TokenIdType; use crate::protocols::TokenIdType;
use dynamo_async_openai::types::CompletionUsage; use dynamo_async_openai::types::CompletionUsage;
use dynamo_async_openai::types::StopReason; use dynamo_async_openai::types::StopReason;
use dynamo_runtime::error::DynamoError;
use dynamo_runtime::protocols::maybe_error::MaybeError; use dynamo_runtime::protocols::maybe_error::MaybeError;
pub type TokenType = Option<String>; pub type TokenType = Option<String>;
...@@ -245,13 +246,13 @@ impl LLMEngineOutput { ...@@ -245,13 +246,13 @@ impl LLMEngineOutput {
} }
impl MaybeError for LLMEngineOutput { impl MaybeError for LLMEngineOutput {
fn from_err(err: Box<dyn std::error::Error + Send + Sync>) -> Self { fn from_err(err: impl std::error::Error + 'static) -> Self {
LLMEngineOutput::error(format!("{:?}", err)) LLMEngineOutput::error(err.to_string())
} }
fn err(&self) -> Option<anyhow::Error> { fn err(&self) -> Option<DynamoError> {
if let Some(FinishReason::Error(err_msg)) = &self.finish_reason { if let Some(FinishReason::Error(err_msg)) = &self.finish_reason {
Some(anyhow::Error::msg(err_msg.clone())) Some(DynamoError::msg(err_msg.clone()))
} else { } else {
None None
} }
...@@ -281,7 +282,7 @@ mod tests { ...@@ -281,7 +282,7 @@ mod tests {
assert!(!output.is_err()); assert!(!output.is_err());
let output = LLMEngineOutput::error("Test error".to_string()); let output = LLMEngineOutput::error("Test error".to_string());
assert_eq!(format!("{}", output.err().unwrap()), "Test error"); assert!(format!("{}", output.err().unwrap()).contains("Test error"));
assert!(!output.is_ok()); assert!(!output.is_ok());
assert!(output.is_err()); assert!(output.is_err());
} }
......
...@@ -457,6 +457,7 @@ mod tests { ...@@ -457,6 +457,7 @@ mod tests {
id: Some("test_id".to_string()), id: Some("test_id".to_string()),
event: None, event: None,
comment: None, comment: None,
error: None,
} }
} }
...@@ -694,6 +695,7 @@ mod tests { ...@@ -694,6 +695,7 @@ mod tests {
id: Some("test_id".to_string()), id: Some("test_id".to_string()),
event: None, event: None,
comment: None, comment: None,
error: None,
}; };
let stream = Box::pin(stream::iter(vec![annotated_delta])); let stream = Box::pin(stream::iter(vec![annotated_delta]));
...@@ -759,6 +761,7 @@ mod tests { ...@@ -759,6 +761,7 @@ mod tests {
id: Some("test_id".to_string()), id: Some("test_id".to_string()),
event: None, event: None,
comment: None, comment: None,
error: None,
}; };
let stream = Box::pin(stream::iter(vec![annotated_delta])); let stream = Box::pin(stream::iter(vec![annotated_delta]));
...@@ -801,6 +804,7 @@ mod tests { ...@@ -801,6 +804,7 @@ mod tests {
id: Some("test_id".to_string()), id: Some("test_id".to_string()),
event: None, event: None,
comment: None, comment: None,
error: None,
}; };
let stream = Box::pin(stream::iter(vec![annotated_delta])); let stream = Box::pin(stream::iter(vec![annotated_delta]));
...@@ -843,6 +847,7 @@ mod tests { ...@@ -843,6 +847,7 @@ mod tests {
id: Some("test_id".to_string()), id: Some("test_id".to_string()),
event: None, event: None,
comment: None, comment: None,
error: None,
}; };
let stream = Box::pin(stream::iter(vec![annotated_delta])); let stream = Box::pin(stream::iter(vec![annotated_delta]));
...@@ -883,6 +888,7 @@ mod tests { ...@@ -883,6 +888,7 @@ mod tests {
id: Some("test_id".to_string()), id: Some("test_id".to_string()),
event: None, event: None,
comment: None, comment: None,
error: None,
}; };
let stream = Box::pin(stream::iter(vec![annotated_delta])); let stream = Box::pin(stream::iter(vec![annotated_delta]));
...@@ -927,6 +933,7 @@ mod tests { ...@@ -927,6 +933,7 @@ mod tests {
id: Some("test_id".to_string()), id: Some("test_id".to_string()),
event: None, event: None,
comment: None, comment: None,
error: None,
}; };
let stream = Box::pin(stream::iter(vec![annotated_delta])); let stream = Box::pin(stream::iter(vec![annotated_delta]));
...@@ -969,6 +976,7 @@ mod tests { ...@@ -969,6 +976,7 @@ mod tests {
id: Some("test_id".to_string()), id: Some("test_id".to_string()),
event: None, event: None,
comment: None, comment: None,
error: None,
}; };
let stream = Box::pin(stream::iter(vec![annotated_delta])); let stream = Box::pin(stream::iter(vec![annotated_delta]));
......
...@@ -677,6 +677,7 @@ impl JailedStream { ...@@ -677,6 +677,7 @@ impl JailedStream {
id, id,
event, event,
comment, comment,
error: None,
}] }]
} }
EmissionMode::SingleChoicePerChunk => { EmissionMode::SingleChoicePerChunk => {
...@@ -692,6 +693,7 @@ impl JailedStream { ...@@ -692,6 +693,7 @@ impl JailedStream {
id: id.clone(), id: id.clone(),
event: event.clone(), event: event.clone(),
comment: comment.clone(), comment: comment.clone(),
error: None,
} }
}) })
.collect() .collect()
......
...@@ -265,6 +265,7 @@ mod tests { ...@@ -265,6 +265,7 @@ mod tests {
id: Some("test_id".to_string()), id: Some("test_id".to_string()),
event: None, event: None,
comment: None, comment: None,
error: None,
} }
} }
...@@ -395,6 +396,7 @@ mod tests { ...@@ -395,6 +396,7 @@ mod tests {
id: Some("test_id".to_string()), id: Some("test_id".to_string()),
event: None, event: None,
comment: None, comment: None,
error: None,
}; };
// Create a stream // Create a stream
......
...@@ -64,6 +64,7 @@ mod tests { ...@@ -64,6 +64,7 @@ mod tests {
id: None, id: None,
event: None, event: None,
comment: None, comment: None,
error: None,
} }
} }
...@@ -104,6 +105,7 @@ mod tests { ...@@ -104,6 +105,7 @@ mod tests {
id: None, id: None,
event: None, event: None,
comment: None, comment: None,
error: None,
} }
} }
...@@ -148,6 +150,7 @@ mod tests { ...@@ -148,6 +150,7 @@ mod tests {
id, id,
event, event,
comment, comment,
error: None,
} }
} }
...@@ -193,6 +196,7 @@ mod tests { ...@@ -193,6 +196,7 @@ mod tests {
id: None, id: None,
event: None, event: None,
comment: None, comment: None,
error: None,
} }
} }
...@@ -238,6 +242,7 @@ mod tests { ...@@ -238,6 +242,7 @@ mod tests {
id: None, id: None,
event: None, event: None,
comment: None, comment: None,
error: None,
} }
} }
...@@ -2395,6 +2400,7 @@ mod parallel_jail_tests { ...@@ -2395,6 +2400,7 @@ mod parallel_jail_tests {
id: None, id: None,
event: None, event: None,
comment: None, comment: None,
error: None,
} }
} }
......
...@@ -55,6 +55,7 @@ fn create_mock_response_chunk( ...@@ -55,6 +55,7 @@ fn create_mock_response_chunk(
data: Some(response), data: Some(response),
event: None, event: None,
comment: None, comment: None,
error: None,
} }
} }
......
...@@ -123,6 +123,7 @@ fn load_test_data(file_path: &str) -> TestData { ...@@ -123,6 +123,7 @@ fn load_test_data(file_path: &str) -> TestData {
data: Some(response), data: Some(response),
event: None, event: None,
comment: None, comment: None,
error: None,
} }
}) })
.collect(); .collect();
......
...@@ -58,6 +58,7 @@ async fn apply_jail_transformation( ...@@ -58,6 +58,7 @@ async fn apply_jail_transformation(
id: None, id: None,
event: None, event: None,
comment: None, comment: None,
error: None,
}]); }]);
let mut builder = JailedStream::builder(); let mut builder = JailedStream::builder();
...@@ -95,6 +96,7 @@ async fn apply_jail_transformation_streaming( ...@@ -95,6 +96,7 @@ async fn apply_jail_transformation_streaming(
id: None, id: None,
event: None, event: None,
comment: None, comment: None,
error: None,
})); }));
let mut builder = JailedStream::builder(); let mut builder = JailedStream::builder();
......
...@@ -67,6 +67,7 @@ async fn apply_jail_transformation( ...@@ -67,6 +67,7 @@ async fn apply_jail_transformation(
id: None, id: None,
event: None, event: None,
comment: None, comment: None,
error: None,
}]); }]);
let mut builder = JailedStream::builder(); let mut builder = JailedStream::builder();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment