lib.rs 5.72 KB
Newer Older
1
2
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
Ryan Olson's avatar
Ryan Olson committed
3

Neelay Shah's avatar
Neelay Shah committed
4
//! Dynamo
Ryan Olson's avatar
Ryan Olson committed
5
6
7
8

#![allow(dead_code)]
#![allow(unused_imports)]

9
10
use std::{
    collections::HashMap,
11
    sync::{Arc, OnceLock, Weak},
12
};
Ryan Olson's avatar
Ryan Olson committed
13

Ryan Olson's avatar
Ryan Olson committed
14
pub use anyhow::{
15
    Context as ErrorContext, Error, Ok as OK, Result, anyhow as error, bail as raise,
Ryan Olson's avatar
Ryan Olson committed
16
};
Ryan Olson's avatar
Ryan Olson committed
17
18
19
20
21
22
23

use async_once_cell::OnceCell;

mod config;
pub use config::RuntimeConfig;

pub mod component;
24
pub mod compute;
Ryan Olson's avatar
Ryan Olson committed
25
26
pub mod discovery;
pub mod engine;
27
pub mod health_check;
28
29
pub mod system_status_server;
pub use system_status_server::SystemStatusServerInfo;
30
pub mod instances;
31
pub mod logging;
32
pub mod metrics;
Ryan Olson's avatar
Ryan Olson committed
33
pub mod pipeline;
34
pub mod prelude;
Ryan Olson's avatar
Ryan Olson committed
35
pub mod protocols;
Ryan Olson's avatar
Ryan Olson committed
36
pub mod runnable;
Ryan Olson's avatar
Ryan Olson committed
37
38
pub mod runtime;
pub mod service;
39
pub mod slug;
40
pub mod storage;
41
pub mod system_health;
Ryan Olson's avatar
Ryan Olson committed
42
pub mod traits;
Ryan Olson's avatar
Ryan Olson committed
43
pub mod transports;
Ryan Olson's avatar
Ryan Olson committed
44
pub mod utils;
Ryan Olson's avatar
Ryan Olson committed
45
46
47
pub mod worker;

pub mod distributed;
48
pub use distributed::distributed_test_utils;
Ryan Olson's avatar
Ryan Olson committed
49
pub use futures::stream;
50
pub use system_health::{HealthCheckTarget, SystemHealth};
Ryan Olson's avatar
Ryan Olson committed
51
52
53
pub use tokio_util::sync::CancellationToken;
pub use worker::Worker;

54
55
use crate::metrics::prometheus_names::distributed_runtime;

56
use component::{Endpoint, InstanceSource};
57
use utils::GracefulShutdownTracker;
58

59
60
use config::HealthStatus;

Neelay Shah's avatar
Neelay Shah committed
61
/// Types of Tokio runtimes that can be used to construct a Dynamo [Runtime].
Ryan Olson's avatar
Ryan Olson committed
62
63
64
65
66
67
68
69
70
71
72
#[derive(Clone)]
enum RuntimeType {
    Shared(Arc<tokio::runtime::Runtime>),
    External(tokio::runtime::Handle),
}

/// Local [Runtime] which provides access to shared resources local to the physical node/machine.
#[derive(Debug, Clone)]
pub struct Runtime {
    id: Arc<String>,
    primary: RuntimeType,
73
    secondary: RuntimeType,
Ryan Olson's avatar
Ryan Olson committed
74
    cancellation_token: CancellationToken,
75
76
    endpoint_shutdown_token: CancellationToken,
    graceful_shutdown_tracker: Arc<GracefulShutdownTracker>,
77
78
    compute_pool: Option<Arc<compute::ComputePool>>,
    block_in_place_permits: Option<Arc<tokio::sync::Semaphore>>,
Ryan Olson's avatar
Ryan Olson committed
79
80
}

81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
/// Type alias for runtime callback functions to reduce complexity
///
/// This type represents an Arc-wrapped callback function that can be:
/// - Shared efficiently across multiple threads and contexts
/// - Cloned without duplicating the underlying closure
/// - Used in generic contexts requiring 'static lifetime
///
/// The Arc wrapper is included in the type to make sharing explicit.
type RuntimeCallback = Arc<dyn Fn() -> anyhow::Result<()> + Send + Sync + 'static>;

/// Structure to hold Prometheus registries and associated callbacks for a given hierarchy
pub struct MetricsRegistryEntry {
    /// The Prometheus registry for this prefix
    pub prometheus_registry: prometheus::Registry,
    /// List of function callbacks that receive a reference to any MetricsRegistry
    pub runtime_callbacks: Vec<RuntimeCallback>,
}

impl MetricsRegistryEntry {
    /// Create a new metrics registry entry with an empty registry and no callbacks
    pub fn new() -> Self {
        Self {
            prometheus_registry: prometheus::Registry::new(),
            runtime_callbacks: Vec::new(),
        }
    }

    /// Add a callback function that receives a reference to any MetricsRegistry
    pub fn add_callback(&mut self, callback: RuntimeCallback) {
        self.runtime_callbacks.push(callback);
    }

    /// Execute all runtime callbacks and return their results
    pub fn execute_callbacks(&self) -> Vec<anyhow::Result<()>> {
        self.runtime_callbacks
            .iter()
            .map(|callback| callback())
            .collect()
    }

    /// Returns true if a metric with the given name already exists in the Prometheus registry
    pub fn has_metric_named(&self, metric_name: &str) -> bool {
        self.prometheus_registry
            .gather()
            .iter()
            .any(|mf| mf.name() == metric_name)
    }
}

impl Default for MetricsRegistryEntry {
    fn default() -> Self {
        Self::new()
    }
}

impl Clone for MetricsRegistryEntry {
    fn clone(&self) -> Self {
        Self {
            prometheus_registry: self.prometheus_registry.clone(),
            runtime_callbacks: Vec::new(), // Callbacks cannot be cloned, so we start with an empty list
        }
    }
}

Ryan Olson's avatar
Ryan Olson committed
145
146
147
148
149
150
151
152
/// Distributed [Runtime] which provides access to shared resources across the cluster, this includes
/// communication protocols and transports.
#[derive(Clone)]
pub struct DistributedRuntime {
    // local runtime
    runtime: Runtime,

    // we might consider a unifed transport manager here
153
    etcd_client: Option<transports::etcd::Client>,
Ryan Olson's avatar
Ryan Olson committed
154
155
    nats_client: transports::nats::Client,
    tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
156
    system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
Ryan Olson's avatar
Ryan Olson committed
157
158
159

    // local registry for components
    // the registry allows us to use share runtime resources across instances of the same component object.
160
    // take for example two instances of a client to the same remote component. The registry allows us to use
Ryan Olson's avatar
Ryan Olson committed
161
162
163
    // a single endpoint watcher for both clients, this keeps the number background tasking watching specific
    // paths in etcd to a minimum.
    component_registry: component::Registry,
164
165
166
167

    // Will only have static components that are not discoverable via etcd, they must be know at
    // startup. Will not start etcd.
    is_static: bool,
168

169
    instance_sources: Arc<tokio::sync::Mutex<HashMap<Endpoint, Weak<InstanceSource>>>>,
170

171
    // Health Status
172
    system_health: Arc<std::sync::Mutex<SystemHealth>>,
173

174
175
176
    // This map associates metric prefixes with their corresponding Prometheus registries and callbacks.
    // Uses RwLock for better concurrency - multiple threads can read (execute callbacks) simultaneously.
    hierarchy_to_metricsregistry: Arc<std::sync::RwLock<HashMap<String, MetricsRegistryEntry>>>,
Ryan Olson's avatar
Ryan Olson committed
177
}