Unverified Commit 7c8f8fdc authored by Yingge He's avatar Yingge He Committed by GitHub
Browse files

feat: Parameterize health and live HTTP endpoint paths (#2230)

parent 433f6012
...@@ -17,6 +17,10 @@ const DEFAULT_SYSTEM_HOST: &str = "0.0.0.0"; ...@@ -17,6 +17,10 @@ const DEFAULT_SYSTEM_HOST: &str = "0.0.0.0";
/// Default system port for health and metrics endpoints /// Default system port for health and metrics endpoints
const DEFAULT_SYSTEM_PORT: u16 = 9090; const DEFAULT_SYSTEM_PORT: u16 = 9090;
/// Default health endpoint paths
const DEFAULT_SYSTEM_HEALTH_PATH: &str = "/health";
const DEFAULT_SYSTEM_LIVE_PATH: &str = "/live";
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkerConfig { pub struct WorkerConfig {
/// Grace shutdown period for the system server. /// Grace shutdown period for the system server.
...@@ -110,6 +114,16 @@ pub struct RuntimeConfig { ...@@ -110,6 +114,16 @@ pub struct RuntimeConfig {
#[builder(default = "vec![]")] #[builder(default = "vec![]")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))] #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub use_endpoint_health_status: Vec<String>, pub use_endpoint_health_status: Vec<String>,
/// Health endpoint paths
/// Set this at runtime with environment variable DYN_SYSTEM_HEALTH_PATH
#[builder(default = "DEFAULT_SYSTEM_HEALTH_PATH.to_string()")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub system_health_path: String,
/// Set this at runtime with environment variable DYN_SYSTEM_LIVE_PATH
#[builder(default = "DEFAULT_SYSTEM_LIVE_PATH.to_string()")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub system_live_path: String,
} }
impl fmt::Display for RuntimeConfig { impl fmt::Display for RuntimeConfig {
...@@ -134,6 +148,8 @@ impl fmt::Display for RuntimeConfig { ...@@ -134,6 +148,8 @@ impl fmt::Display for RuntimeConfig {
"starting_health_status={:?}", "starting_health_status={:?}",
self.starting_health_status self.starting_health_status
)?; )?;
write!(f, ", system_health_path={}", self.system_health_path)?;
write!(f, ", system_live_path={}", self.system_live_path)?;
Ok(()) Ok(())
} }
...@@ -169,6 +185,8 @@ impl RuntimeConfig { ...@@ -169,6 +185,8 @@ impl RuntimeConfig {
"ENABLED" => "system_enabled", "ENABLED" => "system_enabled",
"USE_ENDPOINT_HEALTH_STATUS" => "use_endpoint_health_status", "USE_ENDPOINT_HEALTH_STATUS" => "use_endpoint_health_status",
"STARTING_HEALTH_STATUS" => "starting_health_status", "STARTING_HEALTH_STATUS" => "starting_health_status",
"HEALTH_PATH" => "system_health_path",
"LIVE_PATH" => "system_live_path",
_ => k.as_str(), _ => k.as_str(),
}; };
Some(mapped_key.into()) Some(mapped_key.into())
...@@ -207,6 +225,8 @@ impl RuntimeConfig { ...@@ -207,6 +225,8 @@ impl RuntimeConfig {
system_enabled: false, system_enabled: false,
starting_health_status: HealthStatus::NotReady, starting_health_status: HealthStatus::NotReady,
use_endpoint_health_status: vec![], use_endpoint_health_status: vec![],
system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(),
system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(),
} }
} }
...@@ -234,6 +254,8 @@ impl Default for RuntimeConfig { ...@@ -234,6 +254,8 @@ impl Default for RuntimeConfig {
system_enabled: false, system_enabled: false,
starting_health_status: HealthStatus::NotReady, starting_health_status: HealthStatus::NotReady,
use_endpoint_health_status: vec![], use_endpoint_health_status: vec![],
system_health_path: DEFAULT_SYSTEM_HEALTH_PATH.to_string(),
system_live_path: DEFAULT_SYSTEM_LIVE_PATH.to_string(),
} }
} }
} }
...@@ -432,6 +454,41 @@ mod tests { ...@@ -432,6 +454,41 @@ mod tests {
); );
} }
#[test]
fn test_system_health_endpoint_path_default() {
temp_env::with_vars(vec![("DYN_SYSTEM_HEALTH_PATH", None::<&str>)], || {
let config = RuntimeConfig::from_settings().unwrap();
assert_eq!(
config.system_health_path,
DEFAULT_SYSTEM_HEALTH_PATH.to_string()
);
});
temp_env::with_vars(vec![("DYN_SYSTEM_LIVE_PATH", None::<&str>)], || {
let config = RuntimeConfig::from_settings().unwrap();
assert_eq!(
config.system_live_path,
DEFAULT_SYSTEM_LIVE_PATH.to_string()
);
});
}
#[test]
fn test_system_health_endpoint_path_custom() {
temp_env::with_vars(
vec![("DYN_SYSTEM_HEALTH_PATH", Some("/custom/health"))],
|| {
let config = RuntimeConfig::from_settings().unwrap();
assert_eq!(config.system_health_path, "/custom/health");
},
);
temp_env::with_vars(vec![("DYN_SYSTEM_LIVE_PATH", Some("/custom/live"))], || {
let config = RuntimeConfig::from_settings().unwrap();
assert_eq!(config.system_live_path, "/custom/live");
});
}
#[test] #[test]
fn test_is_truthy_and_falsey() { fn test_is_truthy_and_falsey() {
// Test truthy values // Test truthy values
......
...@@ -88,9 +88,13 @@ impl DistributedRuntime { ...@@ -88,9 +88,13 @@ impl DistributedRuntime {
}; };
let starting_health_status = config.starting_health_status.clone(); let starting_health_status = config.starting_health_status.clone();
let use_endpoint_health_status = config.use_endpoint_health_status.clone(); let use_endpoint_health_status = config.use_endpoint_health_status.clone();
let system_health = Arc::new(Mutex::new(SystemHealth::new( let health_endpoint_path = config.system_health_path.clone();
let live_endpoint_path = config.system_live_path.clone();
let system_health = Arc::new(std::sync::Mutex::new(SystemHealth::new(
starting_health_status, starting_health_status,
use_endpoint_health_status, use_endpoint_health_status,
health_endpoint_path,
live_endpoint_path,
))); )));
let distributed_runtime = Self { let distributed_runtime = Self {
......
...@@ -130,6 +130,20 @@ pub async fn spawn_http_server( ...@@ -130,6 +130,20 @@ pub async fn spawn_http_server(
) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> { ) -> anyhow::Result<(std::net::SocketAddr, tokio::task::JoinHandle<()>)> {
// Create HTTP server state with the provided metrics registry // Create HTTP server state with the provided metrics registry
let server_state = Arc::new(HttpServerState::new(drt)?); let server_state = Arc::new(HttpServerState::new(drt)?);
let health_path = server_state
.drt()
.system_health
.lock()
.unwrap()
.health_path
.clone();
let live_path = server_state
.drt()
.system_health
.lock()
.unwrap()
.live_path
.clone();
// Initialize the start time // Initialize the start time
server_state server_state
...@@ -138,14 +152,14 @@ pub async fn spawn_http_server( ...@@ -138,14 +152,14 @@ pub async fn spawn_http_server(
let app = Router::new() let app = Router::new()
.route( .route(
"/health", &health_path,
get({ get({
let state = Arc::clone(&server_state); let state = Arc::clone(&server_state);
move |tracing_ctx| health_handler(state, "health", tracing_ctx) move |tracing_ctx| health_handler(state, "health", tracing_ctx)
}), }),
) )
.route( .route(
"/live", &live_path,
get({ get({
let state = Arc::clone(&server_state); let state = Arc::clone(&server_state);
move |tracing_ctx| health_handler(state, "live", tracing_ctx) move |tracing_ctx| health_handler(state, "live", tracing_ctx)
...@@ -216,8 +230,12 @@ async fn health_handler( ...@@ -216,8 +230,12 @@ async fn health_handler(
route: &'static str, // Used for tracing only route: &'static str, // Used for tracing only
trace_parent: TraceParent, // Used for tracing only trace_parent: TraceParent, // Used for tracing only
) -> impl IntoResponse { ) -> impl IntoResponse {
let system_health = state.drt().system_health.lock().await; let (mut healthy, endpoints) = state
let (mut healthy, endpoints) = system_health.get_health_status(); .drt()
.system_health
.lock()
.unwrap()
.get_health_status();
let uptime = match state.uptime() { let uptime = match state.uptime() {
Ok(uptime_state) => Some(uptime_state), Ok(uptime_state) => Some(uptime_state),
Err(e) => { Err(e) => {
...@@ -373,14 +391,26 @@ dynamo_component_dynamo_uptime_seconds 42 ...@@ -373,14 +391,26 @@ dynamo_component_dynamo_uptime_seconds 42
} }
#[rstest] #[rstest]
#[case("ready", 200, "ready")] #[case("ready", 200, "ready", None, None, 3)]
#[case("notready", 503, "notready")] #[case("notready", 503, "notready", None, None, 3)]
#[case("ready", 200, "ready", Some("/custom/health"), Some("/custom/live"), 5)]
#[case(
"notready",
503,
"notready",
Some("/custom/health"),
Some("/custom/live"),
5
)]
#[tokio::test] #[tokio::test]
#[cfg(feature = "integration")] #[cfg(feature = "integration")]
async fn test_health_endpoints( async fn test_health_endpoints(
#[case] starting_health_status: &'static str, #[case] starting_health_status: &'static str,
#[case] expected_status: u16, #[case] expected_status: u16,
#[case] expected_body: &'static str, #[case] expected_body: &'static str,
#[case] custom_health_path: Option<&'static str>,
#[case] custom_live_path: Option<&'static str>,
#[case] expected_num_tests: usize,
) { ) {
use std::sync::Arc; use std::sync::Arc;
use tokio::time::sleep; use tokio::time::sleep;
...@@ -394,10 +424,14 @@ dynamo_component_dynamo_uptime_seconds 42 ...@@ -394,10 +424,14 @@ dynamo_component_dynamo_uptime_seconds 42
#[allow(clippy::redundant_closure_call)] #[allow(clippy::redundant_closure_call)]
temp_env::async_with_vars( temp_env::async_with_vars(
[( [
"DYN_SYSTEM_STARTING_HEALTH_STATUS", (
Some(starting_health_status), "DYN_SYSTEM_STARTING_HEALTH_STATUS",
)], Some(starting_health_status),
),
("DYN_SYSTEM_HEALTH_PATH", custom_health_path),
("DYN_SYSTEM_LIVE_PATH", custom_live_path),
],
(async || { (async || {
let runtime = crate::Runtime::from_settings().unwrap(); let runtime = crate::Runtime::from_settings().unwrap();
let drt = Arc::new( let drt = Arc::new(
...@@ -413,20 +447,30 @@ dynamo_component_dynamo_uptime_seconds 42 ...@@ -413,20 +447,30 @@ dynamo_component_dynamo_uptime_seconds 42
sleep(std::time::Duration::from_millis(1000)).await; sleep(std::time::Duration::from_millis(1000)).await;
println!("[test] Server should be up, starting requests..."); println!("[test] Server should be up, starting requests...");
let client = reqwest::Client::new(); let client = reqwest::Client::new();
for (path, expect_status, expect_body) in [
("/health", expected_status, expected_body), // Prepare test cases
("/live", expected_status, expected_body), let mut test_cases = vec![];
("/someRandomPathNotFoundHere", 404, "Route not found"), if custom_health_path.is_none() {
] { // When using default paths, test the default paths
test_cases.push(("/health", expected_status, expected_body));
} else {
// When using custom paths, default paths should not exist
test_cases.push(("/health", 404, "Route not found"));
test_cases.push((custom_health_path.unwrap(), expected_status, expected_body));
}
if custom_live_path.is_none() {
// When using default paths, test the default paths
test_cases.push(("/live", expected_status, expected_body));
} else {
// When using custom paths, default paths should not exist
test_cases.push(("/live", 404, "Route not found"));
test_cases.push((custom_live_path.unwrap(), expected_status, expected_body));
}
test_cases.push(("/someRandomPathNotFoundHere", 404, "Route not found"));
assert_eq!(test_cases.len(), expected_num_tests);
for (path, expect_status, expect_body) in test_cases {
println!("[test] Sending request to {}", path); println!("[test] Sending request to {}", path);
let traceparent_value =
"00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
let tracestate_value = "vendor1=opaqueValue1,vendor2=opaqueValue2";
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
reqwest::header::HeaderName::from_static("traceparent"),
reqwest::header::HeaderValue::from_str(traceparent_value).unwrap(),
);
let url = format!("http://{}{}", addr, path); let url = format!("http://{}{}", addr, path);
let response = client.get(&url).send().await.unwrap(); let response = client.get(&url).send().await.unwrap();
let status = response.status(); let status = response.status();
......
...@@ -87,12 +87,16 @@ pub struct SystemHealth { ...@@ -87,12 +87,16 @@ pub struct SystemHealth {
system_health: HealthStatus, system_health: HealthStatus,
endpoint_health: HashMap<String, HealthStatus>, endpoint_health: HashMap<String, HealthStatus>,
use_endpoint_health_status: Vec<String>, use_endpoint_health_status: Vec<String>,
health_path: String,
live_path: String,
} }
impl SystemHealth { impl SystemHealth {
pub fn new( pub fn new(
starting_health_status: HealthStatus, starting_health_status: HealthStatus,
use_endpoint_health_status: Vec<String>, use_endpoint_health_status: Vec<String>,
health_path: String,
live_path: String,
) -> Self { ) -> Self {
let mut endpoint_health = HashMap::new(); let mut endpoint_health = HashMap::new();
for endpoint in &use_endpoint_health_status { for endpoint in &use_endpoint_health_status {
...@@ -102,6 +106,8 @@ impl SystemHealth { ...@@ -102,6 +106,8 @@ impl SystemHealth {
system_health: starting_health_status, system_health: starting_health_status,
endpoint_health, endpoint_health,
use_endpoint_health_status, use_endpoint_health_status,
health_path,
live_path,
} }
} }
pub fn set_health_status(&mut self, status: HealthStatus) { pub fn set_health_status(&mut self, status: HealthStatus) {
...@@ -167,7 +173,7 @@ pub struct DistributedRuntime { ...@@ -167,7 +173,7 @@ pub struct DistributedRuntime {
instance_sources: Arc<Mutex<HashMap<Endpoint, Weak<InstanceSource>>>>, instance_sources: Arc<Mutex<HashMap<Endpoint, Weak<InstanceSource>>>>,
// Health Status // Health Status
system_health: Arc<Mutex<SystemHealth>>, system_health: Arc<std::sync::Mutex<SystemHealth>>,
// This map associates metric prefixes with their corresponding Prometheus registries. // This map associates metric prefixes with their corresponding Prometheus registries.
prometheus_registries_by_prefix: Arc<std::sync::Mutex<HashMap<String, prometheus::Registry>>>, prometheus_registries_by_prefix: Arc<std::sync::Mutex<HashMap<String, prometheus::Registry>>>,
......
...@@ -23,7 +23,7 @@ use anyhow::Result; ...@@ -23,7 +23,7 @@ use anyhow::Result;
use async_nats::service::endpoint::Endpoint; use async_nats::service::endpoint::Endpoint;
use derive_builder::Builder; use derive_builder::Builder;
use std::collections::HashMap; use std::collections::HashMap;
use tokio::sync::Mutex; use std::sync::Mutex;
use tokio::sync::Notify; use tokio::sync::Notify;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
...@@ -54,7 +54,7 @@ impl PushEndpoint { ...@@ -54,7 +54,7 @@ impl PushEndpoint {
system_health system_health
.lock() .lock()
.await .unwrap()
.set_endpoint_health_status(endpoint_name.clone(), HealthStatus::Ready); .set_endpoint_health_status(endpoint_name.clone(), HealthStatus::Ready);
loop { loop {
...@@ -113,7 +113,7 @@ impl PushEndpoint { ...@@ -113,7 +113,7 @@ impl PushEndpoint {
system_health system_health
.lock() .lock()
.await .unwrap()
.set_endpoint_health_status(endpoint_name.clone(), HealthStatus::NotReady); .set_endpoint_health_status(endpoint_name.clone(), HealthStatus::NotReady);
// await for all inflight requests to complete // await for all inflight requests to complete
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment