Unverified Commit b127d95f authored by Neelay Shah's avatar Neelay Shah Committed by GitHub
Browse files

feat: health check changes based on endpoint served (#1996)

parent 22e6c96f
...@@ -6893,6 +6893,7 @@ version = "0.3.6" ...@@ -6893,6 +6893,7 @@ version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96374855068f47402c3121c6eed88d29cb1de8f3ab27090e273e420bdabcf050" checksum = "96374855068f47402c3121c6eed88d29cb1de8f3ab27090e273e420bdabcf050"
dependencies = [ dependencies = [
"futures",
"parking_lot", "parking_lot",
] ]
......
...@@ -69,4 +69,4 @@ assert_matches = { version = "1.5.0" } ...@@ -69,4 +69,4 @@ assert_matches = { version = "1.5.0" }
env_logger = { version = "0.11" } env_logger = { version = "0.11" }
reqwest = { workspace = true } reqwest = { workspace = true }
rstest = { version = "0.23.0" } rstest = { version = "0.23.0" }
temp-env = { version = "0.3.6" } temp-env = { version = "0.3.6" , features=["async_closure"] }
...@@ -30,7 +30,8 @@ ...@@ -30,7 +30,8 @@
//! TODO: Top-level Overview of Endpoints/Functions //! TODO: Top-level Overview of Endpoints/Functions
use crate::{ use crate::{
discovery::Lease, metrics::MetricsRegistry, service::ServiceSet, transports::etcd::EtcdPath, config::HealthStatus, discovery::Lease, metrics::MetricsRegistry, service::ServiceSet,
transports::etcd::EtcdPath,
}; };
use super::{ use super::{
......
...@@ -110,7 +110,11 @@ impl EndpointConfigBuilder { ...@@ -110,7 +110,11 @@ impl EndpointConfigBuilder {
.map_err(|e| anyhow::anyhow!("Failed to build push endpoint: {e}"))?; .map_err(|e| anyhow::anyhow!("Failed to build push endpoint: {e}"))?;
// launch in primary runtime // launch in primary runtime
let task = tokio::spawn(push_endpoint.start(service_endpoint)); let task = tokio::spawn(push_endpoint.start(
service_endpoint,
endpoint.name.clone(),
endpoint.drt().system_health.clone(),
));
// make the components service endpoint discovery in etcd // make the components service endpoint discovery in etcd
......
...@@ -48,6 +48,13 @@ impl Default for WorkerConfig { ...@@ -48,6 +48,13 @@ impl Default for WorkerConfig {
} }
} }
#[derive(Debug, Deserialize, Serialize, PartialEq, Clone)]
#[serde(rename_all = "lowercase")]
pub enum HealthStatus {
Ready,
NotReady,
}
/// Runtime configuration /// Runtime configuration
/// Defines the configuration for Tokio runtimes /// Defines the configuration for Tokio runtimes
#[derive(Serialize, Deserialize, Validate, Debug, Builder, Clone)] #[derive(Serialize, Deserialize, Validate, Debug, Builder, Clone)]
...@@ -88,6 +95,21 @@ pub struct RuntimeConfig { ...@@ -88,6 +95,21 @@ pub struct RuntimeConfig {
#[builder(default = "false")] #[builder(default = "false")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))] #[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub system_enabled: bool, pub system_enabled: bool,
/// Starting Health Status
/// Set this at runtime with environment variable DYN_SYSTEM_STARTING_HEALTH_STATUS
#[builder(default = "HealthStatus::NotReady")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub starting_health_status: HealthStatus,
/// Use Endpoint Health Status
/// When using endpoint health status, health status
/// is the AND of individual endpoint health
/// Set this at runtime with environment variable DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS
/// with the list of endpoints to consider for system health
#[builder(default = "vec![]")]
#[builder_field_attr(serde(skip_serializing_if = "Option::is_none"))]
pub use_endpoint_health_status: Vec<String>,
} }
impl fmt::Display for RuntimeConfig { impl fmt::Display for RuntimeConfig {
...@@ -102,6 +124,16 @@ impl fmt::Display for RuntimeConfig { ...@@ -102,6 +124,16 @@ impl fmt::Display for RuntimeConfig {
write!(f, "system_host={}, ", self.system_host)?; write!(f, "system_host={}, ", self.system_host)?;
write!(f, "system_port={}, ", self.system_port)?; write!(f, "system_port={}, ", self.system_port)?;
write!(f, "system_enabled={}", self.system_enabled)?; write!(f, "system_enabled={}", self.system_enabled)?;
write!(
f,
"use_endpoint_health_status={:?}",
self.use_endpoint_health_status
)?;
write!(
f,
"starting_health_status={:?}",
self.starting_health_status
)?;
Ok(()) Ok(())
} }
...@@ -135,6 +167,8 @@ impl RuntimeConfig { ...@@ -135,6 +167,8 @@ impl RuntimeConfig {
"HOST" => "system_host", "HOST" => "system_host",
"PORT" => "system_port", "PORT" => "system_port",
"ENABLED" => "system_enabled", "ENABLED" => "system_enabled",
"USE_ENDPOINT_HEALTH_STATUS" => "use_endpoint_health_status",
"STARTING_HEALTH_STATUS" => "starting_health_status",
_ => k.as_str(), _ => k.as_str(),
}; };
Some(mapped_key.into()) Some(mapped_key.into())
...@@ -151,7 +185,7 @@ impl RuntimeConfig { ...@@ -151,7 +185,7 @@ impl RuntimeConfig {
/// 2. /opt/dynamo/etc/runtime.toml /// 2. /opt/dynamo/etc/runtime.toml
/// 3. /opt/dynamo/defaults/runtime.toml (lowest priority) /// 3. /opt/dynamo/defaults/runtime.toml (lowest priority)
/// ///
/// Environment variables are prefixed with `DYN_RUNTIME_` /// Environment variables are prefixed with `DYN_RUNTIME_` and `DYN_SYSTEM`
pub fn from_settings() -> Result<RuntimeConfig> { pub fn from_settings() -> Result<RuntimeConfig> {
let config: RuntimeConfig = Self::figment().extract()?; let config: RuntimeConfig = Self::figment().extract()?;
config.validate()?; config.validate()?;
...@@ -171,6 +205,8 @@ impl RuntimeConfig { ...@@ -171,6 +205,8 @@ impl RuntimeConfig {
system_host: DEFAULT_SYSTEM_HOST.to_string(), system_host: DEFAULT_SYSTEM_HOST.to_string(),
system_port: DEFAULT_SYSTEM_PORT, system_port: DEFAULT_SYSTEM_PORT,
system_enabled: false, system_enabled: false,
starting_health_status: HealthStatus::NotReady,
use_endpoint_health_status: vec![],
} }
} }
...@@ -196,6 +232,8 @@ impl Default for RuntimeConfig { ...@@ -196,6 +232,8 @@ impl Default for RuntimeConfig {
system_host: DEFAULT_SYSTEM_HOST.to_string(), system_host: DEFAULT_SYSTEM_HOST.to_string(),
system_port: DEFAULT_SYSTEM_PORT, system_port: DEFAULT_SYSTEM_PORT,
system_enabled: false, system_enabled: false,
starting_health_status: HealthStatus::NotReady,
use_endpoint_health_status: vec![],
} }
} }
} }
...@@ -372,6 +410,28 @@ mod tests { ...@@ -372,6 +410,28 @@ mod tests {
}); });
} }
#[test]
fn test_system_server_starting_health_status_ready() {
temp_env::with_vars(
vec![("DYN_SYSTEM_STARTING_HEALTH_STATUS", Some("ready"))],
|| {
let config = RuntimeConfig::from_settings().unwrap();
assert!(config.starting_health_status == HealthStatus::Ready);
},
);
}
#[test]
fn test_system_use_endpoint_health_status() {
temp_env::with_vars(
vec![("DYN_SYSTEM_USE_ENDPOINT_HEALTH_STATUS", Some("[\"ready\"]"))],
|| {
let config = RuntimeConfig::from_settings().unwrap();
assert!(config.use_endpoint_health_status == vec!["ready"]);
},
);
}
#[test] #[test]
fn test_is_truthy_and_falsey() { fn test_is_truthy_and_falsey() {
// Test truthy values // Test truthy values
......
...@@ -23,7 +23,7 @@ use crate::{ ...@@ -23,7 +23,7 @@ use crate::{
ErrorContext, ErrorContext,
}; };
use super::{error, Arc, DistributedRuntime, OnceCell, Result, Runtime, Weak, OK}; use super::{error, Arc, DistributedRuntime, OnceCell, Result, Runtime, SystemHealth, Weak, OK};
use derive_getters::Dissolve; use derive_getters::Dissolve;
use figment::error; use figment::error;
...@@ -85,6 +85,12 @@ impl DistributedRuntime { ...@@ -85,6 +85,12 @@ impl DistributedRuntime {
} else { } else {
None None
}; };
let starting_health_status = config.starting_health_status.clone();
let use_endpoint_health_status = config.use_endpoint_health_status.clone();
let system_health = Arc::new(Mutex::new(SystemHealth::new(
starting_health_status,
use_endpoint_health_status,
)));
let distributed_runtime = Self { let distributed_runtime = Self {
runtime, runtime,
...@@ -98,6 +104,7 @@ impl DistributedRuntime { ...@@ -98,6 +104,7 @@ impl DistributedRuntime {
String, String,
prometheus::Registry, prometheus::Registry,
>::new())), >::new())),
system_health,
}; };
// Start HTTP server if enabled // Start HTTP server if enabled
......
...@@ -13,9 +13,12 @@ ...@@ -13,9 +13,12 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use crate::config::HealthStatus;
use crate::metrics::MetricsRegistry; use crate::metrics::MetricsRegistry;
use crate::traits::DistributedRuntimeProvider; use crate::traits::DistributedRuntimeProvider;
use axum::{body, http::StatusCode, response::IntoResponse, routing::get, Router}; use axum::{body, http::StatusCode, response::IntoResponse, routing::get, Router};
use serde_json::json;
use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::sync::OnceLock; use std::sync::OnceLock;
use std::time::Instant; use std::time::Instant;
...@@ -174,20 +177,35 @@ pub async fn spawn_http_server( ...@@ -174,20 +177,35 @@ pub async fn spawn_http_server(
} }
/// Health handler /// Health handler
#[tracing::instrument(skip_all, level = "trace")]
async fn health_handler(state: Arc<HttpServerState>) -> impl IntoResponse { async fn health_handler(state: Arc<HttpServerState>) -> impl IntoResponse {
match state.uptime() { let system_health = state.drt().system_health.lock().await;
Ok(uptime) => { let (mut healthy, endpoints) = system_health.get_health_status();
let response = format!("OK\nUptime: {} seconds\n", uptime.as_secs()); let uptime = match state.uptime() {
(StatusCode::OK, response) Ok(uptime_state) => Some(uptime_state),
}
Err(e) => { Err(e) => {
tracing::error!("Failed to get uptime: {}", e); tracing::error!("Failed to get uptime: {}", e);
( healthy = false;
StatusCode::INTERNAL_SERVER_ERROR, None
"Failed to get uptime".to_string(),
)
} }
} };
let healthy_string = if healthy { "ready" } else { "notready" };
let status_code = if healthy {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};
let response = json!({
"status": healthy_string,
"uptime": uptime,
"endpoints": endpoints
});
tracing::trace!("Response {}", response.to_string());
(status_code, response.to_string())
} }
/// Metrics handler with DistributedRuntime uptime /// Metrics handler with DistributedRuntime uptime
...@@ -225,6 +243,7 @@ async fn create_test_drt_async() -> crate::DistributedRuntime { ...@@ -225,6 +243,7 @@ async fn create_test_drt_async() -> crate::DistributedRuntime {
mod tests { mod tests {
use super::*; use super::*;
use crate::metrics::MetricsRegistry; use crate::metrics::MetricsRegistry;
use rstest::rstest;
use std::sync::Arc; use std::sync::Arc;
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
...@@ -299,6 +318,76 @@ uptime_seconds{namespace=\"http_server\"} 42 ...@@ -299,6 +318,76 @@ uptime_seconds{namespace=\"http_server\"} 42
// If we get here, uptime calculation works correctly // If we get here, uptime calculation works correctly
} }
#[rstest]
#[cfg(feature = "integration")]
#[case("ready", 200, "ready")]
#[case("notready", 503, "notready")]
#[tokio::test]
async fn test_health_endpoints(
#[case] starting_health_status: &'static str,
#[case] expected_status: u16,
#[case] expected_body: &'static str,
) {
use std::sync::Arc;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
// use tokio::io::{AsyncReadExt, AsyncWriteExt};
// use reqwest for HTTP requests
// Closure call is needed here to satisfy async_with_vars
#[allow(clippy::redundant_closure_call)]
temp_env::async_with_vars(
[(
"DYN_SYSTEM_STARTING_HEALTH_STATUS",
Some(starting_health_status),
)],
(async || {
let runtime = crate::Runtime::from_settings().unwrap();
let drt = Arc::new(
crate::DistributedRuntime::from_settings_without_discovery(runtime)
.await
.unwrap(),
);
let cancel_token = CancellationToken::new();
let (addr, _) = spawn_http_server("127.0.0.1", 0, cancel_token.clone(), drt)
.await
.unwrap();
println!("[test] Waiting for server to start...");
sleep(std::time::Duration::from_millis(1000)).await;
println!("[test] Server should be up, starting requests...");
let client = reqwest::Client::new();
for (path, expect_status, expect_body) in [
("/health", expected_status, expected_body),
("/live", expected_status, expected_body),
("/someRandomPathNotFoundHere", 404, "Route not found"),
] {
println!("[test] Sending request to {}", path);
let url = format!("http://{}{}", addr, path);
let response = client.get(&url).send().await.unwrap();
let status = response.status();
let body = response.text().await.unwrap();
println!(
"[test] Response for {}: status={}, body={:?}",
path, status, body
);
assert_eq!(
status, expect_status,
"Response: status={}, body={:?}",
status, body
);
assert!(
body.contains(expect_body),
"Response: status={}, body={:?}",
status,
body
);
}
})(),
)
.await;
}
#[cfg(feature = "integration")] #[cfg(feature = "integration")]
#[tokio::test] #[tokio::test]
async fn test_uptime_without_initialization() { async fn test_uptime_without_initialization() {
......
...@@ -59,6 +59,8 @@ pub use worker::Worker; ...@@ -59,6 +59,8 @@ pub use worker::Worker;
use component::{Endpoint, InstanceSource}; use component::{Endpoint, InstanceSource};
use config::HealthStatus;
/// Types of Tokio runtimes that can be used to construct a Dynamo [Runtime]. /// Types of Tokio runtimes that can be used to construct a Dynamo [Runtime].
#[derive(Clone)] #[derive(Clone)]
enum RuntimeType { enum RuntimeType {
...@@ -75,6 +77,68 @@ pub struct Runtime { ...@@ -75,6 +77,68 @@ pub struct Runtime {
cancellation_token: CancellationToken, cancellation_token: CancellationToken,
} }
/// Current Health Status
/// If use_endpoint_health_status is set then
/// initialize the endpoint_health hashmap to the
/// starting health status
#[derive(Clone)]
pub struct SystemHealth {
system_health: HealthStatus,
endpoint_health: HashMap<String, HealthStatus>,
use_endpoint_health_status: Vec<String>,
}
impl SystemHealth {
pub fn new(
starting_health_status: HealthStatus,
use_endpoint_health_status: Vec<String>,
) -> Self {
let mut endpoint_health = HashMap::new();
for endpoint in &use_endpoint_health_status {
endpoint_health.insert(endpoint.clone(), starting_health_status.clone());
}
SystemHealth {
system_health: starting_health_status,
endpoint_health,
use_endpoint_health_status,
}
}
pub fn set_health_status(&mut self, status: HealthStatus) {
self.system_health = status;
}
pub fn set_endpoint_health_status(&mut self, endpoint: String, status: HealthStatus) {
self.endpoint_health.insert(endpoint, status);
}
/// Returns the overall health status and endpoint health statuses
pub fn get_health_status(&self) -> (bool, HashMap<String, String>) {
let mut endpoints: HashMap<String, String> = HashMap::new();
for (endpoint, ready) in &self.endpoint_health {
endpoints.insert(
endpoint.clone(),
if *ready == HealthStatus::Ready {
"ready".to_string()
} else {
"notready".to_string()
},
);
}
let healthy = if !self.use_endpoint_health_status.is_empty() {
self.use_endpoint_health_status.iter().all(|endpoint| {
self.endpoint_health
.get(endpoint)
.is_some_and(|status| *status == HealthStatus::Ready)
})
} else {
self.system_health == HealthStatus::Ready
};
(healthy, endpoints)
}
}
/// Distributed [Runtime] which provides access to shared resources across the cluster, this includes /// Distributed [Runtime] which provides access to shared resources across the cluster, this includes
/// communication protocols and transports. /// communication protocols and transports.
#[derive(Clone)] #[derive(Clone)]
...@@ -100,6 +164,9 @@ pub struct DistributedRuntime { ...@@ -100,6 +164,9 @@ pub struct DistributedRuntime {
instance_sources: Arc<Mutex<HashMap<Endpoint, Weak<InstanceSource>>>>, instance_sources: Arc<Mutex<HashMap<Endpoint, Weak<InstanceSource>>>>,
// Health Status
system_health: Arc<Mutex<SystemHealth>>,
// This map associates metric prefixes with their corresponding Prometheus registries. // This map associates metric prefixes with their corresponding Prometheus registries.
prometheus_registries_by_prefix: Arc<std::sync::Mutex<HashMap<String, prometheus::Registry>>>, prometheus_registries_by_prefix: Arc<std::sync::Mutex<HashMap<String, prometheus::Registry>>>,
} }
...@@ -16,9 +16,14 @@ ...@@ -16,9 +16,14 @@
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use super::*; use super::*;
use crate::config::HealthStatus;
use crate::protocols::LeaseId;
use crate::SystemHealth;
use anyhow::Result; use anyhow::Result;
use async_nats::service::endpoint::Endpoint; use async_nats::service::endpoint::Endpoint;
use derive_builder::Builder; use derive_builder::Builder;
use std::collections::HashMap;
use tokio::sync::Mutex;
use tokio::sync::Notify; use tokio::sync::Notify;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
...@@ -36,12 +41,22 @@ impl PushEndpoint { ...@@ -36,12 +41,22 @@ impl PushEndpoint {
PushEndpointBuilder::default() PushEndpointBuilder::default()
} }
pub async fn start(self, endpoint: Endpoint) -> Result<()> { pub async fn start(
self,
endpoint: Endpoint,
endpoint_name: String,
system_health: Arc<Mutex<SystemHealth>>,
) -> Result<()> {
let mut endpoint = endpoint; let mut endpoint = endpoint;
let inflight = Arc::new(AtomicU64::new(0)); let inflight = Arc::new(AtomicU64::new(0));
let notify = Arc::new(Notify::new()); let notify = Arc::new(Notify::new());
system_health
.lock()
.await
.set_endpoint_health_status(endpoint_name.clone(), HealthStatus::Ready);
loop { loop {
let req = tokio::select! { let req = tokio::select! {
biased; biased;
...@@ -96,6 +111,11 @@ impl PushEndpoint { ...@@ -96,6 +111,11 @@ impl PushEndpoint {
} }
} }
system_health
.lock()
.await
.set_endpoint_health_status(endpoint_name.clone(), HealthStatus::NotReady);
// await for all inflight requests to complete // await for all inflight requests to complete
tracing::info!( tracing::info!(
"Waiting for {} inflight requests to complete", "Waiting for {} inflight requests to complete",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment