annotated.rs 8.01 KB
Newer Older
1
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2
// SPDX-License-Identifier: Apache-2.0
Ryan Olson's avatar
Ryan Olson committed
3

4
use super::maybe_error::MaybeError;
5
use crate::error::DynamoError;
6
7
use anyhow::{Result, anyhow as error};
use serde::{Deserialize, Serialize};
Ryan Olson's avatar
Ryan Olson committed
8
9
10
11
12
13
14
15
16
17
18
19
20

pub trait AnnotationsProvider {
    fn annotations(&self) -> Option<Vec<String>>;
    fn has_annotation(&self, annotation: &str) -> bool {
        self.annotations()
            .map(|annotations| annotations.iter().any(|a| a == annotation))
            .unwrap_or(false)
    }
}

/// Our services have the option of returning an "annotated" stream, which allows use
/// to include additional information with each delta. This is useful for debugging,
/// performance benchmarking, and improved observability.
21
#[derive(Serialize, Deserialize, Clone, Debug)]
Ryan Olson's avatar
Ryan Olson committed
22
23
24
25
26
27
28
29
30
pub struct Annotated<R> {
    #[serde(skip_serializing_if = "Option::is_none")]
    pub data: Option<R>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub id: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub event: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub comment: Option<Vec<String>>,
31
32
    #[serde(skip_serializing_if = "Option::is_none")]
    pub error: Option<DynamoError>,
Ryan Olson's avatar
Ryan Olson committed
33
34
35
}

impl<R> Annotated<R> {
36
    /// Create a new annotated stream from the given error string
Ryan Olson's avatar
Ryan Olson committed
37
38
39
40
41
    pub fn from_error(error: String) -> Self {
        Self {
            data: None,
            id: None,
            event: Some("error".to_string()),
42
43
            comment: None,
            error: Some(DynamoError::msg(error)),
Ryan Olson's avatar
Ryan Olson committed
44
45
46
47
48
49
50
51
52
53
        }
    }

    /// Create a new annotated stream from the given data
    pub fn from_data(data: R) -> Self {
        Self {
            data: Some(data),
            id: None,
            event: None,
            comment: None,
54
            error: None,
Ryan Olson's avatar
Ryan Olson committed
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
        }
    }

    /// Add an annotation to the stream
    ///
    /// Annotations populate the `event` field and the `comment` field
    pub fn from_annotation<S: Serialize>(
        name: impl Into<String>,
        value: &S,
    ) -> Result<Self, serde_json::Error> {
        Ok(Self {
            data: None,
            id: None,
            event: Some(name.into()),
            comment: Some(vec![serde_json::to_string(value)?]),
70
            error: None,
Ryan Olson's avatar
Ryan Olson committed
71
72
73
74
        })
    }

    /// Convert to a [`Result<Self, String>`]
75
    /// If [`Self::event`] is "error", return an error message
Ryan Olson's avatar
Ryan Olson committed
76
    pub fn ok(self) -> Result<Self, String> {
77
78
79
        if let Some(event) = &self.event
            && event == "error"
        {
80
81
82
83
            // First check DynamoError, then fallback to comment
            if let Some(ref err) = self.error {
                return Err(err.to_string());
            }
84
85
86
87
            return Err(self
                .comment
                .unwrap_or(vec!["unknown error".to_string()])
                .join(", "));
Ryan Olson's avatar
Ryan Olson committed
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
        }
        Ok(self)
    }

    pub fn is_ok(&self) -> bool {
        self.event.as_deref() != Some("error")
    }

    pub fn is_err(&self) -> bool {
        !self.is_ok()
    }

    pub fn is_event(&self) -> bool {
        self.event.is_some()
    }

    pub fn transfer<U: Serialize>(self, data: Option<U>) -> Annotated<U> {
        Annotated::<U> {
            data,
            id: self.id,
            event: self.event,
            comment: self.comment,
110
            error: self.error,
Ryan Olson's avatar
Ryan Olson committed
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
        }
    }

    /// Apply a mapping/transformation to the data field
    /// If the mapping fails, the error is returned as an annotated stream
    pub fn map_data<U, F>(self, transform: F) -> Annotated<U>
    where
        F: FnOnce(R) -> Result<U, String>,
    {
        match self.data.map(transform).transpose() {
            Ok(data) => Annotated::<U> {
                data,
                id: self.id,
                event: self.event,
                comment: self.comment,
126
                error: self.error,
Ryan Olson's avatar
Ryan Olson committed
127
128
129
130
131
132
133
134
135
136
137
138
139
            },
            Err(e) => Annotated::from_error(e),
        }
    }

    pub fn is_error(&self) -> bool {
        self.event.as_deref() == Some("error")
    }

    pub fn into_result(self) -> Result<Option<R>> {
        match self.data {
            Some(data) => Ok(Some(data)),
            None => match self.event {
140
141
142
143
144
145
146
147
148
149
150
151
                Some(event) if event == "error" => {
                    // First check DynamoError, then fallback to comment
                    if let Some(ref err) = self.error {
                        Err(error!("{}", err))?
                    } else {
                        Err(error!(
                            self.comment
                                .unwrap_or(vec!["unknown error".to_string()])
                                .join(", ")
                        ))?
                    }
                }
Ryan Olson's avatar
Ryan Olson committed
152
153
154
155
156
157
                _ => Ok(None),
            },
        }
    }
}

158
159
impl<R> MaybeError for Annotated<R>
where
160
    R: for<'de> Deserialize<'de>,
161
{
162
163
164
165
166
167
168
169
170
171
    fn from_err(err: impl std::error::Error + 'static) -> Self {
        Self {
            data: None,
            id: None,
            event: Some("error".to_string()),
            comment: None,
            error: Some(DynamoError::from(
                Box::new(err) as Box<dyn std::error::Error + 'static>
            )),
        }
172
173
    }

174
    fn err(&self) -> Option<DynamoError> {
175
        if self.is_error() {
176
177
178
179
180
181
            // First check DynamoError field
            if let Some(ref error) = self.error {
                return Some(error.clone());
            }

            // Fallback to comment-based error
182
183
184
            if let Some(comment) = &self.comment
                && !comment.is_empty()
            {
185
                return Some(DynamoError::msg(comment.join("; ")));
186
            }
187
            Some(DynamoError::msg("unknown error"))
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
        } else {
            None
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_maybe_error() {
        let annotated = Annotated::from_data("Test data".to_string());
        assert!(annotated.err().is_none());
        assert!(annotated.is_ok());

        let annotated = Annotated::<String>::from_error("Test error 2".to_string());
205
206
207
208
209
        assert!(annotated.err().is_some());
        assert!(annotated.is_err());

        let dynamo_err = DynamoError::msg("Test error 3");
        let annotated = Annotated::<String>::from_err(dynamo_err);
210
        assert!(annotated.is_err());
211
212
213
214
215
216
    }

    #[test]
    fn test_from_err() {
        let err = DynamoError::msg("connection lost");
        let annotated = Annotated::<String>::from_err(err);
217
218

        assert!(annotated.is_err());
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
        let err = annotated.err().unwrap();
        assert!(err.to_string().contains("connection lost"));
    }

    #[test]
    fn test_error_serialization() {
        let err = DynamoError::msg("test error");
        let annotated = Annotated::<String>::from_err(err);

        // Serialize and deserialize
        let json = serde_json::to_string(&annotated).unwrap();
        let deserialized: Annotated<String> = serde_json::from_str(&json).unwrap();

        assert!(deserialized.is_err());
        assert!(
            deserialized
                .err()
                .unwrap()
                .to_string()
                .contains("test error")
        );
    }

    #[test]
    fn test_transfer_preserves_error() {
        let err = DynamoError::msg("request timed out");
        let annotated = Annotated::<String>::from_err(err);

        let transferred: Annotated<i32> = annotated.transfer(None);
        assert!(transferred.err().is_some());
    }

    #[test]
    fn test_ok_method() {
        let err = DynamoError::msg("connection lost");
        let annotated = Annotated::<String>::from_err(err);

        let result = annotated.ok();
        assert!(result.is_err());
        assert!(result.unwrap_err().contains("connection lost"));
    }

    #[test]
    fn test_into_result() {
        let err = DynamoError::msg("connection lost");
        let annotated = Annotated::<String>::from_err(err);

        let result = annotated.into_result();
        assert!(result.is_err());
        assert!(result.unwrap_err().to_string().contains("connection lost"));
269
270
    }
}