Unverified Commit 22da711f authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

chore(kv-router): tighten indexer cleanup helpers (#7945)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 5a3fa1fd
......@@ -72,6 +72,14 @@ impl Block {
block_hash: Some(block_hash),
}
}
#[inline]
fn drop_worker(&mut self, worker: WorkerWithDpRank) {
self.workers.remove(&worker);
if self.workers.is_empty() {
self.children.clear();
}
}
}
/// Thread-safe radix tree for concurrent KV cache lookups.
......@@ -458,11 +466,7 @@ impl ConcurrentRadixTree {
continue;
};
let mut guard = block.write();
guard.workers.remove(&worker);
if guard.workers.is_empty() {
guard.children.clear();
}
block.write().drop_worker(worker);
num_removed += 1;
}
......@@ -498,11 +502,7 @@ impl ConcurrentRadixTree {
for worker in workers {
if let Some(worker_lookup) = lookup.remove(&worker) {
for (_, block) in worker_lookup.into_iter() {
let mut guard = block.write();
guard.workers.remove(&worker);
if guard.workers.is_empty() {
guard.children.clear();
}
block.write().drop_worker(worker);
}
if keep_worker {
......@@ -530,11 +530,7 @@ impl ConcurrentRadixTree {
let key = WorkerWithDpRank { worker_id, dp_rank };
if let Some(worker_lookup) = lookup.remove(&key) {
for (_, block) in worker_lookup.into_iter() {
let mut guard = block.write();
guard.workers.remove(&key);
if guard.workers.is_empty() {
guard.children.clear();
}
block.write().drop_worker(key);
}
self.tree_sizes.remove(&key);
}
......
......@@ -31,6 +31,7 @@
//! - If `i < current_cutoff`: new_cutoff = `i`
//! - If new_cutoff == 0: remove worker entirely from this node
//! - Else: move worker to `worker_cutoffs[w] = new_cutoff`
//! - Worker lookup entries for the newly uncovered suffix are scrubbed eagerly
//!
//! Removal does NOT perform structural splits. Multiple workers can independently reduce
//! their match indices without fragmenting the tree, accurately tracking each worker's
......@@ -121,6 +122,105 @@ impl Node {
fn has_any_workers(&self) -> bool {
!self.full_edge_workers.is_empty() || !self.worker_cutoffs.is_empty()
}
#[inline]
fn current_cutoff(&self, worker: WorkerWithDpRank) -> usize {
if self.full_edge_workers.contains(&worker) {
self.edge.len()
} else {
self.worker_cutoffs.get(&worker).copied().unwrap_or(0)
}
}
#[inline]
fn covers_pos(&self, worker: WorkerWithDpRank, pos: usize) -> bool {
self.full_edge_workers.contains(&worker)
|| matches!(self.worker_cutoffs.get(&worker), Some(&cutoff) if pos < cutoff)
}
// Descendants are only reachable through full-edge coverage; partial workers stop in this node.
fn clear_children_if_unreachable(&mut self) {
if self.full_edge_workers.is_empty() {
self.children.clear();
}
}
// These hashes are no longer covered after a cutoff shrink and must be scrubbed from lookup.
fn uncovered_suffix_hashes(&self, cutoff: usize) -> Vec<ExternalSequenceBlockHash> {
debug_assert!(cutoff <= self.edge.len());
self.edge[cutoff..].iter().map(|&(_, hash)| hash).collect()
}
#[inline]
fn drop_worker(&mut self, worker: WorkerWithDpRank) {
self.full_edge_workers.remove(&worker);
self.worker_cutoffs.remove(&worker);
self.clear_children_if_unreachable();
}
#[inline]
fn promote_to_full(&mut self, worker: WorkerWithDpRank) {
if !self.full_edge_workers.contains(&worker) {
self.worker_cutoffs.remove(&worker);
self.full_edge_workers.insert(worker);
}
}
#[inline]
fn remove_worker_at_pos(
&mut self,
worker: WorkerWithDpRank,
pos: usize,
removed_hash: ExternalSequenceBlockHash,
) -> RemoveOutcome {
let current_cutoff = self.current_cutoff(worker);
if pos >= current_cutoff {
// Duplicate remove for an already-uncovered hash: just scrub this lookup entry.
return RemoveOutcome {
removed: 0,
stale_hashes: vec![removed_hash],
};
}
let new_cutoff = pos;
let removed = current_cutoff - new_cutoff;
let stale_hashes = self.uncovered_suffix_hashes(new_cutoff);
if new_cutoff == 0 {
self.drop_worker(worker);
} else {
self.full_edge_workers.remove(&worker);
self.worker_cutoffs.insert(worker, new_cutoff);
self.clear_children_if_unreachable();
}
RemoveOutcome {
removed,
stale_hashes,
}
}
// Used by dump/restore to ignore dead child pointers that may still exist in the live tree.
fn live_children(&self) -> Vec<SharedNode> {
self.children
.values()
.filter(|child| {
let guard = child.read();
guard.has_any_workers() || !guard.children.is_empty()
})
.cloned()
.collect()
}
// Dump-time merge for passthrough nodes with identical full-coverage worker sets.
fn can_merge_with_only_child(&self, live_children: &[SharedNode]) -> bool {
self.worker_cutoffs.is_empty() && live_children.len() == 1 && {
let child_guard = live_children[0].read();
child_guard.full_edge_workers == self.full_edge_workers
&& child_guard.worker_cutoffs.is_empty()
&& child_guard.has_any_workers()
}
}
}
/// Data returned by [`ConcurrentRadixTreeCompressed::split_node`] for deferred lookup updates.
......@@ -132,6 +232,11 @@ struct SplitLookupData {
suffix: SharedNode,
}
struct RemoveOutcome {
removed: usize,
stale_hashes: Vec<ExternalSequenceBlockHash>,
}
/// Thread-safe radix tree (compressed trie) for concurrent KV cache lookups.
pub struct ConcurrentRadixTreeCompressed {
/// The root of the radix tree. Has an empty edge and only contains children.
......@@ -535,28 +640,23 @@ impl ConcurrentRadixTreeCompressed {
// stale entry in the lookup map.
{
let guard = node.read();
if let Some(&pos) = guard.edge_index.get(&parent_hash) {
let is_full = guard.full_edge_workers.contains(&worker);
let cutoff = if is_full {
guard.edge.len()
} else {
guard.worker_cutoffs.get(&worker).copied().unwrap_or(0)
};
if pos >= cutoff {
tracing::warn!(
worker_id = worker.worker_id.to_string(),
dp_rank = worker.dp_rank,
id,
parent_hash = ?parent_hash,
pos,
cutoff,
"Stale parent: worker no longer covers parent_hash; rejecting store"
);
drop(guard);
let wl = lookup.get_mut(&worker).unwrap();
wl.remove(&parent_hash);
return Err(KvCacheEventError::ParentBlockNotFound);
}
if let Some(&pos) = guard.edge_index.get(&parent_hash)
&& !guard.covers_pos(worker, pos)
{
let cutoff = guard.current_cutoff(worker);
tracing::warn!(
worker_id = worker.worker_id.to_string(),
dp_rank = worker.dp_rank,
id,
parent_hash = ?parent_hash,
pos,
cutoff,
"Stale parent: worker no longer covers parent_hash; rejecting store"
);
drop(guard);
let wl = lookup.get_mut(&worker).unwrap();
wl.remove(&parent_hash);
return Err(KvCacheEventError::ParentBlockNotFound);
}
}
......@@ -715,10 +815,7 @@ impl ConcurrentRadixTreeCompressed {
let split = Self::split_node(&mut child_guard, match_len);
// Ensure worker has full coverage of the prefix.
if !child_guard.full_edge_workers.contains(&worker) {
child_guard.worker_cutoffs.remove(&worker);
child_guard.full_edge_workers.insert(worker);
}
child_guard.promote_to_full(worker);
let tail = &remaining[match_len..];
if !tail.is_empty() {
......@@ -775,10 +872,7 @@ impl ConcurrentRadixTreeCompressed {
}
// Full edge match: upgrade worker to full coverage if necessary.
if !child_guard.full_edge_workers.contains(&worker) {
child_guard.worker_cutoffs.remove(&worker);
child_guard.full_edge_workers.insert(worker);
}
child_guard.promote_to_full(worker);
drop(child_guard);
let wl = lookup.get_mut(&worker).unwrap();
......@@ -808,6 +902,9 @@ impl ConcurrentRadixTreeCompressed {
/// - `pos >= current_cutoff`: no-op (already beyond coverage)
/// - `pos < current_cutoff`: `new_cutoff = pos`; moves worker to `worker_cutoffs`
/// or removes entirely if `new_cutoff == 0`.
///
/// Lookup entries for the newly uncovered suffix are removed eagerly so
/// later duplicate remove events fast-path through the missing-hash case.
fn apply_removed(
&self,
lookup: &mut FxHashMap<WorkerWithDpRank, WorkerLookup>,
......@@ -842,63 +939,25 @@ impl ConcurrentRadixTreeCompressed {
};
loop {
// Returns Some(removed_count) on success, None if the node is stale
// Returns Some(remove_outcome) on success, None if the node is stale
// (hash was moved to a descendant by a concurrent split).
let update: Option<usize> = {
let update: Option<RemoveOutcome> = {
let mut guard = cur_node.write();
match guard.edge_index.get(&block_hash).copied() {
None => None, // stale: hash moved to a child
Some(pos) => {
// Determine the worker's current match index.
// Use 0 as sentinel for "not tracked" → pos >= 0 is always true → no-op.
let is_full = guard.full_edge_workers.contains(&worker);
let current_cutoff = if is_full {
guard.edge.len()
} else {
guard.worker_cutoffs.get(&worker).copied().unwrap_or(0)
};
if pos >= current_cutoff {
// Block is at or beyond current coverage — no-op.
Some(0)
} else {
let new_cutoff = pos;
let removed = current_cutoff - new_cutoff;
if new_cutoff == 0 {
// Worker loses all coverage in this node.
if is_full {
guard.full_edge_workers.remove(&worker);
} else {
guard.worker_cutoffs.remove(&worker);
}
} else {
// Worker retains coverage of edge[0..new_cutoff].
if is_full {
guard.full_edge_workers.remove(&worker);
}
guard.worker_cutoffs.insert(worker, new_cutoff);
}
if !guard.has_any_workers() {
guard.children.clear();
}
Some(removed)
}
}
}
guard
.edge_index
.get(&block_hash)
.copied()
.map(|pos| guard.remove_worker_at_pos(worker, pos, block_hash))
};
match update {
Some(removed) => {
total_removed += removed;
// Remove this specific hash from the lookup. Other hashes at
// positions > new_cutoff remain and are cleaned up lazily when
// their own remove events arrive (they will be no-ops).
Some(outcome) => {
total_removed += outcome.removed;
if let Some(wl) = lookup.get_mut(&worker) {
wl.remove(&block_hash);
for hash in outcome.stale_hashes {
wl.remove(&hash);
}
}
continue 'outer;
}
......@@ -972,11 +1031,7 @@ impl ConcurrentRadixTreeCompressed {
continue;
}
let mut guard = node.write();
guard.full_edge_workers.remove(&worker);
guard.worker_cutoffs.remove(&worker);
if !guard.has_any_workers() {
guard.children.clear();
}
guard.drop_worker(worker);
}
if keep_worker {
......@@ -1006,11 +1061,7 @@ impl ConcurrentRadixTreeCompressed {
continue;
}
let mut guard = node.write();
guard.full_edge_workers.remove(&key);
guard.worker_cutoffs.remove(&key);
if !guard.has_any_workers() {
guard.children.clear();
}
guard.drop_worker(key);
}
self.tree_sizes.remove(&key);
}
......@@ -1070,25 +1121,12 @@ impl ConcurrentRadixTreeCompressed {
merged_edge.extend_from_slice(&guard.edge);
let live_children: Vec<SharedNode> = guard
.children
.values()
.filter(|child| {
let cg = child.read();
cg.has_any_workers() || !cg.children.is_empty()
})
.cloned()
.collect();
let live_children = guard.live_children();
// Merge condition: this node is a pure passthrough that can be
// collapsed with its single child. Requires identical worker sets
// and no partial-coverage cutoffs on either side.
let can_merge = guard.worker_cutoffs.is_empty() && live_children.len() == 1 && {
let cg = live_children[0].read();
cg.full_edge_workers == guard.full_edge_workers
&& cg.worker_cutoffs.is_empty()
&& cg.has_any_workers()
};
let can_merge = guard.can_merge_with_only_child(&live_children);
if can_merge {
let next = live_children[0].clone();
......
......@@ -70,6 +70,14 @@ impl RadixBlock {
recent_uses: VecDeque::new(),
}
}
#[inline]
fn drop_worker(&mut self, worker: WorkerWithDpRank) {
self.workers.remove(&worker);
if self.workers.is_empty() {
self.children.clear();
}
}
}
pub struct RadixTree {
......@@ -445,12 +453,7 @@ impl RadixTree {
}
};
let mut guard = entry.borrow_mut();
guard.workers.remove(&worker);
if guard.workers.is_empty() {
// if no workers are using this block, that is true for all children
guard.children.clear();
}
entry.borrow_mut().drop_worker(worker);
// remove the block from the worker's lookup table
worker_lookup.remove(&block);
}
......@@ -478,11 +481,7 @@ impl RadixTree {
for worker in workers {
if let Some((worker_key, blocks)) = self.lookup.remove_entry(&worker) {
for (_, block) in blocks {
block.borrow_mut().workers.remove(&worker);
// If no workers are using this block, that is true for all children
if block.borrow().workers.is_empty() {
block.borrow_mut().children.clear();
}
block.borrow_mut().drop_worker(worker);
}
if keep_worker {
......@@ -501,10 +500,7 @@ impl RadixTree {
let key = WorkerWithDpRank { worker_id, dp_rank };
if let Some(blocks) = self.lookup.remove(&key) {
for (_, block) in blocks {
block.borrow_mut().workers.remove(&key);
if block.borrow().workers.is_empty() {
block.borrow_mut().children.clear();
}
block.borrow_mut().drop_worker(key);
}
}
}
......
......@@ -325,13 +325,13 @@ mod interface_tests {
let continuation_remove = make_remove_event_with_parent(0, &[1, 2, 3], &[4, 5]);
let prefix_remove = make_remove_event(0, &[1, 2, 3]);
// TODO: The radix-family implementations still have a broader tree-size
// accounting gap after mid-chain removes because descendant lookup entries
// are cleaned up lazily. That means "store -> partial remove -> restore
// continuation" can still miscount restored coverage in single, sharded,
// concurrent, and concurrent_compressed. This test is intentionally scoped
// to duplicate store/remove replay, which was the concrete compressed-tree
// regression fixed on this branch.
// TODO: The non-compressed radix-family implementations still have a broader
// tree-size accounting gap after mid-chain removes because descendant
// lookup entries are cleaned up lazily. That means "store -> partial
// remove -> restore continuation" can still miscount restored coverage
// in single, sharded, and concurrent. This test is intentionally scoped
// to duplicate store/remove replay so all tree-size variants share the
// same stable baseline.
index.apply_event(prefix_event.clone()).await;
flush_and_settle(index.as_ref()).await;
......@@ -414,6 +414,52 @@ mod interface_tests {
assert!(snapshot_tree(index.as_ref()).await.is_empty());
}
#[tokio::test]
async fn test_concurrent_compressed_restore_after_mid_chain_remove_updates_tree_size() {
let index = make_indexer("concurrent_compressed");
let worker = WorkerWithDpRank::new(0, 0);
index.apply_event(make_store_event(0, &[1, 2, 3])).await;
flush_and_settle(index.as_ref()).await;
assert_query_score_and_tree_size(index.as_ref(), &[1, 2, 3], worker, 3, 3).await;
index
.apply_event(make_remove_event_with_parent(0, &[1], &[2]))
.await;
flush_and_settle(index.as_ref()).await;
assert_query_score_and_tree_size(index.as_ref(), &[1, 2, 3], worker, 1, 1).await;
index
.apply_event(make_store_event_with_parent(0, &[1], &[2, 3]))
.await;
flush_and_settle(index.as_ref()).await;
assert_query_score_and_tree_size(index.as_ref(), &[1, 2, 3], worker, 3, 3).await;
}
#[tokio::test]
async fn test_concurrent_compressed_partial_node_drops_unreachable_descendants() {
let index = make_indexer("concurrent_compressed");
index.apply_event(make_store_event(0, &[1, 2, 3])).await;
index
.apply_event(make_store_event_with_parent(0, &[1, 2, 3], &[4, 5]))
.await;
flush_and_settle(index.as_ref()).await;
index
.apply_event(make_remove_event_with_parent(0, &[1], &[2]))
.await;
flush_and_settle(index.as_ref()).await;
assert_eq!(
snapshot_tree(index.as_ref()).await,
vec![make_store_event(0, &[1])]
);
}
#[tokio::test]
#[apply(indexer_template)]
async fn test_partial_match(variant: &str) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment