config.rs 19 KB
Newer Older
1
2
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
Ryan Olson's avatar
Ryan Olson committed
3
4
5
6
7

use super::Result;
use derive_builder::Builder;
use figment::{
    Figment,
8
    providers::{Env, Format, Serialized, Toml},
Ryan Olson's avatar
Ryan Olson committed
9
10
};
use serde::{Deserialize, Serialize};
11
use std::fmt;
Ryan Olson's avatar
Ryan Olson committed
12
13
use validator::Validate;

14
15
/// Default system host for health and metrics endpoints
const DEFAULT_SYSTEM_HOST: &str = "0.0.0.0";
16

17
18
/// Default system port for health and metrics endpoints
const DEFAULT_SYSTEM_PORT: u16 = 9090;
19

20
21
22
23
/// Default health endpoint paths
const DEFAULT_SYSTEM_HEALTH_PATH: &str = "/health";
const DEFAULT_SYSTEM_LIVE_PATH: &str = "/live";

Ryan Olson's avatar
Ryan Olson committed
24
25
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkerConfig {
26
    /// Grace shutdown period for the system server.
Ryan Olson's avatar
Ryan Olson committed
27
28
29
30
    pub graceful_shutdown_timeout: u64,
}

impl WorkerConfig {
31
32
    /// Instantiates and reads server configurations from appropriate sources.
    /// Panics on invalid configuration.
Ryan Olson's avatar
Ryan Olson committed
33
34
35
36
    pub fn from_settings() -> Self {
        // All calls should be global and thread safe.
        Figment::new()
            .merge(Serialized::defaults(Self::default()))
37
            .merge(Env::prefixed("DYN_WORKER_"))
Ryan Olson's avatar
Ryan Olson committed
38
            .extract()
39
            .unwrap() // safety: Called on startup, so panic is reasonable
Ryan Olson's avatar
Ryan Olson committed
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
    }
}

impl Default for WorkerConfig {
    fn default() -> Self {
        WorkerConfig {
            graceful_shutdown_timeout: if cfg!(debug_assertions) {
                1 // Debug build: 1 second
            } else {
                30 // Release build: 30 seconds
            },
        }
    }
}

55
56
57
58
59
60
61
#[derive(Debug, Deserialize, Serialize, PartialEq, Clone)]
#[serde(rename_all = "lowercase")]
pub enum HealthStatus {
    Ready,
    NotReady,
}

Ryan Olson's avatar
Ryan Olson committed
62
63
64
65
66
/// Runtime configuration
/// Defines the configuration for Tokio runtimes
#[derive(Serialize, Deserialize, Validate, Debug, Builder, Clone)]
#[builder(build_fn(private, name = "build_internal"), derive(Debug, Serialize))]
pub struct RuntimeConfig {
67
    /// Number of async worker threads
Ryan Olson's avatar
Ryan Olson committed
68
    /// If set to 1, the runtime will run in single-threaded mode
69
70
    /// Set this at runtime with environment variable DYN_RUNTIME_NUM_WORKER_THREADS. Defaults to
    /// number of cores.
Ryan Olson's avatar
Ryan Olson committed
71
72
    #[validate(range(min = 1))]
    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
73
    pub num_worker_threads: Option<usize>,
Ryan Olson's avatar
Ryan Olson committed
74
75
76

    /// Maximum number of blocking threads
    /// Blocking threads are used for blocking operations, this value must be greater than 0.
77
78
    /// Set this at runtime with environment variable DYN_RUNTIME_MAX_BLOCKING_THREADS. Defaults to
    /// 512.
Ryan Olson's avatar
Ryan Olson committed
79
    #[validate(range(min = 1))]
80
    #[builder(default = "512")]
Ryan Olson's avatar
Ryan Olson committed
81
82
    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
    pub max_blocking_threads: usize,
83

84
    /// System status server host for health and metrics endpoints
85
86
    /// Set this at runtime with environment variable DYN_SYSTEM_HOST
    #[builder(default = "DEFAULT_SYSTEM_HOST.to_string()")]
87
    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
88
    pub system_host: String,
89

90
    /// System status server port for health and metrics endpoints
91
    /// If set to 0, the system will assign a random available port
92
93
    /// Set this at runtime with environment variable DYN_SYSTEM_PORT
    #[builder(default = "DEFAULT_SYSTEM_PORT")]
94
    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
95
    pub system_port: u16,
96

97
    /// Health and metrics System status server enabled
98
    /// Set this at runtime with environment variable DYN_SYSTEM_ENABLED
99
100
    #[builder(default = "false")]
    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
101
    pub system_enabled: bool,
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116

    /// Starting Health Status
    /// Set this at runtime with environment variable DYN_SYSTEM_STARTING_HEALTH_STATUS
    #[builder(default = "HealthStatus::NotReady")]
    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
    pub starting_health_status: HealthStatus,

    /// Use Endpoint Health Status
    /// When using endpoint health status, health status
    /// is the AND of individual endpoint health
    /// Set this at runtime with environment variable DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS
    /// with the list of endpoints to consider for system health
    #[builder(default = "vec![]")]
    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
    pub use_endpoint_health_status: Vec<String>,
117
118
119
120
121
122
123
124
125
126

    /// Health endpoint paths
    /// Set this at runtime with environment variable DYN_SYSTEM_HEALTH_PATH
    #[builder(default = "DEFAULT_SYSTEM_HEALTH_PATH.to_string()")]
    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
    pub system_health_path: String,
    /// Set this at runtime with environment variable DYN_SYSTEM_LIVE_PATH
    #[builder(default = "DEFAULT_SYSTEM_LIVE_PATH.to_string()")]
    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
    pub system_live_path: String,
Ryan Olson's avatar
Ryan Olson committed
127
128
}

129
130
131
132
133
134
135
136
137
impl fmt::Display for RuntimeConfig {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        // If None, it defaults to "number of cores", so we indicate that.
        match self.num_worker_threads {
            Some(val) => write!(f, "num_worker_threads={val}, ")?,
            None => write!(f, "num_worker_threads=default (num_cores), ")?,
        }

        write!(f, "max_blocking_threads={}, ", self.max_blocking_threads)?;
138
139
140
        write!(f, "system_host={}, ", self.system_host)?;
        write!(f, "system_port={}, ", self.system_port)?;
        write!(f, "system_enabled={}", self.system_enabled)?;
141
142
143
144
145
146
147
148
149
150
        write!(
            f,
            "use_endpoint_health_status={:?}",
            self.use_endpoint_health_status
        )?;
        write!(
            f,
            "starting_health_status={:?}",
            self.starting_health_status
        )?;
151
152
        write!(f, ", system_health_path={}", self.system_health_path)?;
        write!(f, ", system_live_path={}", self.system_live_path)?;
153
154
155
156
157

        Ok(())
    }
}

Ryan Olson's avatar
Ryan Olson committed
158
159
160
161
162
163
164
165
impl RuntimeConfig {
    pub fn builder() -> RuntimeConfigBuilder {
        RuntimeConfigBuilder::default()
    }

    pub(crate) fn figment() -> Figment {
        Figment::new()
            .merge(Serialized::defaults(RuntimeConfig::default()))
Neelay Shah's avatar
Neelay Shah committed
166
167
            .merge(Toml::file("/opt/dynamo/defaults/runtime.toml"))
            .merge(Toml::file("/opt/dynamo/etc/runtime.toml"))
168
169
            .merge(Env::prefixed("DYN_RUNTIME_").filter_map(|k| {
                let full_key = format!("DYN_RUNTIME_{}", k.as_str());
170
171
172
173
174
175
                // filters out empty environment variables
                match std::env::var(&full_key) {
                    Ok(v) if !v.is_empty() => Some(k.into()),
                    _ => None,
                }
            }))
176
177
178
179
180
181
182
183
184
185
            .merge(Env::prefixed("DYN_SYSTEM_").filter_map(|k| {
                let full_key = format!("DYN_SYSTEM_{}", k.as_str());
                // filters out empty environment variables
                match std::env::var(&full_key) {
                    Ok(v) if !v.is_empty() => {
                        // Map DYN_SYSTEM_* to the correct field names
                        let mapped_key = match k.as_str() {
                            "HOST" => "system_host",
                            "PORT" => "system_port",
                            "ENABLED" => "system_enabled",
186
187
                            "USE_ENDPOINT_HEALTH_STATUS" => "use_endpoint_health_status",
                            "STARTING_HEALTH_STATUS" => "starting_health_status",
188
189
                            "HEALTH_PATH" => "system_health_path",
                            "LIVE_PATH" => "system_live_path",
190
191
192
193
194
195
196
                            _ => k.as_str(),
                        };
                        Some(mapped_key.into())
                    }
                    _ => None,
                }
            }))
Ryan Olson's avatar
Ryan Olson committed
197
198
199
200
201
    }

    /// Load the runtime configuration from the environment and configuration files
    /// Configuration is priorities in the following order, where the last has the lowest priority:
    /// 1. Environment variables (top priority)
202
    ///    TO DO: Add documentation for configuration files. Paths should be configurable.
Neelay Shah's avatar
Neelay Shah committed
203
204
    /// 2. /opt/dynamo/etc/runtime.toml
    /// 3. /opt/dynamo/defaults/runtime.toml (lowest priority)
Ryan Olson's avatar
Ryan Olson committed
205
    ///
206
    /// Environment variables are prefixed with `DYN_RUNTIME_` and `DYN_SYSTEM`
Ryan Olson's avatar
Ryan Olson committed
207
208
209
210
211
212
    pub fn from_settings() -> Result<RuntimeConfig> {
        let config: RuntimeConfig = Self::figment().extract()?;
        config.validate()?;
        Ok(config)
    }

213
214
215
216
    /// Check if System server should be enabled
    /// System server is disabled by default, but can be enabled by setting DYN_SYSTEM_ENABLED to true
    pub fn system_server_enabled(&self) -> bool {
        self.system_enabled
217
218
    }

Ryan Olson's avatar
Ryan Olson committed
219
220
    pub fn single_threaded() -> Self {
        RuntimeConfig {
221
            num_worker_threads: Some(1),
Ryan Olson's avatar
Ryan Olson committed
222
            max_blocking_threads: 1,
223
224
225
            system_host: DEFAULT_SYSTEM_HOST.to_string(),
            system_port: DEFAULT_SYSTEM_PORT,
            system_enabled: false,
226
227
            starting_health_status: HealthStatus::NotReady,
            use_endpoint_health_status: vec![],
228
229
            system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(),
            system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(),
Ryan Olson's avatar
Ryan Olson committed
230
231
232
233
        }
    }

    /// Create a new default runtime configuration
234
235
236
237
238
239
    pub(crate) fn create_runtime(&self) -> std::io::Result<tokio::runtime::Runtime> {
        tokio::runtime::Builder::new_multi_thread()
            .worker_threads(
                self.num_worker_threads
                    .unwrap_or_else(|| std::thread::available_parallelism().unwrap().get()),
            )
Ryan Olson's avatar
Ryan Olson committed
240
241
            .max_blocking_threads(self.max_blocking_threads)
            .enable_all()
242
            .build()
Ryan Olson's avatar
Ryan Olson committed
243
244
245
246
247
    }
}

impl Default for RuntimeConfig {
    fn default() -> Self {
248
        let num_cores = std::thread::available_parallelism().unwrap().get();
249
        Self {
250
251
            num_worker_threads: Some(num_cores),
            max_blocking_threads: num_cores,
252
253
254
            system_host: DEFAULT_SYSTEM_HOST.to_string(),
            system_port: DEFAULT_SYSTEM_PORT,
            system_enabled: false,
255
256
            starting_health_status: HealthStatus::NotReady,
            use_endpoint_health_status: vec![],
257
258
            system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(),
            system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(),
259
        }
Ryan Olson's avatar
Ryan Olson committed
260
261
262
263
264
265
266
267
268
269
270
    }
}

impl RuntimeConfigBuilder {
    /// Build and validate the runtime configuration
    pub fn build(&self) -> Result<RuntimeConfig> {
        let config = self.build_internal()?;
        config.validate()?;
        Ok(config)
    }
}
271

272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
/// Check if a string is truthy
/// This will be used to evaluate environment variables or any other subjective
/// configuration parameters that can be set by the user that should be evaluated
/// as a boolean value.
pub fn is_truthy(val: &str) -> bool {
    matches!(val.to_lowercase().as_str(), "1" | "true" | "on" | "yes")
}

/// Check if a string is falsey
/// This will be used to evaluate environment variables or any other subjective
/// configuration parameters that can be set by the user that should be evaluated
/// as a boolean value (opposite of is_truthy).
pub fn is_falsey(val: &str) -> bool {
    matches!(val.to_lowercase().as_str(), "0" | "false" | "off" | "no")
}

288
289
290
291
292
293
294
295
/// Check if an environment variable is truthy
pub fn env_is_truthy(env: &str) -> bool {
    match std::env::var(env) {
        Ok(val) => is_truthy(val.as_str()),
        Err(_) => false,
    }
}

296
297
298
299
300
301
/// Check if an environment variable is falsey
pub fn env_is_falsey(env: &str) -> bool {
    match std::env::var(env) {
        Ok(val) => is_falsey(val.as_str()),
        Err(_) => false,
    }
302
303
304
}

/// Check whether JSONL logging enabled
305
/// Set the `DYN_LOGGING_JSONL` environment variable a [`is_truthy`] value
306
pub fn jsonl_logging_enabled() -> bool {
307
    env_is_truthy("DYN_LOGGING_JSONL")
308
309
310
}

/// Check whether logging with ANSI terminal escape codes and colors is disabled.
311
/// Set the `DYN_SDK_DISABLE_ANSI_LOGGING` environment variable a [`is_truthy`] value
312
pub fn disable_ansi_logging() -> bool {
313
    env_is_truthy("DYN_SDK_DISABLE_ANSI_LOGGING")
314
}
315

Ryan Olson's avatar
Ryan Olson committed
316
317
318
319
320
321
/// Check whether to use local timezone for logging timestamps (default is UTC)
/// Set the `DYN_LOG_USE_LOCAL_TZ` environment variable to a [`is_truthy`] value
pub fn use_local_timezone() -> bool {
    env_is_truthy("DYN_LOG_USE_LOCAL_TZ")
}

322
323
324
325
326
327
328
329
#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_runtime_config_with_env_vars() -> Result<()> {
        temp_env::with_vars(
            vec![
330
331
                ("DYN_RUNTIME_NUM_WORKER_THREADS", Some("24")),
                ("DYN_RUNTIME_MAX_BLOCKING_THREADS", Some("32")),
332
333
334
            ],
            || {
                let config = RuntimeConfig::from_settings()?;
335
                assert_eq!(config.num_worker_threads, Some(24));
336
337
338
339
340
341
342
343
344
345
                assert_eq!(config.max_blocking_threads, 32);
                Ok(())
            },
        )
    }

    #[test]
    fn test_runtime_config_defaults() -> Result<()> {
        temp_env::with_vars(
            vec![
346
347
                ("DYN_RUNTIME_NUM_WORKER_THREADS", None::<&str>),
                ("DYN_RUNTIME_MAX_BLOCKING_THREADS", Some("")),
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
            ],
            || {
                let config = RuntimeConfig::from_settings()?;

                let default_config = RuntimeConfig::default();
                assert_eq!(config.num_worker_threads, default_config.num_worker_threads);
                assert_eq!(
                    config.max_blocking_threads,
                    default_config.max_blocking_threads
                );
                Ok(())
            },
        )
    }

    #[test]
    fn test_runtime_config_rejects_invalid_thread_count() -> Result<()> {
        temp_env::with_vars(
            vec![
367
368
                ("DYN_RUNTIME_NUM_WORKER_THREADS", Some("0")),
                ("DYN_RUNTIME_MAX_BLOCKING_THREADS", Some("0")),
369
370
371
372
373
            ],
            || {
                let result = RuntimeConfig::from_settings();
                assert!(result.is_err());
                if let Err(e) = result {
374
375
376
377
378
379
380
381
                    assert!(
                        e.to_string()
                            .contains("num_worker_threads: Validation error")
                    );
                    assert!(
                        e.to_string()
                            .contains("max_blocking_threads: Validation error")
                    );
382
383
384
385
386
                }
                Ok(())
            },
        )
    }
387
388

    #[test]
389
    fn test_runtime_config_system_server_env_vars() -> Result<()> {
390
391
        temp_env::with_vars(
            vec![
392
393
                ("DYN_SYSTEM_HOST", Some("127.0.0.1")),
                ("DYN_SYSTEM_PORT", Some("9090")),
394
395
396
            ],
            || {
                let config = RuntimeConfig::from_settings()?;
397
398
                assert_eq!(config.system_host, "127.0.0.1");
                assert_eq!(config.system_port, 9090);
399
400
401
402
403
404
                Ok(())
            },
        )
    }

    #[test]
405
406
    fn test_system_server_enabled_by_default() {
        temp_env::with_vars(vec![("DYN_SYSTEM_ENABLED", None::<&str>)], || {
407
            let config = RuntimeConfig::from_settings().unwrap();
408
            assert!(!config.system_server_enabled());
409
410
411
412
        });
    }

    #[test]
413
414
    fn test_system_server_disabled_explicitly() {
        temp_env::with_vars(vec![("DYN_SYSTEM_ENABLED", Some("false"))], || {
415
            let config = RuntimeConfig::from_settings().unwrap();
416
            assert!(!config.system_server_enabled());
417
418
419
420
        });
    }

    #[test]
421
422
    fn test_system_server_enabled_explicitly() {
        temp_env::with_vars(vec![("DYN_SYSTEM_ENABLED", Some("true"))], || {
423
            let config = RuntimeConfig::from_settings().unwrap();
424
            assert!(config.system_server_enabled());
425
426
427
428
        });
    }

    #[test]
429
430
    fn test_system_server_enabled_by_port() {
        temp_env::with_vars(vec![("DYN_SYSTEM_PORT", Some("8080"))], || {
431
            let config = RuntimeConfig::from_settings().unwrap();
432
433
            assert!(!config.system_server_enabled());
            assert_eq!(config.system_port, 8080);
434
435
436
        });
    }

437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
    #[test]
    fn test_system_server_starting_health_status_ready() {
        temp_env::with_vars(
            vec![("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready"))],
            || {
                let config = RuntimeConfig::from_settings().unwrap();
                assert!(config.starting_health_status == HealthStatus::Ready);
            },
        );
    }

    #[test]
    fn test_system_use_endpoint_health_status() {
        temp_env::with_vars(
            vec![("DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Some("[\"ready\"]"))],
            || {
                let config = RuntimeConfig::from_settings().unwrap();
                assert!(config.use_endpoint_health_status == vec!["ready"]);
            },
        );
    }

459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
    #[test]
    fn test_system_health_endpoint_path_default() {
        temp_env::with_vars(vec![("DYN_SYSTEM_HEALTH_PATH", None::<&str>)], || {
            let config = RuntimeConfig::from_settings().unwrap();
            assert_eq!(
                config.system_health_path,
                DEFAULT_SYSTEM_HEALTH_PATH.to_string()
            );
        });

        temp_env::with_vars(vec![("DYN_SYSTEM_LIVE_PATH", None::<&str>)], || {
            let config = RuntimeConfig::from_settings().unwrap();
            assert_eq!(
                config.system_live_path,
                DEFAULT_SYSTEM_LIVE_PATH.to_string()
            );
        });
    }

    #[test]
    fn test_system_health_endpoint_path_custom() {
        temp_env::with_vars(
            vec![("DYN_SYSTEM_HEALTH_PATH", Some("/custom/health"))],
            || {
                let config = RuntimeConfig::from_settings().unwrap();
                assert_eq!(config.system_health_path, "/custom/health");
            },
        );

        temp_env::with_vars(vec![("DYN_SYSTEM_LIVE_PATH", Some("/custom/live"))], || {
            let config = RuntimeConfig::from_settings().unwrap();
            assert_eq!(config.system_live_path, "/custom/live");
        });
    }

494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
    #[test]
    fn test_is_truthy_and_falsey() {
        // Test truthy values
        assert!(is_truthy("1"));
        assert!(is_truthy("true"));
        assert!(is_truthy("TRUE"));
        assert!(is_truthy("on"));
        assert!(is_truthy("yes"));

        // Test falsey values
        assert!(is_falsey("0"));
        assert!(is_falsey("false"));
        assert!(is_falsey("FALSE"));
        assert!(is_falsey("off"));
        assert!(is_falsey("no"));

        // Test opposite behavior
        assert!(!is_truthy("0"));
        assert!(!is_falsey("1"));

        // Test env functions
        temp_env::with_vars(vec![("TEST_TRUTHY", Some("true"))], || {
            assert!(env_is_truthy("TEST_TRUTHY"));
            assert!(!env_is_falsey("TEST_TRUTHY"));
        });

        temp_env::with_vars(vec![("TEST_FALSEY", Some("false"))], || {
            assert!(!env_is_truthy("TEST_FALSEY"));
            assert!(env_is_falsey("TEST_FALSEY"));
        });

        // Test missing env vars
        temp_env::with_vars(vec![("TEST_MISSING", None::<&str>)], || {
            assert!(!env_is_truthy("TEST_MISSING"));
            assert!(!env_is_falsey("TEST_MISSING"));
        });
    }
531
}