config.rs 14.3 KB
Newer Older
1
2
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
Ryan Olson's avatar
Ryan Olson committed
3
4
5
6
7
8
9
10

use super::Result;
use derive_builder::Builder;
use figment::{
    providers::{Env, Format, Serialized, Toml},
    Figment,
};
use serde::{Deserialize, Serialize};
11
use std::fmt;
Ryan Olson's avatar
Ryan Olson committed
12
13
use validator::Validate;

14
15
/// Default system host for health and metrics endpoints
const DEFAULT_SYSTEM_HOST: &str = "0.0.0.0";
16

17
18
/// Default system port for health and metrics endpoints
const DEFAULT_SYSTEM_PORT: u16 = 9090;
19

Ryan Olson's avatar
Ryan Olson committed
20
21
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkerConfig {
22
    /// Grace shutdown period for the system server.
Ryan Olson's avatar
Ryan Olson committed
23
24
25
26
    pub graceful_shutdown_timeout: u64,
}

impl WorkerConfig {
27
28
    /// Instantiates and reads server configurations from appropriate sources.
    /// Panics on invalid configuration.
Ryan Olson's avatar
Ryan Olson committed
29
30
31
32
    pub fn from_settings() -> Self {
        // All calls should be global and thread safe.
        Figment::new()
            .merge(Serialized::defaults(Self::default()))
33
            .merge(Env::prefixed("DYN_WORKER_"))
Ryan Olson's avatar
Ryan Olson committed
34
            .extract()
35
            .unwrap() // safety: Called on startup, so panic is reasonable
Ryan Olson's avatar
Ryan Olson committed
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
    }
}

impl Default for WorkerConfig {
    fn default() -> Self {
        WorkerConfig {
            graceful_shutdown_timeout: if cfg!(debug_assertions) {
                1 // Debug build: 1 second
            } else {
                30 // Release build: 30 seconds
            },
        }
    }
}

/// Runtime configuration
/// Defines the configuration for Tokio runtimes
#[derive(Serialize, Deserialize, Validate, Debug, Builder, Clone)]
#[builder(build_fn(private, name = "build_internal"), derive(Debug, Serialize))]
pub struct RuntimeConfig {
56
    /// Number of async worker threads
Ryan Olson's avatar
Ryan Olson committed
57
    /// If set to 1, the runtime will run in single-threaded mode
58
59
    /// Set this at runtime with environment variable DYN_RUNTIME_NUM_WORKER_THREADS. Defaults to
    /// number of cores.
Ryan Olson's avatar
Ryan Olson committed
60
61
    #[validate(range(min = 1))]
    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
62
    pub num_worker_threads: Option<usize>,
Ryan Olson's avatar
Ryan Olson committed
63
64
65

    /// Maximum number of blocking threads
    /// Blocking threads are used for blocking operations, this value must be greater than 0.
66
67
    /// Set this at runtime with environment variable DYN_RUNTIME_MAX_BLOCKING_THREADS. Defaults to
    /// 512.
Ryan Olson's avatar
Ryan Olson committed
68
    #[validate(range(min = 1))]
69
    #[builder(default = "512")]
Ryan Olson's avatar
Ryan Olson committed
70
71
    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
    pub max_blocking_threads: usize,
72

73
74
75
    /// System server host for health and metrics endpoints
    /// Set this at runtime with environment variable DYN_SYSTEM_HOST
    #[builder(default = "DEFAULT_SYSTEM_HOST.to_string()")]
76
    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
77
    pub system_host: String,
78

79
    /// System server port for health and metrics endpoints
80
    /// If set to 0, the system will assign a random available port
81
82
    /// Set this at runtime with environment variable DYN_SYSTEM_PORT
    #[builder(default = "DEFAULT_SYSTEM_PORT")]
83
    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
84
    pub system_port: u16,
85

86
87
    /// Health and metrics System server enabled
    /// Set this at runtime with environment variable DYN_SYSTEM_ENABLED
88
89
    #[builder(default = "false")]
    #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
90
    pub system_enabled: bool,
Ryan Olson's avatar
Ryan Olson committed
91
92
}

93
94
95
96
97
98
99
100
101
impl fmt::Display for RuntimeConfig {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        // If None, it defaults to "number of cores", so we indicate that.
        match self.num_worker_threads {
            Some(val) => write!(f, "num_worker_threads={val}, ")?,
            None => write!(f, "num_worker_threads=default (num_cores), ")?,
        }

        write!(f, "max_blocking_threads={}, ", self.max_blocking_threads)?;
102
103
104
        write!(f, "system_host={}, ", self.system_host)?;
        write!(f, "system_port={}, ", self.system_port)?;
        write!(f, "system_enabled={}", self.system_enabled)?;
105
106
107
108
109

        Ok(())
    }
}

Ryan Olson's avatar
Ryan Olson committed
110
111
112
113
114
115
116
117
impl RuntimeConfig {
    pub fn builder() -> RuntimeConfigBuilder {
        RuntimeConfigBuilder::default()
    }

    pub(crate) fn figment() -> Figment {
        Figment::new()
            .merge(Serialized::defaults(RuntimeConfig::default()))
Neelay Shah's avatar
Neelay Shah committed
118
119
            .merge(Toml::file("/opt/dynamo/defaults/runtime.toml"))
            .merge(Toml::file("/opt/dynamo/etc/runtime.toml"))
120
121
            .merge(Env::prefixed("DYN_RUNTIME_").filter_map(|k| {
                let full_key = format!("DYN_RUNTIME_{}", k.as_str());
122
123
124
125
126
127
                // filters out empty environment variables
                match std::env::var(&full_key) {
                    Ok(v) if !v.is_empty() => Some(k.into()),
                    _ => None,
                }
            }))
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
            .merge(Env::prefixed("DYN_SYSTEM_").filter_map(|k| {
                let full_key = format!("DYN_SYSTEM_{}", k.as_str());
                // filters out empty environment variables
                match std::env::var(&full_key) {
                    Ok(v) if !v.is_empty() => {
                        // Map DYN_SYSTEM_* to the correct field names
                        let mapped_key = match k.as_str() {
                            "HOST" => "system_host",
                            "PORT" => "system_port",
                            "ENABLED" => "system_enabled",
                            _ => k.as_str(),
                        };
                        Some(mapped_key.into())
                    }
                    _ => None,
                }
            }))
Ryan Olson's avatar
Ryan Olson committed
145
146
147
148
149
    }

    /// Load the runtime configuration from the environment and configuration files
    /// Configuration is priorities in the following order, where the last has the lowest priority:
    /// 1. Environment variables (top priority)
150
    ///    TO DO: Add documentation for configuration files. Paths should be configurable.
Neelay Shah's avatar
Neelay Shah committed
151
152
    /// 2. /opt/dynamo/etc/runtime.toml
    /// 3. /opt/dynamo/defaults/runtime.toml (lowest priority)
Ryan Olson's avatar
Ryan Olson committed
153
    ///
154
    /// Environment variables are prefixed with `DYN_RUNTIME_`
Ryan Olson's avatar
Ryan Olson committed
155
156
157
158
159
160
    pub fn from_settings() -> Result<RuntimeConfig> {
        let config: RuntimeConfig = Self::figment().extract()?;
        config.validate()?;
        Ok(config)
    }

161
162
163
164
    /// Check if System server should be enabled
    /// System server is disabled by default, but can be enabled by setting DYN_SYSTEM_ENABLED to true
    pub fn system_server_enabled(&self) -> bool {
        self.system_enabled
165
166
    }

Ryan Olson's avatar
Ryan Olson committed
167
168
    pub fn single_threaded() -> Self {
        RuntimeConfig {
169
            num_worker_threads: Some(1),
Ryan Olson's avatar
Ryan Olson committed
170
            max_blocking_threads: 1,
171
172
173
            system_host: DEFAULT_SYSTEM_HOST.to_string(),
            system_port: DEFAULT_SYSTEM_PORT,
            system_enabled: false,
Ryan Olson's avatar
Ryan Olson committed
174
175
176
177
        }
    }

    /// Create a new default runtime configuration
178
179
180
181
182
183
    pub(crate) fn create_runtime(&self) -> std::io::Result<tokio::runtime::Runtime> {
        tokio::runtime::Builder::new_multi_thread()
            .worker_threads(
                self.num_worker_threads
                    .unwrap_or_else(|| std::thread::available_parallelism().unwrap().get()),
            )
Ryan Olson's avatar
Ryan Olson committed
184
185
            .max_blocking_threads(self.max_blocking_threads)
            .enable_all()
186
            .build()
Ryan Olson's avatar
Ryan Olson committed
187
188
189
190
191
    }
}

impl Default for RuntimeConfig {
    fn default() -> Self {
192
        let num_cores = std::thread::available_parallelism().unwrap().get();
193
        Self {
194
195
            num_worker_threads: Some(num_cores),
            max_blocking_threads: num_cores,
196
197
198
            system_host: DEFAULT_SYSTEM_HOST.to_string(),
            system_port: DEFAULT_SYSTEM_PORT,
            system_enabled: false,
199
        }
Ryan Olson's avatar
Ryan Olson committed
200
201
202
203
204
205
206
207
208
209
210
    }
}

impl RuntimeConfigBuilder {
    /// Build and validate the runtime configuration
    pub fn build(&self) -> Result<RuntimeConfig> {
        let config = self.build_internal()?;
        config.validate()?;
        Ok(config)
    }
}
211

212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
/// Check if a string is truthy
/// This will be used to evaluate environment variables or any other subjective
/// configuration parameters that can be set by the user that should be evaluated
/// as a boolean value.
pub fn is_truthy(val: &str) -> bool {
    matches!(val.to_lowercase().as_str(), "1" | "true" | "on" | "yes")
}

/// Check if a string is falsey
/// This will be used to evaluate environment variables or any other subjective
/// configuration parameters that can be set by the user that should be evaluated
/// as a boolean value (opposite of is_truthy).
pub fn is_falsey(val: &str) -> bool {
    matches!(val.to_lowercase().as_str(), "0" | "false" | "off" | "no")
}

228
229
230
231
232
233
234
235
/// Check if an environment variable is truthy
pub fn env_is_truthy(env: &str) -> bool {
    match std::env::var(env) {
        Ok(val) => is_truthy(val.as_str()),
        Err(_) => false,
    }
}

236
237
238
239
240
241
/// Check if an environment variable is falsey
pub fn env_is_falsey(env: &str) -> bool {
    match std::env::var(env) {
        Ok(val) => is_falsey(val.as_str()),
        Err(_) => false,
    }
242
243
244
}

/// Check whether JSONL logging enabled
245
/// Set the `DYN_LOGGING_JSONL` environment variable a [`is_truthy`] value
246
pub fn jsonl_logging_enabled() -> bool {
247
    env_is_truthy("DYN_LOGGING_JSONL")
248
249
250
}

/// Check whether logging with ANSI terminal escape codes and colors is disabled.
251
/// Set the `DYN_SDK_DISABLE_ANSI_LOGGING` environment variable a [`is_truthy`] value
252
pub fn disable_ansi_logging() -> bool {
253
    env_is_truthy("DYN_SDK_DISABLE_ANSI_LOGGING")
254
}
255

Ryan Olson's avatar
Ryan Olson committed
256
257
258
259
260
261
/// Check whether to use local timezone for logging timestamps (default is UTC)
/// Set the `DYN_LOG_USE_LOCAL_TZ` environment variable to a [`is_truthy`] value
pub fn use_local_timezone() -> bool {
    env_is_truthy("DYN_LOG_USE_LOCAL_TZ")
}

262
263
264
265
266
267
268
269
#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_runtime_config_with_env_vars() -> Result<()> {
        temp_env::with_vars(
            vec![
270
271
                ("DYN_RUNTIME_NUM_WORKER_THREADS", Some("24")),
                ("DYN_RUNTIME_MAX_BLOCKING_THREADS", Some("32")),
272
273
274
            ],
            || {
                let config = RuntimeConfig::from_settings()?;
275
                assert_eq!(config.num_worker_threads, Some(24));
276
277
278
279
280
281
282
283
284
285
                assert_eq!(config.max_blocking_threads, 32);
                Ok(())
            },
        )
    }

    #[test]
    fn test_runtime_config_defaults() -> Result<()> {
        temp_env::with_vars(
            vec![
286
287
                ("DYN_RUNTIME_NUM_WORKER_THREADS", None::<&str>),
                ("DYN_RUNTIME_MAX_BLOCKING_THREADS", Some("")),
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
            ],
            || {
                let config = RuntimeConfig::from_settings()?;

                let default_config = RuntimeConfig::default();
                assert_eq!(config.num_worker_threads, default_config.num_worker_threads);
                assert_eq!(
                    config.max_blocking_threads,
                    default_config.max_blocking_threads
                );
                Ok(())
            },
        )
    }

    #[test]
    fn test_runtime_config_rejects_invalid_thread_count() -> Result<()> {
        temp_env::with_vars(
            vec![
307
308
                ("DYN_RUNTIME_NUM_WORKER_THREADS", Some("0")),
                ("DYN_RUNTIME_MAX_BLOCKING_THREADS", Some("0")),
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
            ],
            || {
                let result = RuntimeConfig::from_settings();
                assert!(result.is_err());
                if let Err(e) = result {
                    assert!(e
                        .to_string()
                        .contains("num_worker_threads: Validation error"));
                    assert!(e
                        .to_string()
                        .contains("max_blocking_threads: Validation error"));
                }
                Ok(())
            },
        )
    }
325
326

    #[test]
327
    fn test_runtime_config_system_server_env_vars() -> Result<()> {
328
329
        temp_env::with_vars(
            vec![
330
331
                ("DYN_SYSTEM_HOST", Some("127.0.0.1")),
                ("DYN_SYSTEM_PORT", Some("9090")),
332
333
334
            ],
            || {
                let config = RuntimeConfig::from_settings()?;
335
336
                assert_eq!(config.system_host, "127.0.0.1");
                assert_eq!(config.system_port, 9090);
337
338
339
340
341
342
                Ok(())
            },
        )
    }

    #[test]
343
344
    fn test_system_server_enabled_by_default() {
        temp_env::with_vars(vec![("DYN_SYSTEM_ENABLED", None::<&str>)], || {
345
            let config = RuntimeConfig::from_settings().unwrap();
346
            assert!(!config.system_server_enabled());
347
348
349
350
        });
    }

    #[test]
351
352
    fn test_system_server_disabled_explicitly() {
        temp_env::with_vars(vec![("DYN_SYSTEM_ENABLED", Some("false"))], || {
353
            let config = RuntimeConfig::from_settings().unwrap();
354
            assert!(!config.system_server_enabled());
355
356
357
358
        });
    }

    #[test]
359
360
    fn test_system_server_enabled_explicitly() {
        temp_env::with_vars(vec![("DYN_SYSTEM_ENABLED", Some("true"))], || {
361
            let config = RuntimeConfig::from_settings().unwrap();
362
            assert!(config.system_server_enabled());
363
364
365
366
        });
    }

    #[test]
367
368
    fn test_system_server_enabled_by_port() {
        temp_env::with_vars(vec![("DYN_SYSTEM_PORT", Some("8080"))], || {
369
            let config = RuntimeConfig::from_settings().unwrap();
370
371
            assert!(!config.system_server_enabled());
            assert_eq!(config.system_port, 8080);
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
        });
    }

    #[test]
    fn test_is_truthy_and_falsey() {
        // Test truthy values
        assert!(is_truthy("1"));
        assert!(is_truthy("true"));
        assert!(is_truthy("TRUE"));
        assert!(is_truthy("on"));
        assert!(is_truthy("yes"));

        // Test falsey values
        assert!(is_falsey("0"));
        assert!(is_falsey("false"));
        assert!(is_falsey("FALSE"));
        assert!(is_falsey("off"));
        assert!(is_falsey("no"));

        // Test opposite behavior
        assert!(!is_truthy("0"));
        assert!(!is_falsey("1"));

        // Test env functions
        temp_env::with_vars(vec![("TEST_TRUTHY", Some("true"))], || {
            assert!(env_is_truthy("TEST_TRUTHY"));
            assert!(!env_is_falsey("TEST_TRUTHY"));
        });

        temp_env::with_vars(vec![("TEST_FALSEY", Some("false"))], || {
            assert!(!env_is_truthy("TEST_FALSEY"));
            assert!(env_is_falsey("TEST_FALSEY"));
        });

        // Test missing env vars
        temp_env::with_vars(vec![("TEST_MISSING", None::<&str>)], || {
            assert!(!env_is_truthy("TEST_MISSING"));
            assert!(!env_is_falsey("TEST_MISSING"));
        });
    }
412
}