Unverified Commit 6f87f932 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

fix(mocker): cancel replay router scheduler tasks (#8429)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent e6ce4db3
...@@ -111,6 +111,7 @@ pub(crate) struct KvReplayRouter { ...@@ -111,6 +111,7 @@ pub(crate) struct KvReplayRouter {
config: KvRouterConfig, config: KvRouterConfig,
block_size: u32, block_size: u32,
scheduler: Arc<ReplayScheduler>, scheduler: Arc<ReplayScheduler>,
scheduler_cancel: CancellationToken,
event_tx: Mutex<Option<mpsc::UnboundedSender<RouterEvent>>>, event_tx: Mutex<Option<mpsc::UnboundedSender<RouterEvent>>>,
event_task: Mutex<Option<tokio::task::JoinHandle<()>>>, event_task: Mutex<Option<tokio::task::JoinHandle<()>>>,
indexer: ReplayIndexer, indexer: ReplayIndexer,
...@@ -132,6 +133,7 @@ impl KvReplayRouter { ...@@ -132,6 +133,7 @@ impl KvReplayRouter {
tokio::sync::watch::channel(workers_with_configs); tokio::sync::watch::channel(workers_with_configs);
let selector = replay_selector(&config); let selector = replay_selector(&config);
let policy = replay_policy(&config, args); let policy = replay_policy(&config, args);
let scheduler_cancel = CancellationToken::new();
let scheduler = Arc::new(dynamo_kv_router::LocalScheduler::new( let scheduler = Arc::new(dynamo_kv_router::LocalScheduler::new(
slots, slots,
worker_config_rx, worker_config_rx,
...@@ -142,7 +144,7 @@ impl KvReplayRouter { ...@@ -142,7 +144,7 @@ impl KvReplayRouter {
prefill_load_estimator, prefill_load_estimator,
config.router_queue_recheck_interval(), config.router_queue_recheck_interval(),
config.router_track_prefill_tokens, config.router_track_prefill_tokens,
CancellationToken::new(), scheduler_cancel.clone(),
"replay", "replay",
false, false,
)); ));
...@@ -159,6 +161,7 @@ impl KvReplayRouter { ...@@ -159,6 +161,7 @@ impl KvReplayRouter {
config, config,
block_size: args.block_size as u32, block_size: args.block_size as u32,
scheduler, scheduler,
scheduler_cancel,
event_tx: Mutex::new(Some(event_tx)), event_tx: Mutex::new(Some(event_tx)),
event_task: Mutex::new(Some(event_task)), event_task: Mutex::new(Some(event_task)),
indexer, indexer,
...@@ -232,6 +235,7 @@ impl KvReplayRouter { ...@@ -232,6 +235,7 @@ impl KvReplayRouter {
} }
async fn shutdown(&self) -> Result<()> { async fn shutdown(&self) -> Result<()> {
self.scheduler_cancel.cancel();
self.event_tx.lock().unwrap().take(); self.event_tx.lock().unwrap().take();
let Some(event_task) = self.event_task.lock().unwrap().take() else { let Some(event_task) = self.event_task.lock().unwrap().take() else {
return Ok(()); return Ok(());
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment