Unverified Commit 0e77d344 authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

refactor: centralize environment variable constants (#4083)


Signed-off-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
Co-authored-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent cf97c0dc
......@@ -2,6 +2,9 @@
// SPDX-License-Identifier: Apache-2.0
use crate::config::HealthStatus;
use crate::config::environment_names::logging as env_logging;
use crate::config::environment_names::runtime::canary as env_canary;
use crate::config::environment_names::runtime::system as env_system;
use crate::logging::make_request_span;
use crate::metrics::MetricsHierarchy;
use crate::metrics::prometheus_names::{nats_client, nats_service};
......@@ -307,6 +310,8 @@ mod tests {
#[cfg(all(test, feature = "integration"))]
mod integration_tests {
use super::*;
use crate::config::environment_names::logging as env_logging;
use crate::config::environment_names::runtime::canary as env_canary;
use crate::distributed::distributed_test_utils::create_test_drt_async;
use crate::metrics::MetricsHierarchy;
use anyhow::Result;
......@@ -317,7 +322,7 @@ mod integration_tests {
#[tokio::test]
async fn test_uptime_from_system_health() {
// Test that uptime is available from SystemHealth
temp_env::async_with_vars([("DYN_SYSTEM_PORT", None::<&str>)], async {
temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
let drt = create_test_drt_async().await;
// Get uptime from SystemHealth
......@@ -336,7 +341,7 @@ mod integration_tests {
#[tokio::test]
async fn test_runtime_metrics_initialization_and_namespace() {
// Test that metrics have correct namespace
temp_env::async_with_vars([("DYN_SYSTEM_PORT", None::<&str>)], async {
temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
let drt = create_test_drt_async().await;
// SystemStatusState is already created in distributed.rs
// so we don't need to create it again here
......@@ -374,7 +379,7 @@ mod integration_tests {
#[tokio::test]
async fn test_uptime_gauge_updates() {
// Test that the uptime gauge is properly updated and increases over time
temp_env::async_with_vars([("DYN_SYSTEM_PORT", None::<&str>)], async {
temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
let drt = create_test_drt_async().await;
// Get initial uptime
......@@ -406,7 +411,7 @@ mod integration_tests {
#[tokio::test]
async fn test_http_requests_fail_when_system_disabled() {
// Test that system status server is not running when disabled
temp_env::async_with_vars([("DYN_SYSTEM_PORT", None::<&str>)], async {
temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
let drt = create_test_drt_async().await;
// Verify that system status server info is None when disabled
......@@ -460,13 +465,13 @@ mod integration_tests {
#[allow(clippy::redundant_closure_call)]
temp_env::async_with_vars(
[
("DYN_SYSTEM_PORT", Some("0")),
(env_system::DYN_SYSTEM_PORT, Some("0")),
(
"DYN_SYSTEM_STARTING_HEALTH_STATUS",
env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS,
Some(starting_health_status),
),
("DYN_SYSTEM_HEALTH_PATH", custom_health_path),
("DYN_SYSTEM_LIVE_PATH", custom_live_path),
(env_system::DYN_SYSTEM_HEALTH_PATH, custom_health_path),
(env_system::DYN_SYSTEM_LIVE_PATH, custom_live_path),
],
(async || {
let drt = Arc::new(create_test_drt_async().await);
......@@ -542,10 +547,10 @@ mod integration_tests {
#[allow(clippy::redundant_closure_call)]
let _ = temp_env::async_with_vars(
[
("DYN_SYSTEM_PORT", Some("0")),
("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready")),
("DYN_LOGGING_JSONL", Some("1")),
("DYN_LOG", Some("trace")),
(env_system::DYN_SYSTEM_PORT, Some("0")),
(env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("ready")),
(env_logging::DYN_LOGGING_JSONL, Some("1")),
(env_logging::DYN_LOG, Some("trace")),
],
(async || {
// TODO Add proper testing for
......@@ -596,9 +601,9 @@ mod integration_tests {
const ENDPOINT_HEALTH_CONFIG: &str = "[\"generate\"]";
temp_env::async_with_vars(
[
("DYN_SYSTEM_PORT", Some("0")),
("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("notready")),
("DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Some(ENDPOINT_HEALTH_CONFIG)),
(env_system::DYN_SYSTEM_PORT, Some("0")),
(env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("notready")),
(env_system::DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS, Some(ENDPOINT_HEALTH_CONFIG)),
],
async {
let drt = Arc::new(create_test_drt_async().await);
......@@ -610,7 +615,7 @@ mod integration_tests {
assert!(
system_info_opt.is_some(),
"System status server was not spawned by DRT. Expected DRT to spawn server when DYN_SYSTEM_PORT is set to a positive value, but system_status_server_info() returned None. Environment: DYN_SYSTEM_PORT={:?}",
std::env::var("DYN_SYSTEM_PORT")
std::env::var(env_system::DYN_SYSTEM_PORT)
);
// Get the system status server info from DRT - this should never fail now due to above check
......@@ -704,8 +709,8 @@ mod integration_tests {
// use reqwest for HTTP requests
temp_env::async_with_vars(
[
("DYN_SYSTEM_PORT", Some("0")),
("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready")),
(env_system::DYN_SYSTEM_PORT, Some("0")),
(env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS, Some("ready")),
],
async {
let drt = Arc::new(create_test_drt_async().await);
......@@ -756,15 +761,18 @@ mod integration_tests {
temp_env::async_with_vars(
[
("DYN_SYSTEM_PORT", Some("0")),
("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("notready")),
(env_system::DYN_SYSTEM_PORT, Some("0")),
(
"DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS",
env_system::DYN_SYSTEM_STARTING_HEALTH_STATUS,
Some("notready"),
),
(
env_system::DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS,
Some("[\"test.endpoint\"]"),
),
// Enable health check with short intervals for testing
("DYN_HEALTH_CHECK_ENABLED", Some("true")),
("DYN_CANARY_WAIT_TIME", Some("1")), // Send canary after 1 second of inactivity
(env_canary::DYN_CANARY_WAIT_TIME, Some("1")), // Send canary after 1 second of inactivity
("DYN_HEALTH_CHECK_REQUEST_TIMEOUT", Some("1")), // Immediately timeout to mimic unresponsiveness
("RUST_LOG", Some("info")), // Enable logging for test
],
......
......@@ -33,6 +33,7 @@ pub use lock::*;
pub use path::*;
use super::utils::build_in_runtime;
use crate::config::environment_names::etcd as env_etcd;
/// ETCD Client
#[derive(Clone)]
......@@ -567,15 +568,15 @@ impl Default for ClientOptions {
let mut connect_options = None;
if let (Ok(username), Ok(password)) = (
std::env::var("ETCD_AUTH_USERNAME"),
std::env::var("ETCD_AUTH_PASSWORD"),
std::env::var(env_etcd::auth::ETCD_AUTH_USERNAME),
std::env::var(env_etcd::auth::ETCD_AUTH_PASSWORD),
) {
// username and password are set
connect_options = Some(ConnectOptions::new().with_user(username, password));
} else if let (Ok(ca), Ok(cert), Ok(key)) = (
std::env::var("ETCD_AUTH_CA"),
std::env::var("ETCD_AUTH_CLIENT_CERT"),
std::env::var("ETCD_AUTH_CLIENT_KEY"),
std::env::var(env_etcd::auth::ETCD_AUTH_CA),
std::env::var(env_etcd::auth::ETCD_AUTH_CLIENT_CERT),
std::env::var(env_etcd::auth::ETCD_AUTH_CLIENT_KEY),
) {
// TLS is set
connect_options = Some(
......@@ -596,7 +597,7 @@ impl Default for ClientOptions {
}
fn default_servers() -> Vec<String> {
match std::env::var("ETCD_ENDPOINTS") {
match std::env::var(env_etcd::ETCD_ENDPOINTS) {
Ok(possible_list_of_urls) => possible_list_of_urls
.split(',')
.map(|s| s.to_string())
......
......@@ -37,6 +37,7 @@ use tokio::time;
use url::Url;
use validator::{Validate, ValidationError};
use crate::config::environment_names::nats as env_nats;
use crate::metrics::prometheus_names::nats_client as nats_metrics;
pub use crate::slug::Slug;
use tracing as log;
......@@ -283,7 +284,7 @@ pub struct ClientOptions {
}
fn default_server() -> String {
if let Ok(server) = std::env::var("NATS_SERVER") {
if let Ok(server) = std::env::var(env_nats::NATS_SERVER) {
return server;
}
......@@ -378,21 +379,21 @@ impl std::fmt::Debug for NatsAuth {
impl Default for NatsAuth {
fn default() -> Self {
if let (Ok(username), Ok(password)) = (
std::env::var("NATS_AUTH_USERNAME"),
std::env::var("NATS_AUTH_PASSWORD"),
std::env::var(env_nats::auth::NATS_AUTH_USERNAME),
std::env::var(env_nats::auth::NATS_AUTH_PASSWORD),
) {
return NatsAuth::UserPass(username, password);
}
if let Ok(token) = std::env::var("NATS_AUTH_TOKEN") {
if let Ok(token) = std::env::var(env_nats::auth::NATS_AUTH_TOKEN) {
return NatsAuth::Token(token);
}
if let Ok(nkey) = std::env::var("NATS_AUTH_NKEY") {
if let Ok(nkey) = std::env::var(env_nats::auth::NATS_AUTH_NKEY) {
return NatsAuth::NKey(nkey);
}
if let Ok(path) = std::env::var("NATS_AUTH_CREDENTIALS_FILE") {
if let Ok(path) = std::env::var(env_nats::auth::NATS_AUTH_CREDENTIALS_FILE) {
return NatsAuth::CredentialsFile(PathBuf::from(path));
}
......@@ -516,7 +517,7 @@ impl NatsQueue {
let client = client_options.connect().await?;
// messages older than a hour in the stream will be automatically purged
let max_age = std::env::var("DYN_NATS_STREAM_MAX_AGE")
let max_age = std::env::var(env_nats::stream::DYN_NATS_STREAM_MAX_AGE)
.ok()
.and_then(|s| s.parse::<u64>().ok())
.map(time::Duration::from_secs)
......@@ -1012,9 +1013,9 @@ mod tests {
});
Jail::expect_with(|jail| {
jail.set_env("NATS_SERVER", "nats://localhost:5222");
jail.set_env("NATS_AUTH_USERNAME", "user");
jail.set_env("NATS_AUTH_PASSWORD", "pass");
jail.set_env(env_nats::NATS_SERVER, "nats://localhost:5222");
jail.set_env(env_nats::auth::NATS_AUTH_USERNAME, "user");
jail.set_env(env_nats::auth::NATS_AUTH_PASSWORD, "pass");
let opts = ClientOptions::builder().build();
assert!(opts.is_ok());
......@@ -1030,9 +1031,9 @@ mod tests {
});
Jail::expect_with(|jail| {
jail.set_env("NATS_SERVER", "nats://localhost:5222");
jail.set_env("NATS_AUTH_USERNAME", "user");
jail.set_env("NATS_AUTH_PASSWORD", "pass");
jail.set_env(env_nats::NATS_SERVER, "nats://localhost:5222");
jail.set_env(env_nats::auth::NATS_AUTH_USERNAME, "user");
jail.set_env(env_nats::auth::NATS_AUTH_PASSWORD, "pass");
let opts = ClientOptions::builder()
.server("nats://localhost:6222")
......
......@@ -32,14 +32,13 @@ static RT: OnceCell<tokio::runtime::Runtime> = OnceCell::new();
static RTHANDLE: OnceCell<tokio::runtime::Handle> = OnceCell::new();
static INIT: OnceCell<Mutex<Option<tokio::task::JoinHandle<anyhow::Result<()>>>>> = OnceCell::new();
use crate::config::environment_names::worker as env_worker;
const SHUTDOWN_MESSAGE: &str =
"Application received shutdown signal; attempting to gracefully shutdown";
const SHUTDOWN_TIMEOUT_MESSAGE: &str =
"Use DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT to control the graceful shutdown timeout";
/// Environment variable to control the graceful shutdown timeout
pub const DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT: &str = "DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT";
/// Default graceful shutdown timeout in seconds in debug mode
pub const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG: u64 = 5;
......@@ -130,7 +129,7 @@ impl Worker {
let primary = runtime.primary();
let secondary = runtime.secondary();
let timeout = std::env::var(DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT)
let timeout = std::env::var(env_worker::DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT)
.ok()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or({
......
......@@ -18,7 +18,9 @@ mod integration {
pub const DEFAULT_NAMESPACE: &str = "dynamo";
use dynamo_runtime::{
DistributedRuntime, Runtime, Worker, logging,
DistributedRuntime, ErrorContext, Result, Runtime, Worker,
config::environment_names::testing as env_testing,
logging,
pipeline::{
AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, PushRouter, ResponseStream,
SingleIn, async_trait, network::Ingress,
......@@ -109,7 +111,7 @@ mod integration {
async fn backend(runtime: DistributedRuntime) -> Result<Arc<RequestHandler>> {
// get the queued up processing setting from env (not delayed)
let queued_up_processing =
std::env::var("DYN_QUEUED_UP_PROCESSING").unwrap_or("false".to_string());
std::env::var(env_testing::DYN_QUEUED_UP_PROCESSING).unwrap_or("false".to_string());
let queued_up_processing: bool = queued_up_processing.parse().unwrap_or(false);
// attach an ingress to an engine
......@@ -132,11 +134,13 @@ mod integration {
async fn client(runtime: DistributedRuntime) -> Result<()> {
// get the run duration from env
let run_duration = std::env::var("DYN_SOAK_RUN_DURATION").unwrap_or("3s".to_string());
let run_duration =
std::env::var(env_testing::DYN_SOAK_RUN_DURATION).unwrap_or("3s".to_string());
let run_duration =
humantime::parse_duration(&run_duration).unwrap_or(Duration::from_secs(3));
let batch_load = std::env::var("DYN_SOAK_BATCH_LOAD").unwrap_or("100".to_string());
let batch_load =
std::env::var(env_testing::DYN_SOAK_BATCH_LOAD).unwrap_or("100".to_string());
let batch_load: usize = batch_load.parse().unwrap_or(100);
let client = runtime
......
......@@ -62,7 +62,10 @@ sglang_configs = {
model="Qwen/Qwen3-0.6B",
env={},
models_port=8000,
request_payloads=[chat_payload_default(), completion_payload_default()],
request_payloads=[
chat_payload_default(),
completion_payload_default(),
],
),
"disaggregated_same_gpu": SGLangConfig(
# Uses disagg_same_gpu.sh for single-GPU disaggregated testing
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment