// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 use super::maybe_error::MaybeError; use crate::error::DynamoError; use anyhow::{Result, anyhow as error}; use serde::{Deserialize, Serialize}; pub trait AnnotationsProvider { fn annotations(&self) -> Option>; fn has_annotation(&self, annotation: &str) -> bool { self.annotations() .map(|annotations| annotations.iter().any(|a| a == annotation)) .unwrap_or(false) } } /// Our services have the option of returning an "annotated" stream, which allows use /// to include additional information with each delta. This is useful for debugging, /// performance benchmarking, and improved observability. #[derive(Serialize, Deserialize, Clone, Debug)] pub struct Annotated { #[serde(skip_serializing_if = "Option::is_none")] pub data: Option, #[serde(skip_serializing_if = "Option::is_none")] pub id: Option, #[serde(skip_serializing_if = "Option::is_none")] pub event: Option, #[serde(skip_serializing_if = "Option::is_none")] pub comment: Option>, #[serde(skip_serializing_if = "Option::is_none")] pub error: Option, } impl Annotated { /// Create a new annotated stream from the given error string pub fn from_error(error: String) -> Self { Self { data: None, id: None, event: Some("error".to_string()), comment: None, error: Some(DynamoError::msg(error)), } } /// Create a new annotated stream from the given data pub fn from_data(data: R) -> Self { Self { data: Some(data), id: None, event: None, comment: None, error: None, } } /// Add an annotation to the stream /// /// Annotations populate the `event` field and the `comment` field pub fn from_annotation( name: impl Into, value: &S, ) -> Result { Ok(Self { data: None, id: None, event: Some(name.into()), comment: Some(vec![serde_json::to_string(value)?]), error: None, }) } /// Convert to a [`Result`] /// If [`Self::event`] is "error", return an error message pub fn ok(self) -> Result { if let Some(event) = &self.event && event == "error" { // First check DynamoError, then fallback to comment if let Some(ref err) = self.error { return Err(err.to_string()); } return Err(self .comment .unwrap_or(vec!["unknown error".to_string()]) .join(", ")); } Ok(self) } pub fn is_ok(&self) -> bool { self.event.as_deref() != Some("error") } pub fn is_err(&self) -> bool { !self.is_ok() } pub fn is_event(&self) -> bool { self.event.is_some() } pub fn transfer(self, data: Option) -> Annotated { Annotated:: { data, id: self.id, event: self.event, comment: self.comment, error: self.error, } } /// Apply a mapping/transformation to the data field /// If the mapping fails, the error is returned as an annotated stream pub fn map_data(self, transform: F) -> Annotated where F: FnOnce(R) -> Result, { match self.data.map(transform).transpose() { Ok(data) => Annotated:: { data, id: self.id, event: self.event, comment: self.comment, error: self.error, }, Err(e) => Annotated::from_error(e), } } pub fn is_error(&self) -> bool { self.event.as_deref() == Some("error") } pub fn into_result(self) -> Result> { match self.data { Some(data) => Ok(Some(data)), None => match self.event { Some(event) if event == "error" => { // First check DynamoError, then fallback to comment if let Some(ref err) = self.error { Err(error!("{}", err))? } else { Err(error!( self.comment .unwrap_or(vec!["unknown error".to_string()]) .join(", ") ))? } } _ => Ok(None), }, } } } impl MaybeError for Annotated where R: for<'de> Deserialize<'de>, { fn from_err(err: impl std::error::Error + 'static) -> Self { Self { data: None, id: None, event: Some("error".to_string()), comment: None, error: Some(DynamoError::from( Box::new(err) as Box )), } } fn err(&self) -> Option { if self.is_error() { // First check DynamoError field if let Some(ref error) = self.error { return Some(error.clone()); } // Fallback to comment-based error if let Some(comment) = &self.comment && !comment.is_empty() { return Some(DynamoError::msg(comment.join("; "))); } Some(DynamoError::msg("unknown error")) } else { None } } } #[cfg(test)] mod tests { use super::*; #[test] fn test_maybe_error() { let annotated = Annotated::from_data("Test data".to_string()); assert!(annotated.err().is_none()); assert!(annotated.is_ok()); let annotated = Annotated::::from_error("Test error 2".to_string()); assert!(annotated.err().is_some()); assert!(annotated.is_err()); let dynamo_err = DynamoError::msg("Test error 3"); let annotated = Annotated::::from_err(dynamo_err); assert!(annotated.is_err()); } #[test] fn test_from_err() { let err = DynamoError::msg("connection lost"); let annotated = Annotated::::from_err(err); assert!(annotated.is_err()); let err = annotated.err().unwrap(); assert!(err.to_string().contains("connection lost")); } #[test] fn test_error_serialization() { let err = DynamoError::msg("test error"); let annotated = Annotated::::from_err(err); // Serialize and deserialize let json = serde_json::to_string(&annotated).unwrap(); let deserialized: Annotated = serde_json::from_str(&json).unwrap(); assert!(deserialized.is_err()); assert!( deserialized .err() .unwrap() .to_string() .contains("test error") ); } #[test] fn test_transfer_preserves_error() { let err = DynamoError::msg("request timed out"); let annotated = Annotated::::from_err(err); let transferred: Annotated = annotated.transfer(None); assert!(transferred.err().is_some()); } #[test] fn test_ok_method() { let err = DynamoError::msg("connection lost"); let annotated = Annotated::::from_err(err); let result = annotated.ok(); assert!(result.is_err()); assert!(result.unwrap_err().contains("connection lost")); } #[test] fn test_into_result() { let err = DynamoError::msg("connection lost"); let annotated = Annotated::::from_err(err); let result = annotated.into_result(); assert!(result.is_err()); assert!(result.unwrap_err().to_string().contains("connection lost")); } }