Unverified Commit 6387098f authored by Byron Hsu's avatar Byron Hsu Committed by GitHub
Browse files

[router] add health checking in router init (#2393)

parent 2a717c50
...@@ -851,6 +851,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" ...@@ -851,6 +851,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-sink",
] ]
[[package]] [[package]]
...@@ -1986,6 +1987,7 @@ dependencies = [ ...@@ -1986,6 +1987,7 @@ dependencies = [
"base64 0.22.1", "base64 0.22.1",
"bytes", "bytes",
"encoding_rs", "encoding_rs",
"futures-channel",
"futures-core", "futures-core",
"futures-util", "futures-util",
"h2 0.4.6", "h2 0.4.6",
......
...@@ -19,7 +19,7 @@ serde = { version = "1.0", features = ["derive"] } ...@@ -19,7 +19,7 @@ serde = { version = "1.0", features = ["derive"] }
clap = { version = "4.4", features = ["derive"] } clap = { version = "4.4", features = ["derive"] }
bytes = "1.8.0" bytes = "1.8.0"
rand = "0.8.5" rand = "0.8.5"
reqwest = { version = "0.12.8", features = ["stream"] } reqwest = { version = "0.12.8", features = ["stream", "blocking"] }
futures-util = "0.3" futures-util = "0.3"
serde_json = "1.0" serde_json = "1.0"
pyo3 = { version = "0.22.5", features = ["extension-module"] } pyo3 = { version = "0.22.5", features = ["extension-module"] }
......
...@@ -167,13 +167,6 @@ def main(): ...@@ -167,13 +167,6 @@ def main():
signal.SIGQUIT, lambda sig, frame: cleanup_processes(server_processes) signal.SIGQUIT, lambda sig, frame: cleanup_processes(server_processes)
) )
for port in worker_ports:
if not wait_for_server_health(server_args.host, port):
logger.error(f"Server on port {port} failed to become healthy")
break
logger.info("All servers are healthy. Starting router...")
# Update router args with worker URLs # Update router args with worker URLs
router_args.worker_urls = [ router_args.worker_urls = [
f"http://{server_args.host}:{port}" for port in worker_ports f"http://{server_args.host}:{port}" for port in worker_ports
......
...@@ -93,7 +93,7 @@ pub enum Router { ...@@ -93,7 +93,7 @@ pub enum Router {
}, },
} }
#[derive(Debug)] #[derive(Debug, Clone)]
pub enum PolicyConfig { pub enum PolicyConfig {
RandomConfig, RandomConfig,
RoundRobinConfig, RoundRobinConfig,
...@@ -127,9 +127,14 @@ fn get_text_from_request(body: &Bytes, route: &str) -> String { ...@@ -127,9 +127,14 @@ fn get_text_from_request(body: &Bytes, route: &str) -> String {
return "".to_string(); return "".to_string();
} }
impl Router { impl Router {
pub fn new(worker_urls: Vec<String>, policy_config: PolicyConfig) -> Self { pub fn new(worker_urls: Vec<String>, policy_config: PolicyConfig) -> Result<Self, String> {
match policy_config { // Wait until all workers are healthy
Self::wait_for_healthy_workers(&worker_urls, 300, 10)?;
// Create router based on policy...
Ok(match policy_config {
PolicyConfig::RandomConfig => Router::Random { PolicyConfig::RandomConfig => Router::Random {
worker_urls: Arc::new(RwLock::new(worker_urls)), worker_urls: Arc::new(RwLock::new(worker_urls)),
}, },
...@@ -196,7 +201,7 @@ impl Router { ...@@ -196,7 +201,7 @@ impl Router {
_eviction_thread: Some(eviction_thread), _eviction_thread: Some(eviction_thread),
} }
} }
} })
} }
pub fn get_first(&self) -> Option<String> { pub fn get_first(&self) -> Option<String> {
...@@ -213,6 +218,59 @@ impl Router { ...@@ -213,6 +218,59 @@ impl Router {
} }
} }
fn wait_for_healthy_workers(
worker_urls: &[String],
timeout_secs: u64,
interval_secs: u64,
) -> Result<(), String> {
let start_time = std::time::Instant::now();
let sync_client = reqwest::blocking::Client::new();
loop {
if start_time.elapsed() > Duration::from_secs(timeout_secs) {
return Err(format!(
"Timeout {}s waiting for workers to become healthy",
timeout_secs
));
}
let mut all_healthy = true;
let mut unhealthy_workers = Vec::new();
for url in worker_urls {
match sync_client.get(&format!("{}/health", url)).send() {
Ok(res) => {
if !res.status().is_success() {
info!(
"Worker {} health check is pending with status: {}.",
url,
res.status()
);
all_healthy = false;
unhealthy_workers.push((url, format!("Status: {}", res.status())));
}
}
Err(e) => {
info!("Worker {} health check is pending with error: {}", url, e);
all_healthy = false;
unhealthy_workers.push((url, format!("Error: {}", e)));
}
}
}
if all_healthy {
info!("All workers are healthy");
return Ok(());
} else {
info!("Unhealthy workers:");
for (url, reason) in &unhealthy_workers {
info!(" {} - {}", url, reason);
}
thread::sleep(Duration::from_secs(interval_secs));
}
}
}
pub async fn dispatch( pub async fn dispatch(
&self, &self,
client: &reqwest::Client, client: &reqwest::Client,
...@@ -386,7 +444,7 @@ impl Router { ...@@ -386,7 +444,7 @@ impl Router {
} }
} }
pub async fn add_worker(&self, worker_url: String) -> HttpResponse { pub async fn add_worker(&self, worker_url: String) -> Result<String, String> {
let interval_secs = 10; // check every 10 seconds let interval_secs = 10; // check every 10 seconds
let timeout_secs = 300; // 5 minutes let timeout_secs = 300; // 5 minutes
...@@ -395,7 +453,7 @@ impl Router { ...@@ -395,7 +453,7 @@ impl Router {
loop { loop {
if start_time.elapsed() > Duration::from_secs(timeout_secs) { if start_time.elapsed() > Duration::from_secs(timeout_secs) {
return HttpResponse::InternalServerError().body(format!( return Err(format!(
"Timeout {}s waiting for worker {} to become healthy", "Timeout {}s waiting for worker {} to become healthy",
timeout_secs, worker_url timeout_secs, worker_url
)); ));
...@@ -411,19 +469,40 @@ impl Router { ...@@ -411,19 +469,40 @@ impl Router {
info!("Worker {} health check passed", worker_url); info!("Worker {} health check passed", worker_url);
let mut urls = worker_urls.write().unwrap(); let mut urls = worker_urls.write().unwrap();
if urls.contains(&worker_url) { if urls.contains(&worker_url) {
return HttpResponse::BadRequest() return Err(format!("Worker {} already exists", worker_url));
.body(format!("Worker {} already exists", worker_url));
} }
info!("Added worker: {}", worker_url); info!("Added worker: {}", worker_url);
urls.push(worker_url.clone()); urls.push(worker_url.clone());
} }
} }
return HttpResponse::Ok()
.body(format!("Successfully added worker: {}", worker_url)); // If cache aware, initialize the queues for the new worker
if let Router::CacheAware {
running_queue,
processed_queue,
tree,
..
} = self
{
// Add worker to running queue with initial count of 0
running_queue.lock().unwrap().insert(worker_url.clone(), 0);
// Add worker to processed queue with initial count of 0
processed_queue
.lock()
.unwrap()
.insert(worker_url.clone(), 0);
// Add worker to tree
tree.lock().unwrap().insert(&"".to_string(), &worker_url);
}
return Ok(format!("Successfully added worker: {}", worker_url));
} else { } else {
info!( info!(
"Worker {} health check failed with status: {}. The worker might still be starting up.", "Worker {} health check is pending with status: {}.",
worker_url, res.status() worker_url,
res.status()
); );
// if the url does not have http or https prefix, warn users // if the url does not have http or https prefix, warn users
if !worker_url.starts_with("http://") && !worker_url.starts_with("https://") if !worker_url.starts_with("http://") && !worker_url.starts_with("https://")
...@@ -436,7 +515,10 @@ impl Router { ...@@ -436,7 +515,10 @@ impl Router {
} }
} }
Err(e) => { Err(e) => {
info!("Worker {} health check failed: {}", worker_url, e); info!(
"Worker {} health check is pending with error: {}",
worker_url, e
);
// if the url does not have http or https prefix, warn users // if the url does not have http or https prefix, warn users
if !worker_url.starts_with("http://") && !worker_url.starts_with("https://") { if !worker_url.starts_with("http://") && !worker_url.starts_with("https://") {
...@@ -463,9 +545,20 @@ impl Router { ...@@ -463,9 +545,20 @@ impl Router {
} }
// if cache aware, remove the worker from the tree // if cache aware, remove the worker from the tree
if let Router::CacheAware { tree, .. } = self { if let Router::CacheAware {
tree,
running_queue,
processed_queue,
..
} = self
{
tree.lock().unwrap().remove_tenant(&worker_url); tree.lock().unwrap().remove_tenant(&worker_url);
info!("Removed worker from tree: {}", worker_url); running_queue.lock().unwrap().remove(&worker_url);
processed_queue.lock().unwrap().remove(&worker_url);
info!(
"Removed worker from tree and cleaned up queues: {}",
worker_url
);
} }
} }
} }
...@@ -20,7 +20,10 @@ impl AppState { ...@@ -20,7 +20,10 @@ impl AppState {
policy_config: PolicyConfig, policy_config: PolicyConfig,
) -> Self { ) -> Self {
// Create router based on policy // Create router based on policy
let router = Router::new(worker_urls, policy_config); let router = match Router::new(worker_urls, policy_config) {
Ok(router) => router,
Err(error) => panic!("Failed to create router: {}", error),
};
Self { router, client } Self { router, client }
} }
...@@ -141,7 +144,11 @@ async fn add_worker( ...@@ -141,7 +144,11 @@ async fn add_worker(
.body("Worker URL required. Provide 'url' query parameter") .body("Worker URL required. Provide 'url' query parameter")
} }
}; };
data.router.add_worker(worker_url).await
match data.router.add_worker(worker_url).await {
Ok(message) => HttpResponse::Ok().body(message),
Err(error) => HttpResponse::BadRequest().body(error),
}
} }
#[post("/remove_worker")] #[post("/remove_worker")]
...@@ -187,20 +194,20 @@ pub async fn startup(config: ServerConfig) -> std::io::Result<()> { ...@@ -187,20 +194,20 @@ pub async fn startup(config: ServerConfig) -> std::io::Result<()> {
) )
.init(); .init();
info!("Starting server on {}:{}", config.host, config.port);
info!("Worker URLs: {:?}", config.worker_urls);
info!("Policy Config: {:?}", config.policy_config);
let client = reqwest::Client::builder() let client = reqwest::Client::builder()
.build() .build()
.expect("Failed to create HTTP client"); .expect("Failed to create HTTP client");
let app_state = web::Data::new(AppState::new( let app_state = web::Data::new(AppState::new(
config.worker_urls, config.worker_urls.clone(),
client, client,
config.policy_config, config.policy_config.clone(),
)); ));
info!("✅ Starting router on {}:{}", config.host, config.port);
info!("✅ Serving Worker URLs: {:?}", config.worker_urls);
info!("✅ Policy Config: {:?}", config.policy_config);
HttpServer::new(move || { HttpServer::new(move || {
App::new() App::new()
.app_data(app_state.clone()) .app_data(app_state.clone())
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment