distributed.rs 16.8 KB
Newer Older
1
2
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
Ryan Olson's avatar
Ryan Olson committed
3

4
5
use crate::component::Component;
use crate::pipeline::PipelineError;
6
use crate::storage::key_value_store::{
7
8
    EtcdStore, KeyValueStore, KeyValueStoreEnum, KeyValueStoreManager, KeyValueStoreSelect,
    MemoryStore,
9
};
10
use crate::transports::nats::DRTNatsClientPrometheusMetrics;
Ryan Olson's avatar
Ryan Olson committed
11
use crate::{
12
    component::{self, ComponentBuilder, Endpoint, InstanceSource, Namespace},
13
    discovery::Discovery,
14
15
    metrics::PrometheusUpdateCallback,
    metrics::{MetricsHierarchy, MetricsRegistry},
Ryan Olson's avatar
Ryan Olson committed
16
17
18
    service::ServiceClient,
    transports::{etcd, nats, tcp},
};
19
use crate::{discovery, system_status_server, transports};
Ryan Olson's avatar
Ryan Olson committed
20

21
use super::utils::GracefulShutdownTracker;
22
23
use crate::SystemHealth;
use crate::runtime::Runtime;
Ryan Olson's avatar
Ryan Olson committed
24

25
26
27
28
use async_once_cell::OnceCell;
use std::sync::{Arc, OnceLock, Weak};

use anyhow::Result;
Ryan Olson's avatar
Ryan Olson committed
29
30
use derive_getters::Dissolve;
use figment::error;
31
32
use std::collections::HashMap;
use tokio::sync::Mutex;
33
use tokio_util::sync::CancellationToken;
Ryan Olson's avatar
Ryan Olson committed
34

35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/// Distributed [Runtime] which provides access to shared resources across the cluster, this includes
/// communication protocols and transports.
#[derive(Clone)]
pub struct DistributedRuntime {
    // local runtime
    runtime: Runtime,

    // we might consider a unifed transport manager here
    etcd_client: Option<transports::etcd::Client>,
    nats_client: Option<transports::nats::Client>,
    store: KeyValueStoreManager,
    tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
    system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,

    // Service discovery client
    discovery_client: Arc<dyn discovery::Discovery>,

    // local registry for components
    // the registry allows us to use share runtime resources across instances of the same component object.
    // take for example two instances of a client to the same remote component. The registry allows us to use
    // a single endpoint watcher for both clients, this keeps the number background tasking watching specific
    // paths in etcd to a minimum.
    component_registry: component::Registry,

    // Will only have static components that are not discoverable via etcd, they must be know at
    // startup. Will not start etcd.
    is_static: bool,

    instance_sources: Arc<tokio::sync::Mutex<HashMap<Endpoint, Weak<InstanceSource>>>>,

    // Health Status
    system_health: Arc<parking_lot::Mutex<SystemHealth>>,

    // This hierarchy's own metrics registry
    metrics_registry: MetricsRegistry,
}

72
impl MetricsHierarchy for DistributedRuntime {
73
74
75
76
    fn basename(&self) -> String {
        "".to_string() // drt has no basename. Basename only begins with the Namespace.
    }

77
78
79
80
81
82
    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
        vec![] // drt is the root, so no parent hierarchies
    }

    fn get_metrics_registry(&self) -> &MetricsRegistry {
        &self.metrics_registry
83
84
85
    }
}

Ryan Olson's avatar
Ryan Olson committed
86
87
88
89
90
91
impl std::fmt::Debug for DistributedRuntime {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "DistributedRuntime")
    }
}

Ryan Olson's avatar
Ryan Olson committed
92
93
impl DistributedRuntime {
    pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> {
94
        let (selected_kv_store, nats_config, is_static) = config.dissolve();
Ryan Olson's avatar
Ryan Olson committed
95
96
97

        let runtime_clone = runtime.clone();

98
99
100
101
102
103
104
105
106
107
108
109
        let (etcd_client, store) = match (is_static, selected_kv_store) {
            (false, KeyValueStoreSelect::Etcd(etcd_config)) => {
                let etcd_client = etcd::Client::new(*etcd_config, runtime_clone).await.inspect_err(|err|
                    // The returned error doesn't show because of a dropped runtime error, so
                    // log it first.
                    tracing::error!(%err, "Could not connect to etcd. Pass `--store-kv ..` to use a different backend or start etcd."))?;
                let store = KeyValueStoreManager::etcd(etcd_client.clone());
                (Some(etcd_client), store)
            }
            (false, KeyValueStoreSelect::File(root)) => (None, KeyValueStoreManager::file(root)),
            (true, _) | (false, KeyValueStoreSelect::Memory) => {
                (None, KeyValueStoreManager::memory())
110
            }
111
        };
Ryan Olson's avatar
Ryan Olson committed
112

113
        let nats_client = Some(nats_config.clone().connect().await?);
Ryan Olson's avatar
Ryan Olson committed
114

115
        // Start system status server for health and metrics if enabled in configuration
116
117
118
119
120
121
122
123
        let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();
        // IMPORTANT: We must extract cancel_token from runtime BEFORE moving runtime into the struct below.
        // This is because after moving, runtime is no longer accessible in this scope (ownership rules).
        let cancel_token = if config.system_server_enabled() {
            Some(runtime.clone().child_token())
        } else {
            None
        };
124
125
        let starting_health_status = config.starting_health_status.clone();
        let use_endpoint_health_status = config.use_endpoint_health_status.clone();
126
127
        let health_endpoint_path = config.system_health_path.clone();
        let live_endpoint_path = config.system_live_path.clone();
128
        let system_health = Arc::new(parking_lot::Mutex::new(SystemHealth::new(
129
130
            starting_health_status,
            use_endpoint_health_status,
131
132
            health_endpoint_path,
            live_endpoint_path,
133
        )));
134

135
136
        let nats_client_for_metrics = nats_client.clone();

137
        // Initialize discovery backed by KV store
138
        let discovery_client = {
139
140
141
142
143
            use crate::discovery::KVStoreDiscovery;
            Arc::new(KVStoreDiscovery::new(
                store.clone(),
                runtime.primary_token(),
            )) as Arc<dyn Discovery>
144
145
        };

146
        let distributed_runtime = Self {
Ryan Olson's avatar
Ryan Olson committed
147
148
            runtime,
            etcd_client,
149
            store,
Ryan Olson's avatar
Ryan Olson committed
150
151
            nats_client,
            tcp_server: Arc::new(OnceCell::new()),
152
            system_status_server: Arc::new(OnceLock::new()),
153
            discovery_client,
Ryan Olson's avatar
Ryan Olson committed
154
            component_registry: component::Registry::new(),
155
            is_static,
156
            instance_sources: Arc::new(Mutex::new(HashMap::new())),
157
            metrics_registry: crate::MetricsRegistry::new(),
158
            system_health,
159
160
        };

161
162
163
164
165
        if let Some(nats_client_for_metrics) = nats_client_for_metrics {
            let nats_client_metrics = DRTNatsClientPrometheusMetrics::new(
                &distributed_runtime,
                nats_client_for_metrics.client().clone(),
            )?;
166
            // Register a callback to update NATS client metrics on the DRT's metrics registry
167
168
169
170
171
172
173
174
            let nats_client_callback = Arc::new({
                let nats_client_clone = nats_client_metrics.clone();
                move || {
                    nats_client_clone.set_from_client_stats();
                    Ok(())
                }
            });
            distributed_runtime
175
176
                .metrics_registry
                .add_update_callback(nats_client_callback);
177
        }
178

179
180
181
182
183
184
        // Initialize the uptime gauge in SystemHealth
        distributed_runtime
            .system_health
            .lock()
            .initialize_uptime_gauge(&distributed_runtime)?;

185
        // Handle system status server initialization
186
        if let Some(cancel_token) = cancel_token {
187
            // System server is enabled - start both the state and HTTP server
188
            let host = config.system_host.clone();
189
            let port = config.system_port as u16;
190

191
            // Start system status server (it creates SystemStatusState internally)
192
            match crate::system_status_server::spawn_system_status_server(
193
194
195
196
                &host,
                port,
                cancel_token,
                Arc::new(distributed_runtime.clone()),
197
198
199
            )
            .await
            {
200
                Ok((addr, handle)) => {
201
                    tracing::info!("System status server started successfully on {}", addr);
202

203
204
205
206
207
208
                    // Store system status server information
                    let system_status_server_info =
                        crate::system_status_server::SystemStatusServerInfo::new(
                            addr,
                            Some(handle),
                        );
209

210
                    // Initialize the system_status_server field
211
                    distributed_runtime
212
213
214
                        .system_status_server
                        .set(Arc::new(system_status_server_info))
                        .expect("System status server info should only be set once");
215
216
                }
                Err(e) => {
217
                    tracing::error!("System status server startup failed: {}", e);
218
                }
219
            }
220
        } else {
221
            // System server HTTP is disabled, but uptime metrics are still being tracked via SystemHealth
222
223
224
            tracing::debug!(
                "System status server HTTP endpoints disabled, but uptime metrics are being tracked"
            );
225
226
        }

227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
        // Start health check manager if enabled
        if config.health_check_enabled {
            let health_check_config = crate::health_check::HealthCheckConfig {
                canary_wait_time: std::time::Duration::from_secs(config.canary_wait_time_secs),
                request_timeout: std::time::Duration::from_secs(
                    config.health_check_request_timeout_secs,
                ),
            };

            // Start the health check manager (spawns per-endpoint monitoring tasks)
            match crate::health_check::start_health_check_manager(
                distributed_runtime.clone(),
                Some(health_check_config),
            )
            .await
            {
                Ok(()) => tracing::info!(
                    "Health check manager started (canary_wait_time: {}s, request_timeout: {}s)",
                    config.canary_wait_time_secs,
                    config.health_check_request_timeout_secs
                ),
                Err(e) => tracing::error!("Health check manager failed to start: {}", e),
            }
        }

252
        Ok(distributed_runtime)
Ryan Olson's avatar
Ryan Olson committed
253
254
255
    }

    pub async fn from_settings(runtime: Runtime) -> Result<Self> {
256
257
258
259
260
261
262
        let config = DistributedConfig::from_settings(false);
        Self::new(runtime, config).await
    }

    // Call this if you are using static workers that do not need etcd-based discovery.
    pub async fn from_settings_without_discovery(runtime: Runtime) -> Result<Self> {
        let config = DistributedConfig::from_settings(true);
Ryan Olson's avatar
Ryan Olson committed
263
264
265
266
267
268
269
        Self::new(runtime, config).await
    }

    pub fn runtime(&self) -> &Runtime {
        &self.runtime
    }

270
271
272
273
    pub fn primary_token(&self) -> CancellationToken {
        self.runtime.primary_token()
    }

274
275
276
277
278
279
280
281
282
283
284
    // TODO: Don't hand out pointers, instead have methods to use the registry in friendly ways
    // (without being aware of async locks and so on)
    pub fn component_registry(&self) -> &component::Registry {
        &self.component_registry
    }

    // TODO: Don't hand out pointers, instead provide system health related services.
    pub fn system_health(&self) -> Arc<parking_lot::Mutex<SystemHealth>> {
        self.system_health.clone()
    }

285
286
    pub fn connection_id(&self) -> u64 {
        self.store.connection_id()
Ryan Olson's avatar
Ryan Olson committed
287
288
289
290
    }

    pub fn shutdown(&self) {
        self.runtime.shutdown();
291
        self.store.shutdown();
Ryan Olson's avatar
Ryan Olson committed
292
293
294
295
    }

    /// Create a [`Namespace`]
    pub fn namespace(&self, name: impl Into<String>) -> Result<Namespace> {
296
        Namespace::new(self.clone(), name.into(), self.is_static)
Ryan Olson's avatar
Ryan Olson committed
297
298
    }

299
300
301
    /// Returns the discovery interface for service registration and discovery
    pub fn discovery(&self) -> Arc<dyn Discovery> {
        self.discovery_client.clone()
302
303
    }

304
305
    pub(crate) fn service_client(&self) -> Option<ServiceClient> {
        self.nats_client().map(|nc| ServiceClient::new(nc.clone()))
Ryan Olson's avatar
Ryan Olson committed
306
307
    }

308
    pub async fn tcp_server(&self) -> Result<Arc<tcp::server::TcpStreamServer>> {
Ryan Olson's avatar
Ryan Olson committed
309
310
311
312
313
        Ok(self
            .tcp_server
            .get_or_try_init(async move {
                let options = tcp::server::ServerOptions::default();
                let server = tcp::server::TcpStreamServer::new(options).await?;
314
                Ok::<_, PipelineError>(server)
Ryan Olson's avatar
Ryan Olson committed
315
316
317
318
319
            })
            .await?
            .clone())
    }

320
321
    pub fn nats_client(&self) -> Option<&nats::Client> {
        self.nats_client.as_ref()
Ryan Olson's avatar
Ryan Olson committed
322
323
    }

324
325
326
327
328
    /// Get system status server information if available
    pub fn system_status_server_info(
        &self,
    ) -> Option<Arc<crate::system_status_server::SystemStatusServerInfo>> {
        self.system_status_server.get().cloned()
329
330
    }

331
    // todo(ryan): deprecate this as we move to Discovery traits and Component Identifiers
332
333
334
    //
    // Try to use `store()` instead of this. Only use this if you have not been able to migrate
    // yet, or if you require etcd-specific features like distributed locking (rare).
335
    pub fn etcd_client(&self) -> Option<etcd::Client> {
Ryan Olson's avatar
Ryan Olson committed
336
337
        self.etcd_client.clone()
    }
338

339
340
    /// An interface to store things. Will eventually replace `etcd_client`.
    /// Currently does key-value, but will grow to include whatever we need to store.
341
342
    pub fn store(&self) -> &KeyValueStoreManager {
        &self.store
343
344
    }

345
346
347
    pub fn child_token(&self) -> CancellationToken {
        self.runtime.child_token()
    }
348

349
350
351
352
    pub(crate) fn graceful_shutdown_tracker(&self) -> Arc<GracefulShutdownTracker> {
        self.runtime.graceful_shutdown_tracker()
    }

353
354
355
    pub fn instance_sources(&self) -> Arc<Mutex<HashMap<Endpoint, Weak<InstanceSource>>>> {
        self.instance_sources.clone()
    }
Ryan Olson's avatar
Ryan Olson committed
356
357
358
359
}

#[derive(Dissolve)]
pub struct DistributedConfig {
360
    pub store_backend: KeyValueStoreSelect,
Ryan Olson's avatar
Ryan Olson committed
361
    pub nats_config: nats::ClientOptions,
362
    pub is_static: bool,
Ryan Olson's avatar
Ryan Olson committed
363
364
365
}

impl DistributedConfig {
366
    pub fn from_settings(is_static: bool) -> DistributedConfig {
Ryan Olson's avatar
Ryan Olson committed
367
        DistributedConfig {
368
            store_backend: KeyValueStoreSelect::Etcd(Box::default()),
Ryan Olson's avatar
Ryan Olson committed
369
            nats_config: nats::ClientOptions::default(),
370
            is_static,
Ryan Olson's avatar
Ryan Olson committed
371
372
        }
    }
Ryan Olson's avatar
Ryan Olson committed
373
374

    pub fn for_cli() -> DistributedConfig {
375
376
377
378
379
380
        let etcd_config = etcd::ClientOptions {
            attach_lease: false,
            ..Default::default()
        };
        DistributedConfig {
            store_backend: KeyValueStoreSelect::Etcd(Box::new(etcd_config)),
Ryan Olson's avatar
Ryan Olson committed
381
            nats_config: nats::ClientOptions::default(),
382
            is_static: false,
383
        }
Ryan Olson's avatar
Ryan Olson committed
384
    }
Ryan Olson's avatar
Ryan Olson committed
385
}
386

387
pub mod distributed_test_utils {
388
389
390
    //! Common test helper functions for DistributedRuntime tests
    // TODO: Use in-memory DistributedRuntime for tests instead of full runtime when available.

391
    /// Helper function to create a DRT instance for integration-only tests.
392
393
394
395
396
397
398
399
400
401
    /// Uses from_current to leverage existing tokio runtime
    /// Note: Settings are read from environment variables inside DistributedRuntime::from_settings_without_discovery
    #[cfg(feature = "integration")]
    pub async fn create_test_drt_async() -> crate::DistributedRuntime {
        let rt = crate::Runtime::from_current().unwrap();
        crate::DistributedRuntime::from_settings_without_discovery(rt)
            .await
            .unwrap()
    }
}
402

403
#[cfg(all(test, feature = "integration"))]
404
405
406
407
408
409
410
411
412
413
414
415
416
417
mod tests {
    use super::distributed_test_utils::create_test_drt_async;

    #[tokio::test]
    async fn test_drt_uptime_after_delay_system_disabled() {
        // Test uptime with system status server disabled
        temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async {
            // Start a DRT
            let drt = create_test_drt_async().await;

            // Wait 50ms
            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;

            // Check that uptime is 50+ ms
418
            let uptime = drt.system_health.lock().uptime();
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
            assert!(
                uptime >= std::time::Duration::from_millis(50),
                "Expected uptime to be at least 50ms, but got {:?}",
                uptime
            );

            println!(
                "✓ DRT uptime test passed (system disabled): uptime = {:?}",
                uptime
            );
        })
        .await;
    }

    #[tokio::test]
    async fn test_drt_uptime_after_delay_system_enabled() {
        // Test uptime with system status server enabled
436
        temp_env::async_with_vars([("DYN_SYSTEM_PORT", Some("8081"))], async {
437
438
439
440
441
442
443
            // Start a DRT
            let drt = create_test_drt_async().await;

            // Wait 50ms
            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;

            // Check that uptime is 50+ ms
444
            let uptime = drt.system_health.lock().uptime();
445
446
447
448
449
450
451
452
453
454
455
456
457
458
            assert!(
                uptime >= std::time::Duration::from_millis(50),
                "Expected uptime to be at least 50ms, but got {:?}",
                uptime
            );

            println!(
                "✓ DRT uptime test passed (system enabled): uptime = {:?}",
                uptime
            );
        })
        .await;
    }
}