// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 use crate::component::{Component, Instance}; use crate::pipeline::PipelineError; use crate::pipeline::network::manager::NetworkManager; use crate::service::{ServiceClient, ServiceSet}; use crate::storage::kv::{self, Store as _}; use crate::{ component::{self, ComponentBuilder, Endpoint, Namespace}, discovery::Discovery, metrics::PrometheusUpdateCallback, metrics::{MetricsHierarchy, MetricsRegistry}, transports::{etcd, nats, tcp}, }; use crate::{discovery, system_status_server, transports}; use super::utils::GracefulShutdownTracker; use crate::SystemHealth; use crate::runtime::Runtime; // Used instead of std::cell::OnceCell because get_or_try_init there is nightly use async_once_cell::OnceCell; use std::fmt; use std::sync::{Arc, OnceLock, Weak}; use std::time::Duration; use tokio::sync::watch::Receiver; use anyhow::Result; use derive_getters::Dissolve; use figment::error; use std::collections::HashMap; use tokio::sync::Mutex; use tokio_util::sync::CancellationToken; type InstanceMap = HashMap>>>; /// Distributed [Runtime] which provides access to shared resources across the cluster, this includes /// communication protocols and transports. #[derive(Clone)] pub struct DistributedRuntime { // local runtime runtime: Runtime, nats_client: Option, store: kv::Manager, network_manager: Arc, tcp_server: Arc>>, system_status_server: Arc>>, request_plane: RequestPlaneMode, // Service discovery client discovery_client: Arc, // Discovery metadata (only used for Kubernetes backend) // Shared with system status server to expose via /metadata endpoint discovery_metadata: Option>>, // local registry for components // the registry allows us to use share runtime resources across instances of the same component object. // take for example two instances of a client to the same remote component. The registry allows us to use // a single endpoint watcher for both clients, this keeps the number background tasking watching specific // paths in etcd to a minimum. component_registry: component::Registry, instance_sources: Arc>, // Health Status system_health: Arc>, // Local endpoint registry for in-process calls local_endpoint_registry: crate::local_endpoint_registry::LocalEndpointRegistry, // This hierarchy's own metrics registry metrics_registry: MetricsRegistry, // Registry for /engine/* route callbacks engine_routes: crate::engine_routes::EngineRouteRegistry, } impl MetricsHierarchy for DistributedRuntime { fn basename(&self) -> String { "".to_string() // drt has no basename. Basename only begins with the Namespace. } fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> { vec![] // drt is the root, so no parent hierarchies } fn get_metrics_registry(&self) -> &MetricsRegistry { &self.metrics_registry } } impl std::fmt::Debug for DistributedRuntime { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "DistributedRuntime") } } impl DistributedRuntime { pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result { let (selected_kv_store, nats_config, request_plane) = config.dissolve(); let runtime_clone = runtime.clone(); let store = match selected_kv_store { kv::Selector::Etcd(etcd_config) => { let etcd_client = etcd::Client::new(*etcd_config, runtime_clone).await.inspect_err(|err| // The returned error doesn't show because of a dropped runtime error, so // log it first. tracing::error!(%err, "Could not connect to etcd. Pass `--store-kv ..` to use a different backend or start etcd."))?; kv::Manager::etcd(etcd_client) } kv::Selector::File(root) => kv::Manager::file(runtime.primary_token(), root), kv::Selector::Memory => kv::Manager::memory(), }; let nats_client = match nats_config { Some(nc) => Some(nc.connect().await?), None => None, }; // Start system status server for health and metrics if enabled in configuration let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default(); // IMPORTANT: We must extract cancel_token from runtime BEFORE moving runtime into the struct below. // This is because after moving, runtime is no longer accessible in this scope (ownership rules). let cancel_token = if config.system_server_enabled() { Some(runtime.clone().child_token()) } else { None }; let starting_health_status = config.starting_health_status.clone(); let use_endpoint_health_status = config.use_endpoint_health_status.clone(); let health_endpoint_path = config.system_health_path.clone(); let live_endpoint_path = config.system_live_path.clone(); let system_health = Arc::new(parking_lot::Mutex::new(SystemHealth::new( starting_health_status, use_endpoint_health_status, health_endpoint_path, live_endpoint_path, ))); // Initialize discovery client based on backend configuration let discovery_backend = std::env::var("DYN_DISCOVERY_BACKEND").unwrap_or_else(|_| "kv_store".to_string()); let (discovery_client, discovery_metadata) = match discovery_backend.as_str() { "kubernetes" => { tracing::info!("Initializing Kubernetes discovery backend"); let metadata = Arc::new(tokio::sync::RwLock::new( crate::discovery::DiscoveryMetadata::new(), )); let client = crate::discovery::KubeDiscoveryClient::new( metadata.clone(), runtime.primary_token(), ) .await .inspect_err( |err| tracing::error!(%err, "Failed to initialize Kubernetes discovery client"), )?; (Arc::new(client) as Arc, Some(metadata)) } _ => { tracing::info!("Initializing KV store discovery backend"); use crate::discovery::KVStoreDiscovery; ( Arc::new(KVStoreDiscovery::new( store.clone(), runtime.primary_token(), )) as Arc, None, ) } }; let component_registry = component::Registry::new(); // NetworkManager for request plane let network_manager = NetworkManager::new( runtime.child_token(), nats_client.clone().map(|c| c.client().clone()), component_registry.clone(), request_plane, ); let distributed_runtime = Self { runtime, store, network_manager: Arc::new(network_manager), nats_client, tcp_server: Arc::new(OnceCell::new()), system_status_server: Arc::new(OnceLock::new()), discovery_client, discovery_metadata, component_registry, instance_sources: Arc::new(Mutex::new(HashMap::new())), metrics_registry: crate::MetricsRegistry::new(), system_health, request_plane, local_endpoint_registry: crate::local_endpoint_registry::LocalEndpointRegistry::new(), engine_routes: crate::engine_routes::EngineRouteRegistry::new(), }; // Initialize the uptime gauge in SystemHealth distributed_runtime .system_health .lock() .initialize_uptime_gauge(&distributed_runtime)?; // Handle system status server initialization if let Some(cancel_token) = cancel_token { // System server is enabled - start both the state and HTTP server let host = config.system_host.clone(); let port = config.system_port as u16; // Start system status server (it creates SystemStatusState internally) match crate::system_status_server::spawn_system_status_server( &host, port, cancel_token, Arc::new(distributed_runtime.clone()), distributed_runtime.discovery_metadata.clone(), ) .await { Ok((addr, handle)) => { tracing::info!("System status server started successfully on {}", addr); // Store system status server information let system_status_server_info = crate::system_status_server::SystemStatusServerInfo::new( addr, Some(handle), ); // Initialize the system_status_server field distributed_runtime .system_status_server .set(Arc::new(system_status_server_info)) .expect("System status server info should only be set once"); } Err(e) => { tracing::error!("System status server startup failed: {}", e); } } } else { // System server HTTP is disabled, but uptime metrics are still being tracked via SystemHealth tracing::debug!( "System status server HTTP endpoints disabled, but uptime metrics are being tracked" ); } // Start health check manager if enabled if config.health_check_enabled { let health_check_config = crate::health_check::HealthCheckConfig { canary_wait_time: std::time::Duration::from_secs(config.canary_wait_time_secs), request_timeout: std::time::Duration::from_secs( config.health_check_request_timeout_secs, ), }; // Start the health check manager (spawns per-endpoint monitoring tasks) match crate::health_check::start_health_check_manager( distributed_runtime.clone(), Some(health_check_config), ) .await { Ok(()) => tracing::info!( "Health check manager started (canary_wait_time: {}s, request_timeout: {}s)", config.canary_wait_time_secs, config.health_check_request_timeout_secs ), Err(e) => tracing::error!("Health check manager failed to start: {}", e), } } Ok(distributed_runtime) } pub async fn from_settings(runtime: Runtime) -> Result { let config = DistributedConfig::from_settings(); Self::new(runtime, config).await } pub fn runtime(&self) -> &Runtime { &self.runtime } pub fn primary_token(&self) -> CancellationToken { self.runtime.primary_token() } // TODO: Don't hand out pointers, instead have methods to use the registry in friendly ways // (without being aware of async locks and so on) pub fn component_registry(&self) -> &component::Registry { &self.component_registry } // TODO: Don't hand out pointers, instead provide system health related services. pub fn system_health(&self) -> Arc> { self.system_health.clone() } /// Get the local endpoint registry for in-process endpoint calls pub fn local_endpoint_registry( &self, ) -> &crate::local_endpoint_registry::LocalEndpointRegistry { &self.local_endpoint_registry } /// Get the engine route registry for registering custom /engine/* routes pub fn engine_routes(&self) -> &crate::engine_routes::EngineRouteRegistry { &self.engine_routes } pub fn connection_id(&self) -> u64 { self.discovery_client.instance_id() } pub fn shutdown(&self) { self.runtime.shutdown(); self.store.shutdown(); } /// Create a [`Namespace`] pub fn namespace(&self, name: impl Into) -> Result { Namespace::new(self.clone(), name.into()) } /// Returns the discovery interface for service registration and discovery pub fn discovery(&self) -> Arc { self.discovery_client.clone() } pub async fn tcp_server(&self) -> Result> { Ok(self .tcp_server .get_or_try_init(async move { let options = tcp::server::ServerOptions::default(); let server = tcp::server::TcpStreamServer::new(options).await?; Ok::<_, PipelineError>(server) }) .await? .clone()) } /// Get the network manager /// /// The network manager consolidates all network configuration and provides /// unified access to request plane servers and clients. pub fn network_manager(&self) -> Arc { self.network_manager.clone() } /// Get the request plane server (convenience method) /// /// This is a shortcut for `network_manager().await?.server().await`. pub async fn request_plane_server( &self, ) -> Result> { self.network_manager().server().await } /// Get system status server information if available pub fn system_status_server_info( &self, ) -> Option> { self.system_status_server.get().cloned() } /// An interface to store things outside of the process. Usually backed by something like etcd. /// Currently does key-value, but will grow to include whatever we need to store. pub fn store(&self) -> &kv::Manager { &self.store } /// How the frontend should talk to the backend. pub fn request_plane(&self) -> RequestPlaneMode { self.request_plane } pub fn child_token(&self) -> CancellationToken { self.runtime.child_token() } pub(crate) fn graceful_shutdown_tracker(&self) -> Arc { self.runtime.graceful_shutdown_tracker() } pub fn instance_sources(&self) -> Arc> { self.instance_sources.clone() } /// TODO: This is a temporary KV router measure for component/component.rs EventPublisher impl for /// Component, to allow it to publish to NATS. KV Router is the only user. pub async fn kv_router_nats_publish( &self, subject: String, payload: bytes::Bytes, ) -> anyhow::Result<()> { let Some(nats_client) = self.nats_client.as_ref() else { anyhow::bail!("KV router's EventPublisher requires NATS"); }; Ok(nats_client.client().publish(subject, payload).await?) } /// TODO: This is a temporary KV router measure for component/component.rs EventSubscriber impl for /// Component, to allow it to subscribe to NATS. KV Router is the only user. pub(crate) async fn kv_router_nats_subscribe( &self, subject: String, ) -> Result { let Some(nats_client) = self.nats_client.as_ref() else { anyhow::bail!("KV router's EventSubscriber requires NATS"); }; Ok(nats_client.client().subscribe(subject).await?) } /// TODO (karenc): This is a temporary KV router measure for worker query requests. /// Allows KV Router to perform request/reply with workers. (versus the pub/sub pattern above) /// KV Router is the only user, made public for use in dynamo-llm crate pub async fn kv_router_nats_request( &self, subject: String, payload: bytes::Bytes, timeout: std::time::Duration, ) -> anyhow::Result { let Some(nats_client) = self.nats_client.as_ref() else { anyhow::bail!("KV router's request requires NATS"); }; let response = tokio::time::timeout(timeout, nats_client.client().request(subject, payload)) .await .map_err(|_| anyhow::anyhow!("Request timed out after {:?}", timeout))??; Ok(response) } /// DEPRECATED: This method exists only for NATS request plane support. /// Once everything uses the TCP request plane, this can be removed along with /// the NATS service registration infrastructure. /// /// Returns a receiver that signals when the NATS service registration is complete. /// The caller should use `blocking_recv()` to wait for completion. pub fn register_nats_service( &self, component: Component, ) -> tokio::sync::mpsc::Receiver> { // Create a oneshot-style channel (capacity 1) to signal completion let (tx, rx) = tokio::sync::mpsc::channel::>(1); let drt = self.clone(); self.runtime().secondary().spawn(async move { let service_name = component.service_name(); // Pre-check to save cost of creating the service, but don't hold the lock if drt .component_registry() .inner .lock() .await .services .contains_key(&service_name) { // The NATS service is per component, but it is called from `serve_endpoint`, and there // are often multiple endpoints for a component (e.g. `clear_kv_blocks` and `generate`). tracing::trace!("Service {service_name} already exists"); // Signal success - service already exists let _ = tx.send(Ok(())).await; return; } let Some(nats_client) = drt.nats_client.as_ref() else { tracing::error!("Cannot create NATS service without NATS."); let _ = tx .send(Err("Cannot create NATS service without NATS".to_string())) .await; return; }; let description = None; let nats_service = match crate::component::service::build_nats_service( nats_client, &component, description, ) .await { Ok(service) => service, Err(err) => { tracing::error!(error = %err, component = service_name, "Failed to build NATS service"); let _ = tx.send(Err(format!("Failed to build NATS service: {err}"))).await; return; } }; let mut guard = drt.component_registry().inner.lock().await; if !guard.services.contains_key(&service_name) { // Normal case guard.services.insert(service_name.clone(), nats_service); tracing::info!("Added NATS service {service_name}"); drop(guard); } else { drop(guard); let _ = nats_service.stop().await; // The NATS service is per component, but it is called from `serve_endpoint`, and there // are often multiple endpoints for a component (e.g. `clear_kv_blocks` and `generate`). // TODO: Is this still true? } // Signal completion - service registered successfully let _ = tx.send(Ok(())).await; }); rx } } #[derive(Dissolve)] pub struct DistributedConfig { pub store_backend: kv::Selector, pub nats_config: Option, pub request_plane: RequestPlaneMode, } impl DistributedConfig { pub fn from_settings() -> DistributedConfig { let request_plane = RequestPlaneMode::from_env(); // NATS is used for more than just NATS request-plane RPC: // - KV router events (JetStream or NATS core + local indexer) // - inter-router replica sync (NATS core) // // Historically we only connected to NATS when the request plane was NATS, which made // `DYN_REQUEST_PLANE=tcp|http` incompatible with KV routing modes that rely on NATS. // If a NATS server is configured via env, enable the client regardless of request plane. let nats_enabled = request_plane.is_nats() || std::env::var(crate::config::environment_names::nats::NATS_SERVER).is_ok(); DistributedConfig { store_backend: kv::Selector::Etcd(Box::default()), nats_config: if nats_enabled { Some(nats::ClientOptions::default()) } else { None }, request_plane, } } pub fn for_cli() -> DistributedConfig { let etcd_config = etcd::ClientOptions { attach_lease: false, ..Default::default() }; let request_plane = RequestPlaneMode::from_env(); let nats_enabled = request_plane.is_nats() || std::env::var(crate::config::environment_names::nats::NATS_SERVER).is_ok(); DistributedConfig { store_backend: kv::Selector::Etcd(Box::new(etcd_config)), nats_config: if nats_enabled { Some(nats::ClientOptions::default()) } else { None }, request_plane, } } /// A DistributedConfig that isn't distributed, for when the frontend and backend are in the /// same process. pub fn process_local() -> DistributedConfig { DistributedConfig { store_backend: kv::Selector::Memory, nats_config: None, // This won't be used in process local, so we likely need a "none" option to // communicate that and avoid opening the ports. request_plane: RequestPlaneMode::Tcp, } } } /// Request plane transport mode configuration /// /// This determines how requests are distributed from routers to workers: /// - `Nats`: Use NATS for request distribution (legacy) /// - `Http`: Use HTTP/2 for request distribution /// - `Tcp`: Use raw TCP for request distribution with msgpack support (default) #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum RequestPlaneMode { /// Use NATS for request plane Nats, /// Use HTTP/2 for request plane Http, /// Use raw TCP for request plane with msgpack support Tcp, } impl Default for RequestPlaneMode { fn default() -> Self { Self::Tcp } } impl fmt::Display for RequestPlaneMode { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::Nats => write!(f, "nats"), Self::Http => write!(f, "http"), Self::Tcp => write!(f, "tcp"), } } } impl std::str::FromStr for RequestPlaneMode { type Err = anyhow::Error; fn from_str(s: &str) -> std::result::Result { match s.to_lowercase().as_str() { "nats" => Ok(Self::Nats), "http" => Ok(Self::Http), "tcp" => Ok(Self::Tcp), _ => Err(anyhow::anyhow!( "Invalid request plane mode: '{}'. Valid options are: 'nats', 'http', 'tcp'", s )), } } } impl RequestPlaneMode { /// Get the request plane mode from environment variable (uncached) /// Reads from `DYN_REQUEST_PLANE` environment variable. fn from_env() -> Self { std::env::var("DYN_REQUEST_PLANE") .ok() .and_then(|s| s.parse().ok()) .unwrap_or_default() } pub fn is_nats(&self) -> bool { matches!(self, RequestPlaneMode::Nats) } } pub mod distributed_test_utils { //! Common test helper functions for DistributedRuntime tests /// Helper function to create a DRT instance for integration-only tests. /// Uses from_current to leverage existing tokio runtime /// Note: Settings are read from environment variables inside DistributedRuntime::from_settings #[cfg(feature = "integration")] pub async fn create_test_drt_async() -> super::DistributedRuntime { use crate::{storage::kv, transports::nats}; let rt = crate::Runtime::from_current().unwrap(); let config = super::DistributedConfig { store_backend: kv::Selector::Memory, nats_config: Some(nats::ClientOptions::default()), request_plane: crate::distributed::RequestPlaneMode::default(), }; super::DistributedRuntime::new(rt, config).await.unwrap() } /// Helper function to create a DRT instance which points at /// a (shared) file-backed KV store and ephemeral NATS transport so that /// multiple DRT instances may observe the same registration state. /// NOTE: This gets around the fact that create_test_drt_async() is /// hardcoded to spin up a memory-backed discovery store /// which means we can't share discovery state across runtimes. pub async fn create_test_shared_drt_async( store_path: &std::path::Path, ) -> super::DistributedRuntime { use crate::{storage::kv, transports::nats}; let rt = crate::Runtime::from_current().unwrap(); let config = super::DistributedConfig { store_backend: kv::Selector::File(store_path.to_path_buf()), nats_config: Some(nats::ClientOptions::default()), request_plane: crate::distributed::RequestPlaneMode::default(), }; super::DistributedRuntime::new(rt, config).await.unwrap() } } #[cfg(all(test, feature = "integration"))] mod tests { use super::RequestPlaneMode; use super::distributed_test_utils::create_test_drt_async; #[tokio::test] async fn test_drt_uptime_after_delay_system_disabled() { use crate::config::environment_names::runtime::system as env_system; // Test uptime with system status server disabled temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async { // Start a DRT let drt = create_test_drt_async().await; // Wait 50ms tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; // Check that uptime is 50+ ms let uptime = drt.system_health.lock().uptime(); assert!( uptime >= std::time::Duration::from_millis(50), "Expected uptime to be at least 50ms, but got {:?}", uptime ); println!( "✓ DRT uptime test passed (system disabled): uptime = {:?}", uptime ); }) .await; } #[tokio::test] async fn test_drt_uptime_after_delay_system_enabled() { use crate::config::environment_names::runtime::system as env_system; // Test uptime with system status server enabled temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, Some("8081"))], async { // Start a DRT let drt = create_test_drt_async().await; // Wait 50ms tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; // Check that uptime is 50+ ms let uptime = drt.system_health.lock().uptime(); assert!( uptime >= std::time::Duration::from_millis(50), "Expected uptime to be at least 50ms, but got {:?}", uptime ); println!( "✓ DRT uptime test passed (system enabled): uptime = {:?}", uptime ); }) .await; } #[test] fn test_request_plane_mode_from_str() { assert_eq!( "nats".parse::().unwrap(), RequestPlaneMode::Nats ); assert_eq!( "http".parse::().unwrap(), RequestPlaneMode::Http ); assert_eq!( "tcp".parse::().unwrap(), RequestPlaneMode::Tcp ); assert_eq!( "NATS".parse::().unwrap(), RequestPlaneMode::Nats ); assert_eq!( "HTTP".parse::().unwrap(), RequestPlaneMode::Http ); assert_eq!( "TCP".parse::().unwrap(), RequestPlaneMode::Tcp ); assert!("invalid".parse::().is_err()); } #[test] fn test_request_plane_mode_display() { assert_eq!(RequestPlaneMode::Nats.to_string(), "nats"); assert_eq!(RequestPlaneMode::Http.to_string(), "http"); assert_eq!(RequestPlaneMode::Tcp.to_string(), "tcp"); } }