Unverified Commit 819bb62f authored by Graham King's avatar Graham King Committed by GitHub
Browse files

fix: Tests now pass with RUST_BACKTRACE set (#2647)

parent b92a805e
...@@ -100,10 +100,13 @@ impl RetryManager { ...@@ -100,10 +100,13 @@ impl RetryManager {
if let Some(response) = response_stream.next().await { if let Some(response) = response_stream.next().await {
if let Some(err) = response.err() { if let Some(err) = response.err() {
const STREAM_ERR_MSG: &str = "Stream ended before generation completed"; const STREAM_ERR_MSG: &str = "Stream ended before generation completed";
if format!("{:?}", err) == STREAM_ERR_MSG { if err
.chain()
.any(|e| e.to_string().starts_with(STREAM_ERR_MSG))
{
tracing::warn!("Stream disconnected... recreating stream..."); tracing::warn!("Stream disconnected... recreating stream...");
if let Err(err) = self.new_stream().await { if let Err(err) = self.new_stream().await {
tracing::warn!("Cannot recreate stream: {:?}", err); tracing::warn!("Cannot recreate stream: {:#}", err);
} else { } else {
continue; continue;
} }
...@@ -462,6 +465,7 @@ mod tests { ...@@ -462,6 +465,7 @@ mod tests {
/// Expected behavior: All 10 responses should be received successfully. /// Expected behavior: All 10 responses should be received successfully.
#[tokio::test] #[tokio::test]
async fn test_retry_manager_no_migration() { async fn test_retry_manager_no_migration() {
dynamo_runtime::logging::init();
let request = create_mock_request(10); let request = create_mock_request(10);
let mock_engine = Arc::new(MockEngine::new(MockBehavior::Success, 10, 100)); let mock_engine = Arc::new(MockEngine::new(MockBehavior::Success, 10, 100));
let next_generate: ServerStreamingEngine<PreprocessedRequest, Annotated<LLMEngineOutput>> = let next_generate: ServerStreamingEngine<PreprocessedRequest, Annotated<LLMEngineOutput>> =
...@@ -493,6 +497,7 @@ mod tests { ...@@ -493,6 +497,7 @@ mod tests {
/// Expected behavior: All 10 responses should be received successfully after retry. /// Expected behavior: All 10 responses should be received successfully after retry.
#[tokio::test] #[tokio::test]
async fn test_retry_manager_new_request_migration() { async fn test_retry_manager_new_request_migration() {
dynamo_runtime::logging::init();
let request = create_mock_request(10); let request = create_mock_request(10);
let mock_engine = Arc::new(MockEngine::new(MockBehavior::FailThenSuccess, 10, 100)); let mock_engine = Arc::new(MockEngine::new(MockBehavior::FailThenSuccess, 10, 100));
let next_generate: ServerStreamingEngine<PreprocessedRequest, Annotated<LLMEngineOutput>> = let next_generate: ServerStreamingEngine<PreprocessedRequest, Annotated<LLMEngineOutput>> =
...@@ -524,6 +529,8 @@ mod tests { ...@@ -524,6 +529,8 @@ mod tests {
/// Expected behavior: 5 responses from first stream + 5 responses from retry stream = 10 total. /// Expected behavior: 5 responses from first stream + 5 responses from retry stream = 10 total.
#[tokio::test] #[tokio::test]
async fn test_retry_manager_ongoing_request_migration() { async fn test_retry_manager_ongoing_request_migration() {
dynamo_runtime::logging::init();
let request = create_mock_request(10); let request = create_mock_request(10);
let mock_engine = Arc::new(MockEngine::new( let mock_engine = Arc::new(MockEngine::new(
MockBehavior::MidStreamFail { fail_after: 5 }, MockBehavior::MidStreamFail { fail_after: 5 },
...@@ -560,6 +567,7 @@ mod tests { ...@@ -560,6 +567,7 @@ mod tests {
/// Expected behavior: Should receive an error after all retries are exhausted, with the original error. /// Expected behavior: Should receive an error after all retries are exhausted, with the original error.
#[tokio::test] #[tokio::test]
async fn test_retry_manager_new_request_migration_indefinite_failure() { async fn test_retry_manager_new_request_migration_indefinite_failure() {
dynamo_runtime::logging::init();
let request = create_mock_request(0); let request = create_mock_request(0);
let mock_engine = Arc::new(MockEngine::new(MockBehavior::AlwaysFail, 0, 100)); let mock_engine = Arc::new(MockEngine::new(MockBehavior::AlwaysFail, 0, 100));
let next_generate: ServerStreamingEngine<PreprocessedRequest, Annotated<LLMEngineOutput>> = let next_generate: ServerStreamingEngine<PreprocessedRequest, Annotated<LLMEngineOutput>> =
...@@ -580,6 +588,7 @@ mod tests { ...@@ -580,6 +588,7 @@ mod tests {
/// Expected behavior: Should receive some responses from first stream, then error after retries exhausted. /// Expected behavior: Should receive some responses from first stream, then error after retries exhausted.
#[tokio::test] #[tokio::test]
async fn test_retry_manager_ongoing_request_migration_indefinite_failure() { async fn test_retry_manager_ongoing_request_migration_indefinite_failure() {
dynamo_runtime::logging::init();
let request = create_mock_request(10); let request = create_mock_request(10);
let mock_engine = Arc::new(MockEngine::new( let mock_engine = Arc::new(MockEngine::new(
MockBehavior::MidStreamFailAlways { fail_after: 3 }, MockBehavior::MidStreamFailAlways { fail_after: 3 },
...@@ -627,6 +636,7 @@ mod tests { ...@@ -627,6 +636,7 @@ mod tests {
/// Expected behavior: Should receive some responses from first stream, then error after retries exhausted. /// Expected behavior: Should receive some responses from first stream, then error after retries exhausted.
#[tokio::test] #[tokio::test]
async fn test_retry_manager_ongoing_request_migration_indefinite_failure_stream_error() { async fn test_retry_manager_ongoing_request_migration_indefinite_failure_stream_error() {
dynamo_runtime::logging::init();
let request = create_mock_request(10); let request = create_mock_request(10);
let mock_engine = Arc::new(MockEngine::new( let mock_engine = Arc::new(MockEngine::new(
MockBehavior::MidStreamFailAlwaysStreamError { fail_after: 3 }, MockBehavior::MidStreamFailAlwaysStreamError { fail_after: 3 },
......
...@@ -157,9 +157,9 @@ impl MaybeError for LLMEngineOutput { ...@@ -157,9 +157,9 @@ impl MaybeError for LLMEngineOutput {
LLMEngineOutput::error(format!("{:?}", err)) LLMEngineOutput::error(format!("{:?}", err))
} }
fn err(&self) -> Option<Box<dyn std::error::Error + Send + Sync>> { fn err(&self) -> Option<anyhow::Error> {
if let Some(FinishReason::Error(err_msg)) = &self.finish_reason { if let Some(FinishReason::Error(err_msg)) = &self.finish_reason {
Some(anyhow::Error::msg(err_msg.clone()).into()) Some(anyhow::Error::msg(err_msg.clone()))
} else { } else {
None None
} }
......
...@@ -143,14 +143,14 @@ where ...@@ -143,14 +143,14 @@ where
Annotated::from_error(format!("{:?}", err)) Annotated::from_error(format!("{:?}", err))
} }
fn err(&self) -> Option<Box<dyn std::error::Error + Send + Sync>> { fn err(&self) -> Option<anyhow::Error> {
if self.is_error() { if self.is_error() {
if let Some(comment) = &self.comment { if let Some(comment) = &self.comment {
if !comment.is_empty() { if !comment.is_empty() {
return Some(anyhow::Error::msg(comment.join("; ")).into()); return Some(anyhow::Error::msg(comment.join("; ")));
} }
} }
Some(anyhow::Error::msg("unknown error").into()) Some(anyhow::Error::msg("unknown error"))
} else { } else {
None None
} }
......
...@@ -20,7 +20,7 @@ pub trait MaybeError { ...@@ -20,7 +20,7 @@ pub trait MaybeError {
fn from_err(err: Box<dyn Error + Send + Sync>) -> Self; fn from_err(err: Box<dyn Error + Send + Sync>) -> Self;
/// Construct into an error instance. /// Construct into an error instance.
fn err(&self) -> Option<Box<dyn Error + Send + Sync>>; fn err(&self) -> Option<anyhow::Error>;
/// Check if the current instance represents a success. /// Check if the current instance represents a success.
fn is_ok(&self) -> bool { fn is_ok(&self) -> bool {
...@@ -46,8 +46,8 @@ mod tests { ...@@ -46,8 +46,8 @@ mod tests {
message: err.to_string(), message: err.to_string(),
} }
} }
fn err(&self) -> Option<Box<dyn Error + Send + Sync>> { fn err(&self) -> Option<anyhow::Error> {
Some(anyhow::Error::msg(self.message.clone()).into()) Some(anyhow::Error::msg(self.message.clone()))
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment