Unverified Commit d24cfbe1 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

perf(kv-router): linear scan improvements [DYN-2164] (#6363)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
Co-authored-by: default avatarCursor <cursoragent@cursor.com>
Co-authored-by: default avatarClaude Opus 4.6 <noreply@anthropic.com>
parent 1bc913ef
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
//! //!
//! Unlike `RadixTree` which uses `Rc<RefCell<>>` and requires single-threaded access, //! Unlike `RadixTree` which uses `Rc<RefCell<>>` and requires single-threaded access,
//! `ConcurrentRadixTree` uses `Arc<RwLock<>>` per node and a //! `ConcurrentRadixTree` uses `Arc<RwLock<>>` per node and a
//! `RwLock<FxHashMap<..., RwLock<FxHashMap<...>>>>` for the lookup table. //! `DashMap<..., RwLock<FxHashMap<...>>>` for the lookup table.
//! //!
//! # Limitations vs RadixTree //! # Limitations vs RadixTree
//! //!
...@@ -20,9 +20,8 @@ ...@@ -20,9 +20,8 @@
//! //!
//! - Multiple `find_matches` can run in parallel (read locks only) //! - Multiple `find_matches` can run in parallel (read locks only)
//! - Write operations (`apply_event`, `remove_worker`) acquire write locks //! - Write operations (`apply_event`, `remove_worker`) acquire write locks
//! - Outer `RwLock` is read-locked on the hot path; structural mutations //! - Outer `DashMap` provides shard-level locking for per-worker access.
//! (adding/removing workers) are rare. Inner `RwLock` per worker allows //! Inner `RwLock` per worker allows per-worker write concurrency.
//! per-worker write concurrency.
//! - Deadlock prevention: always lock parent before child, hand-over-hand locking //! - Deadlock prevention: always lock parent before child, hand-over-hand locking
use std::sync::Arc; use std::sync::Arc;
...@@ -79,7 +78,7 @@ impl Block { ...@@ -79,7 +78,7 @@ impl Block {
/// ///
/// Unlike `RadixTree` which uses `Rc<RefCell<>>` and requires single-threaded access, /// Unlike `RadixTree` which uses `Rc<RefCell<>>` and requires single-threaded access,
/// `ConcurrentRadixTree` uses `Arc<RwLock<>>` per node and a /// `ConcurrentRadixTree` uses `Arc<RwLock<>>` per node and a
/// `RwLock<FxHashMap<..., RwLock<FxHashMap<...>>>>` for the lookup table, /// `DashMap<..., RwLock<FxHashMap<...>>>` for the lookup table,
/// enabling concurrent `find_matches` operations. /// enabling concurrent `find_matches` operations.
/// ///
/// # Limitations vs RadixTree /// # Limitations vs RadixTree
...@@ -92,8 +91,7 @@ impl Block { ...@@ -92,8 +91,7 @@ impl Block {
/// ///
/// - Multiple `find_matches` can run in parallel (read locks only) /// - Multiple `find_matches` can run in parallel (read locks only)
/// - Write operations (`apply_event`, `remove_worker`) acquire write locks /// - Write operations (`apply_event`, `remove_worker`) acquire write locks
/// - Outer RwLock is read-locked on the hot path; structural mutations /// - Outer `DashMap` provides shard-level locking for per-worker access.
/// (adding/removing workers) are rare and take a write lock.
/// - Inner `RwLock` per worker allows per-worker write concurrency. /// - Inner `RwLock` per worker allows per-worker write concurrency.
/// - Deadlock prevention: always lock parent before child, hand-over-hand locking /// - Deadlock prevention: always lock parent before child, hand-over-hand locking
pub struct ConcurrentRadixTree { pub struct ConcurrentRadixTree {
......
...@@ -438,6 +438,18 @@ impl PositionalIndexer { ...@@ -438,6 +438,18 @@ impl PositionalIndexer {
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
impl PositionalIndexer { impl PositionalIndexer {
/// Score all active workers at the given position and clear the active set.
#[inline]
fn drain_active(
active: &mut FxHashSet<WorkerWithDpRank>,
scores: &mut OverlapScores,
pos: usize,
) {
for worker in active.drain() {
scores.scores.insert(worker, pos as u32);
}
}
/// Compute sequence hash incrementally from previous hash and current local hash. /// Compute sequence hash incrementally from previous hash and current local hash.
#[inline] #[inline]
fn compute_next_seq_hash(prev_seq_hash: u64, current_local_hash: u64) -> u64 { fn compute_next_seq_hash(prev_seq_hash: u64, current_local_hash: u64) -> u64 {
...@@ -516,9 +528,6 @@ impl PositionalIndexer { ...@@ -516,9 +528,6 @@ impl PositionalIndexer {
} }
/// Scan positions sequentially, updating active set and recording drain scores. /// Scan positions sequentially, updating active set and recording drain scores.
///
/// Inlines the DashMap lookup so the guard lives for each iteration,
/// avoiding a per-position `FxHashSet` clone.
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn linear_scan_drain( fn linear_scan_drain(
&self, &self,
...@@ -534,38 +543,34 @@ impl PositionalIndexer { ...@@ -534,38 +543,34 @@ impl PositionalIndexer {
return; return;
} }
for pos in lo..hi { for pos in lo..hi {
if active.is_empty() {
break;
}
let Some(entry) = self.index.get(&(pos, sequence[pos])) else { let Some(entry) = self.index.get(&(pos, sequence[pos])) else {
for worker in active.iter() { Self::drain_active(active, scores, pos);
scores.scores.insert(*worker, pos as u32);
}
active.clear();
break; break;
}; };
Self::ensure_seq_hash_computed(seq_hashes, pos, sequence); Self::ensure_seq_hash_computed(seq_hashes, pos, sequence);
let seq_hash = seq_hashes[pos]; let Some(workers) = entry.get(seq_hashes[pos]) else {
Self::drain_active(active, scores, pos);
match entry.get(seq_hash) { break;
Some(workers) => { };
active.retain(|w| {
if workers.contains(w) { if workers.len() < active.len() {
true active.retain(|w| {
} else { if workers.contains(w) {
scores.scores.insert(*w, pos as u32); true
false } else {
} scores.scores.insert(*w, pos as u32);
}); false
if early_exit && !active.is_empty() {
break;
}
}
None => {
for worker in active.iter() {
scores.scores.insert(*worker, pos as u32);
} }
active.clear(); });
break; }
}
if early_exit && !active.is_empty() {
break;
} }
} }
} }
...@@ -602,7 +607,7 @@ impl PositionalIndexer { ...@@ -602,7 +607,7 @@ impl PositionalIndexer {
} }
// Lazily computed sequence hashes // Lazily computed sequence hashes
let mut seq_hashes: Vec<ExternalSequenceBlockHash> = Vec::new(); let mut seq_hashes: Vec<ExternalSequenceBlockHash> = Vec::with_capacity(local_hashes.len());
// Check first position to initialize active set // Check first position to initialize active set
let Some(initial_workers) = let Some(initial_workers) =
...@@ -623,7 +628,6 @@ impl PositionalIndexer { ...@@ -623,7 +628,6 @@ impl PositionalIndexer {
scores.scores.insert(*worker, 1); scores.scores.insert(*worker, 1);
} }
// Populate tree_sizes // Populate tree_sizes
for worker in scores.scores.keys() { for worker in scores.scores.keys() {
if let Some(worker_tree_size) = self.tree_sizes.get(worker) { if let Some(worker_tree_size) = self.tree_sizes.get(worker) {
scores scores
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment