distributed.rs 12.1 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
Ryan Olson's avatar
Ryan Olson committed
15
16

pub use crate::component::Component;
17
use crate::transports::nats::DRTNatsPrometheusMetrics;
Ryan Olson's avatar
Ryan Olson committed
18
use crate::{
19
    component::{self, ComponentBuilder, Endpoint, InstanceSource, Namespace},
Ryan Olson's avatar
Ryan Olson committed
20
    discovery::DiscoveryClient,
21
    metrics::MetricsRegistry,
Ryan Olson's avatar
Ryan Olson committed
22
23
    service::ServiceClient,
    transports::{etcd, nats, tcp},
24
    ErrorContext, RuntimeCallback,
Ryan Olson's avatar
Ryan Olson committed
25
26
};

27
use super::{error, Arc, DistributedRuntime, OnceCell, Result, Runtime, SystemHealth, Weak, OK};
28
use std::sync::OnceLock;
Ryan Olson's avatar
Ryan Olson committed
29
30
31

use derive_getters::Dissolve;
use figment::error;
32
33
use std::collections::HashMap;
use tokio::sync::Mutex;
34
use tokio_util::sync::CancellationToken;
Ryan Olson's avatar
Ryan Olson committed
35

36
37
38
39
40
41
42
43
44
45
impl MetricsRegistry for DistributedRuntime {
    fn basename(&self) -> String {
        "".to_string() // drt has no basename. Basename only begins with the Namespace.
    }

    fn parent_hierarchy(&self) -> Vec<String> {
        vec![] // drt is the root, so no parent hierarchy
    }
}

Ryan Olson's avatar
Ryan Olson committed
46
47
impl DistributedRuntime {
    pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> {
48
        let (etcd_config, nats_config, is_static) = config.dissolve();
Ryan Olson's avatar
Ryan Olson committed
49
50
51

        let runtime_clone = runtime.clone();

52
53
54
        let etcd_client = if is_static {
            None
        } else {
55
            Some(etcd::Client::new(etcd_config.clone(), runtime_clone).await?)
56
        };
Ryan Olson's avatar
Ryan Olson committed
57

58
        let nats_client = nats_config.clone().connect().await?;
Ryan Olson's avatar
Ryan Olson committed
59

60
        // Start system status server for health and metrics if enabled in configuration
61
62
63
64
65
66
67
68
        let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();
        // IMPORTANT: We must extract cancel_token from runtime BEFORE moving runtime into the struct below.
        // This is because after moving, runtime is no longer accessible in this scope (ownership rules).
        let cancel_token = if config.system_server_enabled() {
            Some(runtime.clone().child_token())
        } else {
            None
        };
69
70
        let starting_health_status = config.starting_health_status.clone();
        let use_endpoint_health_status = config.use_endpoint_health_status.clone();
71
72
73
        let health_endpoint_path = config.system_health_path.clone();
        let live_endpoint_path = config.system_live_path.clone();
        let system_health = Arc::new(std::sync::Mutex::new(SystemHealth::new(
74
75
            starting_health_status,
            use_endpoint_health_status,
76
77
            health_endpoint_path,
            live_endpoint_path,
78
        )));
79

80
81
        let nats_client_for_metrics = nats_client.clone();

82
        let distributed_runtime = Self {
Ryan Olson's avatar
Ryan Olson committed
83
84
85
86
            runtime,
            etcd_client,
            nats_client,
            tcp_server: Arc::new(OnceCell::new()),
87
            system_status_server: Arc::new(OnceLock::new()),
Ryan Olson's avatar
Ryan Olson committed
88
            component_registry: component::Registry::new(),
89
            is_static,
90
            instance_sources: Arc::new(Mutex::new(HashMap::new())),
91
            hierarchy_to_metricsregistry: Arc::new(std::sync::RwLock::new(HashMap::<
92
                String,
93
                crate::MetricsRegistryEntry,
94
            >::new())),
95
            system_health,
96
97
        };

98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
        let sys_nats_metrics = DRTNatsPrometheusMetrics::new(
            &distributed_runtime,
            nats_client_for_metrics.client().clone(),
        )?;
        let mut drt_hierarchies = distributed_runtime.parent_hierarchy();
        drt_hierarchies.push(distributed_runtime.hierarchy());
        // Register a callback to update NATS client metrics
        let nats_metrics_callback = Arc::new({
            let sys_nats_metrics_clone = sys_nats_metrics.clone();
            move || {
                sys_nats_metrics_clone.set_from_client_stats();
                Ok(())
            }
        });
        distributed_runtime.register_metrics_callback(drt_hierarchies, nats_metrics_callback);

114
        // Start system status server if enabled
115
116
117
118
        if let Some(cancel_token) = cancel_token {
            let host = config.system_host.clone();
            let port = config.system_port;

119
120
            // Start system status server (it spawns its own task internally)
            match crate::system_status_server::spawn_system_status_server(
121
122
123
124
                &host,
                port,
                cancel_token,
                Arc::new(distributed_runtime.clone()),
125
126
127
            )
            .await
            {
128
                Ok((addr, handle)) => {
129
                    tracing::info!("System status server started successfully on {}", addr);
130

131
132
133
134
135
136
                    // Store system status server information
                    let system_status_server_info =
                        crate::system_status_server::SystemStatusServerInfo::new(
                            addr,
                            Some(handle),
                        );
137

138
                    // Initialize the system_status_server field
139
                    distributed_runtime
140
141
142
                        .system_status_server
                        .set(Arc::new(system_status_server_info))
                        .expect("System status server info should only be set once");
143
144
                }
                Err(e) => {
145
                    tracing::error!("System status server startup failed: {}", e);
146
                }
147
            }
148
        } else {
149
            tracing::debug!("Health and system status server is disabled via DYN_SYSTEM_ENABLED");
150
151
152
        }

        Ok(distributed_runtime)
Ryan Olson's avatar
Ryan Olson committed
153
154
155
    }

    pub async fn from_settings(runtime: Runtime) -> Result<Self> {
156
157
158
159
160
161
162
        let config = DistributedConfig::from_settings(false);
        Self::new(runtime, config).await
    }

    // Call this if you are using static workers that do not need etcd-based discovery.
    pub async fn from_settings_without_discovery(runtime: Runtime) -> Result<Self> {
        let config = DistributedConfig::from_settings(true);
Ryan Olson's avatar
Ryan Olson committed
163
164
165
166
167
168
169
        Self::new(runtime, config).await
    }

    pub fn runtime(&self) -> &Runtime {
        &self.runtime
    }

170
171
172
173
    pub fn primary_token(&self) -> CancellationToken {
        self.runtime.primary_token()
    }

174
175
176
177
    /// The etcd lease all our components will be attached to.
    /// Not available for static workers.
    pub fn primary_lease(&self) -> Option<etcd::Lease> {
        self.etcd_client.as_ref().map(|c| c.primary_lease())
Ryan Olson's avatar
Ryan Olson committed
178
179
180
181
182
183
184
185
    }

    pub fn shutdown(&self) {
        self.runtime.shutdown();
    }

    /// Create a [`Namespace`]
    pub fn namespace(&self, name: impl Into<String>) -> Result<Namespace> {
186
        Namespace::new(self.clone(), name.into(), self.is_static)
Ryan Olson's avatar
Ryan Olson committed
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
    }

    // /// Create a [`Component`]
    // pub fn component(
    //     &self,
    //     name: impl Into<String>,
    //     namespace: impl Into<String>,
    // ) -> Result<Component> {
    //     Ok(ComponentBuilder::from_runtime(self.clone())
    //         .name(name.into())
    //         .namespace(namespace.into())
    //         .build()?)
    // }

    pub(crate) fn discovery_client(&self, namespace: impl Into<String>) -> DiscoveryClient {
202
203
204
205
206
207
        DiscoveryClient::new(
            namespace.into(),
            self.etcd_client
                .clone()
                .expect("Attempt to get discovery_client on static DistributedRuntime"),
        )
Ryan Olson's avatar
Ryan Olson committed
208
209
210
211
212
213
    }

    pub(crate) fn service_client(&self) -> ServiceClient {
        ServiceClient::new(self.nats_client.clone())
    }

214
    pub async fn tcp_server(&self) -> Result<Arc<tcp::server::TcpStreamServer>> {
Ryan Olson's avatar
Ryan Olson committed
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
        Ok(self
            .tcp_server
            .get_or_try_init(async move {
                let options = tcp::server::ServerOptions::default();
                let server = tcp::server::TcpStreamServer::new(options).await?;
                OK(server)
            })
            .await?
            .clone())
    }

    pub fn nats_client(&self) -> nats::Client {
        self.nats_client.clone()
    }

230
231
232
233
234
    /// Get system status server information if available
    pub fn system_status_server_info(
        &self,
    ) -> Option<Arc<crate::system_status_server::SystemStatusServerInfo>> {
        self.system_status_server.get().cloned()
235
236
    }

237
    // todo(ryan): deprecate this as we move to Discovery traits and Component Identifiers
238
    pub fn etcd_client(&self) -> Option<etcd::Client> {
Ryan Olson's avatar
Ryan Olson committed
239
240
        self.etcd_client.clone()
    }
241
242
243
244

    pub fn child_token(&self) -> CancellationToken {
        self.runtime.child_token()
    }
245
246
247
248

    pub fn instance_sources(&self) -> Arc<Mutex<HashMap<Endpoint, Weak<InstanceSource>>>> {
        self.instance_sources.clone()
    }
249

250
251
    /// Add a Prometheus metric to a specific hierarchy's registry. Note that it is possible
    /// to register the same metric name multiple times, as long as the labels are different.
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
    pub fn add_prometheus_metric(
        &self,
        hierarchy: &str,
        metric_name: &str,
        prometheus_metric: Box<dyn prometheus::core::Collector>,
    ) -> anyhow::Result<()> {
        let mut registries = self.hierarchy_to_metricsregistry.write().unwrap();
        let entry = registries.entry(hierarchy.to_string()).or_default();

        // Try to register the metric and provide better error information
        match entry.prometheus_registry.register(prometheus_metric) {
            Ok(_) => Ok(()),
            Err(e) => {
                let error_msg = e.to_string();
                tracing::error!(
                    hierarchy = ?hierarchy,
                    error = ?error_msg,
                    metric_name = ?metric_name,
                    "Metric registration failed"
                );
                Err(e.into())
            }
        }
    }

    /// Add a callback function to metrics registries for the given hierarchies
    pub fn register_metrics_callback(&self, hierarchies: Vec<String>, callback: RuntimeCallback) {
        let mut registries = self.hierarchy_to_metricsregistry.write().unwrap();
        for hierarchy in hierarchies {
            registries
                .entry(hierarchy)
                .or_default()
                .add_callback(callback.clone());
        }
    }

    /// Execute all callbacks for a given hierarchy key and return their results
    pub fn execute_metrics_callbacks(&self, hierarchy: &str) -> Vec<anyhow::Result<()>> {
        // Clone callbacks while holding read lock (fast operation)
        let callbacks = {
            let registries = self.hierarchy_to_metricsregistry.read().unwrap();
            registries
                .get(hierarchy)
                .map(|entry| entry.runtime_callbacks.clone())
        }; // Read lock released here

        // Execute callbacks without holding the lock
        match callbacks {
            Some(callbacks) => callbacks.iter().map(|callback| callback()).collect(),
            None => Vec::new(),
        }
    }

    /// Get all registered hierarchy keys. Private because it is only used for testing.
    fn get_registered_hierarchies(&self) -> Vec<String> {
        let registries = self.hierarchy_to_metricsregistry.read().unwrap();
        registries.keys().cloned().collect()
    }
Ryan Olson's avatar
Ryan Olson committed
310
311
312
313
314
315
}

#[derive(Dissolve)]
pub struct DistributedConfig {
    pub etcd_config: etcd::ClientOptions,
    pub nats_config: nats::ClientOptions,
316
    pub is_static: bool,
Ryan Olson's avatar
Ryan Olson committed
317
318
319
}

impl DistributedConfig {
320
    pub fn from_settings(is_static: bool) -> DistributedConfig {
Ryan Olson's avatar
Ryan Olson committed
321
322
323
        DistributedConfig {
            etcd_config: etcd::ClientOptions::default(),
            nats_config: nats::ClientOptions::default(),
324
            is_static,
Ryan Olson's avatar
Ryan Olson committed
325
326
        }
    }
Ryan Olson's avatar
Ryan Olson committed
327
328
329
330
331

    pub fn for_cli() -> DistributedConfig {
        let mut config = DistributedConfig {
            etcd_config: etcd::ClientOptions::default(),
            nats_config: nats::ClientOptions::default(),
332
            is_static: false,
Ryan Olson's avatar
Ryan Olson committed
333
334
335
336
337
338
        };

        config.etcd_config.attach_lease = false;

        config
    }
Ryan Olson's avatar
Ryan Olson committed
339
}