Unverified Commit 2eeb2751 authored by Simo Lin's avatar Simo Lin Committed by GitHub
Browse files

[router] disable rate limiter by default (#11435)

parent b36afed4
...@@ -54,8 +54,8 @@ class RouterArgs: ...@@ -54,8 +54,8 @@ class RouterArgs:
request_id_headers: Optional[List[str]] = None request_id_headers: Optional[List[str]] = None
# Request timeout in seconds # Request timeout in seconds
request_timeout_secs: int = 1800 request_timeout_secs: int = 1800
# Max concurrent requests for rate limiting # Max concurrent requests for rate limiting (-1 to disable)
max_concurrent_requests: int = 256 max_concurrent_requests: int = -1
# Queue size for pending requests when max concurrent limit reached # Queue size for pending requests when max concurrent limit reached
queue_size: int = 100 queue_size: int = 100
# Maximum time (in seconds) a request can wait in queue before timing out # Maximum time (in seconds) a request can wait in queue before timing out
...@@ -409,7 +409,7 @@ class RouterArgs: ...@@ -409,7 +409,7 @@ class RouterArgs:
f"--{prefix}max-concurrent-requests", f"--{prefix}max-concurrent-requests",
type=int, type=int,
default=RouterArgs.max_concurrent_requests, default=RouterArgs.max_concurrent_requests,
help="Maximum number of concurrent requests allowed (for rate limiting)", help="Maximum number of concurrent requests allowed (for rate limiting). Set to -1 to disable rate limiting.",
) )
parser.add_argument( parser.add_argument(
f"--{prefix}queue-size", f"--{prefix}queue-size",
......
...@@ -38,14 +38,14 @@ pub struct RouterConfig { ...@@ -38,14 +38,14 @@ pub struct RouterConfig {
pub log_level: Option<String>, pub log_level: Option<String>,
/// Custom request ID headers to check (defaults to common headers) /// Custom request ID headers to check (defaults to common headers)
pub request_id_headers: Option<Vec<String>>, pub request_id_headers: Option<Vec<String>>,
/// Maximum concurrent requests allowed (for rate limiting) /// Maximum concurrent requests allowed (for rate limiting). Set to -1 to disable rate limiting.
pub max_concurrent_requests: usize, pub max_concurrent_requests: i32,
/// Queue size for pending requests when max concurrent limit reached (0 = no queue, return 429 immediately) /// Queue size for pending requests when max concurrent limit reached (0 = no queue, return 429 immediately)
pub queue_size: usize, pub queue_size: usize,
/// Maximum time (in seconds) a request can wait in queue before timing out /// Maximum time (in seconds) a request can wait in queue before timing out
pub queue_timeout_secs: u64, pub queue_timeout_secs: u64,
/// Token bucket refill rate (tokens per second). If not set, defaults to max_concurrent_requests /// Token bucket refill rate (tokens per second). If not set, defaults to max_concurrent_requests
pub rate_limit_tokens_per_second: Option<usize>, pub rate_limit_tokens_per_second: Option<i32>,
/// CORS allowed origins /// CORS allowed origins
pub cors_allowed_origins: Vec<String>, pub cors_allowed_origins: Vec<String>,
/// Retry configuration /// Retry configuration
...@@ -436,7 +436,7 @@ impl Default for RouterConfig { ...@@ -436,7 +436,7 @@ impl Default for RouterConfig {
log_dir: None, log_dir: None,
log_level: None, log_level: None,
request_id_headers: None, request_id_headers: None,
max_concurrent_requests: 256, max_concurrent_requests: -1,
queue_size: 100, queue_size: 100,
queue_timeout_secs: 60, queue_timeout_secs: 60,
rate_limit_tokens_per_second: None, rate_limit_tokens_per_second: None,
......
...@@ -65,7 +65,7 @@ struct Router { ...@@ -65,7 +65,7 @@ struct Router {
decode_urls: Option<Vec<String>>, decode_urls: Option<Vec<String>>,
prefill_policy: Option<PolicyType>, prefill_policy: Option<PolicyType>,
decode_policy: Option<PolicyType>, decode_policy: Option<PolicyType>,
max_concurrent_requests: usize, max_concurrent_requests: i32,
cors_allowed_origins: Vec<String>, cors_allowed_origins: Vec<String>,
retry_max_retries: u32, retry_max_retries: u32,
retry_initial_backoff_ms: u64, retry_initial_backoff_ms: u64,
...@@ -86,7 +86,7 @@ struct Router { ...@@ -86,7 +86,7 @@ struct Router {
enable_igw: bool, enable_igw: bool,
queue_size: usize, queue_size: usize,
queue_timeout_secs: u64, queue_timeout_secs: u64,
rate_limit_tokens_per_second: Option<usize>, rate_limit_tokens_per_second: Option<i32>,
connection_mode: config::ConnectionMode, connection_mode: config::ConnectionMode,
model_path: Option<String>, model_path: Option<String>,
tokenizer_path: Option<String>, tokenizer_path: Option<String>,
...@@ -260,7 +260,7 @@ impl Router { ...@@ -260,7 +260,7 @@ impl Router {
decode_urls = None, decode_urls = None,
prefill_policy = None, prefill_policy = None,
decode_policy = None, decode_policy = None,
max_concurrent_requests = 256, max_concurrent_requests = -1,
cors_allowed_origins = vec![], cors_allowed_origins = vec![],
retry_max_retries = 5, retry_max_retries = 5,
retry_initial_backoff_ms = 50, retry_initial_backoff_ms = 50,
...@@ -321,7 +321,7 @@ impl Router { ...@@ -321,7 +321,7 @@ impl Router {
decode_urls: Option<Vec<String>>, decode_urls: Option<Vec<String>>,
prefill_policy: Option<PolicyType>, prefill_policy: Option<PolicyType>,
decode_policy: Option<PolicyType>, decode_policy: Option<PolicyType>,
max_concurrent_requests: usize, max_concurrent_requests: i32,
cors_allowed_origins: Vec<String>, cors_allowed_origins: Vec<String>,
retry_max_retries: u32, retry_max_retries: u32,
retry_initial_backoff_ms: u64, retry_initial_backoff_ms: u64,
...@@ -342,7 +342,7 @@ impl Router { ...@@ -342,7 +342,7 @@ impl Router {
enable_igw: bool, enable_igw: bool,
queue_size: usize, queue_size: usize,
queue_timeout_secs: u64, queue_timeout_secs: u64,
rate_limit_tokens_per_second: Option<usize>, rate_limit_tokens_per_second: Option<i32>,
model_path: Option<String>, model_path: Option<String>,
tokenizer_path: Option<String>, tokenizer_path: Option<String>,
reasoning_parser: Option<String>, reasoning_parser: Option<String>,
......
...@@ -192,8 +192,8 @@ struct CliArgs { ...@@ -192,8 +192,8 @@ struct CliArgs {
#[arg(long, default_value_t = 1800)] #[arg(long, default_value_t = 1800)]
request_timeout_secs: u64, request_timeout_secs: u64,
#[arg(long, default_value_t = 256)] #[arg(long, default_value_t = -1)]
max_concurrent_requests: usize, max_concurrent_requests: i32,
#[arg(long, num_args = 0..)] #[arg(long, num_args = 0..)]
cors_allowed_origins: Vec<String>, cors_allowed_origins: Vec<String>,
......
...@@ -424,22 +424,23 @@ pub struct ConcurrencyLimiter { ...@@ -424,22 +424,23 @@ pub struct ConcurrencyLimiter {
impl ConcurrencyLimiter { impl ConcurrencyLimiter {
/// Create new concurrency limiter with optional queue /// Create new concurrency limiter with optional queue
pub fn new( pub fn new(
token_bucket: Arc<TokenBucket>, token_bucket: Option<Arc<TokenBucket>>,
queue_size: usize, queue_size: usize,
queue_timeout: Duration, queue_timeout: Duration,
) -> (Self, Option<QueueProcessor>) { ) -> (Self, Option<QueueProcessor>) {
if queue_size > 0 { match (token_bucket, queue_size) {
let (queue_tx, queue_rx) = mpsc::channel(queue_size); (None, _) => (Self { queue_tx: None }, None),
let processor = QueueProcessor::new(token_bucket, queue_rx, queue_timeout); (Some(bucket), size) if size > 0 => {
let (queue_tx, queue_rx) = mpsc::channel(size);
let processor = QueueProcessor::new(bucket, queue_rx, queue_timeout);
( (
Self { Self {
queue_tx: Some(queue_tx), queue_tx: Some(queue_tx),
}, },
Some(processor), Some(processor),
) )
} else { }
(Self { queue_tx: None }, None) (Some(_), _) => (Self { queue_tx: None }, None),
} }
} }
} }
...@@ -450,12 +451,19 @@ pub async fn concurrency_limit_middleware( ...@@ -450,12 +451,19 @@ pub async fn concurrency_limit_middleware(
request: Request<Body>, request: Request<Body>,
next: Next, next: Next,
) -> Response { ) -> Response {
let token_bucket = match &app_state.context.rate_limiter {
Some(bucket) => bucket.clone(),
None => {
// Rate limiting disabled, pass through immediately
return next.run(request).await;
}
};
// Static counter for embeddings queue size // Static counter for embeddings queue size
static EMBEDDINGS_QUEUE_SIZE: AtomicU64 = AtomicU64::new(0); static EMBEDDINGS_QUEUE_SIZE: AtomicU64 = AtomicU64::new(0);
// Identify if this is an embeddings request based on path // Identify if this is an embeddings request based on path
let is_embeddings = request.uri().path().contains("/v1/embeddings"); let is_embeddings = request.uri().path().contains("/v1/embeddings");
let token_bucket = app_state.context.rate_limiter.clone();
// Try to acquire token immediately // Try to acquire token immediately
if token_bucket.try_acquire(1.0).await.is_ok() { if token_bucket.try_acquire(1.0).await.is_ok() {
......
...@@ -48,7 +48,7 @@ use tracing::{error, info, warn, Level}; ...@@ -48,7 +48,7 @@ use tracing::{error, info, warn, Level};
pub struct AppContext { pub struct AppContext {
pub client: Client, pub client: Client,
pub router_config: RouterConfig, pub router_config: RouterConfig,
pub rate_limiter: Arc<TokenBucket>, pub rate_limiter: Option<Arc<TokenBucket>>,
pub tokenizer: Option<Arc<dyn Tokenizer>>, pub tokenizer: Option<Arc<dyn Tokenizer>>,
pub reasoning_parser_factory: Option<ReasoningParserFactory>, pub reasoning_parser_factory: Option<ReasoningParserFactory>,
pub tool_parser_factory: Option<ToolParserFactory>, pub tool_parser_factory: Option<ToolParserFactory>,
...@@ -67,11 +67,20 @@ impl AppContext { ...@@ -67,11 +67,20 @@ impl AppContext {
pub fn new( pub fn new(
router_config: RouterConfig, router_config: RouterConfig,
client: Client, client: Client,
max_concurrent_requests: usize, max_concurrent_requests: i32,
rate_limit_tokens_per_second: Option<usize>, rate_limit_tokens_per_second: Option<i32>,
) -> Result<Self, String> { ) -> Result<Self, String> {
let rate_limit_tokens = rate_limit_tokens_per_second.unwrap_or(max_concurrent_requests); let rate_limiter = match max_concurrent_requests {
let rate_limiter = Arc::new(TokenBucket::new(max_concurrent_requests, rate_limit_tokens)); n if n <= 0 => None,
n => {
let rate_limit_tokens =
rate_limit_tokens_per_second.filter(|&t| t > 0).unwrap_or(n);
Some(Arc::new(TokenBucket::new(
n as usize,
rate_limit_tokens as usize,
)))
}
};
let (tokenizer, reasoning_parser_factory, tool_parser_factory) = let (tokenizer, reasoning_parser_factory, tool_parser_factory) =
if router_config.connection_mode == ConnectionMode::Grpc { if router_config.connection_mode == ConnectionMode::Grpc {
...@@ -916,13 +925,25 @@ pub async fn startup(config: ServerConfig) -> Result<(), Box<dyn std::error::Err ...@@ -916,13 +925,25 @@ pub async fn startup(config: ServerConfig) -> Result<(), Box<dyn std::error::Err
Duration::from_secs(config.router_config.queue_timeout_secs), Duration::from_secs(config.router_config.queue_timeout_secs),
); );
if let Some(processor) = processor { if app_context.rate_limiter.is_none() {
spawn(processor.run()); info!("Rate limiting is disabled (max_concurrent_requests = -1)");
}
match processor {
Some(proc) => {
spawn(proc.run());
info!( info!(
"Started request queue with size: {}, timeout: {}s", "Started request queue (size: {}, timeout: {}s)",
config.router_config.queue_size, config.router_config.queue_timeout_secs config.router_config.queue_size, config.router_config.queue_timeout_secs
); );
} }
None => {
info!(
"Rate limiting enabled (max_concurrent_requests = {}, queue disabled)",
config.router_config.max_concurrent_requests
);
}
}
let app_state = Arc::new(AppState { let app_state = Arc::new(AppState {
router, router,
......
...@@ -532,7 +532,7 @@ mod tests { ...@@ -532,7 +532,7 @@ mod tests {
Arc::new(AppContext { Arc::new(AppContext {
client: reqwest::Client::new(), client: reqwest::Client::new(),
router_config: router_config.clone(), router_config: router_config.clone(),
rate_limiter: Arc::new(TokenBucket::new(1000, 1000)), rate_limiter: Some(Arc::new(TokenBucket::new(1000, 1000))),
worker_registry: Arc::new(crate::core::WorkerRegistry::new()), worker_registry: Arc::new(crate::core::WorkerRegistry::new()),
policy_registry: Arc::new(crate::policies::PolicyRegistry::new( policy_registry: Arc::new(crate::policies::PolicyRegistry::new(
router_config.policy.clone(), router_config.policy.clone(),
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment