Unverified Commit 52382c91 authored by jthomson04's avatar jthomson04 Committed by GitHub
Browse files

fix: Don't double-count duplicate store events in the tree size. (#7549)


Signed-off-by: default avatarjthomson04 <jwillthomson19@gmail.com>
parent d4e30a59
......@@ -345,8 +345,13 @@ impl ConcurrentRadixTree {
let mut needs_worker_insert = false;
let num_blocks_added = op.blocks.len();
let mut num_blocks_added = 0;
// In each iteration, we lock the parent block and insert the worker into it from
// the previous iteration. This avoids locking a block twice.
//
// Track tree size from worker_lookup insertions so it matches the single-threaded
// radix tree's `lookup.len()` semantics and naturally includes the tail block.
for block_data in op.blocks {
let child = {
let mut parent_guard = current.write();
......@@ -392,11 +397,22 @@ impl ConcurrentRadixTree {
};
// Update lookup
worker_lookup.insert(block_data.block_hash, child.clone());
if worker_lookup
.insert(block_data.block_hash, child.clone())
.is_none()
{
num_blocks_added += 1;
}
current = child;
}
// Insert worker into the last child (not yet handled since there is
// no subsequent iteration to pick it up).
if needs_worker_insert {
current.write().workers.insert(worker);
}
match self.tree_sizes.get(&worker) {
Some(size) => {
size.fetch_add(num_blocks_added, Ordering::Relaxed);
......@@ -407,10 +423,6 @@ impl ConcurrentRadixTree {
}
}
if needs_worker_insert {
current.write().workers.insert(worker);
}
Ok(())
}
......
......@@ -272,6 +272,41 @@ mod interface_tests {
assert_eq!(*scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(), 3);
}
#[tokio::test]
async fn test_concurrent_duplicate_store_does_not_inflate_tree_size() {
let index = make_indexer("concurrent");
let sequence = vec![LocalBlockHash(1), LocalBlockHash(2), LocalBlockHash(3)];
let worker = WorkerWithDpRank::new(0, 0);
let event = make_store_event(0, &[1, 2, 3]);
index.apply_event(event.clone()).await;
flush_and_settle(index.as_ref()).await;
let initial_snapshot = snapshot_tree(index.as_ref()).await;
let initial_scores = index.find_matches(sequence.clone()).await.unwrap();
assert_eq!(
initial_scores.tree_sizes.get(&worker),
Some(&3),
"initial store should count all three blocks"
);
index.apply_event(event).await;
flush_and_settle(index.as_ref()).await;
let duplicate_snapshot = snapshot_tree(index.as_ref()).await;
let duplicate_scores = index.find_matches(sequence).await.unwrap();
assert_eq!(
initial_snapshot, duplicate_snapshot,
"replaying the same store event should not change the tree structure"
);
assert_eq!(
duplicate_scores.tree_sizes.get(&worker),
Some(&3),
"replaying the same store event should not increase the per-worker tree size"
);
}
#[tokio::test]
#[apply(indexer_template)]
async fn test_partial_match(variant: &str) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment