Unverified Commit 860f3f75 authored by Keiven C's avatar Keiven C Committed by GitHub
Browse files

chore: metrics endpoint variables renamed from HTTP_SERVER->SYSTEM (#1934)


Co-authored-by: default avatarKeiven Chang <keivenchang@users.noreply.github.com>
parent fc402a35
...@@ -1897,6 +1897,7 @@ dependencies = [ ...@@ -1897,6 +1897,7 @@ dependencies = [
"prometheus", "prometheus",
"rand 0.9.1", "rand 0.9.1",
"regex", "regex",
"reqwest",
"rstest 0.23.0", "rstest 0.23.0",
"serde", "serde",
"serde_json", "serde_json",
...@@ -2271,6 +2272,15 @@ version = "0.1.5" ...@@ -2271,6 +2272,15 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
[[package]]
name = "foreign-types"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
dependencies = [
"foreign-types-shared 0.1.1",
]
[[package]] [[package]]
name = "foreign-types" name = "foreign-types"
version = "0.5.0" version = "0.5.0"
...@@ -2278,7 +2288,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" ...@@ -2278,7 +2288,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d737d9aa519fb7b749cbc3b962edcf310a8dd1f4b67c91c4f83975dbdd17d965" checksum = "d737d9aa519fb7b749cbc3b962edcf310a8dd1f4b67c91c4f83975dbdd17d965"
dependencies = [ dependencies = [
"foreign-types-macros", "foreign-types-macros",
"foreign-types-shared", "foreign-types-shared 0.3.1",
] ]
[[package]] [[package]]
...@@ -2292,6 +2302,12 @@ dependencies = [ ...@@ -2292,6 +2302,12 @@ dependencies = [
"syn 2.0.100", "syn 2.0.100",
] ]
[[package]]
name = "foreign-types-shared"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"
[[package]] [[package]]
name = "foreign-types-shared" name = "foreign-types-shared"
version = "0.3.1" version = "0.3.1"
...@@ -3106,6 +3122,22 @@ dependencies = [ ...@@ -3106,6 +3122,22 @@ dependencies = [
"tower-service", "tower-service",
] ]
[[package]]
name = "hyper-tls"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0"
dependencies = [
"bytes",
"http-body-util",
"hyper 1.6.0",
"hyper-util",
"native-tls",
"tokio",
"tokio-native-tls",
"tower-service",
]
[[package]] [[package]]
name = "hyper-util" name = "hyper-util"
version = "0.1.14" version = "0.1.14"
...@@ -3891,7 +3923,7 @@ dependencies = [ ...@@ -3891,7 +3923,7 @@ dependencies = [
"bitflags 2.9.0", "bitflags 2.9.0",
"block", "block",
"core-graphics-types", "core-graphics-types",
"foreign-types", "foreign-types 0.5.0",
"log", "log",
"objc", "objc",
"paste", "paste",
...@@ -4251,6 +4283,23 @@ version = "0.10.0" ...@@ -4251,6 +4283,23 @@ version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03"
[[package]]
name = "native-tls"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e"
dependencies = [
"libc",
"log",
"openssl",
"openssl-probe",
"openssl-sys",
"schannel",
"security-framework 2.11.1",
"security-framework-sys",
"tempfile",
]
[[package]] [[package]]
name = "ndarray" name = "ndarray"
version = "0.16.1" version = "0.16.1"
...@@ -4644,12 +4693,50 @@ version = "11.1.5" ...@@ -4644,12 +4693,50 @@ version = "11.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e"
[[package]]
name = "openssl"
version = "0.10.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8505734d46c8ab1e19a1dce3aef597ad87dcb4c37e7188231769bd6bd51cebf8"
dependencies = [
"bitflags 2.9.0",
"cfg-if 1.0.0",
"foreign-types 0.3.2",
"libc",
"once_cell",
"openssl-macros",
"openssl-sys",
]
[[package]]
name = "openssl-macros"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.100",
]
[[package]] [[package]]
name = "openssl-probe" name = "openssl-probe"
version = "0.1.6" version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e"
[[package]]
name = "openssl-sys"
version = "0.9.109"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90096e2e47630d78b7d1c20952dc621f957103f8bc2c8359ec81290d75238571"
dependencies = [
"cc",
"libc",
"pkg-config",
"vcpkg",
]
[[package]] [[package]]
name = "option-ext" name = "option-ext"
version = "0.2.0" version = "0.2.0"
...@@ -5581,11 +5668,13 @@ dependencies = [ ...@@ -5581,11 +5668,13 @@ dependencies = [
"http-body-util", "http-body-util",
"hyper 1.6.0", "hyper 1.6.0",
"hyper-rustls", "hyper-rustls",
"hyper-tls",
"hyper-util", "hyper-util",
"js-sys", "js-sys",
"log", "log",
"mime", "mime",
"mime_guess", "mime_guess",
"native-tls",
"percent-encoding", "percent-encoding",
"pin-project-lite", "pin-project-lite",
"quinn", "quinn",
...@@ -5597,6 +5686,7 @@ dependencies = [ ...@@ -5597,6 +5686,7 @@ dependencies = [
"serde_urlencoded", "serde_urlencoded",
"sync_wrapper 1.0.2", "sync_wrapper 1.0.2",
"tokio", "tokio",
"tokio-native-tls",
"tokio-rustls", "tokio-rustls",
"tokio-util", "tokio-util",
"tower 0.5.2", "tower 0.5.2",
...@@ -7056,6 +7146,16 @@ dependencies = [ ...@@ -7056,6 +7146,16 @@ dependencies = [
"syn 2.0.100", "syn 2.0.100",
] ]
[[package]]
name = "tokio-native-tls"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2"
dependencies = [
"native-tls",
"tokio",
]
[[package]] [[package]]
name = "tokio-rayon" name = "tokio-rayon"
version = "2.1.0" version = "2.1.0"
...@@ -7743,6 +7843,12 @@ dependencies = [ ...@@ -7743,6 +7843,12 @@ dependencies = [
"uuid 0.8.2", "uuid 0.8.2",
] ]
[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]] [[package]]
name = "vergen" name = "vergen"
version = "9.0.6" version = "9.0.6"
......
...@@ -77,5 +77,13 @@ socket2 = { version = "0.5.8" } ...@@ -77,5 +77,13 @@ socket2 = { version = "0.5.8" }
[dev-dependencies] [dev-dependencies]
assert_matches = { version = "1.5.0" } assert_matches = { version = "1.5.0" }
env_logger = { version = "0.11" } env_logger = { version = "0.11" }
reqwest = { version = "0.12.22", features = ["json"] }
rstest = { version = "0.23.0" } rstest = { version = "0.23.0" }
temp-env = { version = "0.3.6" } temp-env = { version = "0.3.6" }
# These patches are to address issues in reqwest, which is used in the HTTP server test (but not on servers).
# These are transitive dependencies to use secure versions and mitigate known vulnerabilities.
[patch.crates-io]
tokio = { version = "1.18.4" } # addresses RUSTSEC-2023-0001
h2 = { version = "0.4.4" } # addresses RUSTSEC-2024-0332
rustls = { version = "0.23.18" } # addresses RUSTSEC-2024-0399
...@@ -11,15 +11,15 @@ use serde::{Deserialize, Serialize}; ...@@ -11,15 +11,15 @@ use serde::{Deserialize, Serialize};
use std::fmt; use std::fmt;
use validator::Validate; use validator::Validate;
/// Default HTTP server host /// Default system host for health and metrics endpoints
const DEFAULT_HTTP_SERVER_HOST: &str = "0.0.0.0"; const DEFAULT_SYSTEM_HOST: &str = "0.0.0.0";
/// Default HTTP server port /// Default system port for health and metrics endpoints
const DEFAULT_HTTP_SERVER_PORT: u16 = 9090; const DEFAULT_SYSTEM_PORT: u16 = 9090;
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkerConfig { pub struct WorkerConfig {
/// Grace shutdown period for http-service. /// Grace shutdown period for the system server.
pub graceful_shutdown_timeout: u64, pub graceful_shutdown_timeout: u64,
} }
...@@ -70,24 +70,24 @@ pub struct RuntimeConfig { ...@@ -70,24 +70,24 @@ pub struct RuntimeConfig {
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))] #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub max_blocking_threads: usize, pub max_blocking_threads: usize,
/// HTTP server host for health and metrics endpoints /// System server host for health and metrics endpoints
/// Set this at runtime with environment variable DYN_RUNTIME_HTTP_SERVER_HOST /// Set this at runtime with environment variable DYN_SYSTEM_HOST
#[builder(default = "DEFAULT_HTTP_SERVER_HOST.to_string()")] #[builder(default = "DEFAULT_SYSTEM_HOST.to_string()")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))] #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub http_server_host: String, pub system_host: String,
/// HTTP server port for health and metrics endpoints /// System server port for health and metrics endpoints
/// If set to 0, the system will assign a random available port /// If set to 0, the system will assign a random available port
/// Set this at runtime with environment variable DYN_RUNTIME_HTTP_SERVER_PORT /// Set this at runtime with environment variable DYN_SYSTEM_PORT
#[builder(default = "DEFAULT_HTTP_SERVER_PORT")] #[builder(default = "DEFAULT_SYSTEM_PORT")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))] #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub http_server_port: u16, pub system_port: u16,
/// Health and metrics HTTP server enabled /// Health and metrics System server enabled
/// Set this at runtime with environment variable DYN_RUNTIME_HTTP_ENABLED /// Set this at runtime with environment variable DYN_SYSTEM_ENABLED
#[builder(default = "false")] #[builder(default = "false")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))] #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub http_enabled: bool, pub system_enabled: bool,
} }
impl fmt::Display for RuntimeConfig { impl fmt::Display for RuntimeConfig {
...@@ -99,9 +99,9 @@ impl fmt::Display for RuntimeConfig { ...@@ -99,9 +99,9 @@ impl fmt::Display for RuntimeConfig {
} }
write!(f, "max_blocking_threads={}, ", self.max_blocking_threads)?; write!(f, "max_blocking_threads={}, ", self.max_blocking_threads)?;
write!(f, "http_server_host={}, ", self.http_server_host)?; write!(f, "system_host={}, ", self.system_host)?;
write!(f, "http_server_port={}, ", self.http_server_port)?; write!(f, "system_port={}, ", self.system_port)?;
write!(f, "http_enabled={}", self.http_enabled)?; write!(f, "system_enabled={}", self.system_enabled)?;
Ok(()) Ok(())
} }
...@@ -125,6 +125,23 @@ impl RuntimeConfig { ...@@ -125,6 +125,23 @@ impl RuntimeConfig {
_ => None, _ => None,
} }
})) }))
.merge(Env::prefixed("DYN_SYSTEM_").filter_map(|k| {
let full_key = format!("DYN_SYSTEM_{}", k.as_str());
// filters out empty environment variables
match std::env::var(&full_key) {
Ok(v) if !v.is_empty() => {
// Map DYN_SYSTEM_* to the correct field names
let mapped_key = match k.as_str() {
"HOST" => "system_host",
"PORT" => "system_port",
"ENABLED" => "system_enabled",
_ => k.as_str(),
};
Some(mapped_key.into())
}
_ => None,
}
}))
} }
/// Load the runtime configuration from the environment and configuration files /// Load the runtime configuration from the environment and configuration files
...@@ -141,20 +158,19 @@ impl RuntimeConfig { ...@@ -141,20 +158,19 @@ impl RuntimeConfig {
Ok(config) Ok(config)
} }
/// Check if HTTP server should be enabled /// Check if System server should be enabled
/// HTTP server is enabled by default, but can be disabled by setting DYN_RUNTIME_HTTP_ENABLED to false /// System server is disabled by default, but can be enabled by setting DYN_SYSTEM_ENABLED to true
/// If a port is explicitly provided, HTTP server will be enabled regardless pub fn system_server_enabled(&self) -> bool {
pub fn http_server_enabled(&self) -> bool { self.system_enabled
self.http_enabled
} }
pub fn single_threaded() -> Self { pub fn single_threaded() -> Self {
RuntimeConfig { RuntimeConfig {
num_worker_threads: Some(1), num_worker_threads: Some(1),
max_blocking_threads: 1, max_blocking_threads: 1,
http_server_host: DEFAULT_HTTP_SERVER_HOST.to_string(), system_host: DEFAULT_SYSTEM_HOST.to_string(),
http_server_port: DEFAULT_HTTP_SERVER_PORT, system_port: DEFAULT_SYSTEM_PORT,
http_enabled: false, system_enabled: false,
} }
} }
...@@ -177,9 +193,9 @@ impl Default for RuntimeConfig { ...@@ -177,9 +193,9 @@ impl Default for RuntimeConfig {
Self { Self {
num_worker_threads: Some(num_cores), num_worker_threads: Some(num_cores),
max_blocking_threads: num_cores, max_blocking_threads: num_cores,
http_server_host: DEFAULT_HTTP_SERVER_HOST.to_string(), system_host: DEFAULT_SYSTEM_HOST.to_string(),
http_server_port: DEFAULT_HTTP_SERVER_PORT, system_port: DEFAULT_SYSTEM_PORT,
http_enabled: false, system_enabled: false,
} }
} }
} }
...@@ -308,51 +324,51 @@ mod tests { ...@@ -308,51 +324,51 @@ mod tests {
} }
#[test] #[test]
fn test_runtime_config_http_server_env_vars() -> Result<()> { fn test_runtime_config_system_server_env_vars() -> Result<()> {
temp_env::with_vars( temp_env::with_vars(
vec![ vec![
("DYN_RUNTIME_HTTP_SERVER_HOST", Some("127.0.0.1")), ("DYN_SYSTEM_HOST", Some("127.0.0.1")),
("DYN_RUNTIME_HTTP_SERVER_PORT", Some("9090")), ("DYN_SYSTEM_PORT", Some("9090")),
], ],
|| { || {
let config = RuntimeConfig::from_settings()?; let config = RuntimeConfig::from_settings()?;
assert_eq!(config.http_server_host, "127.0.0.1"); assert_eq!(config.system_host, "127.0.0.1");
assert_eq!(config.http_server_port, 9090); assert_eq!(config.system_port, 9090);
Ok(()) Ok(())
}, },
) )
} }
#[test] #[test]
fn test_http_server_enabled_by_default() { fn test_system_server_enabled_by_default() {
temp_env::with_vars(vec![("DYN_RUNTIME_HTTP_ENABLED", None::<&str>)], || { temp_env::with_vars(vec![("DYN_SYSTEM_ENABLED", None::<&str>)], || {
let config = RuntimeConfig::from_settings().unwrap(); let config = RuntimeConfig::from_settings().unwrap();
assert!(!config.http_server_enabled()); assert!(!config.system_server_enabled());
}); });
} }
#[test] #[test]
fn test_http_server_disabled_explicitly() { fn test_system_server_disabled_explicitly() {
temp_env::with_vars(vec![("DYN_RUNTIME_HTTP_ENABLED", Some("false"))], || { temp_env::with_vars(vec![("DYN_SYSTEM_ENABLED", Some("false"))], || {
let config = RuntimeConfig::from_settings().unwrap(); let config = RuntimeConfig::from_settings().unwrap();
assert!(!config.http_server_enabled()); assert!(!config.system_server_enabled());
}); });
} }
#[test] #[test]
fn test_http_server_enabled_explicitly() { fn test_system_server_enabled_explicitly() {
temp_env::with_vars(vec![("DYN_RUNTIME_HTTP_ENABLED", Some("true"))], || { temp_env::with_vars(vec![("DYN_SYSTEM_ENABLED", Some("true"))], || {
let config = RuntimeConfig::from_settings().unwrap(); let config = RuntimeConfig::from_settings().unwrap();
assert!(config.http_server_enabled()); assert!(config.system_server_enabled());
}); });
} }
#[test] #[test]
fn test_http_server_enabled_by_port() { fn test_system_server_enabled_by_port() {
temp_env::with_vars(vec![("DYN_RUNTIME_HTTP_SERVER_PORT", Some("8080"))], || { temp_env::with_vars(vec![("DYN_SYSTEM_PORT", Some("8080"))], || {
let config = RuntimeConfig::from_settings().unwrap(); let config = RuntimeConfig::from_settings().unwrap();
assert!(!config.http_server_enabled()); assert!(!config.system_server_enabled());
assert_eq!(config.http_server_port, 8080); assert_eq!(config.system_port, 8080);
}); });
} }
......
...@@ -78,27 +78,27 @@ impl DistributedRuntime { ...@@ -78,27 +78,27 @@ impl DistributedRuntime {
// Start HTTP server for health and metrics (if enabled) // Start HTTP server for health and metrics (if enabled)
let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default(); let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();
if config.http_server_enabled() { if config.system_server_enabled() {
let drt_arc = Arc::new(distributed_runtime.clone()); let drt_arc = Arc::new(distributed_runtime.clone());
let runtime_clone = distributed_runtime.runtime.clone(); let runtime_clone = distributed_runtime.runtime.clone();
secondary.spawn(async move { // spawn_http_server spawns its own background task:
if let Err(e) = crate::http_server::start_http_server( match crate::http_server::spawn_http_server(
&config.http_server_host, &config.system_host,
config.http_server_port, config.system_port,
runtime_clone.child_token(), runtime_clone.child_token(),
drt_arc, drt_arc,
) )
.await .await
{ {
Ok((addr, _handle)) => {
tracing::info!("HTTP server started successfully on {}", addr);
}
Err(e) => {
tracing::error!("HTTP server startup failed: {}", e); tracing::error!("HTTP server startup failed: {}", e);
} else {
tracing::debug!("HTTP server started successfully");
} }
}); }
} else { } else {
tracing::debug!( tracing::debug!("Health and metrics HTTP server is disabled via DYN_SYSTEM_ENABLED");
"Health and metrics HTTP server is disabled via DYN_RUNTIME_HTTP_ENABLED"
);
} }
Ok(distributed_runtime) Ok(distributed_runtime)
......
...@@ -70,63 +70,87 @@ impl HttpServerState { ...@@ -70,63 +70,87 @@ impl HttpServerState {
} }
/// Start HTTP server with DistributedRuntime support /// Start HTTP server with DistributedRuntime support
pub async fn start_http_server( pub async fn spawn_http_server(
host: &str, host: &str,
port: u16, port: u16,
cancel_token: CancellationToken, cancel_token: CancellationToken,
drt: Arc<crate::DistributedRuntime>, drt: Arc<crate::DistributedRuntime>,
) -> anyhow::Result<()> { ) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> {
tracing::info!(
"[spawn_http_server] called with host={}, port={}",
host,
port
);
// Create HTTP server state with pre-created metrics // Create HTTP server state with pre-created metrics
let server_state = Arc::new(HttpServerState::new(drt)?); let server_state = Arc::new(HttpServerState::new(drt)?);
let app = Router::new() let app = Router::new()
// .route( .route(
// "/health", "/health",
// get({ get({
// let state = Arc::clone(&server_state); let state = Arc::clone(&server_state);
// move || health_handler(state) move || health_handler(state.clone())
// }), }),
// ) )
.route(
"/live",
get({
let state = Arc::clone(&server_state);
move || health_handler(state)
}),
)
.route( .route(
"/metrics", "/metrics",
get({ get({
let state = Arc::clone(&server_state); let state = Arc::clone(&server_state);
move || metrics_handler(state) move || metrics_handler(state)
}), }),
); )
.fallback(|| async {
tracing::info!("[fallback handler] called");
(StatusCode::NOT_FOUND, "Route not found").into_response()
});
let address = format!("{}:{}", host, port); let address = format!("{}:{}", host, port);
tracing::debug!("Starting HTTP server on: {}", address); tracing::info!("[spawn_http_server] binding to: {}", address);
let listener = match TcpListener::bind(&address).await { let listener = match TcpListener::bind(&address).await {
Ok(listener) => { Ok(listener) => {
// get the actual address and port, print in debug level // get the actual address and port, print in debug level
let actual_address = listener.local_addr()?; let actual_address = listener.local_addr()?;
tracing::debug!("HTTP server bound to: {}", actual_address); tracing::info!(
listener "[spawn_http_server] HTTP server bound to: {}",
actual_address
);
(listener, actual_address)
} }
Err(e) => { Err(e) => {
tracing::error!("Failed to bind to address {}: {}", address, e); tracing::error!("Failed to bind to address {}: {}", address, e);
return Err(anyhow::anyhow!("Failed to bind to address: {}", e)); return Err(anyhow::anyhow!("Failed to bind to address: {}", e));
} }
}; };
let (listener, actual_address) = listener;
let observer = cancel_token.child_token(); let observer = cancel_token.child_token();
if let Err(e) = axum::serve(listener, app) // Spawn the server in the background and return the handle
.with_graceful_shutdown(observer.cancelled_owned()) let handle = tokio::spawn(async move {
.await if let Err(e) = axum::serve(listener, app)
{ .with_graceful_shutdown(observer.cancelled_owned())
tracing::error!("HTTP server error: {}", e); .await
} {
Ok(()) tracing::error!("HTTP server error: {}", e);
}
});
Ok((actual_address, handle))
} }
// /// Health handler /// Health handler
// async fn health_handler(state: Arc<HttpServerState>) -> impl IntoResponse { async fn health_handler(state: Arc<HttpServerState>) -> impl IntoResponse {
// let uptime = state.drt.uptime(); tracing::info!("[health_handler] called");
// let response = format!("OK\nUptime: {} seconds", uptime.as_secs()); let uptime = state.drt.uptime();
// (StatusCode::OK, response) let response = format!("OK\nUptime: {} seconds\n", uptime.as_secs());
// } (StatusCode::OK, response)
}
/// Metrics handler with DistributedRuntime uptime /// Metrics handler with DistributedRuntime uptime
async fn metrics_handler(state: Arc<HttpServerState>) -> impl IntoResponse { async fn metrics_handler(state: Arc<HttpServerState>) -> impl IntoResponse {
...@@ -239,4 +263,64 @@ mod tests { ...@@ -239,4 +263,64 @@ mod tests {
assert!(response.contains("dynamo_runtime_uptime_seconds")); assert!(response.contains("dynamo_runtime_uptime_seconds"));
assert!(response.contains("Total uptime of the DistributedRuntime in seconds")); assert!(response.contains("Total uptime of the DistributedRuntime in seconds"));
} }
#[tokio::test]
async fn test_spawn_http_server_endpoints() {
use std::sync::Arc;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
// use tokio::io::{AsyncReadExt, AsyncWriteExt};
// use reqwest for HTTP requests
let runtime = crate::Runtime::from_settings().unwrap();
let drt = Arc::new(
crate::DistributedRuntime::from_settings_without_discovery(runtime)
.await
.unwrap(),
);
let cancel_token = CancellationToken::new();
let (addr, server_handle) = spawn_http_server("127.0.0.1", 0, cancel_token.clone(), drt)
.await
.unwrap();
println!("[test] Waiting for server to start...");
sleep(std::time::Duration::from_millis(1000)).await;
println!("[test] Server should be up, starting requests...");
let client = reqwest::Client::new();
for (path, expect_200, expect_body) in [
("/health", true, "OK"),
("/live", true, "OK"),
("/someRandomPathNotFoundHere", false, "Route not found"),
] {
println!("[test] Sending request to {}", path);
let url = format!("http://{}{}", addr, path);
let response = client.get(&url).send().await.unwrap();
let status = response.status();
let body = response.text().await.unwrap();
println!(
"[test] Response for {}: status={}, body={:?}",
path, status, body
);
if expect_200 {
assert_eq!(status, 200, "Response: status={}, body={:?}", status, body);
} else {
assert_eq!(status, 404, "Response: status={}, body={:?}", status, body);
}
assert!(
body.contains(expect_body),
"Response: status={}, body={:?}",
status,
body
);
}
cancel_token.cancel();
match server_handle.await {
Ok(_) => println!("[test] Server shut down normally"),
Err(e) => {
if e.is_panic() {
println!("[test] Server panicked: {:?}", e);
} else {
println!("[test] Server cancelled: {:?}", e);
}
}
}
}
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment