distributed.rs 17.7 KB
Newer Older
1
2
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
Ryan Olson's avatar
Ryan Olson committed
3

4
use crate::component::{Component, Instance};
5
use crate::pipeline::PipelineError;
6
use crate::storage::key_value_store::{
7
8
    EtcdStore, KeyValueStore, KeyValueStoreEnum, KeyValueStoreManager, KeyValueStoreSelect,
    MemoryStore,
9
};
10
use crate::transports::nats::DRTNatsClientPrometheusMetrics;
Ryan Olson's avatar
Ryan Olson committed
11
use crate::{
12
    component::{self, ComponentBuilder, Endpoint, Namespace},
13
    discovery::Discovery,
14
15
    metrics::PrometheusUpdateCallback,
    metrics::{MetricsHierarchy, MetricsRegistry},
Ryan Olson's avatar
Ryan Olson committed
16
17
18
    service::ServiceClient,
    transports::{etcd, nats, tcp},
};
19
use crate::{discovery, system_status_server, transports};
Ryan Olson's avatar
Ryan Olson committed
20

21
use super::utils::GracefulShutdownTracker;
22
23
use crate::SystemHealth;
use crate::runtime::Runtime;
Ryan Olson's avatar
Ryan Olson committed
24

25
26
use async_once_cell::OnceCell;
use std::sync::{Arc, OnceLock, Weak};
27
use tokio::sync::watch::Receiver;
28
29

use anyhow::Result;
Ryan Olson's avatar
Ryan Olson committed
30
31
use derive_getters::Dissolve;
use figment::error;
32
33
use std::collections::HashMap;
use tokio::sync::Mutex;
34
use tokio_util::sync::CancellationToken;
Ryan Olson's avatar
Ryan Olson committed
35

36
37
type InstanceMap = HashMap<Endpoint, Weak<Receiver<Vec<Instance>>>>;

38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/// Distributed [Runtime] which provides access to shared resources across the cluster, this includes
/// communication protocols and transports.
#[derive(Clone)]
pub struct DistributedRuntime {
    // local runtime
    runtime: Runtime,

    // we might consider a unifed transport manager here
    etcd_client: Option<transports::etcd::Client>,
    nats_client: Option<transports::nats::Client>,
    store: KeyValueStoreManager,
    tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
    system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,

    // Service discovery client
    discovery_client: Arc<dyn discovery::Discovery>,

55
56
57
58
    // Discovery metadata (only used for Kubernetes backend)
    // Shared with system status server to expose via /metadata endpoint
    discovery_metadata: Option<Arc<tokio::sync::RwLock<discovery::DiscoveryMetadata>>>,

59
60
61
62
63
64
65
    // local registry for components
    // the registry allows us to use share runtime resources across instances of the same component object.
    // take for example two instances of a client to the same remote component. The registry allows us to use
    // a single endpoint watcher for both clients, this keeps the number background tasking watching specific
    // paths in etcd to a minimum.
    component_registry: component::Registry,

66
    instance_sources: Arc<tokio::sync::Mutex<InstanceMap>>,
67
68
69
70
71
72
73
74

    // Health Status
    system_health: Arc<parking_lot::Mutex<SystemHealth>>,

    // This hierarchy's own metrics registry
    metrics_registry: MetricsRegistry,
}

75
impl MetricsHierarchy for DistributedRuntime {
76
77
78
79
    fn basename(&self) -> String {
        "".to_string() // drt has no basename. Basename only begins with the Namespace.
    }

80
81
82
83
84
85
    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
        vec![] // drt is the root, so no parent hierarchies
    }

    fn get_metrics_registry(&self) -> &MetricsRegistry {
        &self.metrics_registry
86
87
88
    }
}

Ryan Olson's avatar
Ryan Olson committed
89
90
91
92
93
94
impl std::fmt::Debug for DistributedRuntime {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "DistributedRuntime")
    }
}

Ryan Olson's avatar
Ryan Olson committed
95
96
impl DistributedRuntime {
    pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> {
97
        let (selected_kv_store, nats_config) = config.dissolve();
Ryan Olson's avatar
Ryan Olson committed
98
99
100

        let runtime_clone = runtime.clone();

101
102
        let (etcd_client, store) = match selected_kv_store {
            KeyValueStoreSelect::Etcd(etcd_config) => {
103
104
105
106
107
108
109
                let etcd_client = etcd::Client::new(*etcd_config, runtime_clone).await.inspect_err(|err|
                    // The returned error doesn't show because of a dropped runtime error, so
                    // log it first.
                    tracing::error!(%err, "Could not connect to etcd. Pass `--store-kv ..` to use a different backend or start etcd."))?;
                let store = KeyValueStoreManager::etcd(etcd_client.clone());
                (Some(etcd_client), store)
            }
110
111
            KeyValueStoreSelect::File(root) => (None, KeyValueStoreManager::file(root)),
            KeyValueStoreSelect::Memory => (None, KeyValueStoreManager::memory()),
112
        };
Ryan Olson's avatar
Ryan Olson committed
113

114
        let nats_client = Some(nats_config.clone().connect().await?);
Ryan Olson's avatar
Ryan Olson committed
115

116
        // Start system status server for health and metrics if enabled in configuration
117
118
119
120
121
122
123
124
        let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();
        // IMPORTANT: We must extract cancel_token from runtime BEFORE moving runtime into the struct below.
        // This is because after moving, runtime is no longer accessible in this scope (ownership rules).
        let cancel_token = if config.system_server_enabled() {
            Some(runtime.clone().child_token())
        } else {
            None
        };
125
126
        let starting_health_status = config.starting_health_status.clone();
        let use_endpoint_health_status = config.use_endpoint_health_status.clone();
127
128
        let health_endpoint_path = config.system_health_path.clone();
        let live_endpoint_path = config.system_live_path.clone();
129
        let system_health = Arc::new(parking_lot::Mutex::new(SystemHealth::new(
130
131
            starting_health_status,
            use_endpoint_health_status,
132
133
            health_endpoint_path,
            live_endpoint_path,
134
        )));
135

136
137
        let nats_client_for_metrics = nats_client.clone();

138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
        // Initialize discovery client based on backend configuration
        let discovery_backend =
            std::env::var("DYN_DISCOVERY_BACKEND").unwrap_or_else(|_| "kv_store".to_string());

        let (discovery_client, discovery_metadata) = match discovery_backend.as_str() {
            "kubernetes" => {
                tracing::info!("Initializing Kubernetes discovery backend");
                let metadata = Arc::new(tokio::sync::RwLock::new(
                    crate::discovery::DiscoveryMetadata::new(),
                ));
                let client = crate::discovery::KubeDiscoveryClient::new(
                    metadata.clone(),
                    runtime.primary_token(),
                )
                .await
                .inspect_err(
                    |err| tracing::error!(%err, "Failed to initialize Kubernetes discovery client"),
                )?;
                (Arc::new(client) as Arc<dyn Discovery>, Some(metadata))
            }
            _ => {
                tracing::info!("Initializing KV store discovery backend");
                use crate::discovery::KVStoreDiscovery;
                (
                    Arc::new(KVStoreDiscovery::new(
                        store.clone(),
                        runtime.primary_token(),
                    )) as Arc<dyn Discovery>,
                    None,
                )
            }
169
170
        };

171
        let distributed_runtime = Self {
Ryan Olson's avatar
Ryan Olson committed
172
173
            runtime,
            etcd_client,
174
            store,
Ryan Olson's avatar
Ryan Olson committed
175
176
            nats_client,
            tcp_server: Arc::new(OnceCell::new()),
177
            system_status_server: Arc::new(OnceLock::new()),
178
            discovery_client,
179
            discovery_metadata,
Ryan Olson's avatar
Ryan Olson committed
180
            component_registry: component::Registry::new(),
181
            instance_sources: Arc::new(Mutex::new(HashMap::new())),
182
            metrics_registry: crate::MetricsRegistry::new(),
183
            system_health,
184
185
        };

186
187
188
189
190
        if let Some(nats_client_for_metrics) = nats_client_for_metrics {
            let nats_client_metrics = DRTNatsClientPrometheusMetrics::new(
                &distributed_runtime,
                nats_client_for_metrics.client().clone(),
            )?;
191
            // Register a callback to update NATS client metrics on the DRT's metrics registry
192
193
194
195
196
197
198
199
            let nats_client_callback = Arc::new({
                let nats_client_clone = nats_client_metrics.clone();
                move || {
                    nats_client_clone.set_from_client_stats();
                    Ok(())
                }
            });
            distributed_runtime
200
201
                .metrics_registry
                .add_update_callback(nats_client_callback);
202
        }
203

204
205
206
207
208
209
        // Initialize the uptime gauge in SystemHealth
        distributed_runtime
            .system_health
            .lock()
            .initialize_uptime_gauge(&distributed_runtime)?;

210
        // Handle system status server initialization
211
        if let Some(cancel_token) = cancel_token {
212
            // System server is enabled - start both the state and HTTP server
213
            let host = config.system_host.clone();
214
            let port = config.system_port as u16;
215

216
            // Start system status server (it creates SystemStatusState internally)
217
            match crate::system_status_server::spawn_system_status_server(
218
219
220
221
                &host,
                port,
                cancel_token,
                Arc::new(distributed_runtime.clone()),
222
                distributed_runtime.discovery_metadata.clone(),
223
224
225
            )
            .await
            {
226
                Ok((addr, handle)) => {
227
                    tracing::info!("System status server started successfully on {}", addr);
228

229
230
231
232
233
234
                    // Store system status server information
                    let system_status_server_info =
                        crate::system_status_server::SystemStatusServerInfo::new(
                            addr,
                            Some(handle),
                        );
235

236
                    // Initialize the system_status_server field
237
                    distributed_runtime
238
239
240
                        .system_status_server
                        .set(Arc::new(system_status_server_info))
                        .expect("System status server info should only be set once");
241
242
                }
                Err(e) => {
243
                    tracing::error!("System status server startup failed: {}", e);
244
                }
245
            }
246
        } else {
247
            // System server HTTP is disabled, but uptime metrics are still being tracked via SystemHealth
248
249
250
            tracing::debug!(
                "System status server HTTP endpoints disabled, but uptime metrics are being tracked"
            );
251
252
        }

253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
        // Start health check manager if enabled
        if config.health_check_enabled {
            let health_check_config = crate::health_check::HealthCheckConfig {
                canary_wait_time: std::time::Duration::from_secs(config.canary_wait_time_secs),
                request_timeout: std::time::Duration::from_secs(
                    config.health_check_request_timeout_secs,
                ),
            };

            // Start the health check manager (spawns per-endpoint monitoring tasks)
            match crate::health_check::start_health_check_manager(
                distributed_runtime.clone(),
                Some(health_check_config),
            )
            .await
            {
                Ok(()) => tracing::info!(
                    "Health check manager started (canary_wait_time: {}s, request_timeout: {}s)",
                    config.canary_wait_time_secs,
                    config.health_check_request_timeout_secs
                ),
                Err(e) => tracing::error!("Health check manager failed to start: {}", e),
            }
        }

278
        Ok(distributed_runtime)
Ryan Olson's avatar
Ryan Olson committed
279
280
281
    }

    pub async fn from_settings(runtime: Runtime) -> Result<Self> {
282
        let config = DistributedConfig::from_settings();
Ryan Olson's avatar
Ryan Olson committed
283
284
285
286
287
288
289
        Self::new(runtime, config).await
    }

    pub fn runtime(&self) -> &Runtime {
        &self.runtime
    }

290
291
292
293
    pub fn primary_token(&self) -> CancellationToken {
        self.runtime.primary_token()
    }

294
295
296
297
298
299
300
301
302
303
304
    // TODO: Don't hand out pointers, instead have methods to use the registry in friendly ways
    // (without being aware of async locks and so on)
    pub fn component_registry(&self) -> &component::Registry {
        &self.component_registry
    }

    // TODO: Don't hand out pointers, instead provide system health related services.
    pub fn system_health(&self) -> Arc<parking_lot::Mutex<SystemHealth>> {
        self.system_health.clone()
    }

305
    pub fn connection_id(&self) -> u64 {
306
        self.discovery_client.instance_id()
Ryan Olson's avatar
Ryan Olson committed
307
308
309
310
    }

    pub fn shutdown(&self) {
        self.runtime.shutdown();
311
        self.store.shutdown();
Ryan Olson's avatar
Ryan Olson committed
312
313
314
315
    }

    /// Create a [`Namespace`]
    pub fn namespace(&self, name: impl Into<String>) -> Result<Namespace> {
316
        Namespace::new(self.clone(), name.into())
Ryan Olson's avatar
Ryan Olson committed
317
318
    }

319
320
321
    /// Returns the discovery interface for service registration and discovery
    pub fn discovery(&self) -> Arc<dyn Discovery> {
        self.discovery_client.clone()
322
323
    }

324
325
    pub(crate) fn service_client(&self) -> Option<ServiceClient> {
        self.nats_client().map(|nc| ServiceClient::new(nc.clone()))
Ryan Olson's avatar
Ryan Olson committed
326
327
    }

328
    pub async fn tcp_server(&self) -> Result<Arc<tcp::server::TcpStreamServer>> {
Ryan Olson's avatar
Ryan Olson committed
329
330
331
332
333
        Ok(self
            .tcp_server
            .get_or_try_init(async move {
                let options = tcp::server::ServerOptions::default();
                let server = tcp::server::TcpStreamServer::new(options).await?;
334
                Ok::<_, PipelineError>(server)
Ryan Olson's avatar
Ryan Olson committed
335
336
337
338
339
            })
            .await?
            .clone())
    }

340
341
    pub fn nats_client(&self) -> Option<&nats::Client> {
        self.nats_client.as_ref()
Ryan Olson's avatar
Ryan Olson committed
342
343
    }

344
345
346
347
348
    /// Get system status server information if available
    pub fn system_status_server_info(
        &self,
    ) -> Option<Arc<crate::system_status_server::SystemStatusServerInfo>> {
        self.system_status_server.get().cloned()
349
350
    }

351
    // todo(ryan): deprecate this as we move to Discovery traits and Component Identifiers
352
353
354
    //
    // Try to use `store()` instead of this. Only use this if you have not been able to migrate
    // yet, or if you require etcd-specific features like distributed locking (rare).
355
    pub fn etcd_client(&self) -> Option<etcd::Client> {
Ryan Olson's avatar
Ryan Olson committed
356
357
        self.etcd_client.clone()
    }
358

359
360
    /// An interface to store things. Will eventually replace `etcd_client`.
    /// Currently does key-value, but will grow to include whatever we need to store.
361
362
    pub fn store(&self) -> &KeyValueStoreManager {
        &self.store
363
364
    }

365
366
367
    pub fn child_token(&self) -> CancellationToken {
        self.runtime.child_token()
    }
368

369
370
371
372
    pub(crate) fn graceful_shutdown_tracker(&self) -> Arc<GracefulShutdownTracker> {
        self.runtime.graceful_shutdown_tracker()
    }

373
    pub fn instance_sources(&self) -> Arc<Mutex<InstanceMap>> {
374
375
        self.instance_sources.clone()
    }
Ryan Olson's avatar
Ryan Olson committed
376
377
378
379
}

#[derive(Dissolve)]
pub struct DistributedConfig {
380
    pub store_backend: KeyValueStoreSelect,
Ryan Olson's avatar
Ryan Olson committed
381
382
383
384
    pub nats_config: nats::ClientOptions,
}

impl DistributedConfig {
385
    pub fn from_settings() -> DistributedConfig {
Ryan Olson's avatar
Ryan Olson committed
386
        DistributedConfig {
387
            store_backend: KeyValueStoreSelect::Etcd(Box::default()),
Ryan Olson's avatar
Ryan Olson committed
388
389
390
            nats_config: nats::ClientOptions::default(),
        }
    }
Ryan Olson's avatar
Ryan Olson committed
391
392

    pub fn for_cli() -> DistributedConfig {
393
394
395
396
397
398
        let etcd_config = etcd::ClientOptions {
            attach_lease: false,
            ..Default::default()
        };
        DistributedConfig {
            store_backend: KeyValueStoreSelect::Etcd(Box::new(etcd_config)),
Ryan Olson's avatar
Ryan Olson committed
399
            nats_config: nats::ClientOptions::default(),
400
        }
Ryan Olson's avatar
Ryan Olson committed
401
    }
Ryan Olson's avatar
Ryan Olson committed
402
}
403

404
pub mod distributed_test_utils {
405
406
    //! Common test helper functions for DistributedRuntime tests

407
    /// Helper function to create a DRT instance for integration-only tests.
408
    /// Uses from_current to leverage existing tokio runtime
409
    /// Note: Settings are read from environment variables inside DistributedRuntime::from_settings
410
    #[cfg(feature = "integration")]
411
412
413
    pub async fn create_test_drt_async() -> super::DistributedRuntime {
        use crate::{storage::key_value_store::KeyValueStoreSelect, transports::nats};

414
        let rt = crate::Runtime::from_current().unwrap();
415
416
417
418
419
        let config = super::DistributedConfig {
            store_backend: KeyValueStoreSelect::Memory,
            nats_config: nats::ClientOptions::default(),
        };
        super::DistributedRuntime::new(rt, config).await.unwrap()
420
421
    }
}
422

423
#[cfg(all(test, feature = "integration"))]
424
425
426
427
428
429
mod tests {
    use super::distributed_test_utils::create_test_drt_async;

    #[tokio::test]
    async fn test_drt_uptime_after_delay_system_disabled() {
        // Test uptime with system status server disabled
430
        temp_env::async_with_vars([("DYN_SYSTEM_PORT", None::<&str>)], async {
431
432
433
434
435
436
437
            // Start a DRT
            let drt = create_test_drt_async().await;

            // Wait 50ms
            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;

            // Check that uptime is 50+ ms
438
            let uptime = drt.system_health.lock().uptime();
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
            assert!(
                uptime >= std::time::Duration::from_millis(50),
                "Expected uptime to be at least 50ms, but got {:?}",
                uptime
            );

            println!(
                "✓ DRT uptime test passed (system disabled): uptime = {:?}",
                uptime
            );
        })
        .await;
    }

    #[tokio::test]
    async fn test_drt_uptime_after_delay_system_enabled() {
        // Test uptime with system status server enabled
456
        temp_env::async_with_vars([("DYN_SYSTEM_PORT", Some("8081"))], async {
457
458
459
460
461
462
463
            // Start a DRT
            let drt = create_test_drt_async().await;

            // Wait 50ms
            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;

            // Check that uptime is 50+ ms
464
            let uptime = drt.system_health.lock().uptime();
465
466
467
468
469
470
471
472
473
474
475
476
477
478
            assert!(
                uptime >= std::time::Duration::from_millis(50),
                "Expected uptime to be at least 50ms, but got {:?}",
                uptime
            );

            println!(
                "✓ DRT uptime test passed (system enabled): uptime = {:?}",
                uptime
            );
        })
        .await;
    }
}