Unverified Commit 1f10ba55 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

refactor(kv-router): tidy indexer test query helpers (#7801)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 3d8e85e2
...@@ -252,6 +252,23 @@ async fn flush_and_settle(index: &dyn KvIndexerInterface) { ...@@ -252,6 +252,23 @@ async fn flush_and_settle(index: &dyn KvIndexerInterface) {
tokio::time::sleep(Duration::from_millis(100)).await; tokio::time::sleep(Duration::from_millis(100)).await;
} }
async fn query_scores(index: &dyn KvIndexerInterface, query: &[u64]) -> OverlapScores {
index
.find_matches(query.iter().copied().map(LocalBlockHash).collect())
.await
.unwrap()
}
async fn assert_score(
index: &dyn KvIndexerInterface,
query: &[u64],
worker: WorkerWithDpRank,
expected_score: u32,
) {
let scores = query_scores(index, query).await;
assert_eq!(scores.scores.get(&worker), Some(&expected_score));
}
async fn assert_query_score_and_tree_size( async fn assert_query_score_and_tree_size(
index: &dyn KvIndexerInterface, index: &dyn KvIndexerInterface,
query: &[u64], query: &[u64],
...@@ -259,15 +276,28 @@ async fn assert_query_score_and_tree_size( ...@@ -259,15 +276,28 @@ async fn assert_query_score_and_tree_size(
expected_score: u32, expected_score: u32,
expected_tree_size: usize, expected_tree_size: usize,
) { ) {
let scores = index let scores = query_scores(index, query).await;
.find_matches(query.iter().copied().map(LocalBlockHash).collect())
.await
.unwrap();
assert_eq!(scores.scores.get(&worker), Some(&expected_score)); assert_eq!(scores.scores.get(&worker), Some(&expected_score));
assert_eq!(scores.tree_sizes.get(&worker), Some(&expected_tree_size)); assert_eq!(scores.tree_sizes.get(&worker), Some(&expected_tree_size));
} }
async fn assert_no_scores(index: &dyn KvIndexerInterface, query: &[u64]) {
let scores = query_scores(index, query).await;
assert!(scores.scores.is_empty());
}
async fn assert_exact_scores(
index: &dyn KvIndexerInterface,
query: &[u64],
expected_scores: &[(WorkerWithDpRank, u32)],
) {
let scores = query_scores(index, query).await;
assert_eq!(scores.scores.len(), expected_scores.len());
for (worker, expected_score) in expected_scores {
assert_eq!(scores.scores.get(worker), Some(expected_score));
}
}
mod interface_tests { mod interface_tests {
use super::*; use super::*;
use rstest_reuse::apply; use rstest_reuse::apply;
...@@ -282,17 +312,7 @@ mod interface_tests { ...@@ -282,17 +312,7 @@ mod interface_tests {
flush_and_settle(index.as_ref()).await; flush_and_settle(index.as_ref()).await;
// Find matches using local hashes assert_score(index.as_ref(), &[1, 2, 3], WorkerWithDpRank::new(0, 0), 3).await;
let scores = index
.find_matches(vec![
LocalBlockHash(1),
LocalBlockHash(2),
LocalBlockHash(3),
])
.await
.unwrap();
assert_eq!(scores.scores.len(), 1);
assert_eq!(*scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(), 3);
} }
#[tokio::test] #[tokio::test]
...@@ -404,16 +424,7 @@ mod interface_tests { ...@@ -404,16 +424,7 @@ mod interface_tests {
flush_and_settle(index.as_ref()).await; flush_and_settle(index.as_ref()).await;
// Find matches for [1, 2, 999] - should match first 2 then stop assert_score(index.as_ref(), &[1, 2, 999], WorkerWithDpRank::new(0, 0), 2).await;
let scores = index
.find_matches(vec![
LocalBlockHash(1),
LocalBlockHash(2),
LocalBlockHash(999),
])
.await
.unwrap();
assert_eq!(*scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(), 2);
} }
#[tokio::test] #[tokio::test]
...@@ -429,16 +440,7 @@ mod interface_tests { ...@@ -429,16 +440,7 @@ mod interface_tests {
flush_and_settle(index.as_ref()).await; flush_and_settle(index.as_ref()).await;
// Find should return nothing assert_no_scores(index.as_ref(), &[1, 2, 3]).await;
let scores = index
.find_matches(vec![
LocalBlockHash(1),
LocalBlockHash(2),
LocalBlockHash(3),
])
.await
.unwrap();
assert!(scores.scores.is_empty());
} }
#[tokio::test] #[tokio::test]
...@@ -454,20 +456,25 @@ mod interface_tests { ...@@ -454,20 +456,25 @@ mod interface_tests {
flush_and_settle(index.as_ref()).await; flush_and_settle(index.as_ref()).await;
// Query [1] - both workers should match assert_exact_scores(
let scores = index.find_matches(vec![LocalBlockHash(1)]).await.unwrap(); index.as_ref(),
assert_eq!(scores.scores.len(), 2); &[1],
assert_eq!(*scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(), 1); &[
assert_eq!(*scores.scores.get(&WorkerWithDpRank::new(1, 0)).unwrap(), 1); (WorkerWithDpRank::new(0, 0), 1),
(WorkerWithDpRank::new(1, 0), 1),
// Query [1, 2] - worker 0 matches both, worker 1 matches only first block ],
let scores = index )
.find_matches(vec![LocalBlockHash(1), LocalBlockHash(2)]) .await;
.await
.unwrap(); assert_exact_scores(
assert_eq!(scores.scores.len(), 2); index.as_ref(),
assert_eq!(*scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(), 2); &[1, 2],
assert_eq!(*scores.scores.get(&WorkerWithDpRank::new(1, 0)).unwrap(), 1); &[
(WorkerWithDpRank::new(0, 0), 2),
(WorkerWithDpRank::new(1, 0), 1),
],
)
.await;
} }
#[tokio::test] #[tokio::test]
...@@ -486,16 +493,12 @@ mod interface_tests { ...@@ -486,16 +493,12 @@ mod interface_tests {
// Allow time for async remove_worker processing // Allow time for async remove_worker processing
flush_and_settle(index.as_ref()).await; flush_and_settle(index.as_ref()).await;
let scores = index assert_exact_scores(
.find_matches(vec![ index.as_ref(),
LocalBlockHash(1), &[1, 2, 3],
LocalBlockHash(2), &[(WorkerWithDpRank::new(1, 0), 3)],
LocalBlockHash(3), )
]) .await;
.await
.unwrap();
assert_eq!(scores.scores.len(), 1);
assert!(scores.scores.contains_key(&WorkerWithDpRank::new(1, 0)));
} }
#[tokio::test] #[tokio::test]
...@@ -588,9 +591,7 @@ mod interface_tests { ...@@ -588,9 +591,7 @@ mod interface_tests {
flush_and_settle(index.as_ref()).await; flush_and_settle(index.as_ref()).await;
// Empty query should return empty scores assert_no_scores(index.as_ref(), &[]).await;
let scores = index.find_matches(vec![]).await.unwrap();
assert!(scores.scores.is_empty());
} }
#[tokio::test] #[tokio::test]
...@@ -602,12 +603,7 @@ mod interface_tests { ...@@ -602,12 +603,7 @@ mod interface_tests {
flush_and_settle(index.as_ref()).await; flush_and_settle(index.as_ref()).await;
// Query for non-existent blocks assert_no_scores(index.as_ref(), &[999, 998]).await;
let scores = index
.find_matches(vec![LocalBlockHash(999), LocalBlockHash(998)])
.await
.unwrap();
assert!(scores.scores.is_empty());
} }
#[tokio::test] #[tokio::test]
...@@ -692,15 +688,16 @@ mod interface_tests { ...@@ -692,15 +688,16 @@ mod interface_tests {
flush_and_settle(index.as_ref()).await; flush_and_settle(index.as_ref()).await;
// Query for full sequence [1, 2, 3, 4, 5] should match all 5 blocks // Query for full sequence [1, 2, 3, 4, 5] should match all 5 blocks
let full_seq: Vec<LocalBlockHash> = (1..=5).map(LocalBlockHash).collect(); assert_score(
let scores = index.find_matches(full_seq).await.unwrap(); index.as_ref(),
assert_eq!(scores.scores.len(), 1); &[1, 2, 3, 4, 5],
assert_eq!(*scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(), 5); WorkerWithDpRank::new(0, 0),
5,
)
.await;
// Query for just [1, 2, 3] should match 3 blocks // Query for just [1, 2, 3] should match 3 blocks
let prefix_seq: Vec<LocalBlockHash> = (1..=3).map(LocalBlockHash).collect(); assert_score(index.as_ref(), &[1, 2, 3], WorkerWithDpRank::new(0, 0), 3).await;
let scores = index.find_matches(prefix_seq).await.unwrap();
assert_eq!(*scores.scores.get(&WorkerWithDpRank::new(0, 0)).unwrap(), 3);
} }
#[tokio::test] #[tokio::test]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment