Unverified Commit cb9b1cd5 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

test(kv-router): use flush-and-settle helper in indexer tests (#7347)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 608405e0
...@@ -262,6 +262,14 @@ fn make_indexer(variant: &str) -> Box<dyn KvIndexerInterface> { ...@@ -262,6 +262,14 @@ fn make_indexer(variant: &str) -> Box<dyn KvIndexerInterface> {
} }
} }
/// Ensure queued indexer work is drained, then give a short settle window.
/// This is intentionally conservative for tests that assert immediately
/// after asynchronous event ingestion.
async fn flush_and_settle(index: &dyn KvIndexerInterface) {
index.flush().await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
#[tokio::test] #[tokio::test]
#[apply(indexer_template)] #[apply(indexer_template)]
async fn test_store_and_find(variant: &str) { async fn test_store_and_find(variant: &str) {
...@@ -270,7 +278,7 @@ async fn test_store_and_find(variant: &str) { ...@@ -270,7 +278,7 @@ async fn test_store_and_find(variant: &str) {
// Store a sequence for worker 0 // Store a sequence for worker 0
index.apply_event(make_store_event(0, &[1, 2, 3])).await; index.apply_event(make_store_event(0, &[1, 2, 3])).await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Find matches using local hashes // Find matches using local hashes
let scores = index let scores = index
...@@ -293,7 +301,7 @@ async fn test_partial_match(variant: &str) { ...@@ -293,7 +301,7 @@ async fn test_partial_match(variant: &str) {
// Store [1, 2, 3] for worker 0 // Store [1, 2, 3] for worker 0
index.apply_event(make_store_event(0, &[1, 2, 3])).await; index.apply_event(make_store_event(0, &[1, 2, 3])).await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Find matches for [1, 2, 999] - should match first 2 then stop // Find matches for [1, 2, 999] - should match first 2 then stop
let scores = index let scores = index
...@@ -318,7 +326,7 @@ async fn test_remove(variant: &str) { ...@@ -318,7 +326,7 @@ async fn test_remove(variant: &str) {
// Remove all blocks // Remove all blocks
index.apply_event(make_remove_event(0, &[1, 2, 3])).await; index.apply_event(make_remove_event(0, &[1, 2, 3])).await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Find should return nothing // Find should return nothing
let scores = index let scores = index
...@@ -343,7 +351,7 @@ async fn test_multiple_workers_shared_prefix(variant: &str) { ...@@ -343,7 +351,7 @@ async fn test_multiple_workers_shared_prefix(variant: &str) {
index.apply_event(make_store_event(0, &[1, 2])).await; index.apply_event(make_store_event(0, &[1, 2])).await;
index.apply_event(make_store_event(1, &[1, 3])).await; index.apply_event(make_store_event(1, &[1, 3])).await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Query [1] - both workers should match // Query [1] - both workers should match
let scores = index.find_matches(vec![LocalBlockHash(1)]).await.unwrap(); let scores = index.find_matches(vec![LocalBlockHash(1)]).await.unwrap();
...@@ -370,12 +378,12 @@ async fn test_remove_worker(variant: &str) { ...@@ -370,12 +378,12 @@ async fn test_remove_worker(variant: &str) {
index.apply_event(make_store_event(1, &[1, 2, 3])).await; index.apply_event(make_store_event(1, &[1, 2, 3])).await;
// Allow time for async event processing // Allow time for async event processing
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
index.remove_worker(0).await; index.remove_worker(0).await;
// Allow time for async remove_worker processing // Allow time for async remove_worker processing
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
let scores = index let scores = index
.find_matches(vec![ .find_matches(vec![
...@@ -404,7 +412,7 @@ async fn test_large_stores(variant: &str) { ...@@ -404,7 +412,7 @@ async fn test_large_stores(variant: &str) {
.await; .await;
} }
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Verify we can find matches for the last stored sequence // Verify we can find matches for the last stored sequence
let last_seq: Vec<LocalBlockHash> = (1..=512u64) let last_seq: Vec<LocalBlockHash> = (1..=512u64)
...@@ -424,7 +432,7 @@ async fn test_dump_and_restore(variant: &str) { ...@@ -424,7 +432,7 @@ async fn test_dump_and_restore(variant: &str) {
index.apply_event(make_store_event(1, &[1, 2, 4])).await; index.apply_event(make_store_event(1, &[1, 2, 4])).await;
// Allow background worker threads to process events. // Allow background worker threads to process events.
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Dump the tree as events and replay into a new index // Dump the tree as events and replay into a new index
let events = index.dump_events().await.unwrap(); let events = index.dump_events().await.unwrap();
...@@ -435,7 +443,7 @@ async fn test_dump_and_restore(variant: &str) { ...@@ -435,7 +443,7 @@ async fn test_dump_and_restore(variant: &str) {
restored.apply_event(event).await; restored.apply_event(event).await;
} }
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(restored.as_ref()).await;
assert_eq!( assert_eq!(
snapshot_tree(index.as_ref()).await, snapshot_tree(index.as_ref()).await,
...@@ -455,7 +463,7 @@ async fn test_clear_all_blocks(variant: &str) { ...@@ -455,7 +463,7 @@ async fn test_clear_all_blocks(variant: &str) {
// Clear worker 0's blocks using the Cleared event // Clear worker 0's blocks using the Cleared event
index.apply_event(make_clear_event(0)).await; index.apply_event(make_clear_event(0)).await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Worker 0's blocks should be gone, worker 1's remain // Worker 0's blocks should be gone, worker 1's remain
let scores = index let scores = index
...@@ -477,7 +485,7 @@ async fn test_empty_query(variant: &str) { ...@@ -477,7 +485,7 @@ async fn test_empty_query(variant: &str) {
index.apply_event(make_store_event(0, &[1, 2, 3])).await; index.apply_event(make_store_event(0, &[1, 2, 3])).await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Empty query should return empty scores // Empty query should return empty scores
let scores = index.find_matches(vec![]).await.unwrap(); let scores = index.find_matches(vec![]).await.unwrap();
...@@ -491,7 +499,7 @@ async fn test_miss_query(variant: &str) { ...@@ -491,7 +499,7 @@ async fn test_miss_query(variant: &str) {
index.apply_event(make_store_event(0, &[1, 2, 3])).await; index.apply_event(make_store_event(0, &[1, 2, 3])).await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Query for non-existent blocks // Query for non-existent blocks
let scores = index let scores = index
...@@ -513,7 +521,7 @@ async fn test_shutdown(variant: &str) { ...@@ -513,7 +521,7 @@ async fn test_shutdown(variant: &str) {
async fn test_shutdown_idempotent(variant: &str) { async fn test_shutdown_idempotent(variant: &str) {
let index = make_indexer(variant); let index = make_indexer(variant);
index.apply_event(make_store_event(0, &[1, 2, 3])).await; index.apply_event(make_store_event(0, &[1, 2, 3])).await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
index.shutdown(); index.shutdown();
index.shutdown(); index.shutdown();
} }
...@@ -532,7 +540,7 @@ async fn test_find_matches_for_request(variant: &str) { ...@@ -532,7 +540,7 @@ async fn test_find_matches_for_request(variant: &str) {
index.apply_event(make_store_event(0, &[1, 2, 3])).await; index.apply_event(make_store_event(0, &[1, 2, 3])).await;
// Allow time for async processing // Allow time for async processing
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Note: find_matches_for_request computes block hashes from tokens, // Note: find_matches_for_request computes block hashes from tokens,
// so we need tokens that hash to the same LocalBlockHash values. // so we need tokens that hash to the same LocalBlockHash values.
...@@ -574,7 +582,7 @@ async fn test_parent_hash_chains(variant: &str) { ...@@ -574,7 +582,7 @@ async fn test_parent_hash_chains(variant: &str) {
.apply_event(make_store_event_with_parent(0, &[1, 2, 3], &[4, 5])) .apply_event(make_store_event_with_parent(0, &[1, 2, 3], &[4, 5]))
.await; .await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Query for full sequence [1, 2, 3, 4, 5] should match all 5 blocks // Query for full sequence [1, 2, 3, 4, 5] should match all 5 blocks
let full_seq: Vec<LocalBlockHash> = (1..=5).map(LocalBlockHash).collect(); let full_seq: Vec<LocalBlockHash> = (1..=5).map(LocalBlockHash).collect();
...@@ -604,7 +612,7 @@ async fn test_multiple_dp_ranks(variant: &str) { ...@@ -604,7 +612,7 @@ async fn test_multiple_dp_ranks(variant: &str) {
.apply_event(make_store_event_with_dp_rank(0, &[1, 2, 3], 2)) .apply_event(make_store_event_with_dp_rank(0, &[1, 2, 3], 2))
.await; .await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Query should return all 3 dp_ranks as separate entries // Query should return all 3 dp_ranks as separate entries
let seq: Vec<LocalBlockHash> = (1..=3).map(LocalBlockHash).collect(); let seq: Vec<LocalBlockHash> = (1..=3).map(LocalBlockHash).collect();
...@@ -624,7 +632,7 @@ async fn test_partial_block_removal(variant: &str) { ...@@ -624,7 +632,7 @@ async fn test_partial_block_removal(variant: &str) {
// Store [1, 2, 3] // Store [1, 2, 3]
index.apply_event(make_store_event(0, &[1, 2, 3])).await; index.apply_event(make_store_event(0, &[1, 2, 3])).await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Verify all 3 blocks match // Verify all 3 blocks match
let seq: Vec<LocalBlockHash> = (1..=3).map(LocalBlockHash).collect(); let seq: Vec<LocalBlockHash> = (1..=3).map(LocalBlockHash).collect();
...@@ -650,7 +658,7 @@ async fn test_partial_block_removal(variant: &str) { ...@@ -650,7 +658,7 @@ async fn test_partial_block_removal(variant: &str) {
}; };
index.apply_event(remove_event).await; index.apply_event(remove_event).await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Query [1, 2, 3] - should only match 2 blocks now (block 3 is removed) // Query [1, 2, 3] - should only match 2 blocks now (block 3 is removed)
let scores = index.find_matches(seq).await.unwrap(); let scores = index.find_matches(seq).await.unwrap();
...@@ -678,7 +686,7 @@ async fn test_remove_mid_chain_block(variant: &str) { ...@@ -678,7 +686,7 @@ async fn test_remove_mid_chain_block(variant: &str) {
.apply_event(make_store_event(0, &[1, 2, 3, 4, 5])) .apply_event(make_store_event(0, &[1, 2, 3, 4, 5]))
.await; .await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Verify all 5 blocks match // Verify all 5 blocks match
let seq: Vec<LocalBlockHash> = (1..=5).map(LocalBlockHash).collect(); let seq: Vec<LocalBlockHash> = (1..=5).map(LocalBlockHash).collect();
...@@ -702,7 +710,7 @@ async fn test_remove_mid_chain_block(variant: &str) { ...@@ -702,7 +710,7 @@ async fn test_remove_mid_chain_block(variant: &str) {
}; };
index.apply_event(remove_event).await; index.apply_event(remove_event).await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Query [1, 2, 3, 4, 5] — only first 2 positions reachable (block 3 removed, orphaning 4 & 5) // Query [1, 2, 3, 4, 5] — only first 2 positions reachable (block 3 removed, orphaning 4 & 5)
let scores = index.find_matches(seq.clone()).await.unwrap(); let scores = index.find_matches(seq.clone()).await.unwrap();
...@@ -718,7 +726,7 @@ async fn test_remove_mid_chain_block(variant: &str) { ...@@ -718,7 +726,7 @@ async fn test_remove_mid_chain_block(variant: &str) {
.apply_event(make_store_event_with_parent(0, &[1, 2], &[3])) .apply_event(make_store_event_with_parent(0, &[1, 2], &[3]))
.await; .await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Query [1, 2, 3, 4, 5] — block 3 is back but 4 & 5 were orphaned, so score = 3 // Query [1, 2, 3, 4, 5] — block 3 is back but 4 & 5 were orphaned, so score = 3
let scores = index.find_matches(seq).await.unwrap(); let scores = index.find_matches(seq).await.unwrap();
...@@ -733,13 +741,13 @@ async fn test_remove_nonexistent_worker(variant: &str) { ...@@ -733,13 +741,13 @@ async fn test_remove_nonexistent_worker(variant: &str) {
// Store data for worker 0 // Store data for worker 0
index.apply_event(make_store_event(0, &[1, 2, 3])).await; index.apply_event(make_store_event(0, &[1, 2, 3])).await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Remove non-existent worker 999 - should not error or affect worker 0 // Remove non-existent worker 999 - should not error or affect worker 0
index.remove_worker(999).await; index.remove_worker(999).await;
// Allow time for async processing // Allow time for async processing
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Worker 0's data should still be there // Worker 0's data should still be there
let seq: Vec<LocalBlockHash> = (1..=3).map(LocalBlockHash).collect(); let seq: Vec<LocalBlockHash> = (1..=3).map(LocalBlockHash).collect();
...@@ -759,7 +767,7 @@ async fn test_remove_nonexistent_blocks(variant: &str) { ...@@ -759,7 +767,7 @@ async fn test_remove_nonexistent_blocks(variant: &str) {
// Try to remove blocks [999, 998] that don't exist - should not error // Try to remove blocks [999, 998] that don't exist - should not error
index.apply_event(make_remove_event(0, &[999, 998])).await; index.apply_event(make_remove_event(0, &[999, 998])).await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Original data should still be there // Original data should still be there
let seq: Vec<LocalBlockHash> = (1..=3).map(LocalBlockHash).collect(); let seq: Vec<LocalBlockHash> = (1..=3).map(LocalBlockHash).collect();
...@@ -778,7 +786,7 @@ async fn test_clear_then_reuse(variant: &str) { ...@@ -778,7 +786,7 @@ async fn test_clear_then_reuse(variant: &str) {
// Clear the worker // Clear the worker
index.apply_event(make_clear_event(0)).await; index.apply_event(make_clear_event(0)).await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Verify data is gone // Verify data is gone
let seq: Vec<LocalBlockHash> = (1..=3).map(LocalBlockHash).collect(); let seq: Vec<LocalBlockHash> = (1..=3).map(LocalBlockHash).collect();
...@@ -788,7 +796,7 @@ async fn test_clear_then_reuse(variant: &str) { ...@@ -788,7 +796,7 @@ async fn test_clear_then_reuse(variant: &str) {
// Store new data for the same worker // Store new data for the same worker
index.apply_event(make_store_event(0, &[1, 2, 3])).await; index.apply_event(make_store_event(0, &[1, 2, 3])).await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Verify new data is accessible // Verify new data is accessible
let scores = index.find_matches(seq).await.unwrap(); let scores = index.find_matches(seq).await.unwrap();
...@@ -809,7 +817,7 @@ async fn test_multiple_sequences_per_worker(variant: &str) { ...@@ -809,7 +817,7 @@ async fn test_multiple_sequences_per_worker(variant: &str) {
.apply_event(make_store_event(0, &[100, 101, 102])) .apply_event(make_store_event(0, &[100, 101, 102]))
.await; .await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Query first sequence // Query first sequence
let seq1: Vec<LocalBlockHash> = (1..=3).map(LocalBlockHash).collect(); let seq1: Vec<LocalBlockHash> = (1..=3).map(LocalBlockHash).collect();
...@@ -841,7 +849,7 @@ async fn test_clear_clears_all_dp_ranks(variant: &str) { ...@@ -841,7 +849,7 @@ async fn test_clear_clears_all_dp_ranks(variant: &str) {
.apply_event(make_store_event_with_dp_rank(0, &[1, 2, 3], 1)) .apply_event(make_store_event_with_dp_rank(0, &[1, 2, 3], 1))
.await; .await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Verify both dp_ranks are present // Verify both dp_ranks are present
let seq: Vec<LocalBlockHash> = (1..=3).map(LocalBlockHash).collect(); let seq: Vec<LocalBlockHash> = (1..=3).map(LocalBlockHash).collect();
...@@ -851,7 +859,7 @@ async fn test_clear_clears_all_dp_ranks(variant: &str) { ...@@ -851,7 +859,7 @@ async fn test_clear_clears_all_dp_ranks(variant: &str) {
// Clear event clears ALL blocks for the worker_id, regardless of dp_rank // Clear event clears ALL blocks for the worker_id, regardless of dp_rank
index.apply_event(make_clear_event_with_dp_rank(0, 0)).await; index.apply_event(make_clear_event_with_dp_rank(0, 0)).await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Both dp_ranks should be cleared // Both dp_ranks should be cleared
let scores = index.find_matches(seq).await.unwrap(); let scores = index.find_matches(seq).await.unwrap();
...@@ -930,7 +938,7 @@ async fn test_lora_and_base_model_blocks_do_not_conflict(variant: &str) { ...@@ -930,7 +938,7 @@ async fn test_lora_and_base_model_blocks_do_not_conflict(variant: &str) {
}; };
index.apply_event(lora_event).await; index.apply_event(lora_event).await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Query with base-model hashes → only worker 0 // Query with base-model hashes → only worker 0
let base_scores = index.find_matches(base_hashes.clone()).await.unwrap(); let base_scores = index.find_matches(base_hashes.clone()).await.unwrap();
...@@ -1040,7 +1048,7 @@ async fn test_lora_base_same_tokens_no_seq_hash_mismatch(variant: &str) { ...@@ -1040,7 +1048,7 @@ async fn test_lora_base_same_tokens_no_seq_hash_mismatch(variant: &str) {
}) })
.await; .await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Base query finds only worker 0 // Base query finds only worker 0
let base_scores = index.find_matches(base_local.clone()).await.unwrap(); let base_scores = index.find_matches(base_local.clone()).await.unwrap();
...@@ -1130,7 +1138,7 @@ async fn test_different_lora_adapters_do_not_conflict(variant: &str) { ...@@ -1130,7 +1138,7 @@ async fn test_different_lora_adapters_do_not_conflict(variant: &str) {
}) })
.await; .await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Query adapter-a → only worker 0 // Query adapter-a → only worker 0
let scores_a = index.find_matches(hashes_a.clone()).await.unwrap(); let scores_a = index.find_matches(hashes_a.clone()).await.unwrap();
...@@ -1159,7 +1167,7 @@ async fn test_long_sequence_single_store(variant: &str) { ...@@ -1159,7 +1167,7 @@ async fn test_long_sequence_single_store(variant: &str) {
let sequence: Vec<u64> = (1..=seq_len).collect(); let sequence: Vec<u64> = (1..=seq_len).collect();
index.apply_event(make_store_event(0, &sequence)).await; index.apply_event(make_store_event(0, &sequence)).await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Query full sequence - should match all blocks // Query full sequence - should match all blocks
let full_query: Vec<LocalBlockHash> = sequence.iter().map(|&i| LocalBlockHash(i)).collect(); let full_query: Vec<LocalBlockHash> = sequence.iter().map(|&i| LocalBlockHash(i)).collect();
...@@ -1211,7 +1219,7 @@ async fn test_long_sequence_multiple_continuations(variant: &str) { ...@@ -1211,7 +1219,7 @@ async fn test_long_sequence_multiple_continuations(variant: &str) {
.apply_event(make_store_event_with_parent(0, &prefix_1_2, &third_chunk)) .apply_event(make_store_event_with_parent(0, &prefix_1_2, &third_chunk))
.await; .await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Query full sequence - should match all 150 blocks // Query full sequence - should match all 150 blocks
let full_query: Vec<LocalBlockHash> = (1..=150).map(LocalBlockHash).collect(); let full_query: Vec<LocalBlockHash> = (1..=150).map(LocalBlockHash).collect();
...@@ -1254,7 +1262,7 @@ async fn test_long_sequence_branching_continuations(variant: &str) { ...@@ -1254,7 +1262,7 @@ async fn test_long_sequence_branching_continuations(variant: &str) {
.apply_event(make_store_event_with_parent(1, &common_prefix, &branch_b)) .apply_event(make_store_event_with_parent(1, &common_prefix, &branch_b))
.await; .await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Query common prefix - both workers should match // Query common prefix - both workers should match
let prefix_query: Vec<LocalBlockHash> = (1..=30).map(LocalBlockHash).collect(); let prefix_query: Vec<LocalBlockHash> = (1..=30).map(LocalBlockHash).collect();
...@@ -1291,7 +1299,7 @@ async fn test_long_sequence_partial_removal(variant: &str) { ...@@ -1291,7 +1299,7 @@ async fn test_long_sequence_partial_removal(variant: &str) {
let sequence: Vec<u64> = (1..=100).collect(); let sequence: Vec<u64> = (1..=100).collect();
index.apply_event(make_store_event(0, &sequence)).await; index.apply_event(make_store_event(0, &sequence)).await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Verify full match // Verify full match
let full_query: Vec<LocalBlockHash> = sequence.iter().map(|&i| LocalBlockHash(i)).collect(); let full_query: Vec<LocalBlockHash> = sequence.iter().map(|&i| LocalBlockHash(i)).collect();
...@@ -1321,7 +1329,7 @@ async fn test_long_sequence_partial_removal(variant: &str) { ...@@ -1321,7 +1329,7 @@ async fn test_long_sequence_partial_removal(variant: &str) {
}; };
index.apply_event(remove_event).await; index.apply_event(remove_event).await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Query should now only match first 79 blocks // Query should now only match first 79 blocks
let scores = index.find_matches(full_query).await.unwrap(); let scores = index.find_matches(full_query).await.unwrap();
...@@ -1352,7 +1360,7 @@ async fn test_long_sequence_interleaved_workers(variant: &str) { ...@@ -1352,7 +1360,7 @@ async fn test_long_sequence_interleaved_workers(variant: &str) {
index.apply_event(make_store_event(2, &seq_50)).await; index.apply_event(make_store_event(2, &seq_50)).await;
index.apply_event(make_store_event(3, &seq_25)).await; index.apply_event(make_store_event(3, &seq_25)).await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Query for 60 blocks - workers 0,1 match 60, worker 2 matches 50, worker 3 matches 25 // Query for 60 blocks - workers 0,1 match 60, worker 2 matches 50, worker 3 matches 25
let query_60: Vec<LocalBlockHash> = (1..=60).map(LocalBlockHash).collect(); let query_60: Vec<LocalBlockHash> = (1..=60).map(LocalBlockHash).collect();
...@@ -1396,7 +1404,7 @@ async fn test_long_sequence_exact_jump_size_boundaries(variant: &str) { ...@@ -1396,7 +1404,7 @@ async fn test_long_sequence_exact_jump_size_boundaries(variant: &str) {
let seq_96: Vec<u64> = (2001..=2096).collect(); let seq_96: Vec<u64> = (2001..=2096).collect();
index.apply_event(make_store_event(2, &seq_96)).await; index.apply_event(make_store_event(2, &seq_96)).await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Verify all sequences match correctly // Verify all sequences match correctly
let query_32: Vec<LocalBlockHash> = seq_32.iter().map(|&i| LocalBlockHash(i)).collect(); let query_32: Vec<LocalBlockHash> = seq_32.iter().map(|&i| LocalBlockHash(i)).collect();
...@@ -1437,7 +1445,7 @@ async fn test_long_sequence_off_by_one_jump_boundaries(variant: &str) { ...@@ -1437,7 +1445,7 @@ async fn test_long_sequence_off_by_one_jump_boundaries(variant: &str) {
index.apply_event(make_store_event(2, &seq_63)).await; index.apply_event(make_store_event(2, &seq_63)).await;
index.apply_event(make_store_event(3, &seq_65)).await; index.apply_event(make_store_event(3, &seq_65)).await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Verify all sequences match correctly // Verify all sequences match correctly
let query_31: Vec<LocalBlockHash> = seq_31.iter().map(|&i| LocalBlockHash(i)).collect(); let query_31: Vec<LocalBlockHash> = seq_31.iter().map(|&i| LocalBlockHash(i)).collect();
...@@ -1478,7 +1486,7 @@ async fn test_long_sequence_divergence_at_jump_boundaries(variant: &str) { ...@@ -1478,7 +1486,7 @@ async fn test_long_sequence_divergence_at_jump_boundaries(variant: &str) {
let sequence: Vec<u64> = (1..=128).collect(); let sequence: Vec<u64> = (1..=128).collect();
index.apply_event(make_store_event(0, &sequence)).await; index.apply_event(make_store_event(0, &sequence)).await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Test divergence exactly at jump boundaries (position 31, 32, 33, 63, 64, 65) // Test divergence exactly at jump boundaries (position 31, 32, 33, 63, 64, 65)
for diverge_pos in [31usize, 32, 33, 63, 64, 65, 95, 96, 97] { for diverge_pos in [31usize, 32, 33, 63, 64, 65, 95, 96, 97] {
...@@ -1525,7 +1533,7 @@ async fn test_long_sequence_deep_continuation_chain(variant: &str) { ...@@ -1525,7 +1533,7 @@ async fn test_long_sequence_deep_continuation_chain(variant: &str) {
full_prefix.extend(&chunk); full_prefix.extend(&chunk);
} }
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Query full sequence // Query full sequence
let full_query: Vec<LocalBlockHash> = (1..=200).map(LocalBlockHash).collect(); let full_query: Vec<LocalBlockHash> = (1..=200).map(LocalBlockHash).collect();
...@@ -1553,7 +1561,7 @@ async fn test_long_sequence_clear_and_rebuild(variant: &str) { ...@@ -1553,7 +1561,7 @@ async fn test_long_sequence_clear_and_rebuild(variant: &str) {
let sequence: Vec<u64> = (1..=100).collect(); let sequence: Vec<u64> = (1..=100).collect();
index.apply_event(make_store_event(0, &sequence)).await; index.apply_event(make_store_event(0, &sequence)).await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Verify it's stored // Verify it's stored
let query: Vec<LocalBlockHash> = sequence.iter().map(|&i| LocalBlockHash(i)).collect(); let query: Vec<LocalBlockHash> = sequence.iter().map(|&i| LocalBlockHash(i)).collect();
...@@ -1566,7 +1574,7 @@ async fn test_long_sequence_clear_and_rebuild(variant: &str) { ...@@ -1566,7 +1574,7 @@ async fn test_long_sequence_clear_and_rebuild(variant: &str) {
// Clear the worker // Clear the worker
index.apply_event(make_clear_event(0)).await; index.apply_event(make_clear_event(0)).await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Verify it's cleared // Verify it's cleared
let scores = index.find_matches(query.clone()).await.unwrap(); let scores = index.find_matches(query.clone()).await.unwrap();
...@@ -1576,7 +1584,7 @@ async fn test_long_sequence_clear_and_rebuild(variant: &str) { ...@@ -1576,7 +1584,7 @@ async fn test_long_sequence_clear_and_rebuild(variant: &str) {
let new_sequence: Vec<u64> = (1001..=1100).collect(); let new_sequence: Vec<u64> = (1001..=1100).collect();
index.apply_event(make_store_event(0, &new_sequence)).await; index.apply_event(make_store_event(0, &new_sequence)).await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Verify new sequence works // Verify new sequence works
let new_query: Vec<LocalBlockHash> = new_sequence.iter().map(|&i| LocalBlockHash(i)).collect(); let new_query: Vec<LocalBlockHash> = new_sequence.iter().map(|&i| LocalBlockHash(i)).collect();
...@@ -1632,7 +1640,7 @@ async fn test_long_sequence_multiple_workers_diverging(variant: &str) { ...@@ -1632,7 +1640,7 @@ async fn test_long_sequence_multiple_workers_diverging(variant: &str) {
)) ))
.await; .await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Query 1-100 - worker 0 matches 100, workers 1&2 match 40 // Query 1-100 - worker 0 matches 100, workers 1&2 match 40
let query: Vec<LocalBlockHash> = worker_0_full.iter().map(|&i| LocalBlockHash(i)).collect(); let query: Vec<LocalBlockHash> = worker_0_full.iter().map(|&i| LocalBlockHash(i)).collect();
...@@ -1671,7 +1679,7 @@ async fn test_long_sequence_staggered_lengths(variant: &str) { ...@@ -1671,7 +1679,7 @@ async fn test_long_sequence_staggered_lengths(variant: &str) {
.await; .await;
} }
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Query for 100 blocks - each worker should match their stored length // Query for 100 blocks - each worker should match their stored length
let query: Vec<LocalBlockHash> = (1..=100).map(LocalBlockHash).collect(); let query: Vec<LocalBlockHash> = (1..=100).map(LocalBlockHash).collect();
...@@ -1709,7 +1717,7 @@ async fn test_very_long_sequence(variant: &str) { ...@@ -1709,7 +1717,7 @@ async fn test_very_long_sequence(variant: &str) {
let sequence: Vec<u64> = (1..=seq_len).collect(); let sequence: Vec<u64> = (1..=seq_len).collect();
index.apply_event(make_store_event(0, &sequence)).await; index.apply_event(make_store_event(0, &sequence)).await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
// Full match // Full match
let full_query: Vec<LocalBlockHash> = sequence.iter().map(|&i| LocalBlockHash(i)).collect(); let full_query: Vec<LocalBlockHash> = sequence.iter().map(|&i| LocalBlockHash(i)).collect();
...@@ -2102,7 +2110,7 @@ async fn test_apply_events_idempotent(variant: &str) { ...@@ -2102,7 +2110,7 @@ async fn test_apply_events_idempotent(variant: &str) {
index index
.apply_event(make_store_event_with_parent(0, &[1, 2, 3], &[7, 8])) .apply_event(make_store_event_with_parent(0, &[1, 2, 3], &[7, 8]))
.await; .await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
let s0 = snapshot_tree(index.as_ref()).await; let s0 = snapshot_tree(index.as_ref()).await;
// Mutation events: each add paired with its remove // Mutation events: each add paired with its remove
...@@ -2120,7 +2128,7 @@ async fn test_apply_events_idempotent(variant: &str) { ...@@ -2120,7 +2128,7 @@ async fn test_apply_events_idempotent(variant: &str) {
index.apply_event(removes[0].clone()).await; index.apply_event(removes[0].clone()).await;
index.apply_event(adds[1].clone()).await; index.apply_event(adds[1].clone()).await;
index.apply_event(removes[1].clone()).await; index.apply_event(removes[1].clone()).await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
let s1 = snapshot_tree(index.as_ref()).await; let s1 = snapshot_tree(index.as_ref()).await;
assert_eq!( assert_eq!(
s0, s1, s0, s1,
...@@ -2132,7 +2140,7 @@ async fn test_apply_events_idempotent(variant: &str) { ...@@ -2132,7 +2140,7 @@ async fn test_apply_events_idempotent(variant: &str) {
index.apply_event(removes[0].clone()).await; index.apply_event(removes[0].clone()).await;
index.apply_event(adds[1].clone()).await; index.apply_event(adds[1].clone()).await;
index.apply_event(removes[1].clone()).await; index.apply_event(removes[1].clone()).await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
let s2 = snapshot_tree(index.as_ref()).await; let s2 = snapshot_tree(index.as_ref()).await;
assert_eq!(s1, s2, "Phase 2: repeated cycle should be idempotent"); assert_eq!(s1, s2, "Phase 2: repeated cycle should be idempotent");
...@@ -2141,7 +2149,7 @@ async fn test_apply_events_idempotent(variant: &str) { ...@@ -2141,7 +2149,7 @@ async fn test_apply_events_idempotent(variant: &str) {
index.apply_event(adds[1].clone()).await; index.apply_event(adds[1].clone()).await;
index.apply_event(removes[0].clone()).await; index.apply_event(removes[0].clone()).await;
index.apply_event(removes[1].clone()).await; index.apply_event(removes[1].clone()).await;
tokio::time::sleep(Duration::from_millis(100)).await; flush_and_settle(index.as_ref()).await;
let s3 = snapshot_tree(index.as_ref()).await; let s3 = snapshot_tree(index.as_ref()).await;
assert_eq!( assert_eq!(
s2, s3, s2, s3,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment