// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. // SPDX-License-Identifier: Apache-2.0 // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. //! Dynamo #![allow(dead_code)] #![allow(unused_imports)] use std::{ collections::HashMap, sync::{Arc, Weak}, }; use tokio::sync::Mutex; pub use anyhow::{ anyhow as error, bail as raise, Context as ErrorContext, Error, Ok as OK, Result, }; use async_once_cell::OnceCell; mod config; pub use config::RuntimeConfig; pub mod component; pub mod discovery; pub mod engine; pub mod http_server; pub mod logging; pub mod metrics; pub mod pipeline; pub mod prelude; pub mod protocols; pub mod runnable; pub mod runtime; pub mod service; pub mod slug; pub mod storage; pub mod traits; pub mod transports; pub mod utils; pub mod worker; pub mod distributed; pub use futures::stream; pub use tokio_util::sync::CancellationToken; pub use worker::Worker; use component::{Endpoint, InstanceSource}; use config::HealthStatus; /// Types of Tokio runtimes that can be used to construct a Dynamo [Runtime]. #[derive(Clone)] enum RuntimeType { Shared(Arc), External(tokio::runtime::Handle), } /// Local [Runtime] which provides access to shared resources local to the physical node/machine. #[derive(Debug, Clone)] pub struct Runtime { id: Arc, primary: RuntimeType, secondary: RuntimeType, cancellation_token: CancellationToken, } /// Current Health Status /// If use_endpoint_health_status is set then /// initialize the endpoint_health hashmap to the /// starting health status #[derive(Clone)] pub struct SystemHealth { system_health: HealthStatus, endpoint_health: HashMap, use_endpoint_health_status: Vec, } impl SystemHealth { pub fn new( starting_health_status: HealthStatus, use_endpoint_health_status: Vec, ) -> Self { let mut endpoint_health = HashMap::new(); for endpoint in &use_endpoint_health_status { endpoint_health.insert(endpoint.clone(), starting_health_status.clone()); } SystemHealth { system_health: starting_health_status, endpoint_health, use_endpoint_health_status, } } pub fn set_health_status(&mut self, status: HealthStatus) { self.system_health = status; } pub fn set_endpoint_health_status(&mut self, endpoint: String, status: HealthStatus) { self.endpoint_health.insert(endpoint, status); } /// Returns the overall health status and endpoint health statuses pub fn get_health_status(&self) -> (bool, HashMap) { let mut endpoints: HashMap = HashMap::new(); for (endpoint, ready) in &self.endpoint_health { endpoints.insert( endpoint.clone(), if *ready == HealthStatus::Ready { "ready".to_string() } else { "notready".to_string() }, ); } let healthy = if !self.use_endpoint_health_status.is_empty() { self.use_endpoint_health_status.iter().all(|endpoint| { self.endpoint_health .get(endpoint) .is_some_and(|status| *status == HealthStatus::Ready) }) } else { self.system_health == HealthStatus::Ready }; (healthy, endpoints) } } /// Distributed [Runtime] which provides access to shared resources across the cluster, this includes /// communication protocols and transports. #[derive(Clone)] pub struct DistributedRuntime { // local runtime runtime: Runtime, // we might consider a unifed transport manager here etcd_client: Option, nats_client: transports::nats::Client, tcp_server: Arc>>, // local registry for components // the registry allows us to use share runtime resources across instances of the same component object. // take for example two instances of a client to the same remote component. The registry allows us to use // a single endpoint watcher for both clients, this keeps the number background tasking watching specific // paths in etcd to a minimum. component_registry: component::Registry, // Will only have static components that are not discoverable via etcd, they must be know at // startup. Will not start etcd. is_static: bool, instance_sources: Arc>>>, // Health Status system_health: Arc>, // This map associates metric prefixes with their corresponding Prometheus registries. prometheus_registries_by_prefix: Arc>>, }