service.rs 16.4 KB
Newer Older
1
2
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
Ryan Olson's avatar
Ryan Olson committed
3
4
5
6
7
8
9

// TODO - refactor this entire module
//
// we want to carry forward the concept of live vs ready for the components
// we will want to associate the components cancellation token with the
// component's "service state"

10
use crate::{
11
    DistributedRuntime,
12
    component::Component,
13
    metrics::{MetricsHierarchy, prometheus_names, prometheus_names::nats_service},
14
15
16
17
    traits::*,
    transports::nats,
    utils::stream,
};
Ryan Olson's avatar
Ryan Olson committed
18

19
20
use anyhow::Result;
use anyhow::anyhow as error;
Ryan Olson's avatar
Ryan Olson committed
21
22
23
24
use async_nats::Message;
use async_stream::try_stream;
use bytes::Bytes;
use derive_getters::Dissolve;
Ryan Olson's avatar
Ryan Olson committed
25
use futures::stream::{StreamExt, TryStreamExt};
26
use prometheus;
27
use serde::{Deserialize, Serialize, de::DeserializeOwned};
Ryan Olson's avatar
Ryan Olson committed
28
29
30
31
32
33
34
use std::time::Duration;

pub struct ServiceClient {
    nats_client: nats::Client,
}

impl ServiceClient {
35
    pub fn new(nats_client: nats::Client) -> Self {
Ryan Olson's avatar
Ryan Olson committed
36
37
38
39
        ServiceClient { nats_client }
    }
}

40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/// ServiceSet contains a collection of services with their endpoints and metrics
///
/// Tree structure:
/// Structure:
/// - ServiceSet
///   - services: Vec<ServiceInfo>
///     - name: String
///     - id: String
///     - version: String
///     - started: String
///     - endpoints: Vec<EndpointInfo>
///       - name: String
///       - subject: String
///       - data: Option<NatsStatsMetrics>
///         - average_processing_time: f64
///         - last_error: String
///         - num_errors: u64
///         - num_requests: u64
///         - processing_time: u64
///         - queue_group: String
///         - data: serde_json::Value (custom stats)
Ryan Olson's avatar
Ryan Olson committed
61
#[derive(Debug, Clone, Serialize, Deserialize)]
Ryan Olson's avatar
Ryan Olson committed
62
63
64
65
pub struct ServiceSet {
    services: Vec<ServiceInfo>,
}

66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/// This is a example JSON from `nats req '$SRV.STATS.dynamo_backend'`:
/// {
///   "type": "io.nats.micro.v1.stats_response",
///   "name": "dynamo_backend",
///   "id": "bdu7nA8tbhy9mEkxIWlkBA",
///   "version": "0.0.1",
///   "started": "2025-08-08T05:07:17.720783523Z",
///   "endpoints": [
///     {
///       "name": "dynamo_backend-generate-694d988806b92e39",
///       "subject": "dynamo_backend.generate-694d988806b92e39",
///       "num_requests": 0,
///       "num_errors": 0,
///       "processing_time": 0,
///       "average_processing_time": 0,
///       "last_error": "",
///       "data": {
///         "val": 10
///       },
///       "queue_group": "q"
///     }
///   ]
/// }
Ryan Olson's avatar
Ryan Olson committed
89
90
91
92
93
94
95
96
97
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceInfo {
    pub name: String,
    pub id: String,
    pub version: String,
    pub started: String,
    pub endpoints: Vec<EndpointInfo>,
}

98
/// Each endpoint has name, subject, num_requests, num_errors, processing_time, average_processing_time, last_error, queue_group, and data
Ryan Olson's avatar
Ryan Olson committed
99
100
101
102
103
#[derive(Debug, Clone, Serialize, Deserialize, Dissolve)]
pub struct EndpointInfo {
    pub name: String,
    pub subject: String,

104
    /// Extra fields that don't fit in EndpointInfo will be flattened into the Metrics struct.
Ryan Olson's avatar
Ryan Olson committed
105
    #[serde(flatten)]
106
    pub data: Option<NatsStatsMetrics>,
Ryan Olson's avatar
Ryan Olson committed
107
108
}

Ryan Olson's avatar
Ryan Olson committed
109
110
111
112
113
impl EndpointInfo {
    pub fn id(&self) -> Result<i64> {
        let id = self
            .subject
            .split('-')
114
            .next_back()
Ryan Olson's avatar
Ryan Olson committed
115
116
117
118
119
            .ok_or_else(|| error!("No id found in subject"))?;

        i64::from_str_radix(id, 16).map_err(|e| error!("Invalid id format: {}", e))
    }
}
120
121
122
123
124
125

// TODO: This is _really_ close to the async_nats::service::Stats object,
// but it's missing a few fields like "name", so use a temporary struct
// for easy deserialization. Ideally, this type already exists or can
// be exposed in the library somewhere.
/// Stats structure returned from NATS service API
126
/// https://github.com/nats-io/nats.rs/blob/main/async-nats/src/service/endpoint.rs
Ryan Olson's avatar
Ryan Olson committed
127
#[derive(Debug, Clone, Serialize, Deserialize, Dissolve)]
128
129
130
pub struct NatsStatsMetrics {
    // Standard NATS Stats Service API fields from $SRV.STATS.<service_name> requests
    pub average_processing_time: u64, // in nanoseconds according to nats-io
131
132
133
    pub last_error: String,
    pub num_errors: u64,
    pub num_requests: u64,
134
    pub processing_time: u64, // in nanoseconds according to nats-io
135
136
137
138
    pub queue_group: String,
    // Field containing custom stats handler data
    pub data: serde_json::Value,
}
Ryan Olson's avatar
Ryan Olson committed
139

140
impl NatsStatsMetrics {
Ryan Olson's avatar
Ryan Olson committed
141
    pub fn decode<T: for<'de> Deserialize<'de>>(self) -> Result<T> {
142
        serde_json::from_value(self.data).map_err(Into::into)
Ryan Olson's avatar
Ryan Olson committed
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
    }
}

impl ServiceClient {
    pub async fn unary(
        &self,
        subject: impl Into<String>,
        payload: impl Into<Bytes>,
    ) -> Result<Message> {
        let response = self
            .nats_client
            .client()
            .request(subject.into(), payload.into())
            .await?;
        Ok(response)
    }

160
161
162
    pub async fn collect_services(
        &self,
        service_name: &str,
163
        timeout: Duration,
164
    ) -> Result<ServiceSet> {
Ryan Olson's avatar
Ryan Olson committed
165
        let sub = self.nats_client.scrape_service(service_name).await?;
166
167
        if timeout.is_zero() {
            tracing::warn!("collect_services: timeout is zero");
Ryan Olson's avatar
Ryan Olson committed
168
        }
169
170
        if timeout > Duration::from_secs(10) {
            tracing::warn!("collect_services: timeout is greater than 10 seconds");
Ryan Olson's avatar
Ryan Olson committed
171
        }
172
        let deadline = tokio::time::Instant::now() + timeout;
Ryan Olson's avatar
Ryan Olson committed
173

174
175
176
177
178
179
180
181
182
183
184
185
186
187
        let mut services = vec![];
        let mut s = stream::until_deadline(sub, deadline);
        while let Some(message) = s.next().await {
            if message.payload.is_empty() {
                // Expected while we wait for KV metrics in worker to start
                tracing::trace!(service_name, "collect_services: empty payload from nats");
                continue;
            }
            let info = serde_json::from_slice::<ServiceInfo>(&message.payload);
            match info {
                Ok(info) => services.push(info),
                Err(err) => {
                    let payload = String::from_utf8_lossy(&message.payload);
                    tracing::debug!(%err, service_name, %payload, "error decoding service info");
Ryan Olson's avatar
Ryan Olson committed
188
                }
189
190
            }
        }
Ryan Olson's avatar
Ryan Olson committed
191

Ryan Olson's avatar
Ryan Olson committed
192
        Ok(ServiceSet { services })
Ryan Olson's avatar
Ryan Olson committed
193
194
195
196
197
198
199
200
201
    }
}

impl ServiceSet {
    pub fn into_endpoints(self) -> impl Iterator<Item = EndpointInfo> {
        self.services
            .into_iter()
            .flat_map(|s| s.endpoints.into_iter())
    }
202
203
204
205
206

    /// Get a reference to the services in this ServiceSet
    pub fn services(&self) -> &[ServiceInfo] {
        &self.services
    }
Ryan Olson's avatar
Ryan Olson committed
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
}

#[cfg(test)]
mod tests {

    use super::*;

    #[test]
    fn test_service_set() {
        let services = vec![
            ServiceInfo {
                name: "service1".to_string(),
                id: "1".to_string(),
                version: "1.0".to_string(),
                started: "2021-01-01".to_string(),
                endpoints: vec![
                    EndpointInfo {
                        name: "endpoint1".to_string(),
                        subject: "subject1".to_string(),
226
227
                        data: Some(NatsStatsMetrics {
                            average_processing_time: 100_000, // 0.1ms = 100,000 nanoseconds
228
229
230
231
232
233
234
                            last_error: "none".to_string(),
                            num_errors: 0,
                            num_requests: 10,
                            processing_time: 100,
                            queue_group: "group1".to_string(),
                            data: serde_json::json!({"key": "value1"}),
                        }),
Ryan Olson's avatar
Ryan Olson committed
235
236
237
238
                    },
                    EndpointInfo {
                        name: "endpoint2-foo".to_string(),
                        subject: "subject2".to_string(),
239
240
                        data: Some(NatsStatsMetrics {
                            average_processing_time: 100_000, // 0.1ms = 100,000 nanoseconds
241
242
243
244
245
246
247
                            last_error: "none".to_string(),
                            num_errors: 0,
                            num_requests: 10,
                            processing_time: 100,
                            queue_group: "group1".to_string(),
                            data: serde_json::json!({"key": "value1"}),
                        }),
Ryan Olson's avatar
Ryan Olson committed
248
249
250
251
252
253
254
255
256
257
258
259
                    },
                ],
            },
            ServiceInfo {
                name: "service1".to_string(),
                id: "2".to_string(),
                version: "1.0".to_string(),
                started: "2021-01-01".to_string(),
                endpoints: vec![
                    EndpointInfo {
                        name: "endpoint1".to_string(),
                        subject: "subject1".to_string(),
260
261
                        data: Some(NatsStatsMetrics {
                            average_processing_time: 100_000, // 0.1ms = 100,000 nanoseconds
262
263
264
265
266
267
268
                            last_error: "none".to_string(),
                            num_errors: 0,
                            num_requests: 10,
                            processing_time: 100,
                            queue_group: "group1".to_string(),
                            data: serde_json::json!({"key": "value1"}),
                        }),
Ryan Olson's avatar
Ryan Olson committed
269
270
271
272
                    },
                    EndpointInfo {
                        name: "endpoint2-bar".to_string(),
                        subject: "subject2".to_string(),
273
274
                        data: Some(NatsStatsMetrics {
                            average_processing_time: 100_000, // 0.1ms = 100,000 nanoseconds
275
276
277
278
279
280
281
                            last_error: "none".to_string(),
                            num_errors: 0,
                            num_requests: 10,
                            processing_time: 100,
                            queue_group: "group1".to_string(),
                            data: serde_json::json!({"key": "value2"}),
                        }),
Ryan Olson's avatar
Ryan Olson committed
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
                    },
                ],
            },
        ];

        let service_set = ServiceSet { services };

        let endpoints: Vec<_> = service_set
            .into_endpoints()
            .filter(|e| e.name.starts_with("endpoint2"))
            .collect();

        assert_eq!(endpoints.len(), 2);
    }
}
297
298
299
300
301
302
303
304
305
306
307

/// Prometheus metrics for component service statistics (ordered to match NatsStatsMetrics)
///
/// ⚠️  IMPORTANT: These Prometheus Gauges are COPIES of NATS data, not live references!
///
/// How it works:
/// 1. NATS provides source data via NatsStatsMetrics
/// 2. Metrics callbacks read current NATS values and update these Prometheus Gauges
/// 3. Prometheus scrapes these Gauge values (snapshots, not live data)
///
/// Flow: NATS Service → NatsStatsMetrics (Counters) → Metrics Callback → Prometheus Gauge
308
/// Note: These are snapshots updated when execute_prometheus_update_callbacks() is called.
309
#[derive(Debug, Clone)]
310
311
312
/// Prometheus metrics for NATS server components.
/// Note: Metrics with `_total` names use IntGauge because we copy counter values
/// from underlying services rather than incrementing directly.
313
pub struct ComponentNatsServerPrometheusMetrics {
314
    /// Average processing time in milliseconds (maps to: average_processing_time)
315
    pub service_processing_ms_avg: prometheus::Gauge,
316
    /// Total errors across all endpoints (maps to: num_errors)
317
    pub service_errors_total: prometheus::IntGauge,
318
    /// Total requests across all endpoints (maps to: num_requests)
319
    pub service_requests_total: prometheus::IntGauge,
320
    /// Total processing time in milliseconds (maps to: processing_time)
321
    pub service_processing_ms_total: prometheus::IntGauge,
322
    /// Number of active services (derived from ServiceSet.services)
323
    pub service_active_services: prometheus::IntGauge,
324
    /// Number of active endpoints (derived from ServiceInfo.endpoints)
325
    pub service_active_endpoints: prometheus::IntGauge,
326
327
}

328
impl ComponentNatsServerPrometheusMetrics {
329
330
    /// Create new ComponentServiceMetrics using Component's DistributedRuntime's Prometheus constructors
    pub fn new(component: &Component) -> Result<Self> {
331
332
333
334
335
336
337
338
339
340
341
342
        let service_name = component.service_name();

        // Build labels: service_name first, then component's labels
        let mut labels_vec = vec![("service_name", service_name.as_str())];

        // Add component's labels (convert from (String, String) to (&str, &str))
        for (key, value) in component.labels() {
            labels_vec.push((key.as_str(), value.as_str()));
        }

        let labels: &[(&str, &str)] = &labels_vec;

343
        let service_processing_ms_avg = component.metrics().create_gauge(
344
            nats_service::PROCESSING_MS_AVG,
345
            "Average processing time across all component endpoints in milliseconds",
346
            labels,
347
348
        )?;

349
        let service_errors_total = component.metrics().create_intgauge(
350
            nats_service::ERRORS_TOTAL,
351
            "Total number of errors across all component endpoints",
352
            labels,
353
354
        )?;

355
        let service_requests_total = component.metrics().create_intgauge(
356
            nats_service::REQUESTS_TOTAL,
357
            "Total number of requests across all component endpoints",
358
            labels,
359
360
        )?;

361
        let service_processing_ms_total = component.metrics().create_intgauge(
362
            nats_service::PROCESSING_MS_TOTAL,
363
            "Total processing time across all component endpoints in milliseconds",
364
            labels,
365
366
        )?;

367
        let service_active_services = component.metrics().create_intgauge(
368
            nats_service::ACTIVE_SERVICES,
369
            "Number of active services in this component",
370
            labels,
371
372
        )?;

373
        let service_active_endpoints = component.metrics().create_intgauge(
374
            nats_service::ACTIVE_ENDPOINTS,
375
            "Number of active endpoints across all services",
376
            labels,
377
378
379
        )?;

        Ok(Self {
380
381
382
383
            service_processing_ms_avg,
            service_errors_total,
            service_requests_total,
            service_processing_ms_total,
384
385
            service_active_services,
            service_active_endpoints,
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
        })
    }

    /// Update metrics from scraped ServiceSet data
    pub fn update_from_service_set(&self, service_set: &ServiceSet) {
        // Variables ordered to match NatsStatsMetrics fields
        let mut processing_time_samples = 0u64; // for average_processing_time calculation
        let mut total_errors = 0u64; // maps to: num_errors
        let mut total_requests = 0u64; // maps to: num_requests
        let mut total_processing_time_nanos = 0u64; // maps to: processing_time (nanoseconds from NATS)
        let mut endpoint_count = 0u64; // for derived metrics

        let service_count = service_set.services().len() as i64;

        for service in service_set.services() {
            for endpoint in &service.endpoints {
                endpoint_count += 1;

                if let Some(ref stats) = endpoint.data {
                    total_errors += stats.num_errors;
                    total_requests += stats.num_requests;
                    total_processing_time_nanos += stats.processing_time;

                    if stats.num_requests > 0 {
                        processing_time_samples += 1;
                    }
                }
            }
        }

        // Update metrics (ordered to match NatsStatsMetrics fields)
        // Calculate average processing time in milliseconds (maps to: average_processing_time)
        if processing_time_samples > 0 && total_requests > 0 {
            let avg_time_nanos = total_processing_time_nanos as f64 / total_requests as f64;
            let avg_time_ms = avg_time_nanos / 1_000_000.0; // Convert nanoseconds to milliseconds
421
            self.service_processing_ms_avg.set(avg_time_ms);
422
        } else {
423
            self.service_processing_ms_avg.set(0.0);
424
425
        }

426
427
428
        self.service_errors_total.set(total_errors as i64); // maps to: num_errors
        self.service_requests_total.set(total_requests as i64); // maps to: num_requests
        self.service_processing_ms_total
429
            .set((total_processing_time_nanos / 1_000_000) as i64); // maps to: processing_time (converted to milliseconds)
430
431
        self.service_active_services.set(service_count); // derived from ServiceSet.services
        self.service_active_endpoints.set(endpoint_count as i64); // derived from ServiceInfo.endpoints
432
433
434
435
    }

    /// Reset all metrics to zero. Useful when no data is available or to clear stale values.
    pub fn reset_to_zeros(&self) {
436
437
438
439
        self.service_processing_ms_avg.set(0.0);
        self.service_errors_total.set(0);
        self.service_requests_total.set(0);
        self.service_processing_ms_total.set(0);
440
441
        self.service_active_services.set(0);
        self.service_active_endpoints.set(0);
442
443
    }
}