Unverified Commit 55c0a769 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

feat: default router_event_threads to 4 (#6672)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent a110abfb
...@@ -357,9 +357,10 @@ class FrontendArgGroup(ArgGroup): ...@@ -357,9 +357,10 @@ class FrontendArgGroup(ArgGroup):
g, g,
flag_name="--router-event-threads", flag_name="--router-event-threads",
env_var="DYN_ROUTER_EVENT_THREADS", env_var="DYN_ROUTER_EVENT_THREADS",
default=1, default=4,
help=( help=(
"KV Router: Number of event processing threads. When > 1, uses a concurrent radix tree with a thread pool for higher throughput." "KV Router: Number of event processing threads. When > 1, uses a concurrent radix tree with a thread pool for higher throughput. "
"Ignored when --no-router-kv-events is set (approximate mode always uses single-threaded indexer with TTL/pruning)."
), ),
arg_type=int, arg_type=int,
) )
......
...@@ -195,7 +195,7 @@ class DynamoRouterArgGroup(ArgGroup): ...@@ -195,7 +195,7 @@ class DynamoRouterArgGroup(ArgGroup):
g, g,
flag_name="--router-event-threads", flag_name="--router-event-threads",
env_var="DYN_ROUTER_EVENT_THREADS", env_var="DYN_ROUTER_EVENT_THREADS",
default=1, default=4,
help="KV Router: Number of event processing threads. >1 uses concurrent radix tree and thread pool for higher throughput.", help="KV Router: Number of event processing threads. >1 uses concurrent radix tree and thread pool for higher throughput. Ignored when --no-router-kv-events is set (approximate mode always uses single-threaded indexer with TTL/pruning).",
arg_type=int, arg_type=int,
) )
...@@ -176,7 +176,7 @@ The main KV-aware routing arguments (frontend uses the same `--router-*` flag na ...@@ -176,7 +176,7 @@ The main KV-aware routing arguments (frontend uses the same `--router-*` flag na
- `--router-prune-target-ratio`: Target size ratio to prune down to when `--router-max-tree-size` is exceeded. For example, with a value of 0.8 (default) and max tree size of 1048576, the router will prune down to approximately 838860 blocks when the threshold is exceeded. Defaults to 0.8 when `--no-router-kv-events` is used. This creates headroom before the next pruning cycle. - `--router-prune-target-ratio`: Target size ratio to prune down to when `--router-max-tree-size` is exceeded. For example, with a value of 0.8 (default) and max tree size of 1048576, the router will prune down to approximately 838860 blocks when the threshold is exceeded. Defaults to 0.8 when `--no-router-kv-events` is used. This creates headroom before the next pruning cycle.
- `--router-event-threads`: Number of event processing threads for the KV indexer. When set to 1 (default), the router uses a single-threaded radix tree with channel-based event processing, which supports TTL-based expiration and pruning. When set to a value greater than 1, the router uses a concurrent radix tree with a thread pool of the specified size for higher event throughput. Note: the concurrent indexer does not support TTL/pruning (`--router-ttl-secs`, `--router-max-tree-size`, `--router-prune-target-ratio` are ignored when `--router-event-threads > 1`). Can be set via `DYN_ROUTER_EVENT_THREADS` env var. For details on the underlying index data structures (`RadixTree`, `ConcurrentRadixTree`, `PositionalIndexer`) and their concurrency model (inline reads, sticky-routed writes via thread pool), see the [KV Router Index documentation](../../../../lib/kv-router/README.md). - `--router-event-threads`: Number of event processing threads for the KV indexer (default: 4). When set to 1, the router uses a single-threaded radix tree with channel-based event processing. When set to a value greater than 1 (the default), the router uses a concurrent radix tree with a thread pool of the specified size for higher event throughput. This setting only applies when KV events are enabled (the default). When `--no-router-kv-events` is set (approximate mode), the router always uses a single-threaded indexer with TTL-based expiration and pruning regardless of this setting. Can be set via `DYN_ROUTER_EVENT_THREADS` env var. For details on the underlying index data structures (`RadixTree`, `ConcurrentRadixTree`, `PositionalIndexer`) and their concurrency model (inline reads, sticky-routed writes via thread pool), see the [KV Router Index documentation](../../../../lib/kv-router/README.md).
<Note> <Note>
......
...@@ -131,9 +131,9 @@ The KVIndexer has a method `find_matches_for_request`, which takes in tokens and ...@@ -131,9 +131,9 @@ The KVIndexer has a method `find_matches_for_request`, which takes in tokens and
The KVIndexer supports two backend implementations, selected via `--router-event-threads`: The KVIndexer supports two backend implementations, selected via `--router-event-threads`:
- **Single-threaded RadixTree** (default, `--router-event-threads 1`): Events are processed in a dedicated single-threaded tokio runtime via channel-based dispatch. Supports TTL-based expiration and size-based pruning (for `--no-kv-events` approximate mode). - **Single-threaded RadixTree** (`--router-event-threads 1`): Events are processed in a dedicated single-threaded tokio runtime via channel-based dispatch. Supports TTL-based expiration and size-based pruning (for `--no-kv-events` approximate mode).
- **ConcurrentRadixTree** (`--router-event-threads N` where N > 1): A thread-safe radix tree with a pool of N worker threads for event processing. Uses sticky worker routing (events for the same worker always go to the same thread) to ensure per-worker event serialization. Read operations (`find_matches`) execute concurrently with writes. Does not support TTL/pruning. - **ConcurrentRadixTree** (default, `--router-event-threads N` where N > 1): A thread-safe radix tree with a pool of N worker threads for event processing (default: 4). Uses sticky worker routing (events for the same worker always go to the same thread) to ensure per-worker event serialization. Read operations (`find_matches`) execute concurrently with writes. Does not support TTL/pruning.
### Inter-Router Communication ### Inter-Router Communication
......
...@@ -52,7 +52,7 @@ impl KvRouterConfig { ...@@ -52,7 +52,7 @@ impl KvRouterConfig {
#[pymethods] #[pymethods]
impl KvRouterConfig { impl KvRouterConfig {
#[new] #[new]
#[pyo3(signature = (overlap_score_weight=1.0, router_temperature=0.0, use_kv_events=true, durable_kv_events=false, router_replica_sync=false, router_track_active_blocks=true, router_track_output_blocks=false, router_assume_kv_reuse=true, router_snapshot_threshold=1000000, router_reset_states=false, router_ttl_secs=120.0, router_max_tree_size=1048576, router_prune_target_ratio=0.8, router_queue_threshold=None, router_event_threads=1, router_enable_cache_control=false))] #[pyo3(signature = (overlap_score_weight=1.0, router_temperature=0.0, use_kv_events=true, durable_kv_events=false, router_replica_sync=false, router_track_active_blocks=true, router_track_output_blocks=false, router_assume_kv_reuse=true, router_snapshot_threshold=1000000, router_reset_states=false, router_ttl_secs=120.0, router_max_tree_size=1048576, router_prune_target_ratio=0.8, router_queue_threshold=None, router_event_threads=4, router_enable_cache_control=false))]
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn new( fn new(
overlap_score_weight: f64, overlap_score_weight: f64,
......
...@@ -981,7 +981,7 @@ class KvRouterConfig: ...@@ -981,7 +981,7 @@ class KvRouterConfig:
router_max_tree_size: int = 1048576, router_max_tree_size: int = 1048576,
router_prune_target_ratio: float = 0.8, router_prune_target_ratio: float = 0.8,
router_queue_threshold: Optional[float] = None, router_queue_threshold: Optional[float] = None,
router_event_threads: int = 1, router_event_threads: int = 4,
) -> None: ) -> None:
""" """
Create a KV router configuration. Create a KV router configuration.
...@@ -1010,7 +1010,7 @@ class KvRouterConfig: ...@@ -1010,7 +1010,7 @@ class KvRouterConfig:
When set, requests are queued if all workers exceed this fraction of When set, requests are queued if all workers exceed this fraction of
max_num_batched_tokens. Enables priority scheduling via latency_sensitivity hints. max_num_batched_tokens. Enables priority scheduling via latency_sensitivity hints.
If None, queueing is disabled and all requests go directly to the scheduler. If None, queueing is disabled and all requests go directly to the scheduler.
router_event_threads: Number of event processing threads (default: 1). router_event_threads: Number of event processing threads (default: 4).
When > 1, uses a concurrent radix tree with a thread pool. When > 1, uses a concurrent radix tree with a thread pool.
""" """
... ...
......
...@@ -155,6 +155,26 @@ impl Indexer { ...@@ -155,6 +155,26 @@ impl Indexer {
return Indexer::None; return Indexer::None;
} }
// Approximate mode (--no-kv-events): always use single-threaded KvIndexer
// with TTL/pruning regardless of event_threads, since updates come from
// routing decisions only, not live KV events from workers.
if !kv_router_config.use_kv_events {
let kv_indexer_metrics = indexer::KvIndexerMetrics::from_component(component);
let cancellation_token = component.drt().primary_token();
let prune_config = Some(PruneConfig {
ttl: Duration::from_secs_f64(kv_router_config.router_ttl_secs),
max_tree_size: kv_router_config.router_max_tree_size,
prune_target_ratio: kv_router_config.router_prune_target_ratio,
});
return Indexer::KvIndexer(KvIndexer::new_with_frequency(
cancellation_token,
None,
block_size,
kv_indexer_metrics,
prune_config,
));
}
if kv_router_config.router_event_threads > 1 { if kv_router_config.router_event_threads > 1 {
return Indexer::Concurrent(Arc::new(ThreadPoolIndexer::new( return Indexer::Concurrent(Arc::new(ThreadPoolIndexer::new(
ConcurrentRadixTree::new(), ConcurrentRadixTree::new(),
...@@ -166,23 +186,12 @@ impl Indexer { ...@@ -166,23 +186,12 @@ impl Indexer {
let kv_indexer_metrics = indexer::KvIndexerMetrics::from_component(component); let kv_indexer_metrics = indexer::KvIndexerMetrics::from_component(component);
let cancellation_token = component.drt().primary_token(); let cancellation_token = component.drt().primary_token();
// If use_kv_events is false, enable TTL and pruning for approximate behavior
let prune_config = if !kv_router_config.use_kv_events {
Some(PruneConfig {
ttl: Duration::from_secs_f64(kv_router_config.router_ttl_secs),
max_tree_size: kv_router_config.router_max_tree_size,
prune_target_ratio: kv_router_config.router_prune_target_ratio,
})
} else {
None
};
Indexer::KvIndexer(KvIndexer::new_with_frequency( Indexer::KvIndexer(KvIndexer::new_with_frequency(
cancellation_token, cancellation_token,
None, // expiration_duration for frequency tracking None, // expiration_duration for frequency tracking
block_size, block_size,
kv_indexer_metrics, kv_indexer_metrics,
prune_config, None,
)) ))
} }
......
...@@ -82,7 +82,7 @@ pub struct KvRouterConfig { ...@@ -82,7 +82,7 @@ pub struct KvRouterConfig {
/// Number of event processing threads for the KV indexer. /// Number of event processing threads for the KV indexer.
/// When > 1, uses ConcurrentRadixTree with a thread pool instead of the /// When > 1, uses ConcurrentRadixTree with a thread pool instead of the
/// single-threaded RadixTree. Default: 1. /// single-threaded RadixTree. Default: 4.
#[validate(range(min = 1))] #[validate(range(min = 1))]
pub router_event_threads: u32, pub router_event_threads: u32,
...@@ -110,7 +110,7 @@ impl Default for KvRouterConfig { ...@@ -110,7 +110,7 @@ impl Default for KvRouterConfig {
router_max_tree_size: 2usize.pow(20), // 2^20 = 1048576, matches PruneConfig::default() router_max_tree_size: 2usize.pow(20), // 2^20 = 1048576, matches PruneConfig::default()
router_prune_target_ratio: 0.8, router_prune_target_ratio: 0.8,
router_queue_threshold: None, router_queue_threshold: None,
router_event_threads: 1, router_event_threads: 4,
router_enable_cache_control: false, router_enable_cache_control: false,
} }
} }
...@@ -128,11 +128,6 @@ fn validate_kv_router_config(config: &KvRouterConfig) -> Result<(), ValidationEr ...@@ -128,11 +128,6 @@ fn validate_kv_router_config(config: &KvRouterConfig) -> Result<(), ValidationEr
"durable_kv_events requires use_kv_events=true", "durable_kv_events requires use_kv_events=true",
)); ));
} }
if !config.use_kv_events && config.router_event_threads > 1 {
return Err(ValidationError::new(
"router_event_threads > 1 requires use_kv_events=true",
));
}
if config.router_track_output_blocks && !config.router_track_active_blocks { if config.router_track_output_blocks && !config.router_track_active_blocks {
return Err(ValidationError::new( return Err(ValidationError::new(
"router_track_output_blocks requires router_track_active_blocks=true", "router_track_output_blocks requires router_track_active_blocks=true",
......
...@@ -1353,7 +1353,7 @@ def _test_router_indexers_sync( ...@@ -1353,7 +1353,7 @@ def _test_router_indexers_sync(
test_nats_interruption: bool = False, test_nats_interruption: bool = False,
nats_server: Optional["NatsServer"] = None, nats_server: Optional["NatsServer"] = None,
durable_kv_events: bool = False, durable_kv_events: bool = False,
router_event_threads: int = 1, router_event_threads: int = 4,
): ):
"""Test that two KV routers have synchronized indexer states after processing requests. """Test that two KV routers have synchronized indexer states after processing requests.
...@@ -1920,7 +1920,7 @@ def _test_router_decisions( ...@@ -1920,7 +1920,7 @@ def _test_router_decisions(
block_size: int = 8, block_size: int = 8,
use_kv_events: bool = True, use_kv_events: bool = True,
durable_kv_events: bool = False, durable_kv_events: bool = False,
router_event_threads: int = 1, router_event_threads: int = 4,
): ):
"""Validate cross-worker routing decisions based on longest prefix match and tree-size tiebreaking. """Validate cross-worker routing decisions based on longest prefix match and tree-size tiebreaking.
......
...@@ -548,18 +548,16 @@ def test_kv_router_bindings( ...@@ -548,18 +548,16 @@ def test_kv_router_bindings(
@pytest.mark.parametrize( @pytest.mark.parametrize(
"store_backend,durable_kv_events,request_plane,router_event_threads", "store_backend,durable_kv_events,request_plane",
[ [
("etcd", True, "nats", 1), # JetStream mode - uses JetStream ("etcd", True, "nats"), # JetStream mode - uses JetStream
("etcd", False, "tcp", 1), # NATS core mode (with gap detection) - no JetStream ("etcd", False, "tcp"), # NATS core mode (with gap detection) - no JetStream
("file", True, "nats", 1), # File backend - uses JetStream ("file", True, "nats"), # File backend - uses JetStream
("etcd", False, "tcp", 2), # NATS core mode - multi-threaded indexer
], ],
ids=[ ids=[
"jetstream", "jetstream",
"nats_core", "nats_core",
"file", "file",
"nats_core_multi_thread",
], ],
indirect=["request_plane", "durable_kv_events"], indirect=["request_plane", "durable_kv_events"],
) )
...@@ -572,7 +570,6 @@ def test_indexers_sync( ...@@ -572,7 +570,6 @@ def test_indexers_sync(
store_backend, store_backend,
durable_kv_events, durable_kv_events,
request_plane, request_plane,
router_event_threads,
): ):
""" """
Test that two KV routers have synchronized indexer states after processing requests. Test that two KV routers have synchronized indexer states after processing requests.
...@@ -625,7 +622,6 @@ def test_indexers_sync( ...@@ -625,7 +622,6 @@ def test_indexers_sync(
test_nats_interruption=not durable_kv_events, test_nats_interruption=not durable_kv_events,
nats_server=nats_process if not durable_kv_events else None, nats_server=nats_process if not durable_kv_events else None,
durable_kv_events=durable_kv_events, durable_kv_events=durable_kv_events,
router_event_threads=router_event_threads,
) )
logger.info("Indexers sync test completed successfully") logger.info("Indexers sync test completed successfully")
...@@ -670,15 +666,14 @@ def test_query_instance_id_returns_worker_and_tokens( ...@@ -670,15 +666,14 @@ def test_query_instance_id_returns_worker_and_tokens(
@pytest.mark.timeout(90) # bumped for xdist contention (was 29s; ~9.55s serial avg) @pytest.mark.timeout(90) # bumped for xdist contention (was 29s; ~9.55s serial avg)
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True) @pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
@pytest.mark.parametrize( @pytest.mark.parametrize(
"durable_kv_events,use_kv_events,router_event_threads,zmq_kv_events", "durable_kv_events,use_kv_events,zmq_kv_events",
[ [
(True, True, 1, False), # JetStream mode with KV events (True, True, False), # JetStream mode with KV events
(False, True, 1, False), # NATS Core mode with local indexer (default) (False, True, False), # NATS Core mode with local indexer (default)
(False, False, 1, False), # Approximate mode (--no-kv-events) - no KV events (False, False, False), # Approximate mode (--no-kv-events) - no KV events
(False, True, 2, False), # NATS Core mode - multi-threaded indexer (False, True, True), # ZMQ mode: mocker → ZMQ PUB → relay → NATS
(False, True, 1, True), # ZMQ mode: mocker → ZMQ PUB → relay → NATS
], ],
ids=["jetstream", "nats_core", "no_kv_events", "nats_core_multi_thread", "zmq"], ids=["jetstream", "nats_core", "no_kv_events", "zmq"],
indirect=["durable_kv_events"], indirect=["durable_kv_events"],
) )
def test_router_decisions( def test_router_decisions(
...@@ -688,7 +683,6 @@ def test_router_decisions( ...@@ -688,7 +683,6 @@ def test_router_decisions(
durable_kv_events, durable_kv_events,
use_kv_events, use_kv_events,
request_plane, request_plane,
router_event_threads,
zmq_kv_events, zmq_kv_events,
): ):
"""Validate KV cache prefix reuse and dp_rank routing by sending progressive requests with overlapping prefixes. """Validate KV cache prefix reuse and dp_rank routing by sending progressive requests with overlapping prefixes.
...@@ -736,7 +730,6 @@ def test_router_decisions( ...@@ -736,7 +730,6 @@ def test_router_decisions(
test_dp_rank=True, test_dp_rank=True,
use_kv_events=use_kv_events, use_kv_events=use_kv_events,
durable_kv_events=durable_kv_events, durable_kv_events=durable_kv_events,
router_event_threads=router_event_threads,
) )
......
...@@ -387,18 +387,12 @@ def test_sglang_kv_router_basic( ...@@ -387,18 +387,12 @@ def test_sglang_kv_router_basic(
@pytest.mark.pre_merge @pytest.mark.pre_merge
@pytest.mark.gpu_1 @pytest.mark.gpu_1
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True) @pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
@pytest.mark.parametrize(
"router_event_threads",
[1, 2],
ids=["single_thread", "multi_thread"],
)
def test_router_decisions_sglang_multiple_workers( def test_router_decisions_sglang_multiple_workers(
request, request,
runtime_services_dynamic_ports, runtime_services_dynamic_ports,
predownload_models, predownload_models,
set_ucx_tls_no_mm, set_ucx_tls_no_mm,
request_plane, request_plane,
router_event_threads,
): ):
# runtime_services starts etcd and nats # runtime_services starts etcd and nats
logger.info("Starting SGLang router prefix reuse test with two workers") logger.info("Starting SGLang router prefix reuse test with two workers")
...@@ -425,7 +419,6 @@ def test_router_decisions_sglang_multiple_workers( ...@@ -425,7 +419,6 @@ def test_router_decisions_sglang_multiple_workers(
request, request,
test_dp_rank=False, test_dp_rank=False,
block_size=PAGE_SIZE, block_size=PAGE_SIZE,
router_event_threads=router_event_threads,
) )
......
...@@ -423,11 +423,6 @@ def test_router_decisions_trtllm_attention_dp( ...@@ -423,11 +423,6 @@ def test_router_decisions_trtllm_attention_dp(
@pytest.mark.pre_merge @pytest.mark.pre_merge
@pytest.mark.gpu_1 @pytest.mark.gpu_1
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True) @pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
@pytest.mark.parametrize(
"router_event_threads",
[1, 2],
ids=["single_thread", "multi_thread"],
)
@pytest.mark.timeout(150) # ~3x average (~45s/test), rounded up @pytest.mark.timeout(150) # ~3x average (~45s/test), rounded up
def test_router_decisions_trtllm_multiple_workers( def test_router_decisions_trtllm_multiple_workers(
request, request,
...@@ -435,7 +430,6 @@ def test_router_decisions_trtllm_multiple_workers( ...@@ -435,7 +430,6 @@ def test_router_decisions_trtllm_multiple_workers(
predownload_models, predownload_models,
set_ucx_tls_no_mm, set_ucx_tls_no_mm,
request_plane, request_plane,
router_event_threads,
): ):
# runtime_services starts etcd and nats # runtime_services starts etcd and nats
logger.info("Starting TRT-LLM router prefix reuse test with two workers") logger.info("Starting TRT-LLM router prefix reuse test with two workers")
...@@ -464,7 +458,6 @@ def test_router_decisions_trtllm_multiple_workers( ...@@ -464,7 +458,6 @@ def test_router_decisions_trtllm_multiple_workers(
request, request,
test_dp_rank=False, test_dp_rank=False,
block_size=TRTLLM_BLOCK_SIZE, block_size=TRTLLM_BLOCK_SIZE,
router_event_threads=router_event_threads,
) )
......
...@@ -415,18 +415,12 @@ def test_vllm_kv_router_basic( ...@@ -415,18 +415,12 @@ def test_vllm_kv_router_basic(
@pytest.mark.gpu_1 @pytest.mark.gpu_1
@pytest.mark.timeout(150) # ~3x average (~43s/test), rounded up @pytest.mark.timeout(150) # ~3x average (~43s/test), rounded up
@pytest.mark.parametrize("request_plane", ["tcp"], indirect=True) @pytest.mark.parametrize("request_plane", ["tcp"], indirect=True)
@pytest.mark.parametrize(
"router_event_threads",
[1, 2],
ids=["single_thread", "multi_thread"],
)
def test_router_decisions_vllm_multiple_workers( def test_router_decisions_vllm_multiple_workers(
request, request,
runtime_services_dynamic_ports, runtime_services_dynamic_ports,
predownload_models, predownload_models,
set_ucx_tls_no_mm, set_ucx_tls_no_mm,
request_plane, request_plane,
router_event_threads,
): ):
# runtime_services starts etcd and nats # runtime_services starts etcd and nats
logger.info("Starting vLLM router prefix reuse test with two workers") logger.info("Starting vLLM router prefix reuse test with two workers")
...@@ -454,7 +448,6 @@ def test_router_decisions_vllm_multiple_workers( ...@@ -454,7 +448,6 @@ def test_router_decisions_vllm_multiple_workers(
request, request,
test_dp_rank=False, test_dp_rank=False,
block_size=BLOCK_SIZE, block_size=BLOCK_SIZE,
router_event_threads=router_event_threads,
) )
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment