Unverified Commit 21b88460 authored by Simo Lin's avatar Simo Lin Committed by GitHub
Browse files

[router] allow more health check configuration (#9198)

parent 0c8594e6
......@@ -42,14 +42,14 @@ class RouterArgs:
policy: str = "cache_aware"
prefill_policy: Optional[str] = None # Specific policy for prefill nodes in PD mode
decode_policy: Optional[str] = None # Specific policy for decode nodes in PD mode
worker_startup_timeout_secs: int = 300
worker_startup_check_interval: int = 10
cache_threshold: float = 0.5
balance_abs_threshold: int = 32
balance_rel_threshold: float = 1.0001
eviction_interval: int = 60
max_tree_size: int = 2**24
max_payload_size: int = 256 * 1024 * 1024 # 256MB default for large batches
worker_startup_timeout_secs: int = 600
worker_startup_check_interval: int = 30
cache_threshold: float = 0.3
balance_abs_threshold: int = 64
balance_rel_threshold: float = 1.5
eviction_interval: int = 120
max_tree_size: int = 2**26
max_payload_size: int = 512 * 1024 * 1024 # 512MB default for large batches
dp_aware: bool = False
api_key: Optional[str] = None
log_dir: Optional[str] = None
......@@ -69,23 +69,29 @@ class RouterArgs:
# Request ID headers configuration
request_id_headers: Optional[List[str]] = None
# Request timeout in seconds
request_timeout_secs: int = 600
request_timeout_secs: int = 1800
# Max concurrent requests for rate limiting
max_concurrent_requests: int = 64
max_concurrent_requests: int = 256
# CORS allowed origins
cors_allowed_origins: List[str] = dataclasses.field(default_factory=list)
# Retry configuration
retry_max_retries: int = 3
retry_initial_backoff_ms: int = 100
retry_max_backoff_ms: int = 10_000
retry_backoff_multiplier: float = 2.0
retry_jitter_factor: float = 0.1
retry_max_retries: int = 5
retry_initial_backoff_ms: int = 50
retry_max_backoff_ms: int = 30_000
retry_backoff_multiplier: float = 1.5
retry_jitter_factor: float = 0.2
disable_retries: bool = False
# Health check configuration
health_failure_threshold: int = 3
health_success_threshold: int = 2
health_check_timeout_secs: int = 5
health_check_interval_secs: int = 60
health_check_endpoint: str = "/health"
# Circuit breaker configuration
cb_failure_threshold: int = 5
cb_success_threshold: int = 2
cb_timeout_duration_secs: int = 30
cb_window_duration_secs: int = 60
cb_failure_threshold: int = 10
cb_success_threshold: int = 3
cb_timeout_duration_secs: int = 60
cb_window_duration_secs: int = 120
disable_circuit_breaker: bool = False
@staticmethod
......@@ -359,6 +365,37 @@ class RouterArgs:
action="store_true",
help="Disable circuit breaker (equivalent to setting cb_failure_threshold to u32::MAX)",
)
# Health check configuration
parser.add_argument(
f"--{prefix}health-failure-threshold",
type=int,
default=RouterArgs.health_failure_threshold,
help="Number of consecutive health check failures before marking worker unhealthy",
)
parser.add_argument(
f"--{prefix}health-success-threshold",
type=int,
default=RouterArgs.health_success_threshold,
help="Number of consecutive health check successes before marking worker healthy",
)
parser.add_argument(
f"--{prefix}health-check-timeout-secs",
type=int,
default=RouterArgs.health_check_timeout_secs,
help="Timeout in seconds for health check requests",
)
parser.add_argument(
f"--{prefix}health-check-interval-secs",
type=int,
default=RouterArgs.health_check_interval_secs,
help="Interval in seconds between runtime health checks",
)
parser.add_argument(
f"--{prefix}health-check-endpoint",
type=str,
default=RouterArgs.health_check_endpoint,
help="Health check endpoint path",
)
parser.add_argument(
f"--{prefix}max-concurrent-requests",
type=int,
......@@ -455,6 +492,29 @@ class RouterArgs:
disable_circuit_breaker=getattr(
args, f"{prefix}disable_circuit_breaker", False
),
health_failure_threshold=getattr(
args,
f"{prefix}health_failure_threshold",
RouterArgs.health_failure_threshold,
),
health_success_threshold=getattr(
args,
f"{prefix}health_success_threshold",
RouterArgs.health_success_threshold,
),
health_check_timeout_secs=getattr(
args,
f"{prefix}health_check_timeout_secs",
RouterArgs.health_check_timeout_secs,
),
health_check_interval_secs=getattr(
args,
f"{prefix}health_check_interval_secs",
RouterArgs.health_check_interval_secs,
),
health_check_endpoint=getattr(
args, f"{prefix}health_check_endpoint", RouterArgs.health_check_endpoint
),
)
@staticmethod
......@@ -652,6 +712,11 @@ def launch_router(args: argparse.Namespace) -> Optional[Router]:
cb_window_duration_secs=router_args.cb_window_duration_secs,
disable_retries=router_args.disable_retries,
disable_circuit_breaker=router_args.disable_circuit_breaker,
health_failure_threshold=router_args.health_failure_threshold,
health_success_threshold=router_args.health_success_threshold,
health_check_timeout_secs=router_args.health_check_timeout_secs,
health_check_interval_secs=router_args.health_check_interval_secs,
health_check_endpoint=router_args.health_check_endpoint,
)
router.start()
......
......@@ -66,6 +66,11 @@ class Router:
request_timeout_secs: Request timeout in seconds. Default: 600
max_concurrent_requests: Maximum number of concurrent requests allowed for rate limiting. Default: 64
cors_allowed_origins: List of allowed origins for CORS. Empty list allows all origins. Default: []
health_failure_threshold: Number of consecutive health check failures before marking worker unhealthy. Default: 3
health_success_threshold: Number of consecutive health check successes before marking worker healthy. Default: 2
health_check_timeout_secs: Timeout in seconds for health check requests. Default: 5
health_check_interval_secs: Interval in seconds between runtime health checks. Default: 60
health_check_endpoint: Health check endpoint path. Default: '/health'
"""
def __init__(
......@@ -74,14 +79,14 @@ class Router:
policy: PolicyType = PolicyType.RoundRobin,
host: str = "127.0.0.1",
port: int = 3001,
worker_startup_timeout_secs: int = 300,
worker_startup_check_interval: int = 10,
cache_threshold: float = 0.50,
balance_abs_threshold: int = 32,
balance_rel_threshold: float = 1.0001,
eviction_interval_secs: int = 60,
max_tree_size: int = 2**24,
max_payload_size: int = 256 * 1024 * 1024, # 256MB
worker_startup_timeout_secs: int = 600,
worker_startup_check_interval: int = 30,
cache_threshold: float = 0.3,
balance_abs_threshold: int = 64,
balance_rel_threshold: float = 1.5,
eviction_interval_secs: int = 120,
max_tree_size: int = 2**26,
max_payload_size: int = 512 * 1024 * 1024, # 512MB
dp_aware: bool = False,
api_key: Optional[str] = None,
log_dir: Optional[str] = None,
......@@ -95,26 +100,31 @@ class Router:
bootstrap_port_annotation: str = "sglang.ai/bootstrap-port",
prometheus_port: Optional[int] = None,
prometheus_host: Optional[str] = None,
request_timeout_secs: int = 600,
request_timeout_secs: int = 1800,
request_id_headers: Optional[List[str]] = None,
pd_disaggregation: bool = False,
prefill_urls: Optional[List[tuple]] = None,
decode_urls: Optional[List[str]] = None,
prefill_policy: Optional[PolicyType] = None,
decode_policy: Optional[PolicyType] = None,
max_concurrent_requests: int = 64,
max_concurrent_requests: int = 256,
cors_allowed_origins: List[str] = None,
retry_max_retries: int = 3,
retry_initial_backoff_ms: int = 100,
retry_max_backoff_ms: int = 10_000,
retry_backoff_multiplier: float = 2.0,
retry_jitter_factor: float = 0.1,
cb_failure_threshold: int = 5,
cb_success_threshold: int = 2,
cb_timeout_duration_secs: int = 30,
cb_window_duration_secs: int = 60,
retry_max_retries: int = 5,
retry_initial_backoff_ms: int = 50,
retry_max_backoff_ms: int = 30_000,
retry_backoff_multiplier: float = 1.5,
retry_jitter_factor: float = 0.2,
cb_failure_threshold: int = 10,
cb_success_threshold: int = 3,
cb_timeout_duration_secs: int = 60,
cb_window_duration_secs: int = 120,
disable_retries: bool = False,
disable_circuit_breaker: bool = False,
health_failure_threshold: int = 3,
health_success_threshold: int = 2,
health_check_timeout_secs: int = 5,
health_check_interval_secs: int = 60,
health_check_endpoint: str = "/health",
):
if selector is None:
selector = {}
......@@ -171,6 +181,11 @@ class Router:
cb_window_duration_secs=cb_window_duration_secs,
disable_retries=disable_retries,
disable_circuit_breaker=disable_circuit_breaker,
health_failure_threshold=health_failure_threshold,
health_success_threshold=health_success_threshold,
health_check_timeout_secs=health_check_timeout_secs,
health_check_interval_secs=health_check_interval_secs,
health_check_endpoint=health_check_endpoint,
)
def start(self) -> None:
......
......@@ -49,6 +49,8 @@ pub struct RouterConfig {
/// Disable circuit breaker (overrides circuit_breaker.failure_threshold to u32::MAX when true)
#[serde(default)]
pub disable_circuit_breaker: bool,
/// Health check configuration
pub health_check: HealthCheckConfig,
}
/// Routing mode configuration
......@@ -183,7 +185,7 @@ impl Default for DiscoveryConfig {
enabled: false,
namespace: None,
port: 8000,
check_interval_secs: 60,
check_interval_secs: 120,
selector: HashMap::new(),
prefill_selector: HashMap::new(),
decode_selector: HashMap::new(),
......@@ -212,17 +214,44 @@ pub struct RetryConfig {
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_retries: 3,
initial_backoff_ms: 100,
max_backoff_ms: 10000,
backoff_multiplier: 2.0,
jitter_factor: 0.1,
max_retries: 5,
initial_backoff_ms: 50,
max_backoff_ms: 30000,
backoff_multiplier: 1.5,
jitter_factor: 0.2,
}
}
}
fn default_retry_jitter_factor() -> f32 {
0.1
0.2
}
/// Health check configuration for worker monitoring
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthCheckConfig {
/// Number of consecutive failures before marking unhealthy
pub failure_threshold: u32,
/// Number of consecutive successes before marking healthy
pub success_threshold: u32,
/// Timeout for health check requests in seconds
pub timeout_secs: u64,
/// Interval between health checks in seconds
pub check_interval_secs: u64,
/// Health check endpoint path
pub endpoint: String,
}
impl Default for HealthCheckConfig {
fn default() -> Self {
Self {
failure_threshold: 3,
success_threshold: 2,
timeout_secs: 5,
check_interval_secs: 60,
endpoint: "/health".to_string(),
}
}
}
/// Circuit breaker configuration for worker reliability
......@@ -241,10 +270,10 @@ pub struct CircuitBreakerConfig {
impl Default for CircuitBreakerConfig {
fn default() -> Self {
Self {
failure_threshold: 5,
success_threshold: 2,
timeout_duration_secs: 30,
window_duration_secs: 60,
failure_threshold: 10,
success_threshold: 3,
timeout_duration_secs: 60,
window_duration_secs: 120,
}
}
}
......@@ -276,10 +305,10 @@ impl Default for RouterConfig {
policy: PolicyConfig::Random,
host: "127.0.0.1".to_string(),
port: 3001,
max_payload_size: 268_435_456, // 256MB
request_timeout_secs: 3600, // 1 hour to match Python mini LB
worker_startup_timeout_secs: 300,
worker_startup_check_interval_secs: 10,
max_payload_size: 536_870_912, // 512MB
request_timeout_secs: 1800, // 30 minutes
worker_startup_timeout_secs: 600,
worker_startup_check_interval_secs: 30,
dp_aware: false,
api_key: None,
discovery: None,
......@@ -287,12 +316,13 @@ impl Default for RouterConfig {
log_dir: None,
log_level: None,
request_id_headers: None,
max_concurrent_requests: 64,
max_concurrent_requests: 256,
cors_allowed_origins: vec![],
retry: RetryConfig::default(),
circuit_breaker: CircuitBreakerConfig::default(),
disable_retries: false,
disable_circuit_breaker: false,
health_check: HealthCheckConfig::default(),
}
}
}
......@@ -365,10 +395,10 @@ mod tests {
assert!(matches!(config.policy, PolicyConfig::Random));
assert_eq!(config.host, "127.0.0.1");
assert_eq!(config.port, 3001);
assert_eq!(config.max_payload_size, 268_435_456);
assert_eq!(config.request_timeout_secs, 3600);
assert_eq!(config.worker_startup_timeout_secs, 300);
assert_eq!(config.worker_startup_check_interval_secs, 10);
assert_eq!(config.max_payload_size, 536_870_912);
assert_eq!(config.request_timeout_secs, 1800);
assert_eq!(config.worker_startup_timeout_secs, 600);
assert_eq!(config.worker_startup_check_interval_secs, 30);
assert!(config.discovery.is_none());
assert!(config.metrics.is_none());
assert!(config.log_dir.is_none());
......@@ -425,6 +455,7 @@ mod tests {
circuit_breaker: CircuitBreakerConfig::default(),
disable_retries: false,
disable_circuit_breaker: false,
health_check: HealthCheckConfig::default(),
};
let json = serde_json::to_string(&config).unwrap();
......@@ -614,7 +645,7 @@ mod tests {
assert!(!config.enabled);
assert!(config.namespace.is_none());
assert_eq!(config.port, 8000);
assert_eq!(config.check_interval_secs, 60);
assert_eq!(config.check_interval_secs, 120);
assert!(config.selector.is_empty());
assert!(config.prefill_selector.is_empty());
assert!(config.decode_selector.is_empty());
......@@ -856,6 +887,7 @@ mod tests {
circuit_breaker: CircuitBreakerConfig::default(),
disable_retries: false,
disable_circuit_breaker: false,
health_check: HealthCheckConfig::default(),
};
assert!(config.mode.is_pd_mode());
......@@ -911,6 +943,7 @@ mod tests {
circuit_breaker: CircuitBreakerConfig::default(),
disable_retries: false,
disable_circuit_breaker: false,
health_check: HealthCheckConfig::default(),
};
assert!(!config.mode.is_pd_mode());
......@@ -962,6 +995,7 @@ mod tests {
circuit_breaker: CircuitBreakerConfig::default(),
disable_retries: false,
disable_circuit_breaker: false,
health_check: HealthCheckConfig::default(),
};
assert!(config.has_service_discovery());
......
......@@ -18,6 +18,6 @@ pub use circuit_breaker::{
pub use error::{WorkerError, WorkerResult};
pub use retry::{is_retryable_status, BackoffCalculator, RetryError, RetryExecutor};
pub use worker::{
start_health_checker, BasicWorker, DPAwareWorker, HealthChecker, Worker, WorkerCollection,
WorkerFactory, WorkerLoadGuard, WorkerType,
start_health_checker, BasicWorker, DPAwareWorker, HealthChecker, HealthConfig, Worker,
WorkerCollection, WorkerFactory, WorkerLoadGuard, WorkerType,
};
......@@ -182,6 +182,10 @@ pub struct HealthConfig {
pub check_interval_secs: u64,
/// Health check endpoint path
pub endpoint: String,
/// Number of consecutive failures before marking unhealthy
pub failure_threshold: u32,
/// Number of consecutive successes before marking healthy
pub success_threshold: u32,
}
impl Default for HealthConfig {
......@@ -190,6 +194,8 @@ impl Default for HealthConfig {
timeout_secs: 5,
check_interval_secs: 30,
endpoint: "/health".to_string(),
failure_threshold: 3,
success_threshold: 2,
}
}
}
......@@ -214,6 +220,8 @@ pub struct BasicWorker {
load_counter: Arc<AtomicUsize>,
processed_counter: Arc<AtomicUsize>,
healthy: Arc<AtomicBool>,
consecutive_failures: Arc<AtomicUsize>,
consecutive_successes: Arc<AtomicUsize>,
circuit_breaker: CircuitBreaker,
}
......@@ -231,6 +239,8 @@ impl BasicWorker {
load_counter: Arc::new(AtomicUsize::new(0)),
processed_counter: Arc::new(AtomicUsize::new(0)),
healthy: Arc::new(AtomicBool::new(true)),
consecutive_failures: Arc::new(AtomicUsize::new(0)),
consecutive_successes: Arc::new(AtomicUsize::new(0)),
circuit_breaker: CircuitBreaker::new(),
}
}
......@@ -300,26 +310,47 @@ impl Worker for BasicWorker {
let timeout = Duration::from_secs(self.metadata.health_config.timeout_secs);
// Use the shared client with a custom timeout for this request
match WORKER_CLIENT.get(&health_url).timeout(timeout).send().await {
let health_result = match WORKER_CLIENT.get(&health_url).timeout(timeout).send().await {
Ok(response) => {
if response.status().is_success() {
self.set_healthy(true);
Ok(())
true
} else {
self.set_healthy(false);
Err(WorkerError::HealthCheckFailed {
url: url.to_string(),
reason: format!("Health check returned status: {}", response.status()),
})
false
}
}
Err(e) => {
Err(_) => false,
};
if health_result {
// Health check succeeded
self.consecutive_failures.store(0, Ordering::Release);
let successes = self.consecutive_successes.fetch_add(1, Ordering::AcqRel) + 1;
// Mark healthy if we've reached the success threshold
if !self.is_healthy()
&& successes >= self.metadata.health_config.success_threshold as usize
{
self.set_healthy(true);
self.consecutive_successes.store(0, Ordering::Release);
}
Ok(())
} else {
// Health check failed
self.consecutive_successes.store(0, Ordering::Release);
let failures = self.consecutive_failures.fetch_add(1, Ordering::AcqRel) + 1;
// Mark unhealthy if we've reached the failure threshold
if self.is_healthy()
&& failures >= self.metadata.health_config.failure_threshold as usize
{
self.set_healthy(false);
Err(WorkerError::HealthCheckFailed {
url: url.to_string(),
reason: format!("Health check request failed: {}", e),
})
self.consecutive_failures.store(0, Ordering::Release);
}
Err(WorkerError::HealthCheckFailed {
url: url.to_string(),
reason: format!("Health check failed (consecutive failures: {})", failures),
})
}
}
......@@ -408,43 +439,8 @@ impl Worker for DPAwareWorker {
}
async fn check_health_async(&self) -> WorkerResult<()> {
// Use base URL for health checks
let health_url = format!("{}/health", self.base_url);
let timeout =
std::time::Duration::from_secs(self.base_worker.metadata.health_config.timeout_secs);
let health_result = async {
let response = WORKER_CLIENT
.get(&health_url)
.timeout(timeout)
.send()
.await
.map_err(|e| format!("Health check request failed: {}", e))?;
if response.status().is_success() {
Ok(())
} else {
Err(format!(
"Health check returned status: {}",
response.status()
))
}
}
.await;
match health_result {
Ok(()) => {
self.set_healthy(true);
Ok(())
}
Err(reason) => {
self.set_healthy(false);
Err(WorkerError::HealthCheckFailed {
url: self.base_url.clone(),
reason,
})
}
}
// Delegate to the base worker's health check logic
self.base_worker.check_health_async().await
}
fn load(&self) -> usize {
......@@ -951,6 +947,8 @@ mod tests {
assert_eq!(config.timeout_secs, 5);
assert_eq!(config.check_interval_secs, 30);
assert_eq!(config.endpoint, "/health");
assert_eq!(config.failure_threshold, 3);
assert_eq!(config.success_threshold, 2);
}
#[test]
......@@ -959,10 +957,14 @@ mod tests {
timeout_secs: 10,
check_interval_secs: 60,
endpoint: "/healthz".to_string(),
failure_threshold: 5,
success_threshold: 3,
};
assert_eq!(config.timeout_secs, 10);
assert_eq!(config.check_interval_secs, 60);
assert_eq!(config.endpoint, "/healthz");
assert_eq!(config.failure_threshold, 5);
assert_eq!(config.success_threshold, 3);
}
// Test BasicWorker
......@@ -994,6 +996,8 @@ mod tests {
timeout_secs: 15,
check_interval_secs: 45,
endpoint: "/custom-health".to_string(),
failure_threshold: 4,
success_threshold: 2,
};
let worker = BasicWorker::new("http://test:8080".to_string(), WorkerType::Regular)
......
......@@ -72,6 +72,12 @@ struct Router {
cb_timeout_duration_secs: u64,
cb_window_duration_secs: u64,
disable_circuit_breaker: bool,
// Health check configuration
health_failure_threshold: u32,
health_success_threshold: u32,
health_check_timeout_secs: u64,
health_check_interval_secs: u64,
health_check_endpoint: String,
}
impl Router {
......@@ -174,6 +180,13 @@ impl Router {
},
disable_retries: false,
disable_circuit_breaker: false,
health_check: config::HealthCheckConfig {
failure_threshold: self.health_failure_threshold,
success_threshold: self.health_success_threshold,
timeout_secs: self.health_check_timeout_secs,
check_interval_secs: self.health_check_interval_secs,
endpoint: self.health_check_endpoint.clone(),
},
})
}
}
......@@ -186,14 +199,14 @@ impl Router {
policy = PolicyType::RoundRobin,
host = String::from("127.0.0.1"),
port = 3001,
worker_startup_timeout_secs = 300,
worker_startup_check_interval = 10,
cache_threshold = 0.50,
balance_abs_threshold = 32,
balance_rel_threshold = 1.0001,
eviction_interval_secs = 60,
max_tree_size = 2usize.pow(24),
max_payload_size = 256 * 1024 * 1024, // 256MB default for large batches
worker_startup_timeout_secs = 600,
worker_startup_check_interval = 30,
cache_threshold = 0.3,
balance_abs_threshold = 64,
balance_rel_threshold = 1.5,
eviction_interval_secs = 120,
max_tree_size = 2usize.pow(26),
max_payload_size = 512 * 1024 * 1024, // 512MB default for large batches
dp_aware = false,
api_key = None,
log_dir = None,
......@@ -207,28 +220,34 @@ impl Router {
bootstrap_port_annotation = String::from("sglang.ai/bootstrap-port"),
prometheus_port = None,
prometheus_host = None,
request_timeout_secs = 600, // Add configurable request timeout
request_timeout_secs = 1800, // Add configurable request timeout
request_id_headers = None, // Custom request ID headers
pd_disaggregation = false, // New flag for PD mode
prefill_urls = None,
decode_urls = None,
prefill_policy = None,
decode_policy = None,
max_concurrent_requests = 64,
max_concurrent_requests = 256,
cors_allowed_origins = vec![],
// Retry defaults
retry_max_retries = 3,
retry_initial_backoff_ms = 100,
retry_max_backoff_ms = 10_000,
retry_backoff_multiplier = 2.0,
retry_jitter_factor = 0.1,
retry_max_retries = 5,
retry_initial_backoff_ms = 50,
retry_max_backoff_ms = 30_000,
retry_backoff_multiplier = 1.5,
retry_jitter_factor = 0.2,
disable_retries = false,
// Circuit breaker defaults
cb_failure_threshold = 5,
cb_success_threshold = 2,
cb_timeout_duration_secs = 30,
cb_window_duration_secs = 60,
cb_failure_threshold = 10,
cb_success_threshold = 3,
cb_timeout_duration_secs = 60,
cb_window_duration_secs = 120,
disable_circuit_breaker = false,
// Health check defaults
health_failure_threshold = 3,
health_success_threshold = 2,
health_check_timeout_secs = 5,
health_check_interval_secs = 60,
health_check_endpoint = String::from("/health"),
))]
fn new(
worker_urls: Vec<String>,
......@@ -276,6 +295,11 @@ impl Router {
cb_timeout_duration_secs: u64,
cb_window_duration_secs: u64,
disable_circuit_breaker: bool,
health_failure_threshold: u32,
health_success_threshold: u32,
health_check_timeout_secs: u64,
health_check_interval_secs: u64,
health_check_endpoint: String,
) -> PyResult<Self> {
Ok(Router {
host,
......@@ -323,6 +347,11 @@ impl Router {
cb_timeout_duration_secs,
cb_window_duration_secs,
disable_circuit_breaker,
health_failure_threshold,
health_success_threshold,
health_check_timeout_secs,
health_check_interval_secs,
health_check_endpoint,
})
}
......
use clap::{ArgAction, Parser};
use sglang_router_rs::config::{
CircuitBreakerConfig, ConfigError, ConfigResult, DiscoveryConfig, MetricsConfig, PolicyConfig,
RetryConfig, RouterConfig, RoutingMode,
CircuitBreakerConfig, ConfigError, ConfigResult, DiscoveryConfig, HealthCheckConfig,
MetricsConfig, PolicyConfig, RetryConfig, RouterConfig, RoutingMode,
};
use sglang_router_rs::metrics::PrometheusConfig;
use sglang_router_rs::server::{self, ServerConfig};
......@@ -105,35 +105,35 @@ struct CliArgs {
decode_policy: Option<String>,
/// Timeout in seconds for worker startup
#[arg(long, default_value_t = 300)]
#[arg(long, default_value_t = 600)]
worker_startup_timeout_secs: u64,
/// Interval in seconds between checks for worker startup
#[arg(long, default_value_t = 10)]
#[arg(long, default_value_t = 30)]
worker_startup_check_interval: u64,
/// Cache threshold (0.0-1.0) for cache-aware routing
#[arg(long, default_value_t = 0.5)]
#[arg(long, default_value_t = 0.3)]
cache_threshold: f32,
/// Absolute threshold for load balancing
#[arg(long, default_value_t = 32)]
#[arg(long, default_value_t = 64)]
balance_abs_threshold: usize,
/// Relative threshold for load balancing
#[arg(long, default_value_t = 1.0001)]
#[arg(long, default_value_t = 1.5)]
balance_rel_threshold: f32,
/// Interval in seconds between cache eviction operations
#[arg(long, default_value_t = 60)]
#[arg(long, default_value_t = 120)]
eviction_interval: u64,
/// Maximum size of the approximation tree for cache-aware routing
#[arg(long, default_value_t = 16777216)] // 2^24
#[arg(long, default_value_t = 67108864)] // 2^26
max_tree_size: usize,
/// Maximum payload size in bytes
#[arg(long, default_value_t = 268435456)] // 256MB
#[arg(long, default_value_t = 536870912)] // 512MB
max_payload_size: usize,
/// Enable data parallelism aware schedule
......@@ -189,11 +189,11 @@ struct CliArgs {
request_id_headers: Vec<String>,
/// Request timeout in seconds
#[arg(long, default_value_t = 600)]
#[arg(long, default_value_t = 1800)]
request_timeout_secs: u64,
/// Maximum number of concurrent requests allowed
#[arg(long, default_value_t = 64)]
#[arg(long, default_value_t = 256)]
max_concurrent_requests: usize,
/// CORS allowed origins
......@@ -202,23 +202,23 @@ struct CliArgs {
// Retry configuration
/// Maximum number of retries
#[arg(long, default_value_t = 3)]
#[arg(long, default_value_t = 5)]
retry_max_retries: u32,
/// Initial backoff in milliseconds for retries
#[arg(long, default_value_t = 100)]
#[arg(long, default_value_t = 50)]
retry_initial_backoff_ms: u64,
/// Maximum backoff in milliseconds for retries
#[arg(long, default_value_t = 10000)]
#[arg(long, default_value_t = 30000)]
retry_max_backoff_ms: u64,
/// Backoff multiplier for exponential backoff
#[arg(long, default_value_t = 2.0)]
#[arg(long, default_value_t = 1.5)]
retry_backoff_multiplier: f32,
/// Jitter factor for retry backoff
#[arg(long, default_value_t = 0.1)]
#[arg(long, default_value_t = 0.2)]
retry_jitter_factor: f32,
/// Disable retries
......@@ -227,24 +227,45 @@ struct CliArgs {
// Circuit breaker configuration
/// Number of failures before circuit breaker opens
#[arg(long, default_value_t = 5)]
#[arg(long, default_value_t = 10)]
cb_failure_threshold: u32,
/// Number of successes before circuit breaker closes
#[arg(long, default_value_t = 2)]
#[arg(long, default_value_t = 3)]
cb_success_threshold: u32,
/// Timeout duration in seconds for circuit breaker
#[arg(long, default_value_t = 30)]
#[arg(long, default_value_t = 60)]
cb_timeout_duration_secs: u64,
/// Window duration in seconds for circuit breaker
#[arg(long, default_value_t = 60)]
#[arg(long, default_value_t = 120)]
cb_window_duration_secs: u64,
/// Disable circuit breaker
#[arg(long, default_value_t = false)]
disable_circuit_breaker: bool,
// Health check configuration
/// Number of consecutive health check failures before marking worker unhealthy
#[arg(long, default_value_t = 3)]
health_failure_threshold: u32,
/// Number of consecutive health check successes before marking worker healthy
#[arg(long, default_value_t = 2)]
health_success_threshold: u32,
/// Timeout in seconds for health check requests
#[arg(long, default_value_t = 5)]
health_check_timeout_secs: u64,
/// Interval in seconds between runtime health checks
#[arg(long, default_value_t = 60)]
health_check_interval_secs: u64,
/// Health check endpoint path
#[arg(long, default_value = "/health")]
health_check_endpoint: String,
}
impl CliArgs {
......@@ -378,6 +399,13 @@ impl CliArgs {
},
disable_retries: self.disable_retries,
disable_circuit_breaker: self.disable_circuit_breaker,
health_check: HealthCheckConfig {
failure_threshold: self.health_failure_threshold,
success_threshold: self.health_success_threshold,
timeout_secs: self.health_check_timeout_secs,
check_interval_secs: self.health_check_interval_secs,
endpoint: self.health_check_endpoint.clone(),
},
})
}
......
......@@ -55,6 +55,7 @@ impl RouterFactory {
ctx.router_config.api_key.clone(),
ctx.router_config.retry.clone(),
ctx.router_config.circuit_breaker.clone(),
ctx.router_config.health_check.clone(),
)
.await?;
......@@ -87,6 +88,7 @@ impl RouterFactory {
ctx.router_config.worker_startup_check_interval_secs,
ctx.router_config.retry.clone(),
ctx.router_config.circuit_breaker.clone(),
ctx.router_config.health_check.clone(),
)
.await?;
......
// PD (Prefill-Decode) Router Implementation
// This module handles routing for disaggregated prefill-decode systems
use super::pd_types::{api_path, PDRouterError};
use crate::config::types::{CircuitBreakerConfig as ConfigCircuitBreakerConfig, RetryConfig};
use crate::config::types::{
CircuitBreakerConfig as ConfigCircuitBreakerConfig,
HealthCheckConfig as ConfigHealthCheckConfig, RetryConfig,
};
use crate::core::{
is_retryable_status, CircuitBreakerConfig, HealthChecker, RetryExecutor, Worker, WorkerFactory,
WorkerLoadGuard,
is_retryable_status, BasicWorker, CircuitBreakerConfig, HealthChecker, HealthConfig,
RetryExecutor, Worker, WorkerFactory, WorkerLoadGuard, WorkerType,
};
use crate::metrics::RouterMetrics;
use crate::openai_api_types::{ChatCompletionRequest, CompletionRequest, GenerateRequest};
......@@ -360,6 +363,7 @@ impl PDRouter {
interval_secs: u64,
retry_config: RetryConfig,
circuit_breaker_config: ConfigCircuitBreakerConfig,
health_check_config: ConfigHealthCheckConfig,
) -> Result<Self, String> {
// Convert config CircuitBreakerConfig to core CircuitBreakerConfig
let core_cb_config = CircuitBreakerConfig {
......@@ -369,17 +373,42 @@ impl PDRouter {
window_duration: Duration::from_secs(circuit_breaker_config.window_duration_secs),
};
// Convert URLs to Worker trait objects
// Convert URLs to Worker trait objects with health check config
let prefill_workers: Vec<Box<dyn Worker>> = prefill_urls
.into_iter()
.map(|(url, port)| {
WorkerFactory::create_prefill_with_config(url, port, core_cb_config.clone())
let worker = BasicWorker::new(
url,
WorkerType::Prefill {
bootstrap_port: port,
},
)
.with_circuit_breaker_config(core_cb_config.clone())
.with_health_config(HealthConfig {
timeout_secs: health_check_config.timeout_secs,
check_interval_secs: health_check_config.check_interval_secs,
endpoint: health_check_config.endpoint.clone(),
failure_threshold: health_check_config.failure_threshold,
success_threshold: health_check_config.success_threshold,
});
Box::new(worker) as Box<dyn Worker>
})
.collect();
let decode_workers: Vec<Box<dyn Worker>> = decode_urls
.into_iter()
.map(|url| WorkerFactory::create_decode_with_config(url, core_cb_config.clone()))
.map(|url| {
let worker = BasicWorker::new(url, WorkerType::Decode)
.with_circuit_breaker_config(core_cb_config.clone())
.with_health_config(HealthConfig {
timeout_secs: health_check_config.timeout_secs,
check_interval_secs: health_check_config.check_interval_secs,
endpoint: health_check_config.endpoint.clone(),
failure_threshold: health_check_config.failure_threshold,
success_threshold: health_check_config.success_threshold,
});
Box::new(worker) as Box<dyn Worker>
})
.collect();
// Wait for PD workers to be healthy (skip if empty - for service discovery mode)
......@@ -443,10 +472,14 @@ impl PDRouter {
let decode_workers = Arc::new(RwLock::new(decode_workers));
// Start health checkers for both worker pools
let prefill_health_checker =
crate::core::start_health_checker(Arc::clone(&prefill_workers), interval_secs);
let decode_health_checker =
crate::core::start_health_checker(Arc::clone(&decode_workers), interval_secs);
let prefill_health_checker = crate::core::start_health_checker(
Arc::clone(&prefill_workers),
health_check_config.check_interval_secs,
);
let decode_health_checker = crate::core::start_health_checker(
Arc::clone(&decode_workers),
health_check_config.check_interval_secs,
);
// Build a dedicated prefill client for fire-and-forget semantics
let prefill_client = reqwest::Client::builder()
......
use crate::config::types::{CircuitBreakerConfig as ConfigCircuitBreakerConfig, RetryConfig};
use crate::config::types::{
CircuitBreakerConfig as ConfigCircuitBreakerConfig,
HealthCheckConfig as ConfigHealthCheckConfig, RetryConfig,
};
use crate::core::{
is_retryable_status, CircuitBreakerConfig, HealthChecker, RetryExecutor, Worker, WorkerFactory,
is_retryable_status, BasicWorker, CircuitBreakerConfig, HealthChecker, HealthConfig,
RetryExecutor, Worker, WorkerFactory, WorkerType,
};
use crate::metrics::RouterMetrics;
use crate::openai_api_types::{ChatCompletionRequest, CompletionRequest, GenerateRequest};
......@@ -61,6 +65,7 @@ impl Router {
api_key: Option<String>,
retry_config: RetryConfig,
circuit_breaker_config: ConfigCircuitBreakerConfig,
health_check_config: ConfigHealthCheckConfig,
) -> Result<Self, String> {
// Update active workers gauge
RouterMetrics::set_active_workers(worker_urls.len());
......@@ -86,11 +91,20 @@ impl Router {
window_duration: Duration::from_secs(circuit_breaker_config.window_duration_secs),
};
// Create Worker trait objects from URLs
// Create Worker trait objects from URLs with health check config
let workers: Vec<Box<dyn Worker>> = worker_urls
.iter()
.map(|url| {
WorkerFactory::create_regular_with_config(url.clone(), core_cb_config.clone())
let worker = BasicWorker::new(url.clone(), WorkerType::Regular)
.with_circuit_breaker_config(core_cb_config.clone())
.with_health_config(HealthConfig {
timeout_secs: health_check_config.timeout_secs,
check_interval_secs: health_check_config.check_interval_secs,
endpoint: health_check_config.endpoint.clone(),
failure_threshold: health_check_config.failure_threshold,
success_threshold: health_check_config.success_threshold,
});
Box::new(worker) as Box<dyn Worker>
})
.collect();
......
......@@ -592,6 +592,7 @@ mod tests {
None,
crate::config::types::RetryConfig::default(),
crate::config::types::CircuitBreakerConfig::default(),
crate::config::types::HealthCheckConfig::default(),
)
.await
.unwrap();
......
......@@ -50,6 +50,7 @@ impl TestContext {
circuit_breaker: CircuitBreakerConfig::default(),
disable_retries: false,
disable_circuit_breaker: false,
health_check: sglang_router_rs::config::HealthCheckConfig::default(),
};
Self::new_with_config(config, worker_configs).await
......@@ -1091,6 +1092,7 @@ mod error_tests {
circuit_breaker: CircuitBreakerConfig::default(),
disable_retries: false,
disable_circuit_breaker: false,
health_check: sglang_router_rs::config::HealthCheckConfig::default(),
};
let ctx = TestContext::new_with_config(
......@@ -1441,6 +1443,7 @@ mod pd_mode_tests {
circuit_breaker: CircuitBreakerConfig::default(),
disable_retries: false,
disable_circuit_breaker: false,
health_check: sglang_router_rs::config::HealthCheckConfig::default(),
};
// Create app context
......@@ -1595,6 +1598,7 @@ mod request_id_tests {
circuit_breaker: CircuitBreakerConfig::default(),
disable_retries: false,
disable_circuit_breaker: false,
health_check: sglang_router_rs::config::HealthCheckConfig::default(),
};
let ctx = TestContext::new_with_config(
......
......@@ -41,6 +41,7 @@ impl TestContext {
circuit_breaker: CircuitBreakerConfig::default(),
disable_retries: false,
disable_circuit_breaker: false,
health_check: sglang_router_rs::config::HealthCheckConfig::default(),
};
let mut workers = Vec::new();
......
......@@ -42,6 +42,7 @@ impl TestContext {
circuit_breaker: CircuitBreakerConfig::default(),
disable_retries: false,
disable_circuit_breaker: false,
health_check: sglang_router_rs::config::HealthCheckConfig::default(),
};
let mut workers = Vec::new();
......
......@@ -184,6 +184,7 @@ mod test_pd_routing {
circuit_breaker: CircuitBreakerConfig::default(),
disable_retries: false,
disable_circuit_breaker: false,
health_check: sglang_router_rs::config::HealthCheckConfig::default(),
};
// Router creation will fail due to health checks, but config should be valid
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment