// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 use crate::component::{Component, Instance}; use crate::pipeline::PipelineError; use crate::storage::key_value_store::{ EtcdStore, KeyValueStore, KeyValueStoreEnum, KeyValueStoreManager, KeyValueStoreSelect, MemoryStore, }; use crate::transports::nats::DRTNatsClientPrometheusMetrics; use crate::{ component::{self, ComponentBuilder, Endpoint, Namespace}, discovery::Discovery, metrics::PrometheusUpdateCallback, metrics::{MetricsHierarchy, MetricsRegistry}, service::ServiceClient, transports::{etcd, nats, tcp}, }; use crate::{discovery, system_status_server, transports}; use super::utils::GracefulShutdownTracker; use crate::SystemHealth; use crate::runtime::Runtime; use async_once_cell::OnceCell; use std::fmt; use std::sync::{Arc, OnceLock, Weak}; use tokio::sync::watch::Receiver; use anyhow::Result; use derive_getters::Dissolve; use figment::error; use std::collections::HashMap; use tokio::sync::Mutex; use tokio_util::sync::CancellationToken; type InstanceMap = HashMap>>>; /// Distributed [Runtime] which provides access to shared resources across the cluster, this includes /// communication protocols and transports. #[derive(Clone)] pub struct DistributedRuntime { // local runtime runtime: Runtime, nats_client: Option, store: KeyValueStoreManager, tcp_server: Arc>>, network_manager: Arc>>, system_status_server: Arc>>, request_plane: RequestPlaneMode, // Service discovery client discovery_client: Arc, // Discovery metadata (only used for Kubernetes backend) // Shared with system status server to expose via /metadata endpoint discovery_metadata: Option>>, // local registry for components // the registry allows us to use share runtime resources across instances of the same component object. // take for example two instances of a client to the same remote component. The registry allows us to use // a single endpoint watcher for both clients, this keeps the number background tasking watching specific // paths in etcd to a minimum. component_registry: component::Registry, instance_sources: Arc>, // Health Status system_health: Arc>, // This hierarchy's own metrics registry metrics_registry: MetricsRegistry, } impl MetricsHierarchy for DistributedRuntime { fn basename(&self) -> String { "".to_string() // drt has no basename. Basename only begins with the Namespace. } fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> { vec![] // drt is the root, so no parent hierarchies } fn get_metrics_registry(&self) -> &MetricsRegistry { &self.metrics_registry } } impl std::fmt::Debug for DistributedRuntime { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "DistributedRuntime") } } impl DistributedRuntime { pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result { let (selected_kv_store, nats_config, request_plane) = config.dissolve(); let runtime_clone = runtime.clone(); let store = match selected_kv_store { KeyValueStoreSelect::Etcd(etcd_config) => { let etcd_client = etcd::Client::new(*etcd_config, runtime_clone).await.inspect_err(|err| // The returned error doesn't show because of a dropped runtime error, so // log it first. tracing::error!(%err, "Could not connect to etcd. Pass `--store-kv ..` to use a different backend or start etcd."))?; KeyValueStoreManager::etcd(etcd_client) } KeyValueStoreSelect::File(root) => KeyValueStoreManager::file(root), KeyValueStoreSelect::Memory => KeyValueStoreManager::memory(), }; let nats_client = match nats_config { Some(nc) => Some(nc.connect().await?), None => None, }; // Start system status server for health and metrics if enabled in configuration let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default(); // IMPORTANT: We must extract cancel_token from runtime BEFORE moving runtime into the struct below. // This is because after moving, runtime is no longer accessible in this scope (ownership rules). let cancel_token = if config.system_server_enabled() { Some(runtime.clone().child_token()) } else { None }; let starting_health_status = config.starting_health_status.clone(); let use_endpoint_health_status = config.use_endpoint_health_status.clone(); let health_endpoint_path = config.system_health_path.clone(); let live_endpoint_path = config.system_live_path.clone(); let system_health = Arc::new(parking_lot::Mutex::new(SystemHealth::new( starting_health_status, use_endpoint_health_status, health_endpoint_path, live_endpoint_path, ))); let nats_client_for_metrics = nats_client.clone(); // Initialize discovery client based on backend configuration let discovery_backend = std::env::var("DYN_DISCOVERY_BACKEND").unwrap_or_else(|_| "kv_store".to_string()); let (discovery_client, discovery_metadata) = match discovery_backend.as_str() { "kubernetes" => { tracing::info!("Initializing Kubernetes discovery backend"); let metadata = Arc::new(tokio::sync::RwLock::new( crate::discovery::DiscoveryMetadata::new(), )); let client = crate::discovery::KubeDiscoveryClient::new( metadata.clone(), runtime.primary_token(), ) .await .inspect_err( |err| tracing::error!(%err, "Failed to initialize Kubernetes discovery client"), )?; (Arc::new(client) as Arc, Some(metadata)) } _ => { tracing::info!("Initializing KV store discovery backend"); use crate::discovery::KVStoreDiscovery; ( Arc::new(KVStoreDiscovery::new( store.clone(), runtime.primary_token(), )) as Arc, None, ) } }; let distributed_runtime = Self { runtime, store, nats_client, tcp_server: Arc::new(OnceCell::new()), network_manager: Arc::new(OnceCell::new()), system_status_server: Arc::new(OnceLock::new()), discovery_client, discovery_metadata, component_registry: component::Registry::new(), instance_sources: Arc::new(Mutex::new(HashMap::new())), metrics_registry: crate::MetricsRegistry::new(), system_health, request_plane, }; if let Some(nats_client_for_metrics) = nats_client_for_metrics { let nats_client_metrics = DRTNatsClientPrometheusMetrics::new( &distributed_runtime, nats_client_for_metrics.client().clone(), )?; // Register a callback to update NATS client metrics on the DRT's metrics registry let nats_client_callback = Arc::new({ let nats_client_clone = nats_client_metrics.clone(); move || { nats_client_clone.set_from_client_stats(); Ok(()) } }); distributed_runtime .metrics_registry .add_update_callback(nats_client_callback); } // Initialize the uptime gauge in SystemHealth distributed_runtime .system_health .lock() .initialize_uptime_gauge(&distributed_runtime)?; // Handle system status server initialization if let Some(cancel_token) = cancel_token { // System server is enabled - start both the state and HTTP server let host = config.system_host.clone(); let port = config.system_port as u16; // Start system status server (it creates SystemStatusState internally) match crate::system_status_server::spawn_system_status_server( &host, port, cancel_token, Arc::new(distributed_runtime.clone()), distributed_runtime.discovery_metadata.clone(), ) .await { Ok((addr, handle)) => { tracing::info!("System status server started successfully on {}", addr); // Store system status server information let system_status_server_info = crate::system_status_server::SystemStatusServerInfo::new( addr, Some(handle), ); // Initialize the system_status_server field distributed_runtime .system_status_server .set(Arc::new(system_status_server_info)) .expect("System status server info should only be set once"); } Err(e) => { tracing::error!("System status server startup failed: {}", e); } } } else { // System server HTTP is disabled, but uptime metrics are still being tracked via SystemHealth tracing::debug!( "System status server HTTP endpoints disabled, but uptime metrics are being tracked" ); } // Start health check manager if enabled if config.health_check_enabled { let health_check_config = crate::health_check::HealthCheckConfig { canary_wait_time: std::time::Duration::from_secs(config.canary_wait_time_secs), request_timeout: std::time::Duration::from_secs( config.health_check_request_timeout_secs, ), }; // Start the health check manager (spawns per-endpoint monitoring tasks) match crate::health_check::start_health_check_manager( distributed_runtime.clone(), Some(health_check_config), ) .await { Ok(()) => tracing::info!( "Health check manager started (canary_wait_time: {}s, request_timeout: {}s)", config.canary_wait_time_secs, config.health_check_request_timeout_secs ), Err(e) => tracing::error!("Health check manager failed to start: {}", e), } } Ok(distributed_runtime) } pub async fn from_settings(runtime: Runtime) -> Result { let config = DistributedConfig::from_settings(); Self::new(runtime, config).await } pub fn runtime(&self) -> &Runtime { &self.runtime } pub fn primary_token(&self) -> CancellationToken { self.runtime.primary_token() } // TODO: Don't hand out pointers, instead have methods to use the registry in friendly ways // (without being aware of async locks and so on) pub fn component_registry(&self) -> &component::Registry { &self.component_registry } // TODO: Don't hand out pointers, instead provide system health related services. pub fn system_health(&self) -> Arc> { self.system_health.clone() } pub fn connection_id(&self) -> u64 { self.discovery_client.instance_id() } pub fn shutdown(&self) { self.runtime.shutdown(); self.store.shutdown(); } /// Create a [`Namespace`] pub fn namespace(&self, name: impl Into) -> Result { Namespace::new(self.clone(), name.into()) } /// Returns the discovery interface for service registration and discovery pub fn discovery(&self) -> Arc { self.discovery_client.clone() } pub(crate) fn service_client(&self) -> Option { self.nats_client().map(|nc| ServiceClient::new(nc.clone())) } pub async fn tcp_server(&self) -> Result> { Ok(self .tcp_server .get_or_try_init(async move { let options = tcp::server::ServerOptions::default(); let server = tcp::server::TcpStreamServer::new(options).await?; Ok::<_, PipelineError>(server) }) .await? .clone()) } /// Get the network manager (lazy initialization) /// /// The network manager consolidates all network configuration and provides /// unified access to request plane servers and clients. pub async fn network_manager( &self, ) -> Result> { use crate::pipeline::network::manager::NetworkManager; let manager = self .network_manager .get_or_try_init(async { // Get NATS client if available let nats_client = self.nats_client().map(|c| c.client().clone()); // NetworkManager handles all config reading and mode selection anyhow::Ok(NetworkManager::new( self.child_token(), nats_client, self.component_registry.clone(), self.request_plane, )) }) .await?; Ok(manager.clone()) } /// Get the request plane server (convenience method) /// /// This is a shortcut for `network_manager().await?.server().await`. pub async fn request_plane_server( &self, ) -> Result> { let manager = self.network_manager().await?; manager.server().await } /// DEPRECATED: Use network_manager().server() instead #[deprecated(note = "Use request_plane_server() or network_manager().server() instead")] pub async fn http_server( &self, ) -> Result> { // For backward compatibility, try to downcast let _server = self.request_plane_server().await?; // This will only work if we're actually in HTTP mode // For now, just return an error suggesting the new API anyhow::bail!( "http_server() is deprecated. Use request_plane_server() instead, which returns a trait object that works with all transport types." ) } /// DEPRECATED: Use network_manager().server() instead #[deprecated(note = "Use request_plane_server() or network_manager().server() instead")] pub async fn shared_tcp_server( &self, ) -> Result> { // For backward compatibility, try to downcast let _server = self.request_plane_server().await?; // This will only work if we're actually in TCP mode // For now, just return an error suggesting the new API anyhow::bail!( "shared_tcp_server() is deprecated. Use request_plane_server() instead, which returns a trait object that works with all transport types." ) } pub fn nats_client(&self) -> Option<&nats::Client> { self.nats_client.as_ref() } /// Get system status server information if available pub fn system_status_server_info( &self, ) -> Option> { self.system_status_server.get().cloned() } /// An interface to store things outside of the process. Usually backed by something like etcd. /// Currently does key-value, but will grow to include whatever we need to store. pub fn store(&self) -> &KeyValueStoreManager { &self.store } /// How the frontend should talk to the backend. pub fn request_plane(&self) -> RequestPlaneMode { self.request_plane } pub fn child_token(&self) -> CancellationToken { self.runtime.child_token() } pub(crate) fn graceful_shutdown_tracker(&self) -> Arc { self.runtime.graceful_shutdown_tracker() } pub fn instance_sources(&self) -> Arc> { self.instance_sources.clone() } } #[derive(Dissolve)] pub struct DistributedConfig { pub store_backend: KeyValueStoreSelect, pub nats_config: Option, pub request_plane: RequestPlaneMode, } impl DistributedConfig { pub fn from_settings() -> DistributedConfig { let request_plane = RequestPlaneMode::from_env(); DistributedConfig { store_backend: KeyValueStoreSelect::Etcd(Box::default()), nats_config: if request_plane.is_nats() { Some(nats::ClientOptions::default()) } else { None }, request_plane, } } pub fn for_cli() -> DistributedConfig { let etcd_config = etcd::ClientOptions { attach_lease: false, ..Default::default() }; let request_plane = RequestPlaneMode::from_env(); DistributedConfig { store_backend: KeyValueStoreSelect::Etcd(Box::new(etcd_config)), nats_config: if request_plane.is_nats() { Some(nats::ClientOptions::default()) } else { None }, request_plane, } } } /// Request plane transport mode configuration /// /// This determines how requests are distributed from routers to workers: /// - `Nats`: Use NATS for request distribution (default, legacy) /// - `Http`: Use HTTP/2 for request distribution /// - `Tcp`: Use raw TCP for request distribution with msgpack support #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum RequestPlaneMode { /// Use NATS for request plane (default for backward compatibility) Nats, /// Use HTTP/2 for request plane Http, /// Use raw TCP for request plane with msgpack support Tcp, } impl Default for RequestPlaneMode { fn default() -> Self { Self::Nats } } impl fmt::Display for RequestPlaneMode { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::Nats => write!(f, "nats"), Self::Http => write!(f, "http"), Self::Tcp => write!(f, "tcp"), } } } impl std::str::FromStr for RequestPlaneMode { type Err = anyhow::Error; fn from_str(s: &str) -> std::result::Result { match s.to_lowercase().as_str() { "nats" => Ok(Self::Nats), "http" => Ok(Self::Http), "tcp" => Ok(Self::Tcp), _ => Err(anyhow::anyhow!( "Invalid request plane mode: '{}'. Valid options are: 'nats', 'http', 'tcp'", s )), } } } impl RequestPlaneMode { /// Get the request plane mode from environment variable (uncached) /// Reads from `DYN_REQUEST_PLANE` environment variable. fn from_env() -> Self { std::env::var("DYN_REQUEST_PLANE") .ok() .and_then(|s| s.parse().ok()) .unwrap_or_default() } pub fn is_nats(&self) -> bool { matches!(self, RequestPlaneMode::Nats) } } pub mod distributed_test_utils { //! Common test helper functions for DistributedRuntime tests /// Helper function to create a DRT instance for integration-only tests. /// Uses from_current to leverage existing tokio runtime /// Note: Settings are read from environment variables inside DistributedRuntime::from_settings #[cfg(feature = "integration")] pub async fn create_test_drt_async() -> super::DistributedRuntime { use crate::{storage::key_value_store::KeyValueStoreSelect, transports::nats}; let rt = crate::Runtime::from_current().unwrap(); let config = super::DistributedConfig { store_backend: KeyValueStoreSelect::Memory, nats_config: Some(nats::ClientOptions::default()), request_plane: crate::distributed::RequestPlaneMode::default(), }; super::DistributedRuntime::new(rt, config).await.unwrap() } } #[cfg(all(test, feature = "integration"))] mod tests { use super::RequestPlaneMode; use super::distributed_test_utils::create_test_drt_async; #[tokio::test] async fn test_drt_uptime_after_delay_system_disabled() { use crate::config::environment_names::runtime::system as env_system; // Test uptime with system status server disabled temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async { // Start a DRT let drt = create_test_drt_async().await; // Wait 50ms tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; // Check that uptime is 50+ ms let uptime = drt.system_health.lock().uptime(); assert!( uptime >= std::time::Duration::from_millis(50), "Expected uptime to be at least 50ms, but got {:?}", uptime ); println!( "✓ DRT uptime test passed (system disabled): uptime = {:?}", uptime ); }) .await; } #[tokio::test] async fn test_drt_uptime_after_delay_system_enabled() { use crate::config::environment_names::runtime::system as env_system; // Test uptime with system status server enabled temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, Some("8081"))], async { // Start a DRT let drt = create_test_drt_async().await; // Wait 50ms tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; // Check that uptime is 50+ ms let uptime = drt.system_health.lock().uptime(); assert!( uptime >= std::time::Duration::from_millis(50), "Expected uptime to be at least 50ms, but got {:?}", uptime ); println!( "✓ DRT uptime test passed (system enabled): uptime = {:?}", uptime ); }) .await; } #[test] fn test_request_plane_mode_from_str() { assert_eq!( "nats".parse::().unwrap(), RequestPlaneMode::Nats ); assert_eq!( "http".parse::().unwrap(), RequestPlaneMode::Http ); assert_eq!( "tcp".parse::().unwrap(), RequestPlaneMode::Tcp ); assert_eq!( "NATS".parse::().unwrap(), RequestPlaneMode::Nats ); assert_eq!( "HTTP".parse::().unwrap(), RequestPlaneMode::Http ); assert_eq!( "TCP".parse::().unwrap(), RequestPlaneMode::Tcp ); assert!("invalid".parse::().is_err()); } #[test] fn test_request_plane_mode_display() { assert_eq!(RequestPlaneMode::Nats.to_string(), "nats"); assert_eq!(RequestPlaneMode::Http.to_string(), "http"); assert_eq!(RequestPlaneMode::Tcp.to_string(), "tcp"); } }