Unverified Commit b55277c9 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

fix(kv-router): compressed tree sizes accounting (#7800)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 1bbdff2d
......@@ -595,15 +595,16 @@ impl ConcurrentRadixTreeCompressed {
None => self.root.clone(),
};
let num_blocks = op.blocks.len();
self.insert_blocks_from(lookup, worker, &parent, op.parent_hash, &op.blocks);
let num_blocks_added =
self.insert_blocks_from(lookup, worker, &parent, op.parent_hash, &op.blocks);
match self.tree_sizes.get(&worker) {
Some(size) => {
size.fetch_add(num_blocks, Ordering::Relaxed);
size.fetch_add(num_blocks_added, Ordering::Relaxed);
}
None => {
self.tree_sizes.insert(worker, AtomicUsize::new(num_blocks));
self.tree_sizes
.insert(worker, AtomicUsize::new(num_blocks_added));
}
}
......@@ -617,9 +618,10 @@ impl ConcurrentRadixTreeCompressed {
parent: &SharedNode,
seed_hash: Option<ExternalSequenceBlockHash>,
blocks: &[KvCacheStoredBlockData],
) {
) -> usize {
let mut current_parent = parent.clone();
let mut remaining = blocks;
let mut num_blocks_added = 0usize;
// Track the last ExternalSequenceBlockHash we matched to detect if
// `current_parent` was split by a concurrent thread between iterations.
// A split shortens `current_parent`'s edge and moves our last-matched
......@@ -681,9 +683,11 @@ impl ConcurrentRadixTreeCompressed {
let wl = lookup.get_mut(&worker).unwrap();
for b in remaining {
wl.insert(b.block_hash, new_node.clone());
if wl.insert(b.block_hash, new_node.clone()).is_none() {
num_blocks_added += 1;
}
}
return;
return num_blocks_added;
}
}
};
......@@ -753,10 +757,14 @@ impl ConcurrentRadixTreeCompressed {
let wl = lookup.get_mut(&worker).unwrap();
for b in &remaining[..match_len] {
wl.insert(b.block_hash, child.clone());
if wl.insert(b.block_hash, child.clone()).is_none() {
num_blocks_added += 1;
}
}
for b in tail {
wl.insert(b.block_hash, new_node.clone());
if wl.insert(b.block_hash, new_node.clone()).is_none() {
num_blocks_added += 1;
}
}
} else {
drop(child_guard);
......@@ -764,10 +772,12 @@ impl ConcurrentRadixTreeCompressed {
let wl = lookup.get_mut(&worker).unwrap();
for b in &remaining[..match_len] {
wl.insert(b.block_hash, child.clone());
if wl.insert(b.block_hash, child.clone()).is_none() {
num_blocks_added += 1;
}
}
}
return;
return num_blocks_added;
}
// Full edge match: upgrade worker to full coverage if necessary.
......@@ -779,7 +789,9 @@ impl ConcurrentRadixTreeCompressed {
let wl = lookup.get_mut(&worker).unwrap();
for b in &remaining[..edge_len] {
wl.insert(b.block_hash, child.clone());
if wl.insert(b.block_hash, child.clone()).is_none() {
num_blocks_added += 1;
}
}
last_ext_hash = Some(remaining[edge_len - 1].block_hash);
......@@ -787,6 +799,8 @@ impl ConcurrentRadixTreeCompressed {
current_parent = child;
}
}
num_blocks_added
}
// ------------------------------------------------------------------
......
......@@ -210,6 +210,13 @@ fn indexer_template(
) {
}
#[template]
#[rstest]
fn tree_size_indexer_template(
#[values("single", "sharded", "concurrent", "concurrent_compressed")] variant: &str,
) {
}
fn make_indexer(variant: &str) -> Box<dyn KvIndexerInterface> {
let token = CancellationToken::new();
let metrics = Arc::new(KvIndexerMetrics::new_unregistered());
......@@ -245,6 +252,22 @@ async fn flush_and_settle(index: &dyn KvIndexerInterface) {
tokio::time::sleep(Duration::from_millis(100)).await;
}
async fn assert_query_score_and_tree_size(
index: &dyn KvIndexerInterface,
query: &[u64],
worker: WorkerWithDpRank,
expected_score: u32,
expected_tree_size: usize,
) {
let scores = index
.find_matches(query.iter().copied().map(LocalBlockHash).collect())
.await
.unwrap();
assert_eq!(scores.scores.get(&worker), Some(&expected_score));
assert_eq!(scores.tree_sizes.get(&worker), Some(&expected_tree_size));
}
mod interface_tests {
use super::*;
use rstest_reuse::apply;
......@@ -273,38 +296,102 @@ mod interface_tests {
}
#[tokio::test]
async fn test_concurrent_duplicate_store_does_not_inflate_tree_size() {
let index = make_indexer("concurrent");
let sequence = vec![LocalBlockHash(1), LocalBlockHash(2), LocalBlockHash(3)];
#[apply(tree_size_indexer_template)]
async fn test_tree_size_accounting_stays_stable(variant: &str) {
let index = make_indexer(variant);
let worker = WorkerWithDpRank::new(0, 0);
let event = make_store_event(0, &[1, 2, 3]);
let prefix_event = make_store_event(0, &[1, 2, 3]);
let continuation_event = make_store_event_with_parent(0, &[1, 2, 3], &[4, 5]);
let continuation_remove = make_remove_event_with_parent(0, &[1, 2, 3], &[4, 5]);
let prefix_remove = make_remove_event(0, &[1, 2, 3]);
// TODO: The radix-family implementations still have a broader tree-size
// accounting gap after mid-chain removes because descendant lookup entries
// are cleaned up lazily. That means "store -> partial remove -> restore
// continuation" can still miscount restored coverage in single, sharded,
// concurrent, and concurrent_compressed. This test is intentionally scoped
// to duplicate store/remove replay, which was the concrete compressed-tree
// regression fixed on this branch.
index.apply_event(prefix_event.clone()).await;
flush_and_settle(index.as_ref()).await;
assert_query_score_and_tree_size(index.as_ref(), &[1, 2, 3], worker, 3, 3).await;
let prefix_snapshot = snapshot_tree(index.as_ref()).await;
index.apply_event(event.clone()).await;
index.apply_event(prefix_event).await;
flush_and_settle(index.as_ref()).await;
let initial_snapshot = snapshot_tree(index.as_ref()).await;
let initial_scores = index.find_matches(sequence.clone()).await.unwrap();
assert_eq!(
initial_scores.tree_sizes.get(&worker),
Some(&3),
"initial store should count all three blocks"
prefix_snapshot,
snapshot_tree(index.as_ref()).await,
"replaying the same store event should not change the tree structure"
);
assert_query_score_and_tree_size(index.as_ref(), &[1, 2, 3], worker, 3, 3).await;
index.apply_event(event).await;
index.apply_event(continuation_event.clone()).await;
flush_and_settle(index.as_ref()).await;
let duplicate_snapshot = snapshot_tree(index.as_ref()).await;
let duplicate_scores = index.find_matches(sequence).await.unwrap();
assert_query_score_and_tree_size(index.as_ref(), &[1, 2, 3, 4, 5], worker, 5, 5).await;
let full_snapshot = snapshot_tree(index.as_ref()).await;
index.apply_event(continuation_event).await;
flush_and_settle(index.as_ref()).await;
assert_eq!(
initial_snapshot, duplicate_snapshot,
"replaying the same store event should not change the tree structure"
full_snapshot,
snapshot_tree(index.as_ref()).await,
"replaying the same continuation store should not change the tree structure"
);
assert_query_score_and_tree_size(index.as_ref(), &[1, 2, 3, 4, 5], worker, 5, 5).await;
index.apply_event(continuation_remove.clone()).await;
flush_and_settle(index.as_ref()).await;
assert_query_score_and_tree_size(index.as_ref(), &[1, 2, 3, 4, 5], worker, 3, 3).await;
let trimmed_snapshot = snapshot_tree(index.as_ref()).await;
index.apply_event(continuation_remove).await;
flush_and_settle(index.as_ref()).await;
assert_eq!(
duplicate_scores.tree_sizes.get(&worker),
Some(&3),
"replaying the same store event should not increase the per-worker tree size"
trimmed_snapshot,
snapshot_tree(index.as_ref()).await,
"replaying the same remove event should not change the tree structure"
);
assert_query_score_and_tree_size(index.as_ref(), &[1, 2, 3, 4, 5], worker, 3, 3).await;
index.apply_event(prefix_remove.clone()).await;
flush_and_settle(index.as_ref()).await;
let empty_scores = index
.find_matches(vec![
LocalBlockHash(1),
LocalBlockHash(2),
LocalBlockHash(3),
LocalBlockHash(4),
LocalBlockHash(5),
])
.await
.unwrap();
assert!(empty_scores.scores.is_empty());
assert!(snapshot_tree(index.as_ref()).await.is_empty());
index.apply_event(prefix_remove).await;
flush_and_settle(index.as_ref()).await;
let duplicate_empty_scores = index
.find_matches(vec![
LocalBlockHash(1),
LocalBlockHash(2),
LocalBlockHash(3),
LocalBlockHash(4),
LocalBlockHash(5),
])
.await
.unwrap();
assert!(duplicate_empty_scores.scores.is_empty());
assert!(snapshot_tree(index.as_ref()).await.is_empty());
}
#[tokio::test]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment