"...ssh:/git@developer.sourcefind.cn:2222/OpenDAS/dynamo.git" did not exist on "c8770464abcb5665343c0355e80abb6ab060bb2a"
Unverified Commit 7fdc742e authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

feat: allow router not assuming decode kv reuse (#5350)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 329752d9
...@@ -204,6 +204,13 @@ def parse_args(): ...@@ -204,6 +204,13 @@ def parse_args():
default=True, default=True,
help="KV Router: Disable tracking of active blocks (blocks being used for ongoing generation). By default, active blocks are tracked for load balancing.", help="KV Router: Disable tracking of active blocks (blocks being used for ongoing generation). By default, active blocks are tracked for load balancing.",
) )
parser.add_argument(
"--no-assume-kv-reuse",
action="store_false",
dest="router_assume_kv_reuse",
default=True,
help="KV Router: When tracking active blocks, do not assume KV cache reuse (generate random hashes instead of computing actual block hashes). Useful when KV cache reuse is not expected. By default, KV cache reuse is assumed.",
)
parser.add_argument( parser.add_argument(
"--enforce-disagg", "--enforce-disagg",
action="store_true", action="store_true",
...@@ -346,9 +353,10 @@ async def async_main(): ...@@ -346,9 +353,10 @@ async def async_main():
router_temperature=flags.router_temperature, router_temperature=flags.router_temperature,
use_kv_events=flags.use_kv_events, use_kv_events=flags.use_kv_events,
router_replica_sync=flags.router_replica_sync, router_replica_sync=flags.router_replica_sync,
router_track_active_blocks=flags.router_track_active_blocks,
router_assume_kv_reuse=flags.router_assume_kv_reuse,
router_snapshot_threshold=flags.router_snapshot_threshold, router_snapshot_threshold=flags.router_snapshot_threshold,
router_reset_states=flags.router_reset_states, router_reset_states=flags.router_reset_states,
router_track_active_blocks=flags.router_track_active_blocks,
router_ttl_secs=flags.router_ttl, router_ttl_secs=flags.router_ttl,
router_max_tree_size=flags.router_max_tree_size, router_max_tree_size=flags.router_max_tree_size,
router_prune_target_ratio=flags.router_prune_target_ratio, router_prune_target_ratio=flags.router_prune_target_ratio,
......
...@@ -31,6 +31,8 @@ The main KV-aware routing arguments: ...@@ -31,6 +31,8 @@ The main KV-aware routing arguments:
- `--no-track-active-blocks`: Disables tracking of active blocks (blocks being used for ongoing generation/decode phases). By default, the router tracks active blocks for load balancing. Disable this when routing to workers that only perform prefill (no decode phase), as tracking decode load is not relevant. This reduces router overhead and simplifies state management. - `--no-track-active-blocks`: Disables tracking of active blocks (blocks being used for ongoing generation/decode phases). By default, the router tracks active blocks for load balancing. Disable this when routing to workers that only perform prefill (no decode phase), as tracking decode load is not relevant. This reduces router overhead and simplifies state management.
- `--no-assume-kv-reuse`: When tracking active blocks, disables the assumption of KV cache reuse. By default (`router_assume_kv_reuse=true`), the router computes actual block hashes for sequence tracking to deduplicate blocks and optimize load balancing. When disabled via this flag, the router generates random hashes for sequence blocks, treating each request's blocks as unique. This is useful in disaggregated setups where prefill transfers blocks to decode workers that may already have those blocks cached, but the engine cannot coordinate transfers to avoid duplication. Without this flag, the router's load balancing heuristics would undercount decode blocks when duplicates exist.
- `--active-decode-blocks-threshold`: Initial threshold (0.0-1.0) for determining when a worker is considered busy based on KV cache block utilization. When a worker's KV cache active blocks exceed this percentage of total blocks, it will be marked as busy and excluded from routing. If not set, blocks-based busy detection is disabled. This feature works with all routing modes (`--router-mode kv|round-robin|random`) as long as backend engines emit `ForwardPassMetrics`. The threshold can be dynamically updated at runtime via the `/busy_threshold` HTTP endpoint (see [Dynamic Threshold Configuration](#dynamic-threshold-configuration)). - `--active-decode-blocks-threshold`: Initial threshold (0.0-1.0) for determining when a worker is considered busy based on KV cache block utilization. When a worker's KV cache active blocks exceed this percentage of total blocks, it will be marked as busy and excluded from routing. If not set, blocks-based busy detection is disabled. This feature works with all routing modes (`--router-mode kv|round-robin|random`) as long as backend engines emit `ForwardPassMetrics`. The threshold can be dynamically updated at runtime via the `/busy_threshold` HTTP endpoint (see [Dynamic Threshold Configuration](#dynamic-threshold-configuration)).
- `--active-prefill-tokens-threshold`: Literal token count threshold for determining when a worker is considered busy based on prefill token utilization. When active prefill tokens exceed this threshold, the worker is marked as busy. If not set, tokens-based busy detection is disabled. - `--active-prefill-tokens-threshold`: Literal token count threshold for determining when a worker is considered busy based on prefill token utilization. When active prefill tokens exceed this threshold, the worker is marked as busy. If not set, tokens-based busy detection is disabled.
......
...@@ -187,6 +187,7 @@ impl Flags { ...@@ -187,6 +187,7 @@ impl Flags {
self.router_replica_sync, self.router_replica_sync,
self.router_track_active_blocks, self.router_track_active_blocks,
// defaulting below args (no longer maintaining new flags for dynamo-run) // defaulting below args (no longer maintaining new flags for dynamo-run)
None, // assume_kv_reuse
None, None,
None, None,
None, None,
......
...@@ -465,6 +465,7 @@ pub unsafe extern "C" fn dynamo_create_worker_selection_pipeline( ...@@ -465,6 +465,7 @@ pub unsafe extern "C" fn dynamo_create_worker_selection_pipeline(
Some(use_kv_events), Some(use_kv_events),
Some(router_replica_sync), Some(router_replica_sync),
None, // track_active_blocks None, // track_active_blocks
None, // assume_kv_reuse
None, // router_snapshot_threshold None, // router_snapshot_threshold
None, // router_reset_states None, // router_reset_states
None, // router_ttl_secs None, // router_ttl_secs
......
...@@ -50,7 +50,7 @@ impl KvRouterConfig { ...@@ -50,7 +50,7 @@ impl KvRouterConfig {
#[pymethods] #[pymethods]
impl KvRouterConfig { impl KvRouterConfig {
#[new] #[new]
#[pyo3(signature = (overlap_score_weight=1.0, router_temperature=0.0, use_kv_events=true, router_replica_sync=false, router_track_active_blocks=true, router_snapshot_threshold=1000000, router_reset_states=false, router_ttl_secs=120.0, router_max_tree_size=1048576, router_prune_target_ratio=0.8))] #[pyo3(signature = (overlap_score_weight=1.0, router_temperature=0.0, use_kv_events=true, router_replica_sync=false, router_track_active_blocks=true, router_assume_kv_reuse=true, router_snapshot_threshold=1000000, router_reset_states=false, router_ttl_secs=120.0, router_max_tree_size=1048576, router_prune_target_ratio=0.8))]
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn new( fn new(
overlap_score_weight: f64, overlap_score_weight: f64,
...@@ -58,6 +58,7 @@ impl KvRouterConfig { ...@@ -58,6 +58,7 @@ impl KvRouterConfig {
use_kv_events: bool, use_kv_events: bool,
router_replica_sync: bool, router_replica_sync: bool,
router_track_active_blocks: bool, router_track_active_blocks: bool,
router_assume_kv_reuse: bool,
router_snapshot_threshold: Option<u32>, router_snapshot_threshold: Option<u32>,
router_reset_states: bool, router_reset_states: bool,
router_ttl_secs: f64, router_ttl_secs: f64,
...@@ -71,6 +72,7 @@ impl KvRouterConfig { ...@@ -71,6 +72,7 @@ impl KvRouterConfig {
use_kv_events, use_kv_events,
router_replica_sync, router_replica_sync,
router_track_active_blocks, router_track_active_blocks,
router_assume_kv_reuse,
router_snapshot_threshold, router_snapshot_threshold,
router_reset_states, router_reset_states,
router_ttl_secs, router_ttl_secs,
......
...@@ -1106,6 +1106,7 @@ class KvRouterConfig: ...@@ -1106,6 +1106,7 @@ class KvRouterConfig:
use_kv_events: bool = True, use_kv_events: bool = True,
router_replica_sync: bool = False, router_replica_sync: bool = False,
router_track_active_blocks: bool = True, router_track_active_blocks: bool = True,
router_assume_kv_reuse: bool = True,
router_snapshot_threshold: Optional[int] = 1000000, router_snapshot_threshold: Optional[int] = 1000000,
router_reset_states: bool = False, router_reset_states: bool = False,
router_ttl_secs: float = 120.0, router_ttl_secs: float = 120.0,
...@@ -1121,6 +1122,8 @@ class KvRouterConfig: ...@@ -1121,6 +1122,8 @@ class KvRouterConfig:
use_kv_events: Whether to use KV events from workers (default: True) use_kv_events: Whether to use KV events from workers (default: True)
router_replica_sync: Enable replica synchronization (default: False) router_replica_sync: Enable replica synchronization (default: False)
router_track_active_blocks: Track active blocks for load balancing (default: True) router_track_active_blocks: Track active blocks for load balancing (default: True)
router_assume_kv_reuse: Assume KV cache reuse when tracking active blocks (default: True).
When True, computes actual block hashes. When False, generates random hashes.
router_snapshot_threshold: Number of messages before snapshot (default: 1000000) router_snapshot_threshold: Number of messages before snapshot (default: 1000000)
router_reset_states: Reset router state on startup (default: False) router_reset_states: Reset router state on startup (default: False)
router_ttl_secs: TTL for blocks in seconds when not using KV events (default: 120.0) router_ttl_secs: TTL for blocks in seconds when not using KV events (default: 120.0)
......
...@@ -19,6 +19,7 @@ use dynamo_runtime::{ ...@@ -19,6 +19,7 @@ use dynamo_runtime::{
traits::DistributedRuntimeProvider, traits::DistributedRuntimeProvider,
}; };
use futures::stream::{self, StreamExt}; use futures::stream::{self, StreamExt};
use rand::Rng;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::json; use serde_json::json;
...@@ -137,6 +138,11 @@ pub struct KvRouterConfig { ...@@ -137,6 +138,11 @@ pub struct KvRouterConfig {
/// Whether to track active blocks in the router (default: true) /// Whether to track active blocks in the router (default: true)
pub router_track_active_blocks: bool, pub router_track_active_blocks: bool,
/// Whether to assume KV cache reuse when tracking active blocks (default: true).
/// When true, computes actual block hashes for sequence tracking.
/// When false, generates random hashes (assuming no KV cache reuse).
pub router_assume_kv_reuse: bool,
/// Threshold for triggering snapshots. If None, no snapshots will be performed. /// Threshold for triggering snapshots. If None, no snapshots will be performed.
pub router_snapshot_threshold: Option<u32>, pub router_snapshot_threshold: Option<u32>,
...@@ -161,6 +167,7 @@ impl Default for KvRouterConfig { ...@@ -161,6 +167,7 @@ impl Default for KvRouterConfig {
use_kv_events: true, use_kv_events: true,
router_replica_sync: false, router_replica_sync: false,
router_track_active_blocks: true, router_track_active_blocks: true,
router_assume_kv_reuse: true,
router_snapshot_threshold: Some(1000000), router_snapshot_threshold: Some(1000000),
router_reset_states: false, router_reset_states: false,
router_ttl_secs: 120.0, router_ttl_secs: 120.0,
...@@ -180,6 +187,7 @@ impl KvRouterConfig { ...@@ -180,6 +187,7 @@ impl KvRouterConfig {
use_kv_events: Option<bool>, use_kv_events: Option<bool>,
replica_sync: Option<bool>, replica_sync: Option<bool>,
track_active_blocks: Option<bool>, track_active_blocks: Option<bool>,
assume_kv_reuse: Option<bool>,
router_snapshot_threshold: Option<Option<u32>>, router_snapshot_threshold: Option<Option<u32>>,
router_reset_states: Option<bool>, router_reset_states: Option<bool>,
router_ttl_secs: Option<f64>, router_ttl_secs: Option<f64>,
...@@ -194,6 +202,7 @@ impl KvRouterConfig { ...@@ -194,6 +202,7 @@ impl KvRouterConfig {
router_replica_sync: replica_sync.unwrap_or(default.router_replica_sync), router_replica_sync: replica_sync.unwrap_or(default.router_replica_sync),
router_track_active_blocks: track_active_blocks router_track_active_blocks: track_active_blocks
.unwrap_or(default.router_track_active_blocks), .unwrap_or(default.router_track_active_blocks),
router_assume_kv_reuse: assume_kv_reuse.unwrap_or(default.router_assume_kv_reuse),
router_snapshot_threshold: router_snapshot_threshold router_snapshot_threshold: router_snapshot_threshold
.unwrap_or(default.router_snapshot_threshold), .unwrap_or(default.router_snapshot_threshold),
router_reset_states: router_reset_states.unwrap_or(default.router_reset_states), router_reset_states: router_reset_states.unwrap_or(default.router_reset_states),
...@@ -203,6 +212,37 @@ impl KvRouterConfig { ...@@ -203,6 +212,37 @@ impl KvRouterConfig {
.unwrap_or(default.router_prune_target_ratio), .unwrap_or(default.router_prune_target_ratio),
} }
} }
/// Compute sequence hashes for active block tracking based on configuration.
///
/// Returns:
/// - `None` if `router_track_active_blocks` is false
/// - Random hashes if `router_track_active_blocks` is true but `router_assume_kv_reuse` is false
/// - Actual sequence hashes if both are true
pub fn compute_seq_hashes_for_tracking(
&self,
tokens: &[u32],
block_size: u32,
) -> Option<Vec<u64>> {
if !self.router_track_active_blocks {
return None;
}
let num_blocks = tokens.len() / block_size as usize;
if num_blocks == 0 {
return Some(Vec::new());
}
if self.router_assume_kv_reuse {
// Compute actual block hashes and sequence hashes
let block_hashes = compute_block_hash_for_seq(tokens, block_size, None);
Some(compute_seq_hash_for_block(&block_hashes))
} else {
// Generate random hashes (no KV reuse assumed)
let mut rng = rand::rng();
Some((0..num_blocks).map(|_| rng.random::<u64>()).collect())
}
}
} }
pub enum Indexer { pub enum Indexer {
...@@ -452,8 +492,7 @@ impl KvRouter { ...@@ -452,8 +492,7 @@ impl KvRouter {
// Compute seq_hashes only if scheduler needs it for active blocks tracking // Compute seq_hashes only if scheduler needs it for active blocks tracking
let maybe_seq_hashes = self let maybe_seq_hashes = self
.kv_router_config .kv_router_config
.router_track_active_blocks .compute_seq_hashes_for_tracking(tokens, self.block_size);
.then(|| compute_seq_hash_for_block(&block_hashes));
let best_worker = self let best_worker = self
.scheduler .scheduler
...@@ -487,10 +526,9 @@ impl KvRouter { ...@@ -487,10 +526,9 @@ impl KvRouter {
) { ) {
let isl_tokens = tokens.len(); let isl_tokens = tokens.len();
let maybe_seq_hashes = self.kv_router_config.router_track_active_blocks.then(|| { let maybe_seq_hashes = self
let block_hashes = compute_block_hash_for_seq(tokens, self.block_size, None); .kv_router_config
compute_seq_hash_for_block(&block_hashes) .compute_seq_hashes_for_tracking(tokens, self.block_size);
});
if let Err(e) = self if let Err(e) = self
.scheduler .scheduler
...@@ -539,8 +577,7 @@ impl KvRouter { ...@@ -539,8 +577,7 @@ impl KvRouter {
let maybe_seq_hashes = self let maybe_seq_hashes = self
.kv_router_config .kv_router_config
.router_track_active_blocks .compute_seq_hashes_for_tracking(tokens, self.block_size);
.then(|| compute_seq_hash_for_block(&block_hashes));
Ok(self Ok(self
.scheduler .scheduler
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment