config.rs 16.6 KB
Newer Older
1
2
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
Ryan Olson's avatar
Ryan Olson committed
3
4
5
6
7
8
9
10

use super::Result;
use derive_builder::Builder;
use figment::{
    providers::{Env, Format, Serialized, Toml},
    Figment,
};
use serde::{Deserialize, Serialize};
11
use std::fmt;
Ryan Olson's avatar
Ryan Olson committed
12
13
use validator::Validate;

14
15
/// Default system host for health and metrics endpoints
const DEFAULT_SYSTEM_HOST: &str = "0.0.0.0";
16

17
18
/// Default system port for health and metrics endpoints
const DEFAULT_SYSTEM_PORT: u16 = 9090;
19

Ryan Olson's avatar
Ryan Olson committed
20
21
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkerConfig {
22
    /// Grace shutdown period for the system server.
Ryan Olson's avatar
Ryan Olson committed
23
24
25
26
    pub graceful_shutdown_timeout: u64,
}

impl WorkerConfig {
27
28
    /// Instantiates and reads server configurations from appropriate sources.
    /// Panics on invalid configuration.
Ryan Olson's avatar
Ryan Olson committed
29
30
31
32
    pub fn from_settings() -> Self {
        // All calls should be global and thread safe.
        Figment::new()
            .merge(Serialized::defaults(Self::default()))
33
            .merge(Env::prefixed("DYN_WORKER_"))
Ryan Olson's avatar
Ryan Olson committed
34
            .extract()
35
            .unwrap() // safety: Called on startup, so panic is reasonable
Ryan Olson's avatar
Ryan Olson committed
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
    }
}

impl Default for WorkerConfig {
    fn default() -> Self {
        WorkerConfig {
            graceful_shutdown_timeout: if cfg!(debug_assertions) {
                1 // Debug build: 1 second
            } else {
                30 // Release build: 30 seconds
            },
        }
    }
}

51
52
53
54
55
56
57
#[derive(Debug, Deserialize, Serialize, PartialEq, Clone)]
#[serde(rename_all = "lowercase")]
pub enum HealthStatus {
    Ready,
    NotReady,
}

Ryan Olson's avatar
Ryan Olson committed
58
59
60
61
62
/// Runtime configuration
/// Defines the configuration for Tokio runtimes
#[derive(Serialize, Deserialize, Validate, Debug, Builder, Clone)]
#[builder(build_fn(private, name = "build_internal"), derive(Debug, Serialize))]
pub struct RuntimeConfig {
63
    /// Number of async worker threads
Ryan Olson's avatar
Ryan Olson committed
64
    /// If set to 1, the runtime will run in single-threaded mode
65
66
    /// Set this at runtime with environment variable DYN_RUNTIME_NUM_WORKER_THREADS. Defaults to
    /// number of cores.
Ryan Olson's avatar
Ryan Olson committed
67
68
    #[validate(range(min = 1))]
    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
69
    pub num_worker_threads: Option<usize>,
Ryan Olson's avatar
Ryan Olson committed
70
71
72

    /// Maximum number of blocking threads
    /// Blocking threads are used for blocking operations, this value must be greater than 0.
73
74
    /// Set this at runtime with environment variable DYN_RUNTIME_MAX_BLOCKING_THREADS. Defaults to
    /// 512.
Ryan Olson's avatar
Ryan Olson committed
75
    #[validate(range(min = 1))]
76
    #[builder(default = "512")]
Ryan Olson's avatar
Ryan Olson committed
77
78
    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
    pub max_blocking_threads: usize,
79

80
81
82
    /// System server host for health and metrics endpoints
    /// Set this at runtime with environment variable DYN_SYSTEM_HOST
    #[builder(default = "DEFAULT_SYSTEM_HOST.to_string()")]
83
    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
84
    pub system_host: String,
85

86
    /// System server port for health and metrics endpoints
87
    /// If set to 0, the system will assign a random available port
88
89
    /// Set this at runtime with environment variable DYN_SYSTEM_PORT
    #[builder(default = "DEFAULT_SYSTEM_PORT")]
90
    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
91
    pub system_port: u16,
92

93
94
    /// Health and metrics System server enabled
    /// Set this at runtime with environment variable DYN_SYSTEM_ENABLED
95
96
    #[builder(default = "false")]
    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
97
    pub system_enabled: bool,
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112

    /// Starting Health Status
    /// Set this at runtime with environment variable DYN_SYSTEM_STARTING_HEALTH_STATUS
    #[builder(default = "HealthStatus::NotReady")]
    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
    pub starting_health_status: HealthStatus,

    /// Use Endpoint Health Status
    /// When using endpoint health status, health status
    /// is the AND of individual endpoint health
    /// Set this at runtime with environment variable DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS
    /// with the list of endpoints to consider for system health
    #[builder(default = "vec![]")]
    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
    pub use_endpoint_health_status: Vec<String>,
Ryan Olson's avatar
Ryan Olson committed
113
114
}

115
116
117
118
119
120
121
122
123
impl fmt::Display for RuntimeConfig {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        // If None, it defaults to "number of cores", so we indicate that.
        match self.num_worker_threads {
            Some(val) => write!(f, "num_worker_threads={val}, ")?,
            None => write!(f, "num_worker_threads=default (num_cores), ")?,
        }

        write!(f, "max_blocking_threads={}, ", self.max_blocking_threads)?;
124
125
126
        write!(f, "system_host={}, ", self.system_host)?;
        write!(f, "system_port={}, ", self.system_port)?;
        write!(f, "system_enabled={}", self.system_enabled)?;
127
128
129
130
131
132
133
134
135
136
        write!(
            f,
            "use_endpoint_health_status={:?}",
            self.use_endpoint_health_status
        )?;
        write!(
            f,
            "starting_health_status={:?}",
            self.starting_health_status
        )?;
137
138
139
140
141

        Ok(())
    }
}

Ryan Olson's avatar
Ryan Olson committed
142
143
144
145
146
147
148
149
impl RuntimeConfig {
    pub fn builder() -> RuntimeConfigBuilder {
        RuntimeConfigBuilder::default()
    }

    pub(crate) fn figment() -> Figment {
        Figment::new()
            .merge(Serialized::defaults(RuntimeConfig::default()))
Neelay Shah's avatar
Neelay Shah committed
150
151
            .merge(Toml::file("/opt/dynamo/defaults/runtime.toml"))
            .merge(Toml::file("/opt/dynamo/etc/runtime.toml"))
152
153
            .merge(Env::prefixed("DYN_RUNTIME_").filter_map(|k| {
                let full_key = format!("DYN_RUNTIME_{}", k.as_str());
154
155
156
157
158
159
                // filters out empty environment variables
                match std::env::var(&full_key) {
                    Ok(v) if !v.is_empty() => Some(k.into()),
                    _ => None,
                }
            }))
160
161
162
163
164
165
166
167
168
169
            .merge(Env::prefixed("DYN_SYSTEM_").filter_map(|k| {
                let full_key = format!("DYN_SYSTEM_{}", k.as_str());
                // filters out empty environment variables
                match std::env::var(&full_key) {
                    Ok(v) if !v.is_empty() => {
                        // Map DYN_SYSTEM_* to the correct field names
                        let mapped_key = match k.as_str() {
                            "HOST" => "system_host",
                            "PORT" => "system_port",
                            "ENABLED" => "system_enabled",
170
171
                            "USE_ENDPOINT_HEALTH_STATUS" => "use_endpoint_health_status",
                            "STARTING_HEALTH_STATUS" => "starting_health_status",
172
173
174
175
176
177
178
                            _ => k.as_str(),
                        };
                        Some(mapped_key.into())
                    }
                    _ => None,
                }
            }))
Ryan Olson's avatar
Ryan Olson committed
179
180
181
182
183
    }

    /// Load the runtime configuration from the environment and configuration files
    /// Configuration is priorities in the following order, where the last has the lowest priority:
    /// 1. Environment variables (top priority)
184
    ///    TO DO: Add documentation for configuration files. Paths should be configurable.
Neelay Shah's avatar
Neelay Shah committed
185
186
    /// 2. /opt/dynamo/etc/runtime.toml
    /// 3. /opt/dynamo/defaults/runtime.toml (lowest priority)
Ryan Olson's avatar
Ryan Olson committed
187
    ///
188
    /// Environment variables are prefixed with `DYN_RUNTIME_` and `DYN_SYSTEM`
Ryan Olson's avatar
Ryan Olson committed
189
190
191
192
193
194
    pub fn from_settings() -> Result<RuntimeConfig> {
        let config: RuntimeConfig = Self::figment().extract()?;
        config.validate()?;
        Ok(config)
    }

195
196
197
198
    /// Check if System server should be enabled
    /// System server is disabled by default, but can be enabled by setting DYN_SYSTEM_ENABLED to true
    pub fn system_server_enabled(&self) -> bool {
        self.system_enabled
199
200
    }

Ryan Olson's avatar
Ryan Olson committed
201
202
    pub fn single_threaded() -> Self {
        RuntimeConfig {
203
            num_worker_threads: Some(1),
Ryan Olson's avatar
Ryan Olson committed
204
            max_blocking_threads: 1,
205
206
207
            system_host: DEFAULT_SYSTEM_HOST.to_string(),
            system_port: DEFAULT_SYSTEM_PORT,
            system_enabled: false,
208
209
            starting_health_status: HealthStatus::NotReady,
            use_endpoint_health_status: vec![],
Ryan Olson's avatar
Ryan Olson committed
210
211
212
213
        }
    }

    /// Create a new default runtime configuration
214
215
216
217
218
219
    pub(crate) fn create_runtime(&self) -> std::io::Result<tokio::runtime::Runtime> {
        tokio::runtime::Builder::new_multi_thread()
            .worker_threads(
                self.num_worker_threads
                    .unwrap_or_else(|| std::thread::available_parallelism().unwrap().get()),
            )
Ryan Olson's avatar
Ryan Olson committed
220
221
            .max_blocking_threads(self.max_blocking_threads)
            .enable_all()
222
            .build()
Ryan Olson's avatar
Ryan Olson committed
223
224
225
226
227
    }
}

impl Default for RuntimeConfig {
    fn default() -> Self {
228
        let num_cores = std::thread::available_parallelism().unwrap().get();
229
        Self {
230
231
            num_worker_threads: Some(num_cores),
            max_blocking_threads: num_cores,
232
233
234
            system_host: DEFAULT_SYSTEM_HOST.to_string(),
            system_port: DEFAULT_SYSTEM_PORT,
            system_enabled: false,
235
236
            starting_health_status: HealthStatus::NotReady,
            use_endpoint_health_status: vec![],
237
        }
Ryan Olson's avatar
Ryan Olson committed
238
239
240
241
242
243
244
245
246
247
248
    }
}

impl RuntimeConfigBuilder {
    /// Build and validate the runtime configuration
    pub fn build(&self) -> Result<RuntimeConfig> {
        let config = self.build_internal()?;
        config.validate()?;
        Ok(config)
    }
}
249

250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
/// Check if a string is truthy
/// This will be used to evaluate environment variables or any other subjective
/// configuration parameters that can be set by the user that should be evaluated
/// as a boolean value.
pub fn is_truthy(val: &str) -> bool {
    matches!(val.to_lowercase().as_str(), "1" | "true" | "on" | "yes")
}

/// Check if a string is falsey
/// This will be used to evaluate environment variables or any other subjective
/// configuration parameters that can be set by the user that should be evaluated
/// as a boolean value (opposite of is_truthy).
pub fn is_falsey(val: &str) -> bool {
    matches!(val.to_lowercase().as_str(), "0" | "false" | "off" | "no")
}

266
267
268
269
270
271
272
273
/// Check if an environment variable is truthy
pub fn env_is_truthy(env: &str) -> bool {
    match std::env::var(env) {
        Ok(val) => is_truthy(val.as_str()),
        Err(_) => false,
    }
}

274
275
276
277
278
279
/// Check if an environment variable is falsey
pub fn env_is_falsey(env: &str) -> bool {
    match std::env::var(env) {
        Ok(val) => is_falsey(val.as_str()),
        Err(_) => false,
    }
280
281
282
}

/// Check whether JSONL logging enabled
283
/// Set the `DYN_LOGGING_JSONL` environment variable a [`is_truthy`] value
284
pub fn jsonl_logging_enabled() -> bool {
285
    env_is_truthy("DYN_LOGGING_JSONL")
286
287
288
}

/// Check whether logging with ANSI terminal escape codes and colors is disabled.
289
/// Set the `DYN_SDK_DISABLE_ANSI_LOGGING` environment variable a [`is_truthy`] value
290
pub fn disable_ansi_logging() -> bool {
291
    env_is_truthy("DYN_SDK_DISABLE_ANSI_LOGGING")
292
}
293

Ryan Olson's avatar
Ryan Olson committed
294
295
296
297
298
299
/// Check whether to use local timezone for logging timestamps (default is UTC)
/// Set the `DYN_LOG_USE_LOCAL_TZ` environment variable to a [`is_truthy`] value
pub fn use_local_timezone() -> bool {
    env_is_truthy("DYN_LOG_USE_LOCAL_TZ")
}

300
301
302
303
304
305
306
307
#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_runtime_config_with_env_vars() -> Result<()> {
        temp_env::with_vars(
            vec![
308
309
                ("DYN_RUNTIME_NUM_WORKER_THREADS", Some("24")),
                ("DYN_RUNTIME_MAX_BLOCKING_THREADS", Some("32")),
310
311
312
            ],
            || {
                let config = RuntimeConfig::from_settings()?;
313
                assert_eq!(config.num_worker_threads, Some(24));
314
315
316
317
318
319
320
321
322
323
                assert_eq!(config.max_blocking_threads, 32);
                Ok(())
            },
        )
    }

    #[test]
    fn test_runtime_config_defaults() -> Result<()> {
        temp_env::with_vars(
            vec![
324
325
                ("DYN_RUNTIME_NUM_WORKER_THREADS", None::<&str>),
                ("DYN_RUNTIME_MAX_BLOCKING_THREADS", Some("")),
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
            ],
            || {
                let config = RuntimeConfig::from_settings()?;

                let default_config = RuntimeConfig::default();
                assert_eq!(config.num_worker_threads, default_config.num_worker_threads);
                assert_eq!(
                    config.max_blocking_threads,
                    default_config.max_blocking_threads
                );
                Ok(())
            },
        )
    }

    #[test]
    fn test_runtime_config_rejects_invalid_thread_count() -> Result<()> {
        temp_env::with_vars(
            vec![
345
346
                ("DYN_RUNTIME_NUM_WORKER_THREADS", Some("0")),
                ("DYN_RUNTIME_MAX_BLOCKING_THREADS", Some("0")),
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
            ],
            || {
                let result = RuntimeConfig::from_settings();
                assert!(result.is_err());
                if let Err(e) = result {
                    assert!(e
                        .to_string()
                        .contains("num_worker_threads: Validation error"));
                    assert!(e
                        .to_string()
                        .contains("max_blocking_threads: Validation error"));
                }
                Ok(())
            },
        )
    }
363
364

    #[test]
365
    fn test_runtime_config_system_server_env_vars() -> Result<()> {
366
367
        temp_env::with_vars(
            vec![
368
369
                ("DYN_SYSTEM_HOST", Some("127.0.0.1")),
                ("DYN_SYSTEM_PORT", Some("9090")),
370
371
372
            ],
            || {
                let config = RuntimeConfig::from_settings()?;
373
374
                assert_eq!(config.system_host, "127.0.0.1");
                assert_eq!(config.system_port, 9090);
375
376
377
378
379
380
                Ok(())
            },
        )
    }

    #[test]
381
382
    fn test_system_server_enabled_by_default() {
        temp_env::with_vars(vec![("DYN_SYSTEM_ENABLED", None::<&str>)], || {
383
            let config = RuntimeConfig::from_settings().unwrap();
384
            assert!(!config.system_server_enabled());
385
386
387
388
        });
    }

    #[test]
389
390
    fn test_system_server_disabled_explicitly() {
        temp_env::with_vars(vec![("DYN_SYSTEM_ENABLED", Some("false"))], || {
391
            let config = RuntimeConfig::from_settings().unwrap();
392
            assert!(!config.system_server_enabled());
393
394
395
396
        });
    }

    #[test]
397
398
    fn test_system_server_enabled_explicitly() {
        temp_env::with_vars(vec![("DYN_SYSTEM_ENABLED", Some("true"))], || {
399
            let config = RuntimeConfig::from_settings().unwrap();
400
            assert!(config.system_server_enabled());
401
402
403
404
        });
    }

    #[test]
405
406
    fn test_system_server_enabled_by_port() {
        temp_env::with_vars(vec![("DYN_SYSTEM_PORT", Some("8080"))], || {
407
            let config = RuntimeConfig::from_settings().unwrap();
408
409
            assert!(!config.system_server_enabled());
            assert_eq!(config.system_port, 8080);
410
411
412
        });
    }

413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
    #[test]
    fn test_system_server_starting_health_status_ready() {
        temp_env::with_vars(
            vec![("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready"))],
            || {
                let config = RuntimeConfig::from_settings().unwrap();
                assert!(config.starting_health_status == HealthStatus::Ready);
            },
        );
    }

    #[test]
    fn test_system_use_endpoint_health_status() {
        temp_env::with_vars(
            vec![("DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Some("[\"ready\"]"))],
            || {
                let config = RuntimeConfig::from_settings().unwrap();
                assert!(config.use_endpoint_health_status == vec!["ready"]);
            },
        );
    }

435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
    #[test]
    fn test_is_truthy_and_falsey() {
        // Test truthy values
        assert!(is_truthy("1"));
        assert!(is_truthy("true"));
        assert!(is_truthy("TRUE"));
        assert!(is_truthy("on"));
        assert!(is_truthy("yes"));

        // Test falsey values
        assert!(is_falsey("0"));
        assert!(is_falsey("false"));
        assert!(is_falsey("FALSE"));
        assert!(is_falsey("off"));
        assert!(is_falsey("no"));

        // Test opposite behavior
        assert!(!is_truthy("0"));
        assert!(!is_falsey("1"));

        // Test env functions
        temp_env::with_vars(vec![("TEST_TRUTHY", Some("true"))], || {
            assert!(env_is_truthy("TEST_TRUTHY"));
            assert!(!env_is_falsey("TEST_TRUTHY"));
        });

        temp_env::with_vars(vec![("TEST_FALSEY", Some("false"))], || {
            assert!(!env_is_truthy("TEST_FALSEY"));
            assert!(env_is_falsey("TEST_FALSEY"));
        });

        // Test missing env vars
        temp_env::with_vars(vec![("TEST_MISSING", None::<&str>)], || {
            assert!(!env_is_truthy("TEST_MISSING"));
            assert!(!env_is_falsey("TEST_MISSING"));
        });
    }
472
}