"lib/vscode:/vscode.git/clone" did not exist on "ca63c49d83ed2d6ac7eda756eee5b5630336372e"
lib.rs 7.43 KB
Newer Older
1
2
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
Ryan Olson's avatar
Ryan Olson committed
3

Neelay Shah's avatar
Neelay Shah committed
4
//! Dynamo
Ryan Olson's avatar
Ryan Olson committed
5
6
7
8

#![allow(dead_code)]
#![allow(unused_imports)]

9
10
use std::{
    collections::HashMap,
11
    sync::{Arc, OnceLock, Weak},
12
};
Ryan Olson's avatar
Ryan Olson committed
13

Ryan Olson's avatar
Ryan Olson committed
14
pub use anyhow::{
15
    Context as ErrorContext, Error, Ok as OK, Result, anyhow as error, bail as raise,
Ryan Olson's avatar
Ryan Olson committed
16
};
Ryan Olson's avatar
Ryan Olson committed
17
18
19
20
21
22
23

use async_once_cell::OnceCell;

mod config;
pub use config::RuntimeConfig;

pub mod component;
24
pub mod compute;
Ryan Olson's avatar
Ryan Olson committed
25
26
pub mod discovery;
pub mod engine;
27
pub mod health_check;
28
29
pub mod system_status_server;
pub use system_status_server::SystemStatusServerInfo;
30
pub mod instances;
31
pub mod logging;
32
pub mod metrics;
Ryan Olson's avatar
Ryan Olson committed
33
pub mod pipeline;
34
pub mod prelude;
Ryan Olson's avatar
Ryan Olson committed
35
pub mod protocols;
Ryan Olson's avatar
Ryan Olson committed
36
pub mod runnable;
Ryan Olson's avatar
Ryan Olson committed
37
38
pub mod runtime;
pub mod service;
39
pub mod slug;
40
pub mod storage;
41
pub mod system_health;
Ryan Olson's avatar
Ryan Olson committed
42
pub mod traits;
Ryan Olson's avatar
Ryan Olson committed
43
pub mod transports;
Ryan Olson's avatar
Ryan Olson committed
44
pub mod utils;
Ryan Olson's avatar
Ryan Olson committed
45
46
47
pub mod worker;

pub mod distributed;
48
pub use distributed::distributed_test_utils;
Ryan Olson's avatar
Ryan Olson committed
49
pub use futures::stream;
50
pub use system_health::{HealthCheckTarget, SystemHealth};
Ryan Olson's avatar
Ryan Olson committed
51
52
53
pub use tokio_util::sync::CancellationToken;
pub use worker::Worker;

54
use crate::{
55
56
    metrics::prometheus_names::distributed_runtime,
    storage::key_value_store::{KeyValueStore, KeyValueStoreManager},
57
};
58

59
use component::{Endpoint, InstanceSource};
60
use utils::GracefulShutdownTracker;
61

62
63
use config::HealthStatus;

Neelay Shah's avatar
Neelay Shah committed
64
/// Types of Tokio runtimes that can be used to construct a Dynamo [Runtime].
Ryan Olson's avatar
Ryan Olson committed
65
66
67
68
69
70
71
72
73
74
75
#[derive(Clone)]
enum RuntimeType {
    Shared(Arc<tokio::runtime::Runtime>),
    External(tokio::runtime::Handle),
}

/// Local [Runtime] which provides access to shared resources local to the physical node/machine.
#[derive(Debug, Clone)]
pub struct Runtime {
    id: Arc<String>,
    primary: RuntimeType,
76
    secondary: RuntimeType,
Ryan Olson's avatar
Ryan Olson committed
77
    cancellation_token: CancellationToken,
78
79
    endpoint_shutdown_token: CancellationToken,
    graceful_shutdown_tracker: Arc<GracefulShutdownTracker>,
80
81
    compute_pool: Option<Arc<compute::ComputePool>>,
    block_in_place_permits: Option<Arc<tokio::sync::Semaphore>>,
Ryan Olson's avatar
Ryan Olson committed
82
83
}

84
85
86
87
88
89
90
91
/// Type alias for runtime callback functions to reduce complexity
///
/// This type represents an Arc-wrapped callback function that can be:
/// - Shared efficiently across multiple threads and contexts
/// - Cloned without duplicating the underlying closure
/// - Used in generic contexts requiring 'static lifetime
///
/// The Arc wrapper is included in the type to make sharing explicit.
92
93
94
95
96
type PrometheusUpdateCallback = Arc<dyn Fn() -> anyhow::Result<()> + Send + Sync + 'static>;

/// Type alias for exposition text callback functions that return Prometheus text
type PrometheusExpositionFormatCallback =
    Arc<dyn Fn() -> anyhow::Result<String> + Send + Sync + 'static>;
97
98
99
100
101

/// Structure to hold Prometheus registries and associated callbacks for a given hierarchy
pub struct MetricsRegistryEntry {
    /// The Prometheus registry for this prefix
    pub prometheus_registry: prometheus::Registry,
102
103
104
105
    /// List of update callbacks invoked before metrics are scraped
    pub prometheus_update_callbacks: Vec<PrometheusUpdateCallback>,
    /// List of callbacks that return Prometheus exposition text to be appended to metrics output
    pub prometheus_expfmt_callbacks: Vec<PrometheusExpositionFormatCallback>,
106
107
108
109
110
111
112
}

impl MetricsRegistryEntry {
    /// Create a new metrics registry entry with an empty registry and no callbacks
    pub fn new() -> Self {
        Self {
            prometheus_registry: prometheus::Registry::new(),
113
114
            prometheus_update_callbacks: Vec::new(),
            prometheus_expfmt_callbacks: Vec::new(),
115
116
117
118
        }
    }

    /// Add a callback function that receives a reference to any MetricsRegistry
119
120
121
122
123
124
125
    pub fn add_prometheus_update_callback(&mut self, callback: PrometheusUpdateCallback) {
        self.prometheus_update_callbacks.push(callback);
    }

    /// Add an exposition text callback that returns Prometheus text
    pub fn add_prometheus_expfmt_callback(&mut self, callback: PrometheusExpositionFormatCallback) {
        self.prometheus_expfmt_callbacks.push(callback);
126
127
    }

128
129
130
    /// Execute all update callbacks and return their results
    pub fn execute_prometheus_update_callbacks(&self) -> Vec<anyhow::Result<()>> {
        self.prometheus_update_callbacks
131
132
133
134
135
            .iter()
            .map(|callback| callback())
            .collect()
    }

136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
    /// Execute all exposition text callbacks and return their concatenated text
    pub fn execute_prometheus_expfmt_callbacks(&self) -> String {
        let mut result = String::new();
        for callback in &self.prometheus_expfmt_callbacks {
            match callback() {
                Ok(text) => {
                    if !text.is_empty() {
                        if !result.is_empty() && !result.ends_with('\n') {
                            result.push('\n');
                        }
                        result.push_str(&text);
                    }
                }
                Err(e) => {
                    tracing::error!("Error executing exposition text callback: {}", e);
                }
            }
        }
        result
    }

157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
    /// Returns true if a metric with the given name already exists in the Prometheus registry
    pub fn has_metric_named(&self, metric_name: &str) -> bool {
        self.prometheus_registry
            .gather()
            .iter()
            .any(|mf| mf.name() == metric_name)
    }
}

impl Default for MetricsRegistryEntry {
    fn default() -> Self {
        Self::new()
    }
}

impl Clone for MetricsRegistryEntry {
    fn clone(&self) -> Self {
        Self {
            prometheus_registry: self.prometheus_registry.clone(),
176
177
            prometheus_update_callbacks: Vec::new(), // Callbacks cannot be cloned, so we start with an empty list
            prometheus_expfmt_callbacks: Vec::new(), // Callbacks cannot be cloned, so we start with an empty list
178
179
180
181
        }
    }
}

Ryan Olson's avatar
Ryan Olson committed
182
183
184
185
186
187
188
189
/// Distributed [Runtime] which provides access to shared resources across the cluster, this includes
/// communication protocols and transports.
#[derive(Clone)]
pub struct DistributedRuntime {
    // local runtime
    runtime: Runtime,

    // we might consider a unifed transport manager here
190
    etcd_client: Option<transports::etcd::Client>,
191
    nats_client: Option<transports::nats::Client>,
192
    store: KeyValueStoreManager,
Ryan Olson's avatar
Ryan Olson committed
193
    tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
194
    system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
Ryan Olson's avatar
Ryan Olson committed
195
196
197

    // local registry for components
    // the registry allows us to use share runtime resources across instances of the same component object.
198
    // take for example two instances of a client to the same remote component. The registry allows us to use
Ryan Olson's avatar
Ryan Olson committed
199
200
201
    // a single endpoint watcher for both clients, this keeps the number background tasking watching specific
    // paths in etcd to a minimum.
    component_registry: component::Registry,
202
203
204
205

    // Will only have static components that are not discoverable via etcd, they must be know at
    // startup. Will not start etcd.
    is_static: bool,
206

207
    instance_sources: Arc<tokio::sync::Mutex<HashMap<Endpoint, Weak<InstanceSource>>>>,
208

209
    // Health Status
210
    system_health: Arc<parking_lot::Mutex<SystemHealth>>,
211

212
213
214
    // This map associates metric prefixes with their corresponding Prometheus registries and callbacks.
    // Uses RwLock for better concurrency - multiple threads can read (execute callbacks) simultaneously.
    hierarchy_to_metricsregistry: Arc<std::sync::RwLock<HashMap<String, MetricsRegistryEntry>>>,
Ryan Olson's avatar
Ryan Olson committed
215
}