service.rs 16.4 KB
Newer Older
1
2
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
Ryan Olson's avatar
Ryan Olson committed
3
4
5
6
7
8
9

// TODO - refactor this entire module
//
// we want to carry forward the concept of live vs ready for the components
// we will want to associate the components cancellation token with the
// component's "service state"

10
use crate::{
11
    DistributedRuntime, Result,
12
13
    component::Component,
    error,
14
    metrics::{MetricsHierarchy, prometheus_names, prometheus_names::nats_service},
15
16
17
18
    traits::*,
    transports::nats,
    utils::stream,
};
Ryan Olson's avatar
Ryan Olson committed
19
20
21
22
23

use async_nats::Message;
use async_stream::try_stream;
use bytes::Bytes;
use derive_getters::Dissolve;
Ryan Olson's avatar
Ryan Olson committed
24
use futures::stream::{StreamExt, TryStreamExt};
25
use prometheus;
26
use serde::{Deserialize, Serialize, de::DeserializeOwned};
Ryan Olson's avatar
Ryan Olson committed
27
28
29
30
31
32
33
use std::time::Duration;

pub struct ServiceClient {
    nats_client: nats::Client,
}

impl ServiceClient {
34
    pub fn new(nats_client: nats::Client) -> Self {
Ryan Olson's avatar
Ryan Olson committed
35
36
37
38
        ServiceClient { nats_client }
    }
}

39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/// ServiceSet contains a collection of services with their endpoints and metrics
///
/// Tree structure:
/// Structure:
/// - ServiceSet
///   - services: Vec<ServiceInfo>
///     - name: String
///     - id: String
///     - version: String
///     - started: String
///     - endpoints: Vec<EndpointInfo>
///       - name: String
///       - subject: String
///       - data: Option<NatsStatsMetrics>
///         - average_processing_time: f64
///         - last_error: String
///         - num_errors: u64
///         - num_requests: u64
///         - processing_time: u64
///         - queue_group: String
///         - data: serde_json::Value (custom stats)
Ryan Olson's avatar
Ryan Olson committed
60
#[derive(Debug, Clone, Serialize, Deserialize)]
Ryan Olson's avatar
Ryan Olson committed
61
62
63
64
pub struct ServiceSet {
    services: Vec<ServiceInfo>,
}

65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
/// This is a example JSON from `nats req '$SRV.STATS.dynamo_backend'`:
/// {
///   "type": "io.nats.micro.v1.stats_response",
///   "name": "dynamo_backend",
///   "id": "bdu7nA8tbhy9mEkxIWlkBA",
///   "version": "0.0.1",
///   "started": "2025-08-08T05:07:17.720783523Z",
///   "endpoints": [
///     {
///       "name": "dynamo_backend-generate-694d988806b92e39",
///       "subject": "dynamo_backend.generate-694d988806b92e39",
///       "num_requests": 0,
///       "num_errors": 0,
///       "processing_time": 0,
///       "average_processing_time": 0,
///       "last_error": "",
///       "data": {
///         "val": 10
///       },
///       "queue_group": "q"
///     }
///   ]
/// }
Ryan Olson's avatar
Ryan Olson committed
88
89
90
91
92
93
94
95
96
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceInfo {
    pub name: String,
    pub id: String,
    pub version: String,
    pub started: String,
    pub endpoints: Vec<EndpointInfo>,
}

97
/// Each endpoint has name, subject, num_requests, num_errors, processing_time, average_processing_time, last_error, queue_group, and data
Ryan Olson's avatar
Ryan Olson committed
98
99
100
101
102
#[derive(Debug, Clone, Serialize, Deserialize, Dissolve)]
pub struct EndpointInfo {
    pub name: String,
    pub subject: String,

103
    /// Extra fields that don't fit in EndpointInfo will be flattened into the Metrics struct.
Ryan Olson's avatar
Ryan Olson committed
104
    #[serde(flatten)]
105
    pub data: Option<NatsStatsMetrics>,
Ryan Olson's avatar
Ryan Olson committed
106
107
}

Ryan Olson's avatar
Ryan Olson committed
108
109
110
111
112
impl EndpointInfo {
    pub fn id(&self) -> Result<i64> {
        let id = self
            .subject
            .split('-')
113
            .next_back()
Ryan Olson's avatar
Ryan Olson committed
114
115
116
117
118
            .ok_or_else(|| error!("No id found in subject"))?;

        i64::from_str_radix(id, 16).map_err(|e| error!("Invalid id format: {}", e))
    }
}
119
120
121
122
123
124

// TODO: This is _really_ close to the async_nats::service::Stats object,
// but it's missing a few fields like "name", so use a temporary struct
// for easy deserialization. Ideally, this type already exists or can
// be exposed in the library somewhere.
/// Stats structure returned from NATS service API
125
/// https://github.com/nats-io/nats.rs/blob/main/async-nats/src/service/endpoint.rs
Ryan Olson's avatar
Ryan Olson committed
126
#[derive(Debug, Clone, Serialize, Deserialize, Dissolve)]
127
128
129
pub struct NatsStatsMetrics {
    // Standard NATS Stats Service API fields from $SRV.STATS.<service_name> requests
    pub average_processing_time: u64, // in nanoseconds according to nats-io
130
131
132
    pub last_error: String,
    pub num_errors: u64,
    pub num_requests: u64,
133
    pub processing_time: u64, // in nanoseconds according to nats-io
134
135
136
137
    pub queue_group: String,
    // Field containing custom stats handler data
    pub data: serde_json::Value,
}
Ryan Olson's avatar
Ryan Olson committed
138

139
impl NatsStatsMetrics {
Ryan Olson's avatar
Ryan Olson committed
140
    pub fn decode<T: for<'de> Deserialize<'de>>(self) -> Result<T> {
141
        serde_json::from_value(self.data).map_err(Into::into)
Ryan Olson's avatar
Ryan Olson committed
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
    }
}

impl ServiceClient {
    pub async fn unary(
        &self,
        subject: impl Into<String>,
        payload: impl Into<Bytes>,
    ) -> Result<Message> {
        let response = self
            .nats_client
            .client()
            .request(subject.into(), payload.into())
            .await?;
        Ok(response)
    }

159
160
161
    pub async fn collect_services(
        &self,
        service_name: &str,
162
        timeout: Duration,
163
    ) -> Result<ServiceSet> {
Ryan Olson's avatar
Ryan Olson committed
164
        let sub = self.nats_client.scrape_service(service_name).await?;
165
166
        if timeout.is_zero() {
            tracing::warn!("collect_services: timeout is zero");
Ryan Olson's avatar
Ryan Olson committed
167
        }
168
169
        if timeout > Duration::from_secs(10) {
            tracing::warn!("collect_services: timeout is greater than 10 seconds");
Ryan Olson's avatar
Ryan Olson committed
170
        }
171
        let deadline = tokio::time::Instant::now() + timeout;
Ryan Olson's avatar
Ryan Olson committed
172

173
174
175
176
177
178
179
180
181
182
183
184
185
186
        let mut services = vec![];
        let mut s = stream::until_deadline(sub, deadline);
        while let Some(message) = s.next().await {
            if message.payload.is_empty() {
                // Expected while we wait for KV metrics in worker to start
                tracing::trace!(service_name, "collect_services: empty payload from nats");
                continue;
            }
            let info = serde_json::from_slice::<ServiceInfo>(&message.payload);
            match info {
                Ok(info) => services.push(info),
                Err(err) => {
                    let payload = String::from_utf8_lossy(&message.payload);
                    tracing::debug!(%err, service_name, %payload, "error decoding service info");
Ryan Olson's avatar
Ryan Olson committed
187
                }
188
189
            }
        }
Ryan Olson's avatar
Ryan Olson committed
190

Ryan Olson's avatar
Ryan Olson committed
191
        Ok(ServiceSet { services })
Ryan Olson's avatar
Ryan Olson committed
192
193
194
195
196
197
198
199
200
    }
}

impl ServiceSet {
    pub fn into_endpoints(self) -> impl Iterator<Item = EndpointInfo> {
        self.services
            .into_iter()
            .flat_map(|s| s.endpoints.into_iter())
    }
201
202
203
204
205

    /// Get a reference to the services in this ServiceSet
    pub fn services(&self) -> &[ServiceInfo] {
        &self.services
    }
Ryan Olson's avatar
Ryan Olson committed
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
}

#[cfg(test)]
mod tests {

    use super::*;

    #[test]
    fn test_service_set() {
        let services = vec![
            ServiceInfo {
                name: "service1".to_string(),
                id: "1".to_string(),
                version: "1.0".to_string(),
                started: "2021-01-01".to_string(),
                endpoints: vec![
                    EndpointInfo {
                        name: "endpoint1".to_string(),
                        subject: "subject1".to_string(),
225
226
                        data: Some(NatsStatsMetrics {
                            average_processing_time: 100_000, // 0.1ms = 100,000 nanoseconds
227
228
229
230
231
232
233
                            last_error: "none".to_string(),
                            num_errors: 0,
                            num_requests: 10,
                            processing_time: 100,
                            queue_group: "group1".to_string(),
                            data: serde_json::json!({"key": "value1"}),
                        }),
Ryan Olson's avatar
Ryan Olson committed
234
235
236
237
                    },
                    EndpointInfo {
                        name: "endpoint2-foo".to_string(),
                        subject: "subject2".to_string(),
238
239
                        data: Some(NatsStatsMetrics {
                            average_processing_time: 100_000, // 0.1ms = 100,000 nanoseconds
240
241
242
243
244
245
246
                            last_error: "none".to_string(),
                            num_errors: 0,
                            num_requests: 10,
                            processing_time: 100,
                            queue_group: "group1".to_string(),
                            data: serde_json::json!({"key": "value1"}),
                        }),
Ryan Olson's avatar
Ryan Olson committed
247
248
249
250
251
252
253
254
255
256
257
258
                    },
                ],
            },
            ServiceInfo {
                name: "service1".to_string(),
                id: "2".to_string(),
                version: "1.0".to_string(),
                started: "2021-01-01".to_string(),
                endpoints: vec![
                    EndpointInfo {
                        name: "endpoint1".to_string(),
                        subject: "subject1".to_string(),
259
260
                        data: Some(NatsStatsMetrics {
                            average_processing_time: 100_000, // 0.1ms = 100,000 nanoseconds
261
262
263
264
265
266
267
                            last_error: "none".to_string(),
                            num_errors: 0,
                            num_requests: 10,
                            processing_time: 100,
                            queue_group: "group1".to_string(),
                            data: serde_json::json!({"key": "value1"}),
                        }),
Ryan Olson's avatar
Ryan Olson committed
268
269
270
271
                    },
                    EndpointInfo {
                        name: "endpoint2-bar".to_string(),
                        subject: "subject2".to_string(),
272
273
                        data: Some(NatsStatsMetrics {
                            average_processing_time: 100_000, // 0.1ms = 100,000 nanoseconds
274
275
276
277
278
279
280
                            last_error: "none".to_string(),
                            num_errors: 0,
                            num_requests: 10,
                            processing_time: 100,
                            queue_group: "group1".to_string(),
                            data: serde_json::json!({"key": "value2"}),
                        }),
Ryan Olson's avatar
Ryan Olson committed
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
                    },
                ],
            },
        ];

        let service_set = ServiceSet { services };

        let endpoints: Vec<_> = service_set
            .into_endpoints()
            .filter(|e| e.name.starts_with("endpoint2"))
            .collect();

        assert_eq!(endpoints.len(), 2);
    }
}
296
297
298
299
300
301
302
303
304
305
306

/// Prometheus metrics for component service statistics (ordered to match NatsStatsMetrics)
///
/// ⚠️  IMPORTANT: These Prometheus Gauges are COPIES of NATS data, not live references!
///
/// How it works:
/// 1. NATS provides source data via NatsStatsMetrics
/// 2. Metrics callbacks read current NATS values and update these Prometheus Gauges
/// 3. Prometheus scrapes these Gauge values (snapshots, not live data)
///
/// Flow: NATS Service → NatsStatsMetrics (Counters) → Metrics Callback → Prometheus Gauge
307
/// Note: These are snapshots updated when execute_prometheus_update_callbacks() is called.
308
#[derive(Debug, Clone)]
309
310
311
/// Prometheus metrics for NATS server components.
/// Note: Metrics with `_total` names use IntGauge because we copy counter values
/// from underlying services rather than incrementing directly.
312
pub struct ComponentNatsServerPrometheusMetrics {
313
    /// Average processing time in milliseconds (maps to: average_processing_time)
314
    pub service_processing_ms_avg: prometheus::Gauge,
315
    /// Total errors across all endpoints (maps to: num_errors)
316
    pub service_errors_total: prometheus::IntGauge,
317
    /// Total requests across all endpoints (maps to: num_requests)
318
    pub service_requests_total: prometheus::IntGauge,
319
    /// Total processing time in milliseconds (maps to: processing_time)
320
    pub service_processing_ms_total: prometheus::IntGauge,
321
    /// Number of active services (derived from ServiceSet.services)
322
    pub service_active_services: prometheus::IntGauge,
323
    /// Number of active endpoints (derived from ServiceInfo.endpoints)
324
    pub service_active_endpoints: prometheus::IntGauge,
325
326
}

327
impl ComponentNatsServerPrometheusMetrics {
328
329
    /// Create new ComponentServiceMetrics using Component's DistributedRuntime's Prometheus constructors
    pub fn new(component: &Component) -> Result<Self> {
330
331
332
333
334
335
336
337
338
339
340
341
        let service_name = component.service_name();

        // Build labels: service_name first, then component's labels
        let mut labels_vec = vec![("service_name", service_name.as_str())];

        // Add component's labels (convert from (String, String) to (&str, &str))
        for (key, value) in component.labels() {
            labels_vec.push((key.as_str(), value.as_str()));
        }

        let labels: &[(&str, &str)] = &labels_vec;

342
        let service_processing_ms_avg = component.metrics().create_gauge(
343
            nats_service::PROCESSING_MS_AVG,
344
            "Average processing time across all component endpoints in milliseconds",
345
            labels,
346
347
        )?;

348
        let service_errors_total = component.metrics().create_intgauge(
349
            nats_service::ERRORS_TOTAL,
350
            "Total number of errors across all component endpoints",
351
            labels,
352
353
        )?;

354
        let service_requests_total = component.metrics().create_intgauge(
355
            nats_service::REQUESTS_TOTAL,
356
            "Total number of requests across all component endpoints",
357
            labels,
358
359
        )?;

360
        let service_processing_ms_total = component.metrics().create_intgauge(
361
            nats_service::PROCESSING_MS_TOTAL,
362
            "Total processing time across all component endpoints in milliseconds",
363
            labels,
364
365
        )?;

366
        let service_active_services = component.metrics().create_intgauge(
367
            nats_service::ACTIVE_SERVICES,
368
            "Number of active services in this component",
369
            labels,
370
371
        )?;

372
        let service_active_endpoints = component.metrics().create_intgauge(
373
            nats_service::ACTIVE_ENDPOINTS,
374
            "Number of active endpoints across all services",
375
            labels,
376
377
378
        )?;

        Ok(Self {
379
380
381
382
            service_processing_ms_avg,
            service_errors_total,
            service_requests_total,
            service_processing_ms_total,
383
384
            service_active_services,
            service_active_endpoints,
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
        })
    }

    /// Update metrics from scraped ServiceSet data
    pub fn update_from_service_set(&self, service_set: &ServiceSet) {
        // Variables ordered to match NatsStatsMetrics fields
        let mut processing_time_samples = 0u64; // for average_processing_time calculation
        let mut total_errors = 0u64; // maps to: num_errors
        let mut total_requests = 0u64; // maps to: num_requests
        let mut total_processing_time_nanos = 0u64; // maps to: processing_time (nanoseconds from NATS)
        let mut endpoint_count = 0u64; // for derived metrics

        let service_count = service_set.services().len() as i64;

        for service in service_set.services() {
            for endpoint in &service.endpoints {
                endpoint_count += 1;

                if let Some(ref stats) = endpoint.data {
                    total_errors += stats.num_errors;
                    total_requests += stats.num_requests;
                    total_processing_time_nanos += stats.processing_time;

                    if stats.num_requests > 0 {
                        processing_time_samples += 1;
                    }
                }
            }
        }

        // Update metrics (ordered to match NatsStatsMetrics fields)
        // Calculate average processing time in milliseconds (maps to: average_processing_time)
        if processing_time_samples > 0 && total_requests > 0 {
            let avg_time_nanos = total_processing_time_nanos as f64 / total_requests as f64;
            let avg_time_ms = avg_time_nanos / 1_000_000.0; // Convert nanoseconds to milliseconds
420
            self.service_processing_ms_avg.set(avg_time_ms);
421
        } else {
422
            self.service_processing_ms_avg.set(0.0);
423
424
        }

425
426
427
        self.service_errors_total.set(total_errors as i64); // maps to: num_errors
        self.service_requests_total.set(total_requests as i64); // maps to: num_requests
        self.service_processing_ms_total
428
            .set((total_processing_time_nanos / 1_000_000) as i64); // maps to: processing_time (converted to milliseconds)
429
430
        self.service_active_services.set(service_count); // derived from ServiceSet.services
        self.service_active_endpoints.set(endpoint_count as i64); // derived from ServiceInfo.endpoints
431
432
433
434
    }

    /// Reset all metrics to zero. Useful when no data is available or to clear stale values.
    pub fn reset_to_zeros(&self) {
435
436
437
438
        self.service_processing_ms_avg.set(0.0);
        self.service_errors_total.set(0);
        self.service_requests_total.set(0);
        self.service_processing_ms_total.set(0);
439
440
        self.service_active_services.set(0);
        self.service_active_endpoints.set(0);
441
442
    }
}