service.rs 7.94 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
Ryan Olson's avatar
Ryan Olson committed
15
16
17
18
19
20
21

// TODO - refactor this entire module
//
// we want to carry forward the concept of live vs ready for the components
// we will want to associate the components cancellation token with the
// component's "service state"

Ryan Olson's avatar
Ryan Olson committed
22
use crate::{error, transports::nats, utils::stream, Result};
Ryan Olson's avatar
Ryan Olson committed
23
24
25
26
27

use async_nats::Message;
use async_stream::try_stream;
use bytes::Bytes;
use derive_getters::Dissolve;
Ryan Olson's avatar
Ryan Olson committed
28
use futures::stream::{StreamExt, TryStreamExt};
Ryan Olson's avatar
Ryan Olson committed
29
30
31
32
33
34
35
36
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::time::Duration;

pub struct ServiceClient {
    nats_client: nats::Client,
}

impl ServiceClient {
37
    pub fn new(nats_client: nats::Client) -> Self {
Ryan Olson's avatar
Ryan Olson committed
38
39
40
41
        ServiceClient { nats_client }
    }
}

Ryan Olson's avatar
Ryan Olson committed
42
#[derive(Debug, Clone, Serialize, Deserialize)]
Ryan Olson's avatar
Ryan Olson committed
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
pub struct ServiceSet {
    services: Vec<ServiceInfo>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceInfo {
    pub name: String,
    pub id: String,
    pub version: String,
    pub started: String,
    pub endpoints: Vec<EndpointInfo>,
}

#[derive(Debug, Clone, Serialize, Deserialize, Dissolve)]
pub struct EndpointInfo {
    pub name: String,
    pub subject: String,

    #[serde(flatten)]
Ryan Olson's avatar
Ryan Olson committed
62
    pub data: Option<Metrics>,
Ryan Olson's avatar
Ryan Olson committed
63
64
}

Ryan Olson's avatar
Ryan Olson committed
65
66
67
68
69
impl EndpointInfo {
    pub fn id(&self) -> Result<i64> {
        let id = self
            .subject
            .split('-')
70
            .next_back()
Ryan Olson's avatar
Ryan Olson committed
71
72
73
74
75
            .ok_or_else(|| error!("No id found in subject"))?;

        i64::from_str_radix(id, 16).map_err(|e| error!("Invalid id format: {}", e))
    }
}
76
77
78
79
80
81

// TODO: This is _really_ close to the async_nats::service::Stats object,
// but it's missing a few fields like "name", so use a temporary struct
// for easy deserialization. Ideally, this type already exists or can
// be exposed in the library somewhere.
/// Stats structure returned from NATS service API
Ryan Olson's avatar
Ryan Olson committed
82
#[derive(Debug, Clone, Serialize, Deserialize, Dissolve)]
83
84
85
86
87
88
89
90
91
92
93
pub struct Metrics {
    // Standard NATS Service API fields
    pub average_processing_time: f64,
    pub last_error: String,
    pub num_errors: u64,
    pub num_requests: u64,
    pub processing_time: u64,
    pub queue_group: String,
    // Field containing custom stats handler data
    pub data: serde_json::Value,
}
Ryan Olson's avatar
Ryan Olson committed
94
95

impl Metrics {
Ryan Olson's avatar
Ryan Olson committed
96
    pub fn decode<T: for<'de> Deserialize<'de>>(self) -> Result<T> {
97
        serde_json::from_value(self.data).map_err(Into::into)
Ryan Olson's avatar
Ryan Olson committed
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
    }
}

impl ServiceClient {
    pub async fn unary(
        &self,
        subject: impl Into<String>,
        payload: impl Into<Bytes>,
    ) -> Result<Message> {
        let response = self
            .nats_client
            .client()
            .request(subject.into(), payload.into())
            .await?;
        Ok(response)
    }

115
116
117
    pub async fn collect_services(
        &self,
        service_name: &str,
118
        timeout: Duration,
119
    ) -> Result<ServiceSet> {
Ryan Olson's avatar
Ryan Olson committed
120
        let sub = self.nats_client.scrape_service(service_name).await?;
121
122
        if timeout.is_zero() {
            tracing::warn!("collect_services: timeout is zero");
Ryan Olson's avatar
Ryan Olson committed
123
        }
124
125
        if timeout > Duration::from_secs(10) {
            tracing::warn!("collect_services: timeout is greater than 10 seconds");
Ryan Olson's avatar
Ryan Olson committed
126
        }
127
        let deadline = tokio::time::Instant::now() + timeout;
Ryan Olson's avatar
Ryan Olson committed
128

129
        let services: Vec<ServiceInfo> = stream::until_deadline(sub, deadline)
Ryan Olson's avatar
Ryan Olson committed
130
131
132
133
134
135
136
137
            .map(|message| serde_json::from_slice::<ServiceInfo>(&message.payload))
            .filter_map(|info| async move {
                match info {
                    Ok(info) => Some(info),
                    Err(e) => {
                        log::debug!("error decoding service info: {:?}", e);
                        None
                    }
Ryan Olson's avatar
Ryan Olson committed
138
                }
Ryan Olson's avatar
Ryan Olson committed
139
140
141
            })
            .collect()
            .await;
Ryan Olson's avatar
Ryan Olson committed
142

Ryan Olson's avatar
Ryan Olson committed
143
        Ok(ServiceSet { services })
Ryan Olson's avatar
Ryan Olson committed
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
    }
}

impl ServiceSet {
    pub fn into_endpoints(self) -> impl Iterator<Item = EndpointInfo> {
        self.services
            .into_iter()
            .flat_map(|s| s.endpoints.into_iter())
    }
}

#[cfg(test)]
mod tests {

    use super::*;

    #[test]
    fn test_service_set() {
        let services = vec![
            ServiceInfo {
                name: "service1".to_string(),
                id: "1".to_string(),
                version: "1.0".to_string(),
                started: "2021-01-01".to_string(),
                endpoints: vec![
                    EndpointInfo {
                        name: "endpoint1".to_string(),
                        subject: "subject1".to_string(),
172
173
174
175
176
177
178
179
180
                        data: Some(Metrics {
                            average_processing_time: 0.1,
                            last_error: "none".to_string(),
                            num_errors: 0,
                            num_requests: 10,
                            processing_time: 100,
                            queue_group: "group1".to_string(),
                            data: serde_json::json!({"key": "value1"}),
                        }),
Ryan Olson's avatar
Ryan Olson committed
181
182
183
184
                    },
                    EndpointInfo {
                        name: "endpoint2-foo".to_string(),
                        subject: "subject2".to_string(),
185
186
187
188
189
190
191
192
193
                        data: Some(Metrics {
                            average_processing_time: 0.1,
                            last_error: "none".to_string(),
                            num_errors: 0,
                            num_requests: 10,
                            processing_time: 100,
                            queue_group: "group1".to_string(),
                            data: serde_json::json!({"key": "value1"}),
                        }),
Ryan Olson's avatar
Ryan Olson committed
194
195
196
197
198
199
200
201
202
203
204
205
                    },
                ],
            },
            ServiceInfo {
                name: "service1".to_string(),
                id: "2".to_string(),
                version: "1.0".to_string(),
                started: "2021-01-01".to_string(),
                endpoints: vec![
                    EndpointInfo {
                        name: "endpoint1".to_string(),
                        subject: "subject1".to_string(),
206
207
208
209
210
211
212
213
214
                        data: Some(Metrics {
                            average_processing_time: 0.1,
                            last_error: "none".to_string(),
                            num_errors: 0,
                            num_requests: 10,
                            processing_time: 100,
                            queue_group: "group1".to_string(),
                            data: serde_json::json!({"key": "value1"}),
                        }),
Ryan Olson's avatar
Ryan Olson committed
215
216
217
218
                    },
                    EndpointInfo {
                        name: "endpoint2-bar".to_string(),
                        subject: "subject2".to_string(),
219
220
221
222
223
224
225
226
227
                        data: Some(Metrics {
                            average_processing_time: 0.1,
                            last_error: "none".to_string(),
                            num_errors: 0,
                            num_requests: 10,
                            processing_time: 100,
                            queue_group: "group1".to_string(),
                            data: serde_json::json!({"key": "value2"}),
                        }),
Ryan Olson's avatar
Ryan Olson committed
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
                    },
                ],
            },
        ];

        let service_set = ServiceSet { services };

        let endpoints: Vec<_> = service_set
            .into_endpoints()
            .filter(|e| e.name.starts_with("endpoint2"))
            .collect();

        assert_eq!(endpoints.len(), 2);
    }
}