lib.rs 5.67 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
Ryan Olson's avatar
Ryan Olson committed
15

Neelay Shah's avatar
Neelay Shah committed
16
//! Dynamo
Ryan Olson's avatar
Ryan Olson committed
17
18
19
20

#![allow(dead_code)]
#![allow(unused_imports)]

21
22
use std::{
    collections::HashMap,
23
    sync::{Arc, OnceLock, Weak},
24
25
};
use tokio::sync::Mutex;
Ryan Olson's avatar
Ryan Olson committed
26

Ryan Olson's avatar
Ryan Olson committed
27
28
29
pub use anyhow::{
    anyhow as error, bail as raise, Context as ErrorContext, Error, Ok as OK, Result,
};
Ryan Olson's avatar
Ryan Olson committed
30
31
32
33
34
35
36
37
38

use async_once_cell::OnceCell;

mod config;
pub use config::RuntimeConfig;

pub mod component;
pub mod discovery;
pub mod engine;
39
40
pub mod system_status_server;
pub use system_status_server::SystemStatusServerInfo;
41
pub mod instances;
42
pub mod logging;
43
pub mod metrics;
Ryan Olson's avatar
Ryan Olson committed
44
pub mod pipeline;
45
pub mod prelude;
Ryan Olson's avatar
Ryan Olson committed
46
pub mod protocols;
Ryan Olson's avatar
Ryan Olson committed
47
pub mod runnable;
Ryan Olson's avatar
Ryan Olson committed
48
49
pub mod runtime;
pub mod service;
50
pub mod slug;
51
pub mod storage;
Ryan Olson's avatar
Ryan Olson committed
52
pub mod traits;
Ryan Olson's avatar
Ryan Olson committed
53
pub mod transports;
Ryan Olson's avatar
Ryan Olson committed
54
pub mod utils;
Ryan Olson's avatar
Ryan Olson committed
55
56
57
pub mod worker;

pub mod distributed;
Ryan Olson's avatar
Ryan Olson committed
58
pub use futures::stream;
Ryan Olson's avatar
Ryan Olson committed
59
60
61
pub use tokio_util::sync::CancellationToken;
pub use worker::Worker;

62
63
use component::{Endpoint, InstanceSource};

64
65
use config::HealthStatus;

Neelay Shah's avatar
Neelay Shah committed
66
/// Types of Tokio runtimes that can be used to construct a Dynamo [Runtime].
Ryan Olson's avatar
Ryan Olson committed
67
68
69
70
71
72
73
74
75
76
77
#[derive(Clone)]
enum RuntimeType {
    Shared(Arc<tokio::runtime::Runtime>),
    External(tokio::runtime::Handle),
}

/// Local [Runtime] which provides access to shared resources local to the physical node/machine.
#[derive(Debug, Clone)]
pub struct Runtime {
    id: Arc<String>,
    primary: RuntimeType,
78
    secondary: RuntimeType,
Ryan Olson's avatar
Ryan Olson committed
79
80
81
    cancellation_token: CancellationToken,
}

82
83
84
85
86
87
88
89
90
/// Current Health Status
/// If use_endpoint_health_status is set then
/// initialize the endpoint_health hashmap to the
/// starting health status
#[derive(Clone)]
pub struct SystemHealth {
    system_health: HealthStatus,
    endpoint_health: HashMap<String, HealthStatus>,
    use_endpoint_health_status: Vec<String>,
91
92
    health_path: String,
    live_path: String,
93
94
95
96
97
98
}

impl SystemHealth {
    pub fn new(
        starting_health_status: HealthStatus,
        use_endpoint_health_status: Vec<String>,
99
100
        health_path: String,
        live_path: String,
101
102
103
104
105
106
107
108
109
    ) -> Self {
        let mut endpoint_health = HashMap::new();
        for endpoint in &use_endpoint_health_status {
            endpoint_health.insert(endpoint.clone(), starting_health_status.clone());
        }
        SystemHealth {
            system_health: starting_health_status,
            endpoint_health,
            use_endpoint_health_status,
110
111
            health_path,
            live_path,
112
113
114
115
116
117
        }
    }
    pub fn set_health_status(&mut self, status: HealthStatus) {
        self.system_health = status;
    }

118
119
    pub fn set_endpoint_health_status(&mut self, endpoint: &str, status: HealthStatus) {
        self.endpoint_health.insert(endpoint.to_string(), status);
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
    }

    /// Returns the overall health status and endpoint health statuses
    pub fn get_health_status(&self) -> (bool, HashMap<String, String>) {
        let mut endpoints: HashMap<String, String> = HashMap::new();
        for (endpoint, ready) in &self.endpoint_health {
            endpoints.insert(
                endpoint.clone(),
                if *ready == HealthStatus::Ready {
                    "ready".to_string()
                } else {
                    "notready".to_string()
                },
            );
        }

        let healthy = if !self.use_endpoint_health_status.is_empty() {
            self.use_endpoint_health_status.iter().all(|endpoint| {
                self.endpoint_health
                    .get(endpoint)
                    .is_some_and(|status| *status == HealthStatus::Ready)
            })
        } else {
            self.system_health == HealthStatus::Ready
        };

        (healthy, endpoints)
    }
}

Ryan Olson's avatar
Ryan Olson committed
150
151
152
153
154
155
156
157
/// Distributed [Runtime] which provides access to shared resources across the cluster, this includes
/// communication protocols and transports.
#[derive(Clone)]
pub struct DistributedRuntime {
    // local runtime
    runtime: Runtime,

    // we might consider a unifed transport manager here
158
    etcd_client: Option<transports::etcd::Client>,
Ryan Olson's avatar
Ryan Olson committed
159
160
    nats_client: transports::nats::Client,
    tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
161
    system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
Ryan Olson's avatar
Ryan Olson committed
162
163
164

    // local registry for components
    // the registry allows us to use share runtime resources across instances of the same component object.
165
    // take for example two instances of a client to the same remote component. The registry allows us to use
Ryan Olson's avatar
Ryan Olson committed
166
167
168
    // a single endpoint watcher for both clients, this keeps the number background tasking watching specific
    // paths in etcd to a minimum.
    component_registry: component::Registry,
169
170
171
172

    // Will only have static components that are not discoverable via etcd, they must be know at
    // startup. Will not start etcd.
    is_static: bool,
173
174

    instance_sources: Arc<Mutex<HashMap<Endpoint, Weak<InstanceSource>>>>,
175

176
    // Health Status
177
    system_health: Arc<std::sync::Mutex<SystemHealth>>,
178

179
180
    // This map associates metric prefixes with their corresponding Prometheus registries.
    prometheus_registries_by_prefix: Arc<std::sync::Mutex<HashMap<String, prometheus::Registry>>>,
Ryan Olson's avatar
Ryan Olson committed
181
}