distributed.rs 24.5 KB
Newer Older
1
2
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
Ryan Olson's avatar
Ryan Olson committed
3

4
use crate::component::{Component, Instance};
5
use crate::pipeline::PipelineError;
6
use crate::storage::key_value_store::{
7
8
    EtcdStore, KeyValueStore, KeyValueStoreEnum, KeyValueStoreManager, KeyValueStoreSelect,
    MemoryStore,
9
};
10
use crate::transports::nats::DRTNatsClientPrometheusMetrics;
Ryan Olson's avatar
Ryan Olson committed
11
use crate::{
12
    component::{self, ComponentBuilder, Endpoint, Namespace},
13
    discovery::Discovery,
14
15
    metrics::PrometheusUpdateCallback,
    metrics::{MetricsHierarchy, MetricsRegistry},
Ryan Olson's avatar
Ryan Olson committed
16
17
18
    service::ServiceClient,
    transports::{etcd, nats, tcp},
};
19
use crate::{discovery, system_status_server, transports};
Ryan Olson's avatar
Ryan Olson committed
20

21
use super::utils::GracefulShutdownTracker;
22
23
use crate::SystemHealth;
use crate::runtime::Runtime;
Ryan Olson's avatar
Ryan Olson committed
24

25
use async_once_cell::OnceCell;
26
use std::fmt;
27
use std::sync::{Arc, OnceLock, Weak};
28
use tokio::sync::watch::Receiver;
29
30

use anyhow::Result;
Ryan Olson's avatar
Ryan Olson committed
31
32
use derive_getters::Dissolve;
use figment::error;
33
34
use std::collections::HashMap;
use tokio::sync::Mutex;
35
use tokio_util::sync::CancellationToken;
Ryan Olson's avatar
Ryan Olson committed
36

37
38
type InstanceMap = HashMap<Endpoint, Weak<Receiver<Vec<Instance>>>>;

39
40
41
42
43
44
45
/// Distributed [Runtime] which provides access to shared resources across the cluster, this includes
/// communication protocols and transports.
#[derive(Clone)]
pub struct DistributedRuntime {
    // local runtime
    runtime: Runtime,

46
    // Unified transport manager
47
48
49
50
    etcd_client: Option<transports::etcd::Client>,
    nats_client: Option<transports::nats::Client>,
    store: KeyValueStoreManager,
    tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
51
    network_manager: Arc<OnceCell<Arc<crate::pipeline::network::manager::NetworkManager>>>,
52
    system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
53
    request_plane: RequestPlaneMode,
54
55
56
57

    // Service discovery client
    discovery_client: Arc<dyn discovery::Discovery>,

58
59
60
61
    // Discovery metadata (only used for Kubernetes backend)
    // Shared with system status server to expose via /metadata endpoint
    discovery_metadata: Option<Arc<tokio::sync::RwLock<discovery::DiscoveryMetadata>>>,

62
63
64
65
66
67
68
    // local registry for components
    // the registry allows us to use share runtime resources across instances of the same component object.
    // take for example two instances of a client to the same remote component. The registry allows us to use
    // a single endpoint watcher for both clients, this keeps the number background tasking watching specific
    // paths in etcd to a minimum.
    component_registry: component::Registry,

69
    instance_sources: Arc<tokio::sync::Mutex<InstanceMap>>,
70
71
72
73
74
75
76
77

    // Health Status
    system_health: Arc<parking_lot::Mutex<SystemHealth>>,

    // This hierarchy's own metrics registry
    metrics_registry: MetricsRegistry,
}

78
impl MetricsHierarchy for DistributedRuntime {
79
80
81
82
    fn basename(&self) -> String {
        "".to_string() // drt has no basename. Basename only begins with the Namespace.
    }

83
84
85
86
87
88
    fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
        vec![] // drt is the root, so no parent hierarchies
    }

    fn get_metrics_registry(&self) -> &MetricsRegistry {
        &self.metrics_registry
89
90
91
    }
}

Ryan Olson's avatar
Ryan Olson committed
92
93
94
95
96
97
impl std::fmt::Debug for DistributedRuntime {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "DistributedRuntime")
    }
}

Ryan Olson's avatar
Ryan Olson committed
98
99
impl DistributedRuntime {
    pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> {
100
        let (selected_kv_store, nats_config, request_plane) = config.dissolve();
Ryan Olson's avatar
Ryan Olson committed
101
102
103

        let runtime_clone = runtime.clone();

104
105
        let (etcd_client, store) = match selected_kv_store {
            KeyValueStoreSelect::Etcd(etcd_config) => {
106
107
108
109
110
111
112
                let etcd_client = etcd::Client::new(*etcd_config, runtime_clone).await.inspect_err(|err|
                    // The returned error doesn't show because of a dropped runtime error, so
                    // log it first.
                    tracing::error!(%err, "Could not connect to etcd. Pass `--store-kv ..` to use a different backend or start etcd."))?;
                let store = KeyValueStoreManager::etcd(etcd_client.clone());
                (Some(etcd_client), store)
            }
113
114
            KeyValueStoreSelect::File(root) => (None, KeyValueStoreManager::file(root)),
            KeyValueStoreSelect::Memory => (None, KeyValueStoreManager::memory()),
115
        };
Ryan Olson's avatar
Ryan Olson committed
116

117
118
119
120
        let nats_client = match nats_config {
            Some(nc) => Some(nc.connect().await?),
            None => None,
        };
Ryan Olson's avatar
Ryan Olson committed
121

122
        // Start system status server for health and metrics if enabled in configuration
123
124
125
126
127
128
129
130
        let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();
        // IMPORTANT: We must extract cancel_token from runtime BEFORE moving runtime into the struct below.
        // This is because after moving, runtime is no longer accessible in this scope (ownership rules).
        let cancel_token = if config.system_server_enabled() {
            Some(runtime.clone().child_token())
        } else {
            None
        };
131
132
        let starting_health_status = config.starting_health_status.clone();
        let use_endpoint_health_status = config.use_endpoint_health_status.clone();
133
134
        let health_endpoint_path = config.system_health_path.clone();
        let live_endpoint_path = config.system_live_path.clone();
135
        let system_health = Arc::new(parking_lot::Mutex::new(SystemHealth::new(
136
137
            starting_health_status,
            use_endpoint_health_status,
138
139
            health_endpoint_path,
            live_endpoint_path,
140
        )));
141

142
143
        let nats_client_for_metrics = nats_client.clone();

144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
        // Initialize discovery client based on backend configuration
        let discovery_backend =
            std::env::var("DYN_DISCOVERY_BACKEND").unwrap_or_else(|_| "kv_store".to_string());

        let (discovery_client, discovery_metadata) = match discovery_backend.as_str() {
            "kubernetes" => {
                tracing::info!("Initializing Kubernetes discovery backend");
                let metadata = Arc::new(tokio::sync::RwLock::new(
                    crate::discovery::DiscoveryMetadata::new(),
                ));
                let client = crate::discovery::KubeDiscoveryClient::new(
                    metadata.clone(),
                    runtime.primary_token(),
                )
                .await
                .inspect_err(
                    |err| tracing::error!(%err, "Failed to initialize Kubernetes discovery client"),
                )?;
                (Arc::new(client) as Arc<dyn Discovery>, Some(metadata))
            }
            _ => {
                tracing::info!("Initializing KV store discovery backend");
                use crate::discovery::KVStoreDiscovery;
                (
                    Arc::new(KVStoreDiscovery::new(
                        store.clone(),
                        runtime.primary_token(),
                    )) as Arc<dyn Discovery>,
                    None,
                )
            }
175
176
        };

177
        let distributed_runtime = Self {
Ryan Olson's avatar
Ryan Olson committed
178
179
            runtime,
            etcd_client,
180
            store,
Ryan Olson's avatar
Ryan Olson committed
181
182
            nats_client,
            tcp_server: Arc::new(OnceCell::new()),
183
            network_manager: Arc::new(OnceCell::new()),
184
            system_status_server: Arc::new(OnceLock::new()),
185
            discovery_client,
186
            discovery_metadata,
Ryan Olson's avatar
Ryan Olson committed
187
            component_registry: component::Registry::new(),
188
            instance_sources: Arc::new(Mutex::new(HashMap::new())),
189
            metrics_registry: crate::MetricsRegistry::new(),
190
            system_health,
191
            request_plane,
192
193
        };

194
195
196
197
198
        if let Some(nats_client_for_metrics) = nats_client_for_metrics {
            let nats_client_metrics = DRTNatsClientPrometheusMetrics::new(
                &distributed_runtime,
                nats_client_for_metrics.client().clone(),
            )?;
199
            // Register a callback to update NATS client metrics on the DRT's metrics registry
200
201
202
203
204
205
206
207
            let nats_client_callback = Arc::new({
                let nats_client_clone = nats_client_metrics.clone();
                move || {
                    nats_client_clone.set_from_client_stats();
                    Ok(())
                }
            });
            distributed_runtime
208
209
                .metrics_registry
                .add_update_callback(nats_client_callback);
210
        }
211

212
213
214
215
216
217
        // Initialize the uptime gauge in SystemHealth
        distributed_runtime
            .system_health
            .lock()
            .initialize_uptime_gauge(&distributed_runtime)?;

218
        // Handle system status server initialization
219
        if let Some(cancel_token) = cancel_token {
220
            // System server is enabled - start both the state and HTTP server
221
            let host = config.system_host.clone();
222
            let port = config.system_port as u16;
223

224
            // Start system status server (it creates SystemStatusState internally)
225
            match crate::system_status_server::spawn_system_status_server(
226
227
228
229
                &host,
                port,
                cancel_token,
                Arc::new(distributed_runtime.clone()),
230
                distributed_runtime.discovery_metadata.clone(),
231
232
233
            )
            .await
            {
234
                Ok((addr, handle)) => {
235
                    tracing::info!("System status server started successfully on {}", addr);
236

237
238
239
240
241
242
                    // Store system status server information
                    let system_status_server_info =
                        crate::system_status_server::SystemStatusServerInfo::new(
                            addr,
                            Some(handle),
                        );
243

244
                    // Initialize the system_status_server field
245
                    distributed_runtime
246
247
248
                        .system_status_server
                        .set(Arc::new(system_status_server_info))
                        .expect("System status server info should only be set once");
249
250
                }
                Err(e) => {
251
                    tracing::error!("System status server startup failed: {}", e);
252
                }
253
            }
254
        } else {
255
            // System server HTTP is disabled, but uptime metrics are still being tracked via SystemHealth
256
257
258
            tracing::debug!(
                "System status server HTTP endpoints disabled, but uptime metrics are being tracked"
            );
259
260
        }

261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
        // Start health check manager if enabled
        if config.health_check_enabled {
            let health_check_config = crate::health_check::HealthCheckConfig {
                canary_wait_time: std::time::Duration::from_secs(config.canary_wait_time_secs),
                request_timeout: std::time::Duration::from_secs(
                    config.health_check_request_timeout_secs,
                ),
            };

            // Start the health check manager (spawns per-endpoint monitoring tasks)
            match crate::health_check::start_health_check_manager(
                distributed_runtime.clone(),
                Some(health_check_config),
            )
            .await
            {
                Ok(()) => tracing::info!(
                    "Health check manager started (canary_wait_time: {}s, request_timeout: {}s)",
                    config.canary_wait_time_secs,
                    config.health_check_request_timeout_secs
                ),
                Err(e) => tracing::error!("Health check manager failed to start: {}", e),
            }
        }

286
        Ok(distributed_runtime)
Ryan Olson's avatar
Ryan Olson committed
287
288
289
    }

    pub async fn from_settings(runtime: Runtime) -> Result<Self> {
290
        let config = DistributedConfig::from_settings();
Ryan Olson's avatar
Ryan Olson committed
291
292
293
294
295
296
297
        Self::new(runtime, config).await
    }

    pub fn runtime(&self) -> &Runtime {
        &self.runtime
    }

298
299
300
301
    pub fn primary_token(&self) -> CancellationToken {
        self.runtime.primary_token()
    }

302
303
304
305
306
307
308
309
310
311
312
    // TODO: Don't hand out pointers, instead have methods to use the registry in friendly ways
    // (without being aware of async locks and so on)
    pub fn component_registry(&self) -> &component::Registry {
        &self.component_registry
    }

    // TODO: Don't hand out pointers, instead provide system health related services.
    pub fn system_health(&self) -> Arc<parking_lot::Mutex<SystemHealth>> {
        self.system_health.clone()
    }

313
    pub fn connection_id(&self) -> u64 {
314
        self.discovery_client.instance_id()
Ryan Olson's avatar
Ryan Olson committed
315
316
317
318
    }

    pub fn shutdown(&self) {
        self.runtime.shutdown();
319
        self.store.shutdown();
Ryan Olson's avatar
Ryan Olson committed
320
321
322
323
    }

    /// Create a [`Namespace`]
    pub fn namespace(&self, name: impl Into<String>) -> Result<Namespace> {
324
        Namespace::new(self.clone(), name.into())
Ryan Olson's avatar
Ryan Olson committed
325
326
    }

327
328
329
    /// Returns the discovery interface for service registration and discovery
    pub fn discovery(&self) -> Arc<dyn Discovery> {
        self.discovery_client.clone()
330
331
    }

332
333
    pub(crate) fn service_client(&self) -> Option<ServiceClient> {
        self.nats_client().map(|nc| ServiceClient::new(nc.clone()))
Ryan Olson's avatar
Ryan Olson committed
334
335
    }

336
    pub async fn tcp_server(&self) -> Result<Arc<tcp::server::TcpStreamServer>> {
Ryan Olson's avatar
Ryan Olson committed
337
338
339
340
341
        Ok(self
            .tcp_server
            .get_or_try_init(async move {
                let options = tcp::server::ServerOptions::default();
                let server = tcp::server::TcpStreamServer::new(options).await?;
342
                Ok::<_, PipelineError>(server)
Ryan Olson's avatar
Ryan Olson committed
343
344
345
346
347
            })
            .await?
            .clone())
    }

348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
    /// Get the network manager (lazy initialization)
    ///
    /// The network manager consolidates all network configuration and provides
    /// unified access to request plane servers and clients.
    pub async fn network_manager(
        &self,
    ) -> Result<Arc<crate::pipeline::network::manager::NetworkManager>> {
        use crate::pipeline::network::manager::NetworkManager;

        let manager = self
            .network_manager
            .get_or_try_init(async {
                // Get NATS client if available
                let nats_client = self.nats_client().map(|c| c.client().clone());

                // NetworkManager handles all config reading and mode selection
                anyhow::Ok(NetworkManager::new(
                    self.child_token(),
                    nats_client,
                    self.component_registry.clone(),
368
                    self.request_plane,
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
                ))
            })
            .await?;

        Ok(manager.clone())
    }

    /// Get the request plane server (convenience method)
    ///
    /// This is a shortcut for `network_manager().await?.server().await`.
    pub async fn request_plane_server(
        &self,
    ) -> Result<Arc<dyn crate::pipeline::network::ingress::unified_server::RequestPlaneServer>>
    {
        let manager = self.network_manager().await?;
        manager.server().await
    }

    /// DEPRECATED: Use network_manager().server() instead
    #[deprecated(note = "Use request_plane_server() or network_manager().server() instead")]
    pub async fn http_server(
        &self,
    ) -> Result<Arc<crate::pipeline::network::ingress::http_endpoint::SharedHttpServer>> {
        // For backward compatibility, try to downcast
        let _server = self.request_plane_server().await?;
        // This will only work if we're actually in HTTP mode
        // For now, just return an error suggesting the new API
        anyhow::bail!(
            "http_server() is deprecated. Use request_plane_server() instead, which returns a trait object that works with all transport types."
        )
    }

    /// DEPRECATED: Use network_manager().server() instead
    #[deprecated(note = "Use request_plane_server() or network_manager().server() instead")]
    pub async fn shared_tcp_server(
        &self,
    ) -> Result<Arc<crate::pipeline::network::ingress::shared_tcp_endpoint::SharedTcpServer>> {
        // For backward compatibility, try to downcast
        let _server = self.request_plane_server().await?;
        // This will only work if we're actually in TCP mode
        // For now, just return an error suggesting the new API
        anyhow::bail!(
            "shared_tcp_server() is deprecated. Use request_plane_server() instead, which returns a trait object that works with all transport types."
        )
    }

415
416
    pub fn nats_client(&self) -> Option<&nats::Client> {
        self.nats_client.as_ref()
Ryan Olson's avatar
Ryan Olson committed
417
418
    }

419
420
421
422
423
    /// Get system status server information if available
    pub fn system_status_server_info(
        &self,
    ) -> Option<Arc<crate::system_status_server::SystemStatusServerInfo>> {
        self.system_status_server.get().cloned()
424
425
    }

426
    // todo(ryan): deprecate this as we move to Discovery traits and Component Identifiers
427
428
429
    //
    // Try to use `store()` instead of this. Only use this if you have not been able to migrate
    // yet, or if you require etcd-specific features like distributed locking (rare).
430
    pub fn etcd_client(&self) -> Option<etcd::Client> {
Ryan Olson's avatar
Ryan Olson committed
431
432
        self.etcd_client.clone()
    }
433

434
435
    /// An interface to store things. Will eventually replace `etcd_client`.
    /// Currently does key-value, but will grow to include whatever we need to store.
436
437
    pub fn store(&self) -> &KeyValueStoreManager {
        &self.store
438
439
    }

440
441
442
443
444
    /// How the frontend should talk to the backend.
    pub fn request_plane(&self) -> RequestPlaneMode {
        self.request_plane
    }

445
446
447
    pub fn child_token(&self) -> CancellationToken {
        self.runtime.child_token()
    }
448

449
450
451
452
    pub(crate) fn graceful_shutdown_tracker(&self) -> Arc<GracefulShutdownTracker> {
        self.runtime.graceful_shutdown_tracker()
    }

453
    pub fn instance_sources(&self) -> Arc<Mutex<InstanceMap>> {
454
455
        self.instance_sources.clone()
    }
Ryan Olson's avatar
Ryan Olson committed
456
457
458
459
}

#[derive(Dissolve)]
pub struct DistributedConfig {
460
    pub store_backend: KeyValueStoreSelect,
461
    pub nats_config: Option<nats::ClientOptions>,
462
    pub request_plane: RequestPlaneMode,
Ryan Olson's avatar
Ryan Olson committed
463
464
465
}

impl DistributedConfig {
466
    pub fn from_settings() -> DistributedConfig {
467
        let request_plane = RequestPlaneMode::from_env();
Ryan Olson's avatar
Ryan Olson committed
468
        DistributedConfig {
469
            store_backend: KeyValueStoreSelect::Etcd(Box::default()),
470
471
472
473
474
475
            nats_config: if request_plane.is_nats() {
                Some(nats::ClientOptions::default())
            } else {
                None
            },
            request_plane,
Ryan Olson's avatar
Ryan Olson committed
476
477
        }
    }
Ryan Olson's avatar
Ryan Olson committed
478
479

    pub fn for_cli() -> DistributedConfig {
480
481
482
483
        let etcd_config = etcd::ClientOptions {
            attach_lease: false,
            ..Default::default()
        };
484
        let request_plane = RequestPlaneMode::from_env();
485
486
        DistributedConfig {
            store_backend: KeyValueStoreSelect::Etcd(Box::new(etcd_config)),
487
488
489
490
491
492
            nats_config: if request_plane.is_nats() {
                Some(nats::ClientOptions::default())
            } else {
                None
            },
            request_plane,
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
        }
    }
}

/// Request plane transport mode configuration
///
/// This determines how requests are distributed from routers to workers:
/// - `Nats`: Use NATS for request distribution (default, legacy)
/// - `Http`: Use HTTP/2 for request distribution
/// - `Tcp`: Use raw TCP for request distribution with msgpack support
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RequestPlaneMode {
    /// Use NATS for request plane (default for backward compatibility)
    Nats,
    /// Use HTTP/2 for request plane
    Http,
    /// Use raw TCP for request plane with msgpack support
    Tcp,
}

impl Default for RequestPlaneMode {
    fn default() -> Self {
        Self::Nats
    }
}

impl fmt::Display for RequestPlaneMode {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Self::Nats => write!(f, "nats"),
            Self::Http => write!(f, "http"),
            Self::Tcp => write!(f, "tcp"),
        }
    }
}

impl std::str::FromStr for RequestPlaneMode {
    type Err = anyhow::Error;

    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
        match s.to_lowercase().as_str() {
            "nats" => Ok(Self::Nats),
            "http" => Ok(Self::Http),
            "tcp" => Ok(Self::Tcp),
            _ => Err(anyhow::anyhow!(
                "Invalid request plane mode: '{}'. Valid options are: 'nats', 'http', 'tcp'",
                s
            )),
541
        }
Ryan Olson's avatar
Ryan Olson committed
542
    }
Ryan Olson's avatar
Ryan Olson committed
543
}
544

545
546
547
548
549
550
551
552
553
impl RequestPlaneMode {
    /// Get the request plane mode from environment variable (uncached)
    /// Reads from `DYN_REQUEST_PLANE` environment variable.
    fn from_env() -> Self {
        std::env::var("DYN_REQUEST_PLANE")
            .ok()
            .and_then(|s| s.parse().ok())
            .unwrap_or_default()
    }
554
555
556
557

    pub fn is_nats(&self) -> bool {
        matches!(self, RequestPlaneMode::Nats)
    }
558
559
}

560
pub mod distributed_test_utils {
561
562
    //! Common test helper functions for DistributedRuntime tests

563
    /// Helper function to create a DRT instance for integration-only tests.
564
    /// Uses from_current to leverage existing tokio runtime
565
    /// Note: Settings are read from environment variables inside DistributedRuntime::from_settings
566
    #[cfg(feature = "integration")]
567
568
569
    pub async fn create_test_drt_async() -> super::DistributedRuntime {
        use crate::{storage::key_value_store::KeyValueStoreSelect, transports::nats};

570
        let rt = crate::Runtime::from_current().unwrap();
571
572
        let config = super::DistributedConfig {
            store_backend: KeyValueStoreSelect::Memory,
573
            nats_config: Some(nats::ClientOptions::default()),
574
            request_plane: crate::distributed::RequestPlaneMode::default(),
575
576
        };
        super::DistributedRuntime::new(rt, config).await.unwrap()
577
578
    }
}
579

580
#[cfg(all(test, feature = "integration"))]
581
mod tests {
582
    use super::RequestPlaneMode;
583
584
585
586
    use super::distributed_test_utils::create_test_drt_async;

    #[tokio::test]
    async fn test_drt_uptime_after_delay_system_disabled() {
587
        use crate::config::environment_names::runtime::system as env_system;
588
        // Test uptime with system status server disabled
589
        temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
590
591
592
593
594
595
596
            // Start a DRT
            let drt = create_test_drt_async().await;

            // Wait 50ms
            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;

            // Check that uptime is 50+ ms
597
            let uptime = drt.system_health.lock().uptime();
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
            assert!(
                uptime >= std::time::Duration::from_millis(50),
                "Expected uptime to be at least 50ms, but got {:?}",
                uptime
            );

            println!(
                "✓ DRT uptime test passed (system disabled): uptime = {:?}",
                uptime
            );
        })
        .await;
    }

    #[tokio::test]
    async fn test_drt_uptime_after_delay_system_enabled() {
614
        use crate::config::environment_names::runtime::system as env_system;
615
        // Test uptime with system status server enabled
616
        temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, Some("8081"))], async {
617
618
619
620
621
622
623
            // Start a DRT
            let drt = create_test_drt_async().await;

            // Wait 50ms
            tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;

            // Check that uptime is 50+ ms
624
            let uptime = drt.system_health.lock().uptime();
625
626
627
628
629
630
631
632
633
634
635
636
637
            assert!(
                uptime >= std::time::Duration::from_millis(50),
                "Expected uptime to be at least 50ms, but got {:?}",
                uptime
            );

            println!(
                "✓ DRT uptime test passed (system enabled): uptime = {:?}",
                uptime
            );
        })
        .await;
    }
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673

    #[test]
    fn test_request_plane_mode_from_str() {
        assert_eq!(
            "nats".parse::<RequestPlaneMode>().unwrap(),
            RequestPlaneMode::Nats
        );
        assert_eq!(
            "http".parse::<RequestPlaneMode>().unwrap(),
            RequestPlaneMode::Http
        );
        assert_eq!(
            "tcp".parse::<RequestPlaneMode>().unwrap(),
            RequestPlaneMode::Tcp
        );
        assert_eq!(
            "NATS".parse::<RequestPlaneMode>().unwrap(),
            RequestPlaneMode::Nats
        );
        assert_eq!(
            "HTTP".parse::<RequestPlaneMode>().unwrap(),
            RequestPlaneMode::Http
        );
        assert_eq!(
            "TCP".parse::<RequestPlaneMode>().unwrap(),
            RequestPlaneMode::Tcp
        );
        assert!("invalid".parse::<RequestPlaneMode>().is_err());
    }

    #[test]
    fn test_request_plane_mode_display() {
        assert_eq!(RequestPlaneMode::Nats.to_string(), "nats");
        assert_eq!(RequestPlaneMode::Http.to_string(), "http");
        assert_eq!(RequestPlaneMode::Tcp.to_string(), "tcp");
    }
674
}