lib.rs 7.41 KB
Newer Older
1
2
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
Ryan Olson's avatar
Ryan Olson committed
3

Neelay Shah's avatar
Neelay Shah committed
4
//! Dynamo
Ryan Olson's avatar
Ryan Olson committed
5
6
7
8

#![allow(dead_code)]
#![allow(unused_imports)]

9
10
use std::{
    collections::HashMap,
11
    sync::{Arc, OnceLock, Weak},
12
};
Ryan Olson's avatar
Ryan Olson committed
13

Ryan Olson's avatar
Ryan Olson committed
14
pub use anyhow::{
15
    Context as ErrorContext, Error, Ok as OK, Result, anyhow as error, bail as raise,
Ryan Olson's avatar
Ryan Olson committed
16
};
Ryan Olson's avatar
Ryan Olson committed
17
18
19
20
21
22
23

use async_once_cell::OnceCell;

mod config;
pub use config::RuntimeConfig;

pub mod component;
24
pub mod compute;
Ryan Olson's avatar
Ryan Olson committed
25
26
pub mod discovery;
pub mod engine;
27
pub mod health_check;
28
29
pub mod system_status_server;
pub use system_status_server::SystemStatusServerInfo;
30
pub mod instances;
31
pub mod logging;
32
pub mod metrics;
Ryan Olson's avatar
Ryan Olson committed
33
pub mod pipeline;
34
pub mod prelude;
Ryan Olson's avatar
Ryan Olson committed
35
pub mod protocols;
Ryan Olson's avatar
Ryan Olson committed
36
pub mod runnable;
Ryan Olson's avatar
Ryan Olson committed
37
38
pub mod runtime;
pub mod service;
39
pub mod slug;
40
pub mod storage;
41
pub mod system_health;
Ryan Olson's avatar
Ryan Olson committed
42
pub mod traits;
Ryan Olson's avatar
Ryan Olson committed
43
pub mod transports;
Ryan Olson's avatar
Ryan Olson committed
44
pub mod utils;
Ryan Olson's avatar
Ryan Olson committed
45
46
47
pub mod worker;

pub mod distributed;
48
pub use distributed::distributed_test_utils;
Ryan Olson's avatar
Ryan Olson committed
49
pub use futures::stream;
50
pub use system_health::{HealthCheckTarget, SystemHealth};
Ryan Olson's avatar
Ryan Olson committed
51
52
53
pub use tokio_util::sync::CancellationToken;
pub use worker::Worker;

54
55
56
use crate::{
    metrics::prometheus_names::distributed_runtime, storage::key_value_store::KeyValueStore,
};
57

58
use component::{Endpoint, InstanceSource};
59
use utils::GracefulShutdownTracker;
60

61
62
use config::HealthStatus;

Neelay Shah's avatar
Neelay Shah committed
63
/// Types of Tokio runtimes that can be used to construct a Dynamo [Runtime].
Ryan Olson's avatar
Ryan Olson committed
64
65
66
67
68
69
70
71
72
73
74
#[derive(Clone)]
enum RuntimeType {
    Shared(Arc<tokio::runtime::Runtime>),
    External(tokio::runtime::Handle),
}

/// Local [Runtime] which provides access to shared resources local to the physical node/machine.
#[derive(Debug, Clone)]
pub struct Runtime {
    id: Arc<String>,
    primary: RuntimeType,
75
    secondary: RuntimeType,
Ryan Olson's avatar
Ryan Olson committed
76
    cancellation_token: CancellationToken,
77
78
    endpoint_shutdown_token: CancellationToken,
    graceful_shutdown_tracker: Arc<GracefulShutdownTracker>,
79
80
    compute_pool: Option<Arc<compute::ComputePool>>,
    block_in_place_permits: Option<Arc<tokio::sync::Semaphore>>,
Ryan Olson's avatar
Ryan Olson committed
81
82
}

83
84
85
86
87
88
89
90
/// Type alias for runtime callback functions to reduce complexity
///
/// This type represents an Arc-wrapped callback function that can be:
/// - Shared efficiently across multiple threads and contexts
/// - Cloned without duplicating the underlying closure
/// - Used in generic contexts requiring 'static lifetime
///
/// The Arc wrapper is included in the type to make sharing explicit.
91
92
93
94
95
type PrometheusUpdateCallback = Arc<dyn Fn() -> anyhow::Result<()> + Send + Sync + 'static>;

/// Type alias for exposition text callback functions that return Prometheus text
type PrometheusExpositionFormatCallback =
    Arc<dyn Fn() -> anyhow::Result<String> + Send + Sync + 'static>;
96
97
98
99
100

/// Structure to hold Prometheus registries and associated callbacks for a given hierarchy
pub struct MetricsRegistryEntry {
    /// The Prometheus registry for this prefix
    pub prometheus_registry: prometheus::Registry,
101
102
103
104
    /// List of update callbacks invoked before metrics are scraped
    pub prometheus_update_callbacks: Vec<PrometheusUpdateCallback>,
    /// List of callbacks that return Prometheus exposition text to be appended to metrics output
    pub prometheus_expfmt_callbacks: Vec<PrometheusExpositionFormatCallback>,
105
106
107
108
109
110
111
}

impl MetricsRegistryEntry {
    /// Create a new metrics registry entry with an empty registry and no callbacks
    pub fn new() -> Self {
        Self {
            prometheus_registry: prometheus::Registry::new(),
112
113
            prometheus_update_callbacks: Vec::new(),
            prometheus_expfmt_callbacks: Vec::new(),
114
115
116
117
        }
    }

    /// Add a callback function that receives a reference to any MetricsRegistry
118
119
120
121
122
123
124
    pub fn add_prometheus_update_callback(&mut self, callback: PrometheusUpdateCallback) {
        self.prometheus_update_callbacks.push(callback);
    }

    /// Add an exposition text callback that returns Prometheus text
    pub fn add_prometheus_expfmt_callback(&mut self, callback: PrometheusExpositionFormatCallback) {
        self.prometheus_expfmt_callbacks.push(callback);
125
126
    }

127
128
129
    /// Execute all update callbacks and return their results
    pub fn execute_prometheus_update_callbacks(&self) -> Vec<anyhow::Result<()>> {
        self.prometheus_update_callbacks
130
131
132
133
134
            .iter()
            .map(|callback| callback())
            .collect()
    }

135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
    /// Execute all exposition text callbacks and return their concatenated text
    pub fn execute_prometheus_expfmt_callbacks(&self) -> String {
        let mut result = String::new();
        for callback in &self.prometheus_expfmt_callbacks {
            match callback() {
                Ok(text) => {
                    if !text.is_empty() {
                        if !result.is_empty() && !result.ends_with('\n') {
                            result.push('\n');
                        }
                        result.push_str(&text);
                    }
                }
                Err(e) => {
                    tracing::error!("Error executing exposition text callback: {}", e);
                }
            }
        }
        result
    }

156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
    /// Returns true if a metric with the given name already exists in the Prometheus registry
    pub fn has_metric_named(&self, metric_name: &str) -> bool {
        self.prometheus_registry
            .gather()
            .iter()
            .any(|mf| mf.name() == metric_name)
    }
}

impl Default for MetricsRegistryEntry {
    fn default() -> Self {
        Self::new()
    }
}

impl Clone for MetricsRegistryEntry {
    fn clone(&self) -> Self {
        Self {
            prometheus_registry: self.prometheus_registry.clone(),
175
176
            prometheus_update_callbacks: Vec::new(), // Callbacks cannot be cloned, so we start with an empty list
            prometheus_expfmt_callbacks: Vec::new(), // Callbacks cannot be cloned, so we start with an empty list
177
178
179
180
        }
    }
}

Ryan Olson's avatar
Ryan Olson committed
181
182
183
184
185
186
187
188
/// Distributed [Runtime] which provides access to shared resources across the cluster, this includes
/// communication protocols and transports.
#[derive(Clone)]
pub struct DistributedRuntime {
    // local runtime
    runtime: Runtime,

    // we might consider a unifed transport manager here
189
    etcd_client: Option<transports::etcd::Client>,
190
    nats_client: Option<transports::nats::Client>,
191
    store: Arc<dyn KeyValueStore>,
Ryan Olson's avatar
Ryan Olson committed
192
    tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
193
    system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
Ryan Olson's avatar
Ryan Olson committed
194
195
196

    // local registry for components
    // the registry allows us to use share runtime resources across instances of the same component object.
197
    // take for example two instances of a client to the same remote component. The registry allows us to use
Ryan Olson's avatar
Ryan Olson committed
198
199
200
    // a single endpoint watcher for both clients, this keeps the number background tasking watching specific
    // paths in etcd to a minimum.
    component_registry: component::Registry,
201
202
203
204

    // Will only have static components that are not discoverable via etcd, they must be know at
    // startup. Will not start etcd.
    is_static: bool,
205

206
    instance_sources: Arc<tokio::sync::Mutex<HashMap<Endpoint, Weak<InstanceSource>>>>,
207

208
    // Health Status
209
    system_health: Arc<parking_lot::Mutex<SystemHealth>>,
210

211
212
213
    // This map associates metric prefixes with their corresponding Prometheus registries and callbacks.
    // Uses RwLock for better concurrency - multiple threads can read (execute callbacks) simultaneously.
    hierarchy_to_metricsregistry: Arc<std::sync::RwLock<HashMap<String, MetricsRegistryEntry>>>,
Ryan Olson's avatar
Ryan Olson committed
214
}