// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 pub use crate::component::Component; use crate::storage::key_value_store::{ EtcdStore, KeyValueStore, KeyValueStoreEnum, KeyValueStoreManager, KeyValueStoreSelect, MemoryStore, }; use crate::transports::nats::DRTNatsClientPrometheusMetrics; use crate::{ ErrorContext, component::{self, ComponentBuilder, Endpoint, InstanceSource, Namespace}, discovery::Discovery, metrics::PrometheusUpdateCallback, metrics::{MetricsHierarchy, MetricsRegistry}, service::ServiceClient, transports::{etcd, nats, tcp}, }; use super::utils::GracefulShutdownTracker; use super::{Arc, DistributedRuntime, OK, OnceCell, Result, Runtime, SystemHealth, Weak, error}; use std::sync::OnceLock; use derive_getters::Dissolve; use figment::error; use std::collections::HashMap; use tokio::sync::Mutex; use tokio_util::sync::CancellationToken; impl MetricsHierarchy for DistributedRuntime { fn basename(&self) -> String { "".to_string() // drt has no basename. Basename only begins with the Namespace. } fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> { vec![] // drt is the root, so no parent hierarchies } fn get_metrics_registry(&self) -> &MetricsRegistry { &self.metrics_registry } } impl std::fmt::Debug for DistributedRuntime { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "DistributedRuntime") } } impl DistributedRuntime { pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result { let (selected_kv_store, nats_config, is_static) = config.dissolve(); let runtime_clone = runtime.clone(); let (etcd_client, store) = match (is_static, selected_kv_store) { (false, KeyValueStoreSelect::Etcd(etcd_config)) => { let etcd_client = etcd::Client::new(*etcd_config, runtime_clone).await.inspect_err(|err| // The returned error doesn't show because of a dropped runtime error, so // log it first. tracing::error!(%err, "Could not connect to etcd. Pass `--store-kv ..` to use a different backend or start etcd."))?; let store = KeyValueStoreManager::etcd(etcd_client.clone()); (Some(etcd_client), store) } (false, KeyValueStoreSelect::File(root)) => (None, KeyValueStoreManager::file(root)), (true, _) | (false, KeyValueStoreSelect::Memory) => { (None, KeyValueStoreManager::memory()) } }; let nats_client = Some(nats_config.clone().connect().await?); // Start system status server for health and metrics if enabled in configuration let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default(); // IMPORTANT: We must extract cancel_token from runtime BEFORE moving runtime into the struct below. // This is because after moving, runtime is no longer accessible in this scope (ownership rules). let cancel_token = if config.system_server_enabled() { Some(runtime.clone().child_token()) } else { None }; let starting_health_status = config.starting_health_status.clone(); let use_endpoint_health_status = config.use_endpoint_health_status.clone(); let health_endpoint_path = config.system_health_path.clone(); let live_endpoint_path = config.system_live_path.clone(); let system_health = Arc::new(parking_lot::Mutex::new(SystemHealth::new( starting_health_status, use_endpoint_health_status, health_endpoint_path, live_endpoint_path, ))); let nats_client_for_metrics = nats_client.clone(); // Initialize discovery backed by KV store let discovery_client = { use crate::discovery::KVStoreDiscovery; Arc::new(KVStoreDiscovery::new( store.clone(), runtime.primary_token(), )) as Arc }; let distributed_runtime = Self { runtime, etcd_client, store, nats_client, tcp_server: Arc::new(OnceCell::new()), system_status_server: Arc::new(OnceLock::new()), discovery_client, component_registry: component::Registry::new(), is_static, instance_sources: Arc::new(Mutex::new(HashMap::new())), metrics_registry: crate::MetricsRegistry::new(), system_health, }; if let Some(nats_client_for_metrics) = nats_client_for_metrics { let nats_client_metrics = DRTNatsClientPrometheusMetrics::new( &distributed_runtime, nats_client_for_metrics.client().clone(), )?; // Register a callback to update NATS client metrics on the DRT's metrics registry let nats_client_callback = Arc::new({ let nats_client_clone = nats_client_metrics.clone(); move || { nats_client_clone.set_from_client_stats(); Ok(()) } }); distributed_runtime .metrics_registry .add_update_callback(nats_client_callback); } // Initialize the uptime gauge in SystemHealth distributed_runtime .system_health .lock() .initialize_uptime_gauge(&distributed_runtime)?; // Handle system status server initialization if let Some(cancel_token) = cancel_token { // System server is enabled - start both the state and HTTP server let host = config.system_host.clone(); let port = config.system_port as u16; // Start system status server (it creates SystemStatusState internally) match crate::system_status_server::spawn_system_status_server( &host, port, cancel_token, Arc::new(distributed_runtime.clone()), ) .await { Ok((addr, handle)) => { tracing::info!("System status server started successfully on {}", addr); // Store system status server information let system_status_server_info = crate::system_status_server::SystemStatusServerInfo::new( addr, Some(handle), ); // Initialize the system_status_server field distributed_runtime .system_status_server .set(Arc::new(system_status_server_info)) .expect("System status server info should only be set once"); } Err(e) => { tracing::error!("System status server startup failed: {}", e); } } } else { // System server HTTP is disabled, but uptime metrics are still being tracked via SystemHealth tracing::debug!( "System status server HTTP endpoints disabled, but uptime metrics are being tracked" ); } // Start health check manager if enabled if config.health_check_enabled { let health_check_config = crate::health_check::HealthCheckConfig { canary_wait_time: std::time::Duration::from_secs(config.canary_wait_time_secs), request_timeout: std::time::Duration::from_secs( config.health_check_request_timeout_secs, ), }; // Start the health check manager (spawns per-endpoint monitoring tasks) match crate::health_check::start_health_check_manager( distributed_runtime.clone(), Some(health_check_config), ) .await { Ok(()) => tracing::info!( "Health check manager started (canary_wait_time: {}s, request_timeout: {}s)", config.canary_wait_time_secs, config.health_check_request_timeout_secs ), Err(e) => tracing::error!("Health check manager failed to start: {}", e), } } Ok(distributed_runtime) } pub async fn from_settings(runtime: Runtime) -> Result { let config = DistributedConfig::from_settings(false); Self::new(runtime, config).await } // Call this if you are using static workers that do not need etcd-based discovery. pub async fn from_settings_without_discovery(runtime: Runtime) -> Result { let config = DistributedConfig::from_settings(true); Self::new(runtime, config).await } pub fn runtime(&self) -> &Runtime { &self.runtime } pub fn primary_token(&self) -> CancellationToken { self.runtime.primary_token() } pub fn connection_id(&self) -> u64 { self.store.connection_id() } pub fn shutdown(&self) { self.runtime.shutdown(); self.store.shutdown(); } /// Create a [`Namespace`] pub fn namespace(&self, name: impl Into) -> Result { Namespace::new(self.clone(), name.into(), self.is_static) } /// Returns the discovery interface for service registration and discovery pub fn discovery(&self) -> Arc { self.discovery_client.clone() } pub(crate) fn service_client(&self) -> Option { self.nats_client().map(|nc| ServiceClient::new(nc.clone())) } pub async fn tcp_server(&self) -> Result> { Ok(self .tcp_server .get_or_try_init(async move { let options = tcp::server::ServerOptions::default(); let server = tcp::server::TcpStreamServer::new(options).await?; OK(server) }) .await? .clone()) } pub fn nats_client(&self) -> Option<&nats::Client> { self.nats_client.as_ref() } /// Get system status server information if available pub fn system_status_server_info( &self, ) -> Option> { self.system_status_server.get().cloned() } // todo(ryan): deprecate this as we move to Discovery traits and Component Identifiers // // Try to use `store()` instead of this. Only use this if you have not been able to migrate // yet, or if you require etcd-specific features like distributed locking (rare). pub fn etcd_client(&self) -> Option { self.etcd_client.clone() } /// An interface to store things. Will eventually replace `etcd_client`. /// Currently does key-value, but will grow to include whatever we need to store. pub fn store(&self) -> &KeyValueStoreManager { &self.store } pub fn child_token(&self) -> CancellationToken { self.runtime.child_token() } pub(crate) fn graceful_shutdown_tracker(&self) -> Arc { self.runtime.graceful_shutdown_tracker() } pub fn instance_sources(&self) -> Arc>>> { self.instance_sources.clone() } } #[derive(Dissolve)] pub struct DistributedConfig { pub store_backend: KeyValueStoreSelect, pub nats_config: nats::ClientOptions, pub is_static: bool, } impl DistributedConfig { pub fn from_settings(is_static: bool) -> DistributedConfig { DistributedConfig { store_backend: KeyValueStoreSelect::Etcd(Box::default()), nats_config: nats::ClientOptions::default(), is_static, } } pub fn for_cli() -> DistributedConfig { let etcd_config = etcd::ClientOptions { attach_lease: false, ..Default::default() }; DistributedConfig { store_backend: KeyValueStoreSelect::Etcd(Box::new(etcd_config)), nats_config: nats::ClientOptions::default(), is_static: false, } } } pub mod distributed_test_utils { //! Common test helper functions for DistributedRuntime tests // TODO: Use in-memory DistributedRuntime for tests instead of full runtime when available. /// Helper function to create a DRT instance for integration-only tests. /// Uses from_current to leverage existing tokio runtime /// Note: Settings are read from environment variables inside DistributedRuntime::from_settings_without_discovery #[cfg(feature = "integration")] pub async fn create_test_drt_async() -> crate::DistributedRuntime { let rt = crate::Runtime::from_current().unwrap(); crate::DistributedRuntime::from_settings_without_discovery(rt) .await .unwrap() } } #[cfg(all(test, feature = "integration"))] mod tests { use super::distributed_test_utils::create_test_drt_async; #[tokio::test] async fn test_drt_uptime_after_delay_system_disabled() { // Test uptime with system status server disabled temp_env::async_with_vars([("DYN_SYSTEM_ENABLED", Some("false"))], async { // Start a DRT let drt = create_test_drt_async().await; // Wait 50ms tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; // Check that uptime is 50+ ms let uptime = drt.system_health.lock().uptime(); assert!( uptime >= std::time::Duration::from_millis(50), "Expected uptime to be at least 50ms, but got {:?}", uptime ); println!( "✓ DRT uptime test passed (system disabled): uptime = {:?}", uptime ); }) .await; } #[tokio::test] async fn test_drt_uptime_after_delay_system_enabled() { // Test uptime with system status server enabled temp_env::async_with_vars([("DYN_SYSTEM_PORT", Some("8081"))], async { // Start a DRT let drt = create_test_drt_async().await; // Wait 50ms tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; // Check that uptime is 50+ ms let uptime = drt.system_health.lock().uptime(); assert!( uptime >= std::time::Duration::from_millis(50), "Expected uptime to be at least 50ms, but got {:?}", uptime ); println!( "✓ DRT uptime test passed (system enabled): uptime = {:?}", uptime ); }) .await; } }