Unverified Commit 6365a015 authored by jthomson04's avatar jthomson04 Committed by GitHub
Browse files

fix: Fix main (#1712)

parent aaf283bb
...@@ -535,13 +535,13 @@ impl ApproxKvIndexer { ...@@ -535,13 +535,13 @@ impl ApproxKvIndexer {
let ttl = tokio::time::Duration::from_secs_f64(ttl_secs); let ttl = tokio::time::Duration::from_secs_f64(ttl_secs);
let inner = Arc::new(llm_rs::kv_router::approx::ApproxKvIndexer::new( let inner = Arc::new(llm_rs::kv_router::approx::ApproxKvIndexer::new(
component.inner.drt().runtime().child_token(), component.inner.drt().runtime().child_token(),
kv_block_size, kv_block_size as u32,
ttl, ttl,
)); ));
Ok(Self { inner }) Ok(Self { inner })
} }
fn block_size(&self) -> usize { fn block_size(&self) -> u32 {
self.inner.block_size() self.inner.block_size()
} }
......
...@@ -166,11 +166,11 @@ pub struct ApproxKvIndexer { ...@@ -166,11 +166,11 @@ pub struct ApproxKvIndexer {
/// A handle to the background task managing the KV store. /// A handle to the background task managing the KV store.
task: OnceLock<std::thread::JoinHandle<()>>, task: OnceLock<std::thread::JoinHandle<()>>,
/// The size of the KV block this indexer can handle. /// The size of the KV block this indexer can handle.
kv_block_size: usize, kv_block_size: u32,
} }
impl ApproxKvIndexer { impl ApproxKvIndexer {
pub fn new(token: CancellationToken, kv_block_size: usize, ttl: Duration) -> Self { pub fn new(token: CancellationToken, kv_block_size: u32, ttl: Duration) -> Self {
let (match_tx, mut match_rx) = mpsc::channel::<MatchRequest>(2048); let (match_tx, mut match_rx) = mpsc::channel::<MatchRequest>(2048);
let (route_tx, mut route_rx) = mpsc::channel::<RouterResult>(2048); let (route_tx, mut route_rx) = mpsc::channel::<RouterResult>(2048);
let (remove_worker_tx, mut remove_worker_rx) = mpsc::channel::<WorkerId>(16); let (remove_worker_tx, mut remove_worker_rx) = mpsc::channel::<WorkerId>(16);
...@@ -273,7 +273,7 @@ impl ApproxKvIndexer { ...@@ -273,7 +273,7 @@ impl ApproxKvIndexer {
} }
} }
pub fn block_size(&self) -> usize { pub fn block_size(&self) -> u32 {
self.kv_block_size self.kv_block_size
} }
...@@ -367,6 +367,8 @@ mod tests { ...@@ -367,6 +367,8 @@ mod tests {
use tokio::time::{self, Duration, Instant}; use tokio::time::{self, Duration, Instant};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
const KV_BLOCK_SIZE: u32 = 4;
impl<T: Clone + Hash + Eq + Ord> TimerManager<T> { impl<T: Clone + Hash + Eq + Ord> TimerManager<T> {
pub fn get_expiry(&self, key: &T) -> Option<&Instant> { pub fn get_expiry(&self, key: &T) -> Option<&Instant> {
self.timers.get(key) self.timers.get(key)
...@@ -455,7 +457,6 @@ mod tests { ...@@ -455,7 +457,6 @@ mod tests {
/// 3. Matches disappear after TTL expiry /// 3. Matches disappear after TTL expiry
#[tokio::test] #[tokio::test]
async fn test_approx_kv_indexer_basic_flow() { async fn test_approx_kv_indexer_basic_flow() {
const KV_BLOCK_SIZE: usize = 4;
const TTL: Duration = Duration::from_millis(200); const TTL: Duration = Duration::from_millis(200);
let cancel = CancellationToken::new(); let cancel = CancellationToken::new();
let indexer = ApproxKvIndexer::new(cancel.clone(), KV_BLOCK_SIZE, TTL); let indexer = ApproxKvIndexer::new(cancel.clone(), KV_BLOCK_SIZE, TTL);
...@@ -492,7 +493,6 @@ mod tests { ...@@ -492,7 +493,6 @@ mod tests {
/// Verify that `remove_worker` clears all entries for the specified worker. /// Verify that `remove_worker` clears all entries for the specified worker.
#[tokio::test] #[tokio::test]
async fn test_remove_worker() { async fn test_remove_worker() {
const KV_BLOCK_SIZE: usize = 4;
const TTL: Duration = Duration::from_secs(5); // Large enough to avoid expiry during test const TTL: Duration = Duration::from_secs(5); // Large enough to avoid expiry during test
let cancel = CancellationToken::new(); let cancel = CancellationToken::new();
let mut indexer = ApproxKvIndexer::new(cancel.clone(), KV_BLOCK_SIZE, TTL); let mut indexer = ApproxKvIndexer::new(cancel.clone(), KV_BLOCK_SIZE, TTL);
...@@ -526,7 +526,6 @@ mod tests { ...@@ -526,7 +526,6 @@ mod tests {
/// After removing one of multiple workers that share the same block, the remaining worker's entries should persist. /// After removing one of multiple workers that share the same block, the remaining worker's entries should persist.
#[tokio::test] #[tokio::test]
async fn test_remove_worker_preserves_other_workers() { async fn test_remove_worker_preserves_other_workers() {
const KV_BLOCK_SIZE: usize = 4;
const TTL: Duration = Duration::from_secs(5); // Large enough to avoid expiry during test const TTL: Duration = Duration::from_secs(5); // Large enough to avoid expiry during test
let cancel = CancellationToken::new(); let cancel = CancellationToken::new();
...@@ -568,7 +567,6 @@ mod tests { ...@@ -568,7 +567,6 @@ mod tests {
/// Two sequences with a shared prefix should yield overlap scores reflecting the common blocks. /// Two sequences with a shared prefix should yield overlap scores reflecting the common blocks.
#[tokio::test] #[tokio::test]
async fn test_common_prefix_overlap() { async fn test_common_prefix_overlap() {
const KV_BLOCK_SIZE: usize = 4;
const TTL: Duration = Duration::from_secs(5); const TTL: Duration = Duration::from_secs(5);
let cancel = CancellationToken::new(); let cancel = CancellationToken::new();
...@@ -604,7 +602,6 @@ mod tests { ...@@ -604,7 +602,6 @@ mod tests {
/// When the same block resides on multiple workers, all should appear in the overlap scores. /// When the same block resides on multiple workers, all should appear in the overlap scores.
#[tokio::test] #[tokio::test]
async fn test_multiple_workers_same_block() { async fn test_multiple_workers_same_block() {
const KV_BLOCK_SIZE: usize = 4;
const TTL: Duration = Duration::from_secs(5); const TTL: Duration = Duration::from_secs(5);
let cancel = CancellationToken::new(); let cancel = CancellationToken::new();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment