"vscode:/vscode.git/clone" did not exist on "489b7626a02993285b9d6a6c77c38997b03e3ff6"
lib.rs 5.61 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
Ryan Olson's avatar
Ryan Olson committed
15

Neelay Shah's avatar
Neelay Shah committed
16
//! Dynamo
Ryan Olson's avatar
Ryan Olson committed
17
18
19
20

#![allow(dead_code)]
#![allow(unused_imports)]

21
22
use std::{
    collections::HashMap,
23
    sync::{Arc, OnceLock, Weak},
24
25
};
use tokio::sync::Mutex;
Ryan Olson's avatar
Ryan Olson committed
26

Ryan Olson's avatar
Ryan Olson committed
27
28
29
pub use anyhow::{
    anyhow as error, bail as raise, Context as ErrorContext, Error, Ok as OK, Result,
};
Ryan Olson's avatar
Ryan Olson committed
30
31
32
33
34
35
36
37
38

use async_once_cell::OnceCell;

mod config;
pub use config::RuntimeConfig;

pub mod component;
pub mod discovery;
pub mod engine;
39
pub mod http_server;
40
pub use http_server::HttpServerInfo;
41
pub mod instances;
42
pub mod logging;
43
pub mod metrics;
Ryan Olson's avatar
Ryan Olson committed
44
pub mod pipeline;
45
pub mod prelude;
Ryan Olson's avatar
Ryan Olson committed
46
pub mod protocols;
Ryan Olson's avatar
Ryan Olson committed
47
pub mod runnable;
Ryan Olson's avatar
Ryan Olson committed
48
49
pub mod runtime;
pub mod service;
50
pub mod slug;
51
pub mod storage;
Ryan Olson's avatar
Ryan Olson committed
52
pub mod traits;
Ryan Olson's avatar
Ryan Olson committed
53
pub mod transports;
Ryan Olson's avatar
Ryan Olson committed
54
pub mod utils;
Ryan Olson's avatar
Ryan Olson committed
55
56
57
pub mod worker;

pub mod distributed;
Ryan Olson's avatar
Ryan Olson committed
58
pub use futures::stream;
Ryan Olson's avatar
Ryan Olson committed
59
60
61
pub use tokio_util::sync::CancellationToken;
pub use worker::Worker;

62
63
use component::{Endpoint, InstanceSource};

64
65
use config::HealthStatus;

Neelay Shah's avatar
Neelay Shah committed
66
/// Types of Tokio runtimes that can be used to construct a Dynamo [Runtime].
Ryan Olson's avatar
Ryan Olson committed
67
68
69
70
71
72
73
74
75
76
77
#[derive(Clone)]
enum RuntimeType {
    Shared(Arc<tokio::runtime::Runtime>),
    External(tokio::runtime::Handle),
}

/// Local [Runtime] which provides access to shared resources local to the physical node/machine.
#[derive(Debug, Clone)]
pub struct Runtime {
    id: Arc<String>,
    primary: RuntimeType,
78
    secondary: RuntimeType,
Ryan Olson's avatar
Ryan Olson committed
79
80
81
    cancellation_token: CancellationToken,
}

82
83
84
85
86
87
88
89
90
/// Current Health Status
/// If use_endpoint_health_status is set then
/// initialize the endpoint_health hashmap to the
/// starting health status
#[derive(Clone)]
pub struct SystemHealth {
    system_health: HealthStatus,
    endpoint_health: HashMap<String, HealthStatus>,
    use_endpoint_health_status: Vec<String>,
91
92
    health_path: String,
    live_path: String,
93
94
95
96
97
98
}

impl SystemHealth {
    pub fn new(
        starting_health_status: HealthStatus,
        use_endpoint_health_status: Vec<String>,
99
100
        health_path: String,
        live_path: String,
101
102
103
104
105
106
107
108
109
    ) -> Self {
        let mut endpoint_health = HashMap::new();
        for endpoint in &use_endpoint_health_status {
            endpoint_health.insert(endpoint.clone(), starting_health_status.clone());
        }
        SystemHealth {
            system_health: starting_health_status,
            endpoint_health,
            use_endpoint_health_status,
110
111
            health_path,
            live_path,
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
        }
    }
    pub fn set_health_status(&mut self, status: HealthStatus) {
        self.system_health = status;
    }

    pub fn set_endpoint_health_status(&mut self, endpoint: String, status: HealthStatus) {
        self.endpoint_health.insert(endpoint, status);
    }

    /// Returns the overall health status and endpoint health statuses
    pub fn get_health_status(&self) -> (bool, HashMap<String, String>) {
        let mut endpoints: HashMap<String, String> = HashMap::new();
        for (endpoint, ready) in &self.endpoint_health {
            endpoints.insert(
                endpoint.clone(),
                if *ready == HealthStatus::Ready {
                    "ready".to_string()
                } else {
                    "notready".to_string()
                },
            );
        }

        let healthy = if !self.use_endpoint_health_status.is_empty() {
            self.use_endpoint_health_status.iter().all(|endpoint| {
                self.endpoint_health
                    .get(endpoint)
                    .is_some_and(|status| *status == HealthStatus::Ready)
            })
        } else {
            self.system_health == HealthStatus::Ready
        };

        (healthy, endpoints)
    }
}

Ryan Olson's avatar
Ryan Olson committed
150
151
152
153
154
155
156
157
/// Distributed [Runtime] which provides access to shared resources across the cluster, this includes
/// communication protocols and transports.
#[derive(Clone)]
pub struct DistributedRuntime {
    // local runtime
    runtime: Runtime,

    // we might consider a unifed transport manager here
158
    etcd_client: Option<transports::etcd::Client>,
Ryan Olson's avatar
Ryan Olson committed
159
160
    nats_client: transports::nats::Client,
    tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
161
    http_server: Arc<OnceLock<Arc<http_server::HttpServerInfo>>>,
Ryan Olson's avatar
Ryan Olson committed
162
163
164

    // local registry for components
    // the registry allows us to use share runtime resources across instances of the same component object.
165
    // take for example two instances of a client to the same remote component. The registry allows us to use
Ryan Olson's avatar
Ryan Olson committed
166
167
168
    // a single endpoint watcher for both clients, this keeps the number background tasking watching specific
    // paths in etcd to a minimum.
    component_registry: component::Registry,
169
170
171
172

    // Will only have static components that are not discoverable via etcd, they must be know at
    // startup. Will not start etcd.
    is_static: bool,
173
174

    instance_sources: Arc<Mutex<HashMap<Endpoint, Weak<InstanceSource>>>>,
175

176
    // Health Status
177
    system_health: Arc<std::sync::Mutex<SystemHealth>>,
178

179
180
    // This map associates metric prefixes with their corresponding Prometheus registries.
    prometheus_registries_by_prefix: Arc<std::sync::Mutex<HashMap<String, prometheus::Registry>>>,
Ryan Olson's avatar
Ryan Olson committed
181
}