service.rs 9.94 KB
Newer Older
1
2
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
Ryan Olson's avatar
Ryan Olson committed
3
4
5
6
7
8
9

// TODO - refactor this entire module
//
// we want to carry forward the concept of live vs ready for the components
// we will want to associate the components cancellation token with the
// component's "service state"

10
use crate::{
11
    DistributedRuntime,
12
    component::Component,
13
    metrics::{MetricsHierarchy, prometheus_names},
14
15
16
17
    traits::*,
    transports::nats,
    utils::stream,
};
Ryan Olson's avatar
Ryan Olson committed
18

19
20
use anyhow::Result;
use anyhow::anyhow as error;
Ryan Olson's avatar
Ryan Olson committed
21
22
23
24
use async_nats::Message;
use async_stream::try_stream;
use bytes::Bytes;
use derive_getters::Dissolve;
Ryan Olson's avatar
Ryan Olson committed
25
use futures::stream::{StreamExt, TryStreamExt};
26
use prometheus;
27
use serde::{Deserialize, Serialize, de::DeserializeOwned};
Ryan Olson's avatar
Ryan Olson committed
28
29
30
31
32
33
34
use std::time::Duration;

pub struct ServiceClient {
    nats_client: nats::Client,
}

impl ServiceClient {
35
    pub fn new(nats_client: nats::Client) -> Self {
Ryan Olson's avatar
Ryan Olson committed
36
37
38
39
        ServiceClient { nats_client }
    }
}

40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/// ServiceSet contains a collection of services with their endpoints and metrics
///
/// Tree structure:
/// Structure:
/// - ServiceSet
///   - services: Vec<ServiceInfo>
///     - name: String
///     - id: String
///     - version: String
///     - started: String
///     - endpoints: Vec<EndpointInfo>
///       - name: String
///       - subject: String
///       - data: Option<NatsStatsMetrics>
///         - average_processing_time: f64
///         - last_error: String
///         - num_errors: u64
///         - num_requests: u64
///         - processing_time: u64
///         - queue_group: String
///         - data: serde_json::Value (custom stats)
Ryan Olson's avatar
Ryan Olson committed
61
#[derive(Debug, Clone, Serialize, Deserialize)]
Ryan Olson's avatar
Ryan Olson committed
62
63
64
65
pub struct ServiceSet {
    services: Vec<ServiceInfo>,
}

66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/// This is a example JSON from `nats req '$SRV.STATS.dynamo_backend'`:
/// {
///   "type": "io.nats.micro.v1.stats_response",
///   "name": "dynamo_backend",
///   "id": "bdu7nA8tbhy9mEkxIWlkBA",
///   "version": "0.0.1",
///   "started": "2025-08-08T05:07:17.720783523Z",
///   "endpoints": [
///     {
///       "name": "dynamo_backend-generate-694d988806b92e39",
///       "subject": "dynamo_backend.generate-694d988806b92e39",
///       "num_requests": 0,
///       "num_errors": 0,
///       "processing_time": 0,
///       "average_processing_time": 0,
///       "last_error": "",
///       "data": {
///         "val": 10
///       },
///       "queue_group": "q"
///     }
///   ]
/// }
Ryan Olson's avatar
Ryan Olson committed
89
90
91
92
93
94
95
96
97
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceInfo {
    pub name: String,
    pub id: String,
    pub version: String,
    pub started: String,
    pub endpoints: Vec<EndpointInfo>,
}

98
/// Each endpoint has name, subject, num_requests, num_errors, processing_time, average_processing_time, last_error, queue_group, and data
Ryan Olson's avatar
Ryan Olson committed
99
100
101
102
103
#[derive(Debug, Clone, Serialize, Deserialize, Dissolve)]
pub struct EndpointInfo {
    pub name: String,
    pub subject: String,

104
    /// Extra fields that don't fit in EndpointInfo will be flattened into the Metrics struct.
Ryan Olson's avatar
Ryan Olson committed
105
    #[serde(flatten)]
106
    pub data: Option<NatsStatsMetrics>,
Ryan Olson's avatar
Ryan Olson committed
107
108
}

Ryan Olson's avatar
Ryan Olson committed
109
110
111
112
113
impl EndpointInfo {
    pub fn id(&self) -> Result<i64> {
        let id = self
            .subject
            .split('-')
114
            .next_back()
Ryan Olson's avatar
Ryan Olson committed
115
116
117
118
119
            .ok_or_else(|| error!("No id found in subject"))?;

        i64::from_str_radix(id, 16).map_err(|e| error!("Invalid id format: {}", e))
    }
}
120
121
122
123
124
125

// TODO: This is _really_ close to the async_nats::service::Stats object,
// but it's missing a few fields like "name", so use a temporary struct
// for easy deserialization. Ideally, this type already exists or can
// be exposed in the library somewhere.
/// Stats structure returned from NATS service API
126
/// https://github.com/nats-io/nats.rs/blob/main/async-nats/src/service/endpoint.rs
Ryan Olson's avatar
Ryan Olson committed
127
#[derive(Debug, Clone, Serialize, Deserialize, Dissolve)]
128
129
130
pub struct NatsStatsMetrics {
    // Standard NATS Stats Service API fields from $SRV.STATS.<service_name> requests
    pub average_processing_time: u64, // in nanoseconds according to nats-io
131
132
133
    pub last_error: String,
    pub num_errors: u64,
    pub num_requests: u64,
134
    pub processing_time: u64, // in nanoseconds according to nats-io
135
136
137
138
    pub queue_group: String,
    // Field containing custom stats handler data
    pub data: serde_json::Value,
}
Ryan Olson's avatar
Ryan Olson committed
139

140
impl NatsStatsMetrics {
Ryan Olson's avatar
Ryan Olson committed
141
    pub fn decode<T: for<'de> Deserialize<'de>>(self) -> Result<T> {
142
        serde_json::from_value(self.data).map_err(Into::into)
Ryan Olson's avatar
Ryan Olson committed
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
    }
}

impl ServiceClient {
    pub async fn unary(
        &self,
        subject: impl Into<String>,
        payload: impl Into<Bytes>,
    ) -> Result<Message> {
        let response = self
            .nats_client
            .client()
            .request(subject.into(), payload.into())
            .await?;
        Ok(response)
    }

160
161
162
    pub async fn collect_services(
        &self,
        service_name: &str,
163
        timeout: Duration,
164
    ) -> Result<ServiceSet> {
Ryan Olson's avatar
Ryan Olson committed
165
        let sub = self.nats_client.scrape_service(service_name).await?;
166
167
        if timeout.is_zero() {
            tracing::warn!("collect_services: timeout is zero");
Ryan Olson's avatar
Ryan Olson committed
168
        }
169
170
        if timeout > Duration::from_secs(10) {
            tracing::warn!("collect_services: timeout is greater than 10 seconds");
Ryan Olson's avatar
Ryan Olson committed
171
        }
172
        let deadline = tokio::time::Instant::now() + timeout;
Ryan Olson's avatar
Ryan Olson committed
173

174
175
176
177
178
179
180
181
182
183
184
185
186
187
        let mut services = vec![];
        let mut s = stream::until_deadline(sub, deadline);
        while let Some(message) = s.next().await {
            if message.payload.is_empty() {
                // Expected while we wait for KV metrics in worker to start
                tracing::trace!(service_name, "collect_services: empty payload from nats");
                continue;
            }
            let info = serde_json::from_slice::<ServiceInfo>(&message.payload);
            match info {
                Ok(info) => services.push(info),
                Err(err) => {
                    let payload = String::from_utf8_lossy(&message.payload);
                    tracing::debug!(%err, service_name, %payload, "error decoding service info");
Ryan Olson's avatar
Ryan Olson committed
188
                }
189
190
            }
        }
Ryan Olson's avatar
Ryan Olson committed
191

Ryan Olson's avatar
Ryan Olson committed
192
        Ok(ServiceSet { services })
Ryan Olson's avatar
Ryan Olson committed
193
194
195
196
197
198
199
200
201
    }
}

impl ServiceSet {
    pub fn into_endpoints(self) -> impl Iterator<Item = EndpointInfo> {
        self.services
            .into_iter()
            .flat_map(|s| s.endpoints.into_iter())
    }
202
203
204
205
206

    /// Get a reference to the services in this ServiceSet
    pub fn services(&self) -> &[ServiceInfo] {
        &self.services
    }
Ryan Olson's avatar
Ryan Olson committed
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
}

#[cfg(test)]
mod tests {

    use super::*;

    #[test]
    fn test_service_set() {
        let services = vec![
            ServiceInfo {
                name: "service1".to_string(),
                id: "1".to_string(),
                version: "1.0".to_string(),
                started: "2021-01-01".to_string(),
                endpoints: vec![
                    EndpointInfo {
                        name: "endpoint1".to_string(),
                        subject: "subject1".to_string(),
226
227
                        data: Some(NatsStatsMetrics {
                            average_processing_time: 100_000, // 0.1ms = 100,000 nanoseconds
228
229
230
231
232
233
234
                            last_error: "none".to_string(),
                            num_errors: 0,
                            num_requests: 10,
                            processing_time: 100,
                            queue_group: "group1".to_string(),
                            data: serde_json::json!({"key": "value1"}),
                        }),
Ryan Olson's avatar
Ryan Olson committed
235
236
237
238
                    },
                    EndpointInfo {
                        name: "endpoint2-foo".to_string(),
                        subject: "subject2".to_string(),
239
240
                        data: Some(NatsStatsMetrics {
                            average_processing_time: 100_000, // 0.1ms = 100,000 nanoseconds
241
242
243
244
245
246
247
                            last_error: "none".to_string(),
                            num_errors: 0,
                            num_requests: 10,
                            processing_time: 100,
                            queue_group: "group1".to_string(),
                            data: serde_json::json!({"key": "value1"}),
                        }),
Ryan Olson's avatar
Ryan Olson committed
248
249
250
251
252
253
254
255
256
257
258
259
                    },
                ],
            },
            ServiceInfo {
                name: "service1".to_string(),
                id: "2".to_string(),
                version: "1.0".to_string(),
                started: "2021-01-01".to_string(),
                endpoints: vec![
                    EndpointInfo {
                        name: "endpoint1".to_string(),
                        subject: "subject1".to_string(),
260
261
                        data: Some(NatsStatsMetrics {
                            average_processing_time: 100_000, // 0.1ms = 100,000 nanoseconds
262
263
264
265
266
267
268
                            last_error: "none".to_string(),
                            num_errors: 0,
                            num_requests: 10,
                            processing_time: 100,
                            queue_group: "group1".to_string(),
                            data: serde_json::json!({"key": "value1"}),
                        }),
Ryan Olson's avatar
Ryan Olson committed
269
270
271
272
                    },
                    EndpointInfo {
                        name: "endpoint2-bar".to_string(),
                        subject: "subject2".to_string(),
273
274
                        data: Some(NatsStatsMetrics {
                            average_processing_time: 100_000, // 0.1ms = 100,000 nanoseconds
275
276
277
278
279
280
281
                            last_error: "none".to_string(),
                            num_errors: 0,
                            num_requests: 10,
                            processing_time: 100,
                            queue_group: "group1".to_string(),
                            data: serde_json::json!({"key": "value2"}),
                        }),
Ryan Olson's avatar
Ryan Olson committed
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
                    },
                ],
            },
        ];

        let service_set = ServiceSet { services };

        let endpoints: Vec<_> = service_set
            .into_endpoints()
            .filter(|e| e.name.starts_with("endpoint2"))
            .collect();

        assert_eq!(endpoints.len(), 2);
    }
}