lib.rs 9.25 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
Ryan Olson's avatar
Ryan Olson committed
15

Neelay Shah's avatar
Neelay Shah committed
16
//! Dynamo
Ryan Olson's avatar
Ryan Olson committed
17
18
19
20

#![allow(dead_code)]
#![allow(unused_imports)]

21
22
use std::{
    collections::HashMap,
23
    sync::{Arc, OnceLock, Weak},
24
    time::Instant,
25
};
Ryan Olson's avatar
Ryan Olson committed
26

Ryan Olson's avatar
Ryan Olson committed
27
pub use anyhow::{
28
    Context as ErrorContext, Error, Ok as OK, Result, anyhow as error, bail as raise,
Ryan Olson's avatar
Ryan Olson committed
29
};
Ryan Olson's avatar
Ryan Olson committed
30
31
32
33
34
35
36
37
38

use async_once_cell::OnceCell;

mod config;
pub use config::RuntimeConfig;

pub mod component;
pub mod discovery;
pub mod engine;
39
40
pub mod system_status_server;
pub use system_status_server::SystemStatusServerInfo;
41
pub mod instances;
42
pub mod logging;
43
pub mod metrics;
Ryan Olson's avatar
Ryan Olson committed
44
pub mod pipeline;
45
pub mod prelude;
Ryan Olson's avatar
Ryan Olson committed
46
pub mod protocols;
Ryan Olson's avatar
Ryan Olson committed
47
pub mod runnable;
Ryan Olson's avatar
Ryan Olson committed
48
49
pub mod runtime;
pub mod service;
50
pub mod slug;
51
pub mod storage;
Ryan Olson's avatar
Ryan Olson committed
52
pub mod traits;
Ryan Olson's avatar
Ryan Olson committed
53
pub mod transports;
Ryan Olson's avatar
Ryan Olson committed
54
pub mod utils;
Ryan Olson's avatar
Ryan Olson committed
55
56
57
pub mod worker;

pub mod distributed;
58
pub use distributed::distributed_test_utils;
Ryan Olson's avatar
Ryan Olson committed
59
pub use futures::stream;
Ryan Olson's avatar
Ryan Olson committed
60
61
62
pub use tokio_util::sync::CancellationToken;
pub use worker::Worker;

63
64
use crate::metrics::prometheus_names::distributed_runtime;

65
use component::{Endpoint, InstanceSource};
66
use utils::GracefulShutdownTracker;
67

68
69
use config::HealthStatus;

Neelay Shah's avatar
Neelay Shah committed
70
/// Types of Tokio runtimes that can be used to construct a Dynamo [Runtime].
Ryan Olson's avatar
Ryan Olson committed
71
72
73
74
75
76
77
78
79
80
81
#[derive(Clone)]
enum RuntimeType {
    Shared(Arc<tokio::runtime::Runtime>),
    External(tokio::runtime::Handle),
}

/// Local [Runtime] which provides access to shared resources local to the physical node/machine.
#[derive(Debug, Clone)]
pub struct Runtime {
    id: Arc<String>,
    primary: RuntimeType,
82
    secondary: RuntimeType,
Ryan Olson's avatar
Ryan Olson committed
83
    cancellation_token: CancellationToken,
84
85
    endpoint_shutdown_token: CancellationToken,
    graceful_shutdown_tracker: Arc<GracefulShutdownTracker>,
Ryan Olson's avatar
Ryan Olson committed
86
87
}

88
89
90
91
92
93
94
95
96
/// Current Health Status
/// If use_endpoint_health_status is set then
/// initialize the endpoint_health hashmap to the
/// starting health status
#[derive(Clone)]
pub struct SystemHealth {
    system_health: HealthStatus,
    endpoint_health: HashMap<String, HealthStatus>,
    use_endpoint_health_status: Vec<String>,
97
98
    health_path: String,
    live_path: String,
99
100
    start_time: Instant,
    uptime_gauge: OnceLock<prometheus::Gauge>,
101
102
103
104
105
106
}

impl SystemHealth {
    pub fn new(
        starting_health_status: HealthStatus,
        use_endpoint_health_status: Vec<String>,
107
108
        health_path: String,
        live_path: String,
109
110
111
112
113
114
115
116
117
    ) -> Self {
        let mut endpoint_health = HashMap::new();
        for endpoint in &use_endpoint_health_status {
            endpoint_health.insert(endpoint.clone(), starting_health_status.clone());
        }
        SystemHealth {
            system_health: starting_health_status,
            endpoint_health,
            use_endpoint_health_status,
118
119
            health_path,
            live_path,
120
121
            start_time: Instant::now(),
            uptime_gauge: OnceLock::new(),
122
123
124
125
126
127
        }
    }
    pub fn set_health_status(&mut self, status: HealthStatus) {
        self.system_health = status;
    }

128
129
    pub fn set_endpoint_health_status(&mut self, endpoint: &str, status: HealthStatus) {
        self.endpoint_health.insert(endpoint.to_string(), status);
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
    }

    /// Returns the overall health status and endpoint health statuses
    pub fn get_health_status(&self) -> (bool, HashMap<String, String>) {
        let mut endpoints: HashMap<String, String> = HashMap::new();
        for (endpoint, ready) in &self.endpoint_health {
            endpoints.insert(
                endpoint.clone(),
                if *ready == HealthStatus::Ready {
                    "ready".to_string()
                } else {
                    "notready".to_string()
                },
            );
        }

        let healthy = if !self.use_endpoint_health_status.is_empty() {
            self.use_endpoint_health_status.iter().all(|endpoint| {
                self.endpoint_health
                    .get(endpoint)
                    .is_some_and(|status| *status == HealthStatus::Ready)
            })
        } else {
            self.system_health == HealthStatus::Ready
        };

        (healthy, endpoints)
    }
158
159
160
161
162
163
164

    /// Initialize the uptime gauge using the provided metrics registry
    pub fn initialize_uptime_gauge<T: crate::metrics::MetricsRegistry>(
        &self,
        registry: &T,
    ) -> anyhow::Result<()> {
        let gauge = registry.create_gauge(
165
            distributed_runtime::UPTIME_SECONDS,
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
            "Total uptime of the DistributedRuntime in seconds",
            &[],
        )?;
        self.uptime_gauge
            .set(gauge)
            .map_err(|_| anyhow::anyhow!("uptime_gauge already initialized"))?;
        Ok(())
    }

    /// Get the current uptime as a Duration
    pub fn uptime(&self) -> std::time::Duration {
        self.start_time.elapsed()
    }

    /// Update the uptime gauge with the current uptime value
    pub fn update_uptime_gauge(&self) {
        if let Some(gauge) = self.uptime_gauge.get() {
            gauge.set(self.uptime().as_secs_f64());
        }
    }
186
187
}

188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
/// Type alias for runtime callback functions to reduce complexity
///
/// This type represents an Arc-wrapped callback function that can be:
/// - Shared efficiently across multiple threads and contexts
/// - Cloned without duplicating the underlying closure
/// - Used in generic contexts requiring 'static lifetime
///
/// The Arc wrapper is included in the type to make sharing explicit.
type RuntimeCallback = Arc<dyn Fn() -> anyhow::Result<()> + Send + Sync + 'static>;

/// Structure to hold Prometheus registries and associated callbacks for a given hierarchy
pub struct MetricsRegistryEntry {
    /// The Prometheus registry for this prefix
    pub prometheus_registry: prometheus::Registry,
    /// List of function callbacks that receive a reference to any MetricsRegistry
    pub runtime_callbacks: Vec<RuntimeCallback>,
}

impl MetricsRegistryEntry {
    /// Create a new metrics registry entry with an empty registry and no callbacks
    pub fn new() -> Self {
        Self {
            prometheus_registry: prometheus::Registry::new(),
            runtime_callbacks: Vec::new(),
        }
    }

    /// Add a callback function that receives a reference to any MetricsRegistry
    pub fn add_callback(&mut self, callback: RuntimeCallback) {
        self.runtime_callbacks.push(callback);
    }

    /// Execute all runtime callbacks and return their results
    pub fn execute_callbacks(&self) -> Vec<anyhow::Result<()>> {
        self.runtime_callbacks
            .iter()
            .map(|callback| callback())
            .collect()
    }

    /// Returns true if a metric with the given name already exists in the Prometheus registry
    pub fn has_metric_named(&self, metric_name: &str) -> bool {
        self.prometheus_registry
            .gather()
            .iter()
            .any(|mf| mf.name() == metric_name)
    }
}

impl Default for MetricsRegistryEntry {
    fn default() -> Self {
        Self::new()
    }
}

impl Clone for MetricsRegistryEntry {
    fn clone(&self) -> Self {
        Self {
            prometheus_registry: self.prometheus_registry.clone(),
            runtime_callbacks: Vec::new(), // Callbacks cannot be cloned, so we start with an empty list
        }
    }
}

Ryan Olson's avatar
Ryan Olson committed
252
253
254
255
256
257
258
259
/// Distributed [Runtime] which provides access to shared resources across the cluster, this includes
/// communication protocols and transports.
#[derive(Clone)]
pub struct DistributedRuntime {
    // local runtime
    runtime: Runtime,

    // we might consider a unifed transport manager here
260
    etcd_client: Option<transports::etcd::Client>,
Ryan Olson's avatar
Ryan Olson committed
261
262
    nats_client: transports::nats::Client,
    tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
263
    system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
Ryan Olson's avatar
Ryan Olson committed
264
265
266

    // local registry for components
    // the registry allows us to use share runtime resources across instances of the same component object.
267
    // take for example two instances of a client to the same remote component. The registry allows us to use
Ryan Olson's avatar
Ryan Olson committed
268
269
270
    // a single endpoint watcher for both clients, this keeps the number background tasking watching specific
    // paths in etcd to a minimum.
    component_registry: component::Registry,
271
272
273
274

    // Will only have static components that are not discoverable via etcd, they must be know at
    // startup. Will not start etcd.
    is_static: bool,
275

276
    instance_sources: Arc<tokio::sync::Mutex<HashMap<Endpoint, Weak<InstanceSource>>>>,
277

278
    // Health Status
279
    system_health: Arc<std::sync::Mutex<SystemHealth>>,
280

281
282
283
    // This map associates metric prefixes with their corresponding Prometheus registries and callbacks.
    // Uses RwLock for better concurrency - multiple threads can read (execute callbacks) simultaneously.
    hierarchy_to_metricsregistry: Arc<std::sync::RwLock<HashMap<String, MetricsRegistryEntry>>>,
Ryan Olson's avatar
Ryan Olson committed
284
}