Unverified Commit dc14ce05 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

test(kv-router): add mid-chain removal, idempotency tests and flush_wait helpers (#6948)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 6be2a9b1
......@@ -146,6 +146,75 @@ fn make_remove_event_with_dp_rank(
}
}
/// Create a remove event with parent hash for continuation sequences.
/// `prefix_hashes` are the hashes of the prefix (to compute parent_hash and full seq context).
/// `local_hashes` are the blocks being removed.
fn make_remove_event_with_parent(
worker_id: u64,
prefix_hashes: &[u64],
local_hashes: &[u64],
) -> RouterEvent {
let full_hashes: Vec<u64> = prefix_hashes
.iter()
.chain(local_hashes.iter())
.copied()
.collect();
let full_block_hashes: Vec<LocalBlockHash> =
full_hashes.iter().map(|&h| LocalBlockHash(h)).collect();
let full_seq_hashes = compute_seq_hash_for_block(&full_block_hashes);
let suffix_seq_hashes = &full_seq_hashes[prefix_hashes.len()..];
RouterEvent {
worker_id,
event: KvCacheEvent {
event_id: 0,
data: KvCacheEventData::Removed(KvCacheRemoveData {
block_hashes: suffix_seq_hashes
.iter()
.map(|&h| ExternalSequenceBlockHash(h))
.collect(),
}),
dp_rank: 0,
},
}
}
/// Snapshot the tree state for deterministic comparison.
/// Dumps all events, zeros out `event_id`, and sorts by `(worker_id, dp_rank, block_hash)`.
async fn snapshot_tree(index: &dyn KvIndexerInterface) -> Vec<RouterEvent> {
let mut events = index.dump_events().await.unwrap();
for ev in &mut events {
ev.event.event_id = 0;
}
events.sort_by(|a, b| {
a.worker_id.cmp(&b.worker_id).then_with(|| {
a.event.dp_rank.cmp(&b.event.dp_rank).then_with(|| {
let hash_a = match &a.event.data {
KvCacheEventData::Stored(s) => {
s.blocks.first().map(|b| b.block_hash.0).unwrap_or(0)
}
KvCacheEventData::Removed(r) => {
r.block_hashes.first().map(|h| h.0).unwrap_or(0)
}
KvCacheEventData::Cleared => 0,
};
let hash_b = match &b.event.data {
KvCacheEventData::Stored(s) => {
s.blocks.first().map(|b| b.block_hash.0).unwrap_or(0)
}
KvCacheEventData::Removed(r) => {
r.block_hashes.first().map(|h| h.0).unwrap_or(0)
}
KvCacheEventData::Cleared => 0,
};
hash_a.cmp(&hash_b)
})
})
});
events
}
/// Create a clear event for a worker.
fn make_clear_event(worker_id: u64) -> RouterEvent {
make_clear_event_with_dp_rank(worker_id, 0)
......@@ -357,29 +426,21 @@ async fn test_dump_and_restore(variant: &str) {
// Allow background worker threads to process events.
tokio::time::sleep(Duration::from_millis(100)).await;
// Dump the tree as events
// Dump the tree as events and replay into a new index
let events = index.dump_events().await.unwrap();
assert!(!events.is_empty());
// Create a new index and replay events
let restored = make_indexer(variant);
for event in events {
restored.apply_event(event).await;
}
// Allow background worker threads to process replayed events.
tokio::time::sleep(Duration::from_millis(100)).await;
// Verify find_matches produces same results
let original_scores = index
.find_matches(vec![LocalBlockHash(1), LocalBlockHash(2)])
.await
.unwrap();
let restored_scores = restored
.find_matches(vec![LocalBlockHash(1), LocalBlockHash(2)])
.await
.unwrap();
assert_eq!(original_scores.scores, restored_scores.scores);
assert_eq!(
snapshot_tree(index.as_ref()).await,
snapshot_tree(restored.as_ref()).await
);
}
#[tokio::test]
......@@ -416,7 +477,7 @@ async fn test_empty_query(variant: &str) {
index.apply_event(make_store_event(0, &[1, 2, 3])).await;
index.flush().await;
tokio::time::sleep(Duration::from_millis(100)).await;
// Empty query should return empty scores
let scores = index.find_matches(vec![]).await.unwrap();
......@@ -430,7 +491,7 @@ async fn test_miss_query(variant: &str) {
index.apply_event(make_store_event(0, &[1, 2, 3])).await;
index.flush().await;
tokio::time::sleep(Duration::from_millis(100)).await;
// Query for non-existent blocks
let scores = index
......@@ -471,7 +532,7 @@ async fn test_find_matches_for_request(variant: &str) {
index.apply_event(make_store_event(0, &[1, 2, 3])).await;
// Allow time for async processing
index.flush().await;
tokio::time::sleep(Duration::from_millis(100)).await;
// Note: find_matches_for_request computes block hashes from tokens,
// so we need tokens that hash to the same LocalBlockHash values.
......@@ -513,7 +574,7 @@ async fn test_parent_hash_chains(variant: &str) {
.apply_event(make_store_event_with_parent(0, &[1, 2, 3], &[4, 5]))
.await;
index.flush().await;
tokio::time::sleep(Duration::from_millis(100)).await;
// Query for full sequence [1, 2, 3, 4, 5] should match all 5 blocks
let full_seq: Vec<LocalBlockHash> = (1..=5).map(LocalBlockHash).collect();
......@@ -543,7 +604,7 @@ async fn test_multiple_dp_ranks(variant: &str) {
.apply_event(make_store_event_with_dp_rank(0, &[1, 2, 3], 2))
.await;
index.flush().await;
tokio::time::sleep(Duration::from_millis(100)).await;
// Query should return all 3 dp_ranks as separate entries
let seq: Vec<LocalBlockHash> = (1..=3).map(LocalBlockHash).collect();
......@@ -601,6 +662,69 @@ async fn test_partial_block_removal(variant: &str) {
assert_eq!(*scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(), 2);
}
#[tokio::test]
#[apply(indexer_template)]
async fn test_remove_mid_chain_block(variant: &str) {
// TODO: positional indexer has no parent-child structure, so mid-chain removal
// doesn't invalidate later positions — jump search skips over the gap and over-counts.
if variant == "flat" {
return;
}
let index = make_indexer(variant);
// Store [1, 2, 3, 4, 5]
index
.apply_event(make_store_event(0, &[1, 2, 3, 4, 5]))
.await;
tokio::time::sleep(Duration::from_millis(100)).await;
// Verify all 5 blocks match
let seq: Vec<LocalBlockHash> = (1..=5).map(LocalBlockHash).collect();
let scores = index.find_matches(seq.clone()).await.unwrap();
assert_eq!(*scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(), 5);
// Remove only block 3 (index 2) — the middle of the chain
let full_hashes: Vec<LocalBlockHash> = (1..=5).map(LocalBlockHash).collect();
let seq_hashes = compute_seq_hash_for_block(&full_hashes);
let block_3_seq_hash = ExternalSequenceBlockHash(seq_hashes[2]);
let remove_event = RouterEvent {
worker_id: 0,
event: KvCacheEvent {
event_id: 0,
data: KvCacheEventData::Removed(KvCacheRemoveData {
block_hashes: vec![block_3_seq_hash],
}),
dp_rank: 0,
},
};
index.apply_event(remove_event).await;
tokio::time::sleep(Duration::from_millis(100)).await;
// Query [1, 2, 3, 4, 5] — only first 2 positions reachable (block 3 removed, orphaning 4 & 5)
let scores = index.find_matches(seq.clone()).await.unwrap();
assert_eq!(*scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(), 2);
// Query [1, 2] — prefix before the gap is still intact
let prefix_seq: Vec<LocalBlockHash> = (1..=2).map(LocalBlockHash).collect();
let scores = index.find_matches(prefix_seq).await.unwrap();
assert_eq!(*scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(), 2);
// Re-store block 3 as a continuation of [1, 2]
index
.apply_event(make_store_event_with_parent(0, &[1, 2], &[3]))
.await;
tokio::time::sleep(Duration::from_millis(100)).await;
// Query [1, 2, 3, 4, 5] — block 3 is back but 4 & 5 were orphaned, so score = 3
let scores = index.find_matches(seq).await.unwrap();
assert_eq!(*scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(), 3);
}
#[tokio::test]
#[apply(indexer_template)]
async fn test_remove_nonexistent_worker(variant: &str) {
......@@ -635,7 +759,7 @@ async fn test_remove_nonexistent_blocks(variant: &str) {
// Try to remove blocks [999, 998] that don't exist - should not error
index.apply_event(make_remove_event(0, &[999, 998])).await;
index.flush().await;
tokio::time::sleep(Duration::from_millis(100)).await;
// Original data should still be there
let seq: Vec<LocalBlockHash> = (1..=3).map(LocalBlockHash).collect();
......@@ -685,7 +809,7 @@ async fn test_multiple_sequences_per_worker(variant: &str) {
.apply_event(make_store_event(0, &[100, 101, 102]))
.await;
index.flush().await;
tokio::time::sleep(Duration::from_millis(100)).await;
// Query first sequence
let seq1: Vec<LocalBlockHash> = (1..=3).map(LocalBlockHash).collect();
......@@ -806,9 +930,7 @@ async fn test_lora_and_base_model_blocks_do_not_conflict(variant: &str) {
};
index.apply_event(lora_event).await;
// flush + settle time for thread-pool variants
index.flush().await;
tokio::time::sleep(Duration::from_millis(50)).await;
tokio::time::sleep(Duration::from_millis(100)).await;
// Query with base-model hashes → only worker 0
let base_scores = index.find_matches(base_hashes.clone()).await.unwrap();
......@@ -918,8 +1040,7 @@ async fn test_lora_base_same_tokens_no_seq_hash_mismatch(variant: &str) {
})
.await;
index.flush().await;
tokio::time::sleep(Duration::from_millis(50)).await;
tokio::time::sleep(Duration::from_millis(100)).await;
// Base query finds only worker 0
let base_scores = index.find_matches(base_local.clone()).await.unwrap();
......@@ -1009,8 +1130,7 @@ async fn test_different_lora_adapters_do_not_conflict(variant: &str) {
})
.await;
index.flush().await;
tokio::time::sleep(Duration::from_millis(50)).await;
tokio::time::sleep(Duration::from_millis(100)).await;
// Query adapter-a → only worker 0
let scores_a = index.find_matches(hashes_a.clone()).await.unwrap();
......@@ -1134,7 +1254,7 @@ async fn test_long_sequence_branching_continuations(variant: &str) {
.apply_event(make_store_event_with_parent(1, &common_prefix, &branch_b))
.await;
index.flush().await;
tokio::time::sleep(Duration::from_millis(100)).await;
// Query common prefix - both workers should match
let prefix_query: Vec<LocalBlockHash> = (1..=30).map(LocalBlockHash).collect();
......@@ -1883,7 +2003,7 @@ async fn test_local_indexer_get_events_in_id_range_all_cases() {
}
// Wait for events to be processed
tokio::time::sleep(Duration::from_millis(100)).await;
indexer.flush().await;
let extract_events = |resp: WorkerKvQueryResponse| -> Vec<RouterEvent> {
match resp {
......@@ -1952,7 +2072,7 @@ async fn test_local_indexer_buffer_and_serialization() {
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
local_indexer.flush().await;
let buffered_events = local_indexer.get_all_events_in_buffer();
assert_eq!(buffered_events.len(), 1);
......@@ -1970,3 +2090,61 @@ async fn test_local_indexer_buffer_and_serialization() {
assert_eq!(events.len(), 1);
assert_eq!(events[0].worker_id, worker_id);
}
#[tokio::test]
#[apply(indexer_template)]
async fn test_apply_events_idempotent(variant: &str) {
let index = make_indexer(variant);
// Setup: build initial tree
index.apply_event(make_store_event(0, &[1, 2, 3])).await;
index.apply_event(make_store_event(1, &[4, 5, 6])).await;
index
.apply_event(make_store_event_with_parent(0, &[1, 2, 3], &[7, 8]))
.await;
tokio::time::sleep(Duration::from_millis(100)).await;
let s0 = snapshot_tree(index.as_ref()).await;
// Mutation events: each add paired with its remove
let adds = [
make_store_event(2, &[1, 2, 9]),
make_store_event_with_parent(1, &[4, 5, 6], &[10, 11, 12]),
];
let removes = [
make_remove_event(2, &[1, 2, 9]),
make_remove_event_with_parent(1, &[4, 5, 6], &[10, 11, 12]),
];
// Phase 1: interleaved add/remove
index.apply_event(adds[0].clone()).await;
index.apply_event(removes[0].clone()).await;
index.apply_event(adds[1].clone()).await;
index.apply_event(removes[1].clone()).await;
tokio::time::sleep(Duration::from_millis(100)).await;
let s1 = snapshot_tree(index.as_ref()).await;
assert_eq!(
s0, s1,
"Phase 1: interleaved add/remove should restore tree"
);
// Phase 2: same interleaved again (idempotence of the full cycle)
index.apply_event(adds[0].clone()).await;
index.apply_event(removes[0].clone()).await;
index.apply_event(adds[1].clone()).await;
index.apply_event(removes[1].clone()).await;
tokio::time::sleep(Duration::from_millis(100)).await;
let s2 = snapshot_tree(index.as_ref()).await;
assert_eq!(s1, s2, "Phase 2: repeated cycle should be idempotent");
// Phase 3: non-interleaved (all adds then all removes)
index.apply_event(adds[0].clone()).await;
index.apply_event(adds[1].clone()).await;
index.apply_event(removes[0].clone()).await;
index.apply_event(removes[1].clone()).await;
tokio::time::sleep(Duration::from_millis(100)).await;
let s3 = snapshot_tree(index.as_ref()).await;
assert_eq!(
s2, s3,
"Phase 3: non-interleaved ordering should restore tree"
);
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment