distributed.rs 9.38 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
Ryan Olson's avatar
Ryan Olson committed
15
16
17

pub use crate::component::Component;
use crate::{
18
    component::{self, ComponentBuilder, Endpoint, InstanceSource, Namespace},
Ryan Olson's avatar
Ryan Olson committed
19
    discovery::DiscoveryClient,
20
    metrics::MetricsRegistry,
Ryan Olson's avatar
Ryan Olson committed
21
22
23
24
25
    service::ServiceClient,
    transports::{etcd, nats, tcp},
    ErrorContext,
};

26
use super::{error, Arc, DistributedRuntime, OnceCell, Result, Runtime, SystemHealth, Weak, OK};
27
use std::sync::OnceLock;
Ryan Olson's avatar
Ryan Olson committed
28
29
30

use derive_getters::Dissolve;
use figment::error;
31
32
use std::collections::HashMap;
use tokio::sync::Mutex;
33
use tokio_util::sync::CancellationToken;
Ryan Olson's avatar
Ryan Olson committed
34

35
36
37
38
39
40
41
42
43
44
impl MetricsRegistry for DistributedRuntime {
    fn basename(&self) -> String {
        "".to_string() // drt has no basename. Basename only begins with the Namespace.
    }

    fn parent_hierarchy(&self) -> Vec<String> {
        vec![] // drt is the root, so no parent hierarchy
    }
}

Ryan Olson's avatar
Ryan Olson committed
45
46
47
impl DistributedRuntime {
    pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> {
        let secondary = runtime.secondary();
48
        let (etcd_config, nats_config, is_static) = config.dissolve();
Ryan Olson's avatar
Ryan Olson committed
49
50
51

        let runtime_clone = runtime.clone();

52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
        let etcd_client = if is_static {
            None
        } else {
            Some(
                secondary
                    .spawn(async move {
                        let client = etcd::Client::new(etcd_config.clone(), runtime_clone)
                            .await
                            .context(format!(
                                "Failed to connect to etcd server with config {:?}",
                                etcd_config
                            ))?;
                        OK(client)
                    })
                    .await??,
            )
        };
Ryan Olson's avatar
Ryan Olson committed
69
70
71
72
73
74
75
76
77
78
79

        let nats_client = secondary
            .spawn(async move {
                let client = nats_config.clone().connect().await.context(format!(
                    "Failed to connect to NATS server with config {:?}",
                    nats_config
                ))?;
                anyhow::Ok(client)
            })
            .await??;

80
        // Start system metrics server for health and metrics if enabled in configuration
81
82
83
84
85
86
87
88
        let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();
        // IMPORTANT: We must extract cancel_token from runtime BEFORE moving runtime into the struct below.
        // This is because after moving, runtime is no longer accessible in this scope (ownership rules).
        let cancel_token = if config.system_server_enabled() {
            Some(runtime.clone().child_token())
        } else {
            None
        };
89
90
        let starting_health_status = config.starting_health_status.clone();
        let use_endpoint_health_status = config.use_endpoint_health_status.clone();
91
92
93
        let health_endpoint_path = config.system_health_path.clone();
        let live_endpoint_path = config.system_live_path.clone();
        let system_health = Arc::new(std::sync::Mutex::new(SystemHealth::new(
94
95
            starting_health_status,
            use_endpoint_health_status,
96
97
            health_endpoint_path,
            live_endpoint_path,
98
        )));
99

100
        let distributed_runtime = Self {
Ryan Olson's avatar
Ryan Olson committed
101
102
103
104
            runtime,
            etcd_client,
            nats_client,
            tcp_server: Arc::new(OnceCell::new()),
105
            metrics_server: Arc::new(OnceLock::new()),
Ryan Olson's avatar
Ryan Olson committed
106
            component_registry: component::Registry::new(),
107
            is_static,
108
            instance_sources: Arc::new(Mutex::new(HashMap::new())),
109
110
111
112
            prometheus_registries_by_prefix: Arc::new(std::sync::Mutex::new(HashMap::<
                String,
                prometheus::Registry,
            >::new())),
113
            system_health,
114
115
        };

116
        // Start metrics server if enabled
117
118
119
120
        if let Some(cancel_token) = cancel_token {
            let host = config.system_host.clone();
            let port = config.system_port;

121
122
            // Start metrics server (it spawns its own task internally)
            match crate::metrics_server::spawn_metrics_server(
123
124
125
126
                &host,
                port,
                cancel_token,
                Arc::new(distributed_runtime.clone()),
127
128
129
            )
            .await
            {
130
                Ok((addr, handle)) => {
131
                    tracing::info!("Metrics server started successfully on {}", addr);
132

133
134
135
                    // Store metrics server information
                    let metrics_server_info =
                        crate::metrics_server::MetricsServerInfo::new(addr, Some(handle));
136

137
                    // Initialize the metrics_server field
138
                    distributed_runtime
139
140
141
                        .metrics_server
                        .set(Arc::new(metrics_server_info))
                        .expect("Metrics server info should only be set once");
142
143
                }
                Err(e) => {
144
                    tracing::error!("Metrics server startup failed: {}", e);
145
                }
146
            }
147
        } else {
148
            tracing::debug!("Health and metrics server is disabled via DYN_SYSTEM_ENABLED");
149
150
151
        }

        Ok(distributed_runtime)
Ryan Olson's avatar
Ryan Olson committed
152
153
154
    }

    pub async fn from_settings(runtime: Runtime) -> Result<Self> {
155
156
157
158
159
160
161
        let config = DistributedConfig::from_settings(false);
        Self::new(runtime, config).await
    }

    // Call this if you are using static workers that do not need etcd-based discovery.
    pub async fn from_settings_without_discovery(runtime: Runtime) -> Result<Self> {
        let config = DistributedConfig::from_settings(true);
Ryan Olson's avatar
Ryan Olson committed
162
163
164
165
166
167
168
        Self::new(runtime, config).await
    }

    pub fn runtime(&self) -> &Runtime {
        &self.runtime
    }

169
170
171
172
    pub fn primary_token(&self) -> CancellationToken {
        self.runtime.primary_token()
    }

173
174
175
176
    /// The etcd lease all our components will be attached to.
    /// Not available for static workers.
    pub fn primary_lease(&self) -> Option<etcd::Lease> {
        self.etcd_client.as_ref().map(|c| c.primary_lease())
Ryan Olson's avatar
Ryan Olson committed
177
178
179
180
181
182
183
184
    }

    pub fn shutdown(&self) {
        self.runtime.shutdown();
    }

    /// Create a [`Namespace`]
    pub fn namespace(&self, name: impl Into<String>) -> Result<Namespace> {
185
        Namespace::new(self.clone(), name.into(), self.is_static)
Ryan Olson's avatar
Ryan Olson committed
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
    }

    // /// Create a [`Component`]
    // pub fn component(
    //     &self,
    //     name: impl Into<String>,
    //     namespace: impl Into<String>,
    // ) -> Result<Component> {
    //     Ok(ComponentBuilder::from_runtime(self.clone())
    //         .name(name.into())
    //         .namespace(namespace.into())
    //         .build()?)
    // }

    pub(crate) fn discovery_client(&self, namespace: impl Into<String>) -> DiscoveryClient {
201
202
203
204
205
206
        DiscoveryClient::new(
            namespace.into(),
            self.etcd_client
                .clone()
                .expect("Attempt to get discovery_client on static DistributedRuntime"),
        )
Ryan Olson's avatar
Ryan Olson committed
207
208
209
210
211
212
    }

    pub(crate) fn service_client(&self) -> ServiceClient {
        ServiceClient::new(self.nats_client.clone())
    }

213
    pub async fn tcp_server(&self) -> Result<Arc<tcp::server::TcpStreamServer>> {
Ryan Olson's avatar
Ryan Olson committed
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
        Ok(self
            .tcp_server
            .get_or_try_init(async move {
                let options = tcp::server::ServerOptions::default();
                let server = tcp::server::TcpStreamServer::new(options).await?;
                OK(server)
            })
            .await?
            .clone())
    }

    pub fn nats_client(&self) -> nats::Client {
        self.nats_client.clone()
    }

229
230
231
    /// Get metrics server information if available
    pub fn metrics_server_info(&self) -> Option<Arc<crate::metrics_server::MetricsServerInfo>> {
        self.metrics_server.get().cloned()
232
233
    }

234
    // todo(ryan): deprecate this as we move to Discovery traits and Component Identifiers
235
    pub fn etcd_client(&self) -> Option<etcd::Client> {
Ryan Olson's avatar
Ryan Olson committed
236
237
        self.etcd_client.clone()
    }
238
239
240
241

    pub fn child_token(&self) -> CancellationToken {
        self.runtime.child_token()
    }
242
243
244
245

    pub fn instance_sources(&self) -> Arc<Mutex<HashMap<Endpoint, Weak<InstanceSource>>>> {
        self.instance_sources.clone()
    }
Ryan Olson's avatar
Ryan Olson committed
246
247
248
249
250
251
}

#[derive(Dissolve)]
pub struct DistributedConfig {
    pub etcd_config: etcd::ClientOptions,
    pub nats_config: nats::ClientOptions,
252
    pub is_static: bool,
Ryan Olson's avatar
Ryan Olson committed
253
254
255
}

impl DistributedConfig {
256
    pub fn from_settings(is_static: bool) -> DistributedConfig {
Ryan Olson's avatar
Ryan Olson committed
257
258
259
        DistributedConfig {
            etcd_config: etcd::ClientOptions::default(),
            nats_config: nats::ClientOptions::default(),
260
            is_static,
Ryan Olson's avatar
Ryan Olson committed
261
262
        }
    }
Ryan Olson's avatar
Ryan Olson committed
263
264
265
266
267

    pub fn for_cli() -> DistributedConfig {
        let mut config = DistributedConfig {
            etcd_config: etcd::ClientOptions::default(),
            nats_config: nats::ClientOptions::default(),
268
            is_static: false,
Ryan Olson's avatar
Ryan Olson committed
269
270
271
272
273
274
        };

        config.etcd_config.attach_lease = false;

        config
    }
Ryan Olson's avatar
Ryan Olson committed
275
}