Unverified Commit 9d9fa9a5 authored by LukasBluebaum's avatar LukasBluebaum Committed by GitHub
Browse files

[router] Fix short timeout for the prefill client (#9803)

parent 58d06fdc
...@@ -305,10 +305,10 @@ jobs: ...@@ -305,10 +305,10 @@ jobs:
# Set mean thresholds (allowing for reasonable variance) # Set mean thresholds (allowing for reasonable variance)
# These can be adjusted based on your performance requirements # These can be adjusted based on your performance requirements
ttft_threshold=2.0 # Max 2.0 seconds for mean TTFT ttft_threshold=4.7 # Max 4.7 seconds for mean TTFT
e2e_latency_threshold=24.0 # Max 8.0 seconds for mean E2E latency e2e_latency_threshold=35.0 # Max 35.0 seconds for mean E2E latency
input_throughput_threshold=10000 # Min 9000 tokens/s for mean input throughput input_throughput_threshold=12000 # Min 12000 tokens/s for mean input throughput
output_throughput_threshold=90 # Min 100 tokens/s for mean output throughput output_throughput_threshold=68 # Min 68 tokens/s for mean output throughput
# Validate mean thresholds # Validate mean thresholds
......
...@@ -95,6 +95,7 @@ impl RouterFactory { ...@@ -95,6 +95,7 @@ impl RouterFactory {
prefill_policy, prefill_policy,
decode_policy, decode_policy,
ctx.client.clone(), ctx.client.clone(),
ctx.router_config.request_timeout_secs,
ctx.router_config.worker_startup_timeout_secs, ctx.router_config.worker_startup_timeout_secs,
ctx.router_config.worker_startup_check_interval_secs, ctx.router_config.worker_startup_check_interval_secs,
ctx.router_config.retry.clone(), ctx.router_config.retry.clone(),
......
...@@ -42,8 +42,8 @@ pub struct PDRouter { ...@@ -42,8 +42,8 @@ pub struct PDRouter {
pub decode_workers: Arc<RwLock<Vec<Box<dyn Worker>>>>, pub decode_workers: Arc<RwLock<Vec<Box<dyn Worker>>>>,
pub prefill_policy: Arc<dyn LoadBalancingPolicy>, pub prefill_policy: Arc<dyn LoadBalancingPolicy>,
pub decode_policy: Arc<dyn LoadBalancingPolicy>, pub decode_policy: Arc<dyn LoadBalancingPolicy>,
pub timeout_secs: u64, pub worker_startup_timeout_secs: u64,
pub interval_secs: u64, pub worker_startup_check_interval_secs: u64,
pub worker_loads: Arc<tokio::sync::watch::Receiver<HashMap<String, isize>>>, pub worker_loads: Arc<tokio::sync::watch::Receiver<HashMap<String, isize>>>,
pub load_monitor_handle: Option<Arc<tokio::task::JoinHandle<()>>>, pub load_monitor_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
pub client: Client, pub client: Client,
...@@ -74,8 +74,8 @@ impl PDRouter { ...@@ -74,8 +74,8 @@ impl PDRouter {
async fn wait_for_server_health(&self, url: &str) -> Result<(), PDRouterError> { async fn wait_for_server_health(&self, url: &str) -> Result<(), PDRouterError> {
crate::routers::http::router::Router::wait_for_healthy_workers( crate::routers::http::router::Router::wait_for_healthy_workers(
&[url.to_string()], &[url.to_string()],
self.timeout_secs, self.worker_startup_timeout_secs,
self.interval_secs, self.worker_startup_check_interval_secs,
) )
.await .await
.map_err(|_| PDRouterError::HealthCheckFailed { .map_err(|_| PDRouterError::HealthCheckFailed {
...@@ -376,8 +376,9 @@ impl PDRouter { ...@@ -376,8 +376,9 @@ impl PDRouter {
prefill_policy: Arc<dyn LoadBalancingPolicy>, prefill_policy: Arc<dyn LoadBalancingPolicy>,
decode_policy: Arc<dyn LoadBalancingPolicy>, decode_policy: Arc<dyn LoadBalancingPolicy>,
client: Client, client: Client,
timeout_secs: u64, prefill_request_timeout_secs: u64,
interval_secs: u64, worker_startup_timeout_secs: u64,
worker_startup_check_interval_secs: u64,
retry_config: RetryConfig, retry_config: RetryConfig,
circuit_breaker_config: ConfigCircuitBreakerConfig, circuit_breaker_config: ConfigCircuitBreakerConfig,
health_check_config: ConfigHealthCheckConfig, health_check_config: ConfigHealthCheckConfig,
...@@ -437,8 +438,8 @@ impl PDRouter { ...@@ -437,8 +438,8 @@ impl PDRouter {
if !all_urls.is_empty() { if !all_urls.is_empty() {
crate::routers::http::router::Router::wait_for_healthy_workers( crate::routers::http::router::Router::wait_for_healthy_workers(
&all_urls, &all_urls,
timeout_secs, worker_startup_timeout_secs,
interval_secs, worker_startup_check_interval_secs,
) )
.await?; .await?;
} }
...@@ -465,7 +466,7 @@ impl PDRouter { ...@@ -465,7 +466,7 @@ impl PDRouter {
let load_monitor_handle = let load_monitor_handle =
if prefill_policy.name() == "power_of_two" || decode_policy.name() == "power_of_two" { if prefill_policy.name() == "power_of_two" || decode_policy.name() == "power_of_two" {
let monitor_urls = all_urls.clone(); let monitor_urls = all_urls.clone();
let monitor_interval = interval_secs; let monitor_interval = worker_startup_check_interval_secs;
let monitor_client = client.clone(); let monitor_client = client.clone();
let prefill_policy_clone = Arc::clone(&prefill_policy); let prefill_policy_clone = Arc::clone(&prefill_policy);
let decode_policy_clone = Arc::clone(&decode_policy); let decode_policy_clone = Arc::clone(&decode_policy);
...@@ -503,7 +504,7 @@ impl PDRouter { ...@@ -503,7 +504,7 @@ impl PDRouter {
.pool_max_idle_per_host(0) .pool_max_idle_per_host(0)
.http1_only() .http1_only()
.connect_timeout(Duration::from_millis(300)) .connect_timeout(Duration::from_millis(300))
.timeout(Duration::from_secs(2)) .timeout(Duration::from_secs(prefill_request_timeout_secs))
.build() .build()
.map_err(|e| format!("Failed to build prefill client: {}", e))?; .map_err(|e| format!("Failed to build prefill client: {}", e))?;
...@@ -581,8 +582,8 @@ impl PDRouter { ...@@ -581,8 +582,8 @@ impl PDRouter {
decode_workers, decode_workers,
prefill_policy, prefill_policy,
decode_policy, decode_policy,
timeout_secs, worker_startup_timeout_secs,
interval_secs, worker_startup_check_interval_secs,
worker_loads, worker_loads,
load_monitor_handle, load_monitor_handle,
client, client,
...@@ -2104,8 +2105,8 @@ mod tests { ...@@ -2104,8 +2105,8 @@ mod tests {
decode_workers: Arc::new(RwLock::new(vec![])), decode_workers: Arc::new(RwLock::new(vec![])),
prefill_policy, prefill_policy,
decode_policy, decode_policy,
timeout_secs: 5, worker_startup_timeout_secs: 5,
interval_secs: 1, worker_startup_check_interval_secs: 1,
worker_loads: Arc::new(tokio::sync::watch::channel(HashMap::new()).1), worker_loads: Arc::new(tokio::sync::watch::channel(HashMap::new()).1),
load_monitor_handle: None, load_monitor_handle: None,
client: Client::new(), client: Client::new(),
......
...@@ -34,8 +34,8 @@ pub struct Router { ...@@ -34,8 +34,8 @@ pub struct Router {
workers: Arc<RwLock<Vec<Box<dyn Worker>>>>, workers: Arc<RwLock<Vec<Box<dyn Worker>>>>,
policy: Arc<dyn LoadBalancingPolicy>, policy: Arc<dyn LoadBalancingPolicy>,
client: Client, client: Client,
timeout_secs: u64, worker_startup_timeout_secs: u64,
interval_secs: u64, worker_startup_check_interval_secs: u64,
dp_aware: bool, dp_aware: bool,
api_key: Option<String>, api_key: Option<String>,
retry_config: RetryConfig, retry_config: RetryConfig,
...@@ -52,8 +52,8 @@ impl Router { ...@@ -52,8 +52,8 @@ impl Router {
worker_urls: Vec<String>, worker_urls: Vec<String>,
policy: Arc<dyn LoadBalancingPolicy>, policy: Arc<dyn LoadBalancingPolicy>,
client: Client, client: Client,
timeout_secs: u64, worker_startup_timeout_secs: u64,
interval_secs: u64, worker_startup_check_interval_secs: u64,
dp_aware: bool, dp_aware: bool,
api_key: Option<String>, api_key: Option<String>,
retry_config: RetryConfig, retry_config: RetryConfig,
...@@ -65,7 +65,12 @@ impl Router { ...@@ -65,7 +65,12 @@ impl Router {
// Wait for workers to be healthy (skip if empty - for service discovery mode) // Wait for workers to be healthy (skip if empty - for service discovery mode)
if !worker_urls.is_empty() { if !worker_urls.is_empty() {
Self::wait_for_healthy_workers(&worker_urls, timeout_secs, interval_secs).await?; Self::wait_for_healthy_workers(
&worker_urls,
worker_startup_timeout_secs,
worker_startup_check_interval_secs,
)
.await?;
} }
let worker_urls = if dp_aware { let worker_urls = if dp_aware {
...@@ -110,7 +115,10 @@ impl Router { ...@@ -110,7 +115,10 @@ impl Router {
} }
let workers = Arc::new(RwLock::new(workers)); let workers = Arc::new(RwLock::new(workers));
let health_checker = crate::core::start_health_checker(Arc::clone(&workers), interval_secs); let health_checker = crate::core::start_health_checker(
Arc::clone(&workers),
worker_startup_check_interval_secs,
);
// Setup load monitoring for PowerOfTwo policy // Setup load monitoring for PowerOfTwo policy
let (tx, rx) = tokio::sync::watch::channel(HashMap::new()); let (tx, rx) = tokio::sync::watch::channel(HashMap::new());
...@@ -118,7 +126,7 @@ impl Router { ...@@ -118,7 +126,7 @@ impl Router {
let load_monitor_handle = if policy.name() == "power_of_two" { let load_monitor_handle = if policy.name() == "power_of_two" {
let monitor_urls = worker_urls.clone(); let monitor_urls = worker_urls.clone();
let monitor_interval = interval_secs; let monitor_interval = worker_startup_check_interval_secs;
let policy_clone = Arc::clone(&policy); let policy_clone = Arc::clone(&policy);
let client_clone = client.clone(); let client_clone = client.clone();
...@@ -140,8 +148,8 @@ impl Router { ...@@ -140,8 +148,8 @@ impl Router {
workers, workers,
policy, policy,
client, client,
timeout_secs, worker_startup_timeout_secs,
interval_secs, worker_startup_check_interval_secs,
dp_aware, dp_aware,
api_key, api_key,
retry_config, retry_config,
...@@ -164,8 +172,8 @@ impl Router { ...@@ -164,8 +172,8 @@ impl Router {
pub async fn wait_for_healthy_workers( pub async fn wait_for_healthy_workers(
worker_urls: &[String], worker_urls: &[String],
timeout_secs: u64, worker_startup_timeout_secs: u64,
interval_secs: u64, worker_startup_check_interval_secs: u64,
) -> Result<(), String> { ) -> Result<(), String> {
if worker_urls.is_empty() { if worker_urls.is_empty() {
return Err( return Err(
...@@ -174,18 +182,23 @@ impl Router { ...@@ -174,18 +182,23 @@ impl Router {
} }
// Perform health check asynchronously // Perform health check asynchronously
Self::wait_for_healthy_workers_async(worker_urls, timeout_secs, interval_secs).await Self::wait_for_healthy_workers_async(
worker_urls,
worker_startup_timeout_secs,
worker_startup_check_interval_secs,
)
.await
} }
async fn wait_for_healthy_workers_async( async fn wait_for_healthy_workers_async(
worker_urls: &[String], worker_urls: &[String],
timeout_secs: u64, worker_startup_timeout_secs: u64,
interval_secs: u64, worker_startup_check_interval_secs: u64,
) -> Result<(), String> { ) -> Result<(), String> {
info!( info!(
"Waiting for {} workers to become healthy (timeout: {}s)", "Waiting for {} workers to become healthy (timeout: {}s)",
worker_urls.len(), worker_urls.len(),
timeout_secs worker_startup_timeout_secs
); );
let start_time = std::time::Instant::now(); let start_time = std::time::Instant::now();
...@@ -195,14 +208,14 @@ impl Router { ...@@ -195,14 +208,14 @@ impl Router {
.map_err(|e| format!("Failed to create HTTP client: {}", e))?; .map_err(|e| format!("Failed to create HTTP client: {}", e))?;
loop { loop {
if start_time.elapsed() > Duration::from_secs(timeout_secs) { if start_time.elapsed() > Duration::from_secs(worker_startup_timeout_secs) {
error!( error!(
"Timeout {}s waiting for workers {:?} to become healthy. Please set --router-worker-startup-timeout-secs (sglang_router.launch_server) or --worker-startup-timeout-secs (sglang_worker.router) to a larger value", "Timeout {}s waiting for workers {:?} to become healthy. Please set --router-worker-startup-timeout-secs (sglang_router.launch_server) or --worker-startup-timeout-secs (sglang_worker.router) to a larger value",
timeout_secs, worker_urls worker_startup_timeout_secs, worker_urls
); );
return Err(format!( return Err(format!(
"Timeout {}s waiting for workers {:?} to become healthy. Please set --router-worker-startup-timeout-secs (sglang_router.launch_server) or --worker-startup-timeout-secs (sglang_worker.router) to a larger value", "Timeout {}s waiting for workers {:?} to become healthy. Please set --router-worker-startup-timeout-secs (sglang_router.launch_server) or --worker-startup-timeout-secs (sglang_worker.router) to a larger value",
timeout_secs, worker_urls worker_startup_timeout_secs, worker_urls
)); ));
} }
...@@ -262,7 +275,7 @@ impl Router { ...@@ -262,7 +275,7 @@ impl Router {
unhealthy_workers.len(), unhealthy_workers.len(),
unhealthy_workers unhealthy_workers
); );
tokio::time::sleep(Duration::from_secs(interval_secs)).await; tokio::time::sleep(Duration::from_secs(worker_startup_check_interval_secs)).await;
} }
} }
} }
...@@ -812,19 +825,19 @@ impl Router { ...@@ -812,19 +825,19 @@ impl Router {
pub async fn add_worker(&self, worker_url: &str) -> Result<String, String> { pub async fn add_worker(&self, worker_url: &str) -> Result<String, String> {
let start_time = std::time::Instant::now(); let start_time = std::time::Instant::now();
let client = reqwest::Client::builder() let client = reqwest::Client::builder()
.timeout(Duration::from_secs(self.timeout_secs)) .timeout(Duration::from_secs(self.worker_startup_timeout_secs))
.build() .build()
.map_err(|e| format!("Failed to create HTTP client: {}", e))?; .map_err(|e| format!("Failed to create HTTP client: {}", e))?;
loop { loop {
if start_time.elapsed() > Duration::from_secs(self.timeout_secs) { if start_time.elapsed() > Duration::from_secs(self.worker_startup_timeout_secs) {
error!( error!(
"Timeout {}s waiting for worker {} to become healthy. Please set --router-worker-startup-timeout-secs (sglang_router.launch_server) or --worker-startup-timeout-secs (sglang_worker.router) to a larger value", "Timeout {}s waiting for worker {} to become healthy. Please set --router-worker-startup-timeout-secs (sglang_router.launch_server) or --worker-startup-timeout-secs (sglang_worker.router) to a larger value",
self.timeout_secs, worker_url self.worker_startup_timeout_secs, worker_url
); );
return Err(format!( return Err(format!(
"Timeout {}s waiting for worker {} to become healthy. Please set --router-worker-startup-timeout-secs (sglang_router.launch_server) or --worker-startup-timeout-secs (sglang_worker.router) to a larger value", "Timeout {}s waiting for worker {} to become healthy. Please set --router-worker-startup-timeout-secs (sglang_router.launch_server) or --worker-startup-timeout-secs (sglang_worker.router) to a larger value",
self.timeout_secs, worker_url self.worker_startup_timeout_secs, worker_url
)); ));
} }
...@@ -894,7 +907,10 @@ impl Router { ...@@ -894,7 +907,10 @@ impl Router {
warn!("The worker url {} does not have http or https prefix. Please add the prefix to the url.", worker_url); warn!("The worker url {} does not have http or https prefix. Please add the prefix to the url.", worker_url);
} }
tokio::time::sleep(Duration::from_secs(self.interval_secs)).await; tokio::time::sleep(Duration::from_secs(
self.worker_startup_check_interval_secs,
))
.await;
continue; continue;
} }
} }
...@@ -906,7 +922,10 @@ impl Router { ...@@ -906,7 +922,10 @@ impl Router {
warn!("The worker url {} does not have http or https prefix. Please add the prefix to the url.", worker_url); warn!("The worker url {} does not have http or https prefix. Please add the prefix to the url.", worker_url);
} }
tokio::time::sleep(Duration::from_secs(self.interval_secs)).await; tokio::time::sleep(Duration::from_secs(
self.worker_startup_check_interval_secs,
))
.await;
continue; continue;
} }
} }
...@@ -1324,8 +1343,8 @@ mod tests { ...@@ -1324,8 +1343,8 @@ mod tests {
Router { Router {
workers: Arc::new(RwLock::new(workers)), workers: Arc::new(RwLock::new(workers)),
policy: Arc::new(RandomPolicy::new()), policy: Arc::new(RandomPolicy::new()),
timeout_secs: 5, worker_startup_timeout_secs: 5,
interval_secs: 1, worker_startup_check_interval_secs: 1,
dp_aware: false, dp_aware: false,
api_key: None, api_key: None,
client: Client::new(), client: Client::new(),
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment