local.rs 31.5 KB
Newer Older
1
2
3
4
5
6
7
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;

8
9
use rustc_hash::{FxHashMap, FxHashSet};
use tokio::sync::watch;
10
use tokio::time::Instant;
11
12
13
use tokio_util::sync::CancellationToken;

use super::policy::{RouterSchedulingPolicy, SchedulingPolicy};
14
use super::prefill_load::PrefillLoadEstimator;
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
use super::queue::SchedulerQueue;
use super::selector::{DefaultWorkerSelector, WorkerSelector};
use super::types::{KvSchedulerError, PotentialLoad, SchedulingRequest, SchedulingResponse};
use crate::protocols::{OverlapScores, WorkerConfigLike, WorkerId, WorkerWithDpRank};
use crate::sequences::{
    ActiveSequencesMultiWorker, SequenceError, SequencePublisher, SequenceRequest,
};
use dynamo_tokens::SequenceHash;

pub struct LocalScheduler<P, C, S = RouterSchedulingPolicy, Sel = DefaultWorkerSelector>
where
    P: SequencePublisher,
    C: WorkerConfigLike,
    S: SchedulingPolicy,
    Sel: WorkerSelector<C>,
{
    slots: Arc<ActiveSequencesMultiWorker<P>>,
    queue: Arc<SchedulerQueue<P, C, S, Sel>>,
33
    queue_updates: watch::Sender<()>,
34
    track_prefill_tokens_default: bool,
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
    worker_type: &'static str,
}

impl<P, C, S, Sel> LocalScheduler<P, C, S, Sel>
where
    P: SequencePublisher + 'static,
    C: WorkerConfigLike + Clone + PartialEq + Send + Sync + 'static,
    S: SchedulingPolicy + 'static,
    Sel: WorkerSelector<C> + Send + Sync + 'static,
{
    #[allow(clippy::too_many_arguments)]
    pub fn new(
        slots: Arc<ActiveSequencesMultiWorker<P>>,
        workers_with_configs: watch::Receiver<HashMap<WorkerId, C>>,
        threshold_frac: Option<f64>,
        block_size: u32,
        selector: Sel,
        policy: S,
53
54
        prefill_load_estimator: Option<Arc<dyn PrefillLoadEstimator>>,
        recheck_interval: Duration,
55
        track_prefill_tokens_default: bool,
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
        cancellation_token: CancellationToken,
        worker_type: &'static str,
        monitor_worker_configs: bool,
    ) -> Self {
        if monitor_worker_configs {
            let slots_monitor = Arc::clone(&slots);
            let mut monitor_rx = workers_with_configs.clone();
            let mut last_workers = monitor_rx.borrow().clone();
            let monitor_cancel_token = cancellation_token.clone();
            tokio::spawn(async move {
                tracing::trace!("LocalScheduler workers monitoring task started");

                loop {
                    tokio::select! {
                        _ = monitor_cancel_token.cancelled() => {
                            tracing::trace!("LocalScheduler workers monitoring task shutting down");
                            break;
                        }
                        result = monitor_rx.changed() => {
                            if result.is_err() {
                                tracing::warn!("LocalScheduler worker config watch dropped, shutting down");
                                break;
                            }
                        }
                    }

                    let current_workers = monitor_rx.borrow_and_update().clone();
                    if current_workers == last_workers {
                        continue;
                    }

                    let dp_range: HashMap<WorkerId, (u32, u32)> = current_workers
                        .iter()
                        .map(|(&id, cfg)| {
                            (
                                id,
                                (cfg.data_parallel_start_rank(), cfg.data_parallel_size()),
                            )
                        })
                        .collect();
                    slots_monitor.update_workers(&dp_range);
                    last_workers = current_workers;
                }
            });
        }

        let queue = Arc::new(SchedulerQueue::new(
            Arc::clone(&slots),
            workers_with_configs,
            threshold_frac,
            block_size,
            selector,
            policy,
109
            prefill_load_estimator,
110
        ));
111
112
        let (queue_updates, _) = watch::channel(());
        let queue_remote_updates = Arc::clone(&queue);
113
        let queue_periodic_updates = Arc::clone(&queue);
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
        let mut remote_state_updates = slots.subscribe_remote_state_changes();
        let remote_update_cancel_token = cancellation_token.clone();
        let queue_updates_remote = queue_updates.clone();

        tokio::spawn(async move {
            tracing::trace!("LocalScheduler remote state listener started");

            loop {
                tokio::select! {
                    _ = remote_update_cancel_token.cancelled() => {
                        tracing::trace!("LocalScheduler remote state listener shutting down");
                        break;
                    }
                    result = remote_state_updates.changed() => {
                        if result.is_err() {
                            tracing::trace!("LocalScheduler remote state listener shutting down");
                            break;
                        }
                        queue_remote_updates.update().await;
                        let _ = queue_updates_remote.send(());
                    }
                }
            }
        });
138
139

        tokio::spawn(async move {
140
            let mut recheck_interval = tokio::time::interval(recheck_interval);
141
            tracing::trace!("LocalScheduler periodic queue update task started");
142
143
144
145

            loop {
                tokio::select! {
                    _ = cancellation_token.cancelled() => {
146
                        tracing::trace!("LocalScheduler periodic queue update task shutting down");
147
148
149
                        break;
                    }
                    _ = recheck_interval.tick() => {
150
                        queue_periodic_updates.update().await;
151
152
153
154
155
156
157
158
                    }
                }
            }
        });

        Self {
            slots,
            queue,
159
            queue_updates,
160
            track_prefill_tokens_default,
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
            worker_type,
        }
    }

    #[expect(clippy::too_many_arguments)]
    pub async fn schedule(
        &self,
        maybe_request_id: Option<String>,
        isl_tokens: usize,
        token_seq: Option<Vec<SequenceHash>>,
        overlaps: OverlapScores,
        router_config_override: Option<&super::config::RouterConfigOverride>,
        update_states: bool,
        lora_name: Option<String>,
        priority_jump: f64,
        expected_output_tokens: Option<u32>,
177
        pinned_worker: Option<WorkerWithDpRank>,
178
        allowed_worker_ids: Option<HashSet<WorkerId>>,
179
        shared_cache_hits: Option<crate::SharedCacheHits>,
180
181
    ) -> Result<SchedulingResponse, KvSchedulerError> {
        let (resp_tx, resp_rx) = tokio::sync::oneshot::channel();
182
183
184
        let track_prefill_tokens = router_config_override
            .and_then(|cfg| cfg.track_prefill_tokens)
            .unwrap_or(self.track_prefill_tokens_default);
185
186
187
188
189
        let request = SchedulingRequest {
            maybe_request_id,
            token_seq,
            isl_tokens,
            overlaps,
190
191
            decode_blocks: FxHashMap::default(),
            prefill_tokens: FxHashMap::default(),
192
            track_prefill_tokens,
193
194
195
196
197
            router_config_override: router_config_override.cloned(),
            update_states,
            lora_name,
            priority_jump,
            expected_output_tokens,
198
            pinned_worker,
199
            allowed_worker_ids,
200
            shared_cache_hits,
201
202
203
            resp_tx: Some(resp_tx),
        };

204
        self.queue.enqueue(request).await;
205
206
207
208
209
210
211
212
213
214
215

        resp_rx
            .await
            .map_err(|_| KvSchedulerError::SubscriberShutdown)?
    }

    pub fn register_workers(&self, worker_ids: &HashSet<WorkerId>) {
        self.queue.register_workers(worker_ids);
    }

    pub async fn add_request(&self, req: SequenceRequest) -> Result<(), SequenceError> {
216
        self.slots.add_request(req, Instant::now())
217
218
219
    }

    pub async fn mark_prefill_completed(&self, request_id: &str) -> Result<(), SequenceError> {
220
221
        self.slots
            .mark_prefill_completed(&request_id.to_string(), Instant::now())?;
222
223
224
225
226
        self.queue.update().await;
        Ok(())
    }

    pub async fn free(&self, request_id: &str) -> Result<(), SequenceError> {
227
        self.slots.free(&request_id.to_string(), Instant::now())?;
228
229
230
231
232
233
234
235
        self.queue.update().await;
        Ok(())
    }

    pub fn pending_count(&self) -> usize {
        self.queue.pending_count()
    }

236
237
238
239
    pub fn pending_isl_tokens(&self) -> usize {
        self.queue.pending_isl_tokens()
    }

240
241
242
243
    pub fn worker_type(&self) -> &'static str {
        self.worker_type
    }

244
245
246
247
    pub fn subscribe_queue_updates(&self) -> watch::Receiver<()> {
        self.queue_updates.subscribe()
    }

248
249
250
251
252
253
254
255
256
257
258
259
260
261
    pub fn add_output_block(
        &self,
        request_id: &str,
        decay_fraction: Option<f64>,
    ) -> Result<(), SequenceError> {
        self.slots
            .add_output_block(&request_id.to_string(), decay_fraction)
    }

    pub fn get_potential_loads(
        &self,
        token_seq: Option<Vec<SequenceHash>>,
        isl_tokens: usize,
        overlaps: OverlapScores,
262
        track_prefill_tokens: bool,
263
    ) -> Vec<PotentialLoad> {
264
        let decay_now = Instant::now();
265
266
267
268
269
270
271
        let (decode_blocks, prefill_tokens) = self
            .slots
            .potential_blocks_and_tokens_with_prefill_tracking(
                token_seq.as_deref(),
                isl_tokens,
                overlaps,
                track_prefill_tokens,
272
                decay_now,
273
            );
274

275
        let mut workers: FxHashSet<WorkerWithDpRank> = FxHashSet::default();
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
        workers.extend(decode_blocks.keys().copied());
        workers.extend(prefill_tokens.keys().copied());

        let mut loads = Vec::with_capacity(workers.len());
        for worker in workers {
            loads.push(PotentialLoad {
                worker_id: worker.worker_id,
                dp_rank: worker.dp_rank,
                potential_prefill_tokens: prefill_tokens
                    .get(&worker)
                    .copied()
                    .unwrap_or(isl_tokens),
                potential_decode_blocks: decode_blocks.get(&worker).copied().unwrap_or(0),
            });
        }

        loads
    }

    pub fn get_active_lora_counts(&self) -> HashMap<String, usize> {
        self.slots.get_active_lora_counts()
    }
}

#[cfg(test)]
mod tests {
    use std::collections::HashMap;
    use std::sync::Arc;
    use std::time::Duration;

306
    use tokio::sync::{mpsc, watch};
307
308

    use super::*;
309
    use crate::protocols::{ActiveSequenceEvent, ActiveSequenceEventData, OverlapScores};
310
    use crate::scheduling::PrefillLoadEstimator;
311
312
    use crate::scheduling::policy::FcfsPolicy;
    use crate::scheduling::selector::DefaultWorkerSelector;
313
    use crate::sequences::SequenceSubscriber;
314
315
    use crate::test_utils::{NoopSequencePublisher, SimpleWorkerConfig};

316
317
318
319
320
321
322
323
324
325
    struct TestSequenceSubscriber {
        rx: mpsc::UnboundedReceiver<ActiveSequenceEvent>,
    }

    impl SequenceSubscriber for TestSequenceSubscriber {
        async fn next_event(&mut self) -> Option<anyhow::Result<ActiveSequenceEvent>> {
            self.rx.recv().await.map(Ok)
        }
    }

326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
    struct FixedPrefillLoadEstimator {
        duration: Duration,
    }

    impl PrefillLoadEstimator for FixedPrefillLoadEstimator {
        fn predict_prefill_duration(
            &self,
            _batch_size: usize,
            _effective_isl: usize,
            _prefix: usize,
        ) -> anyhow::Result<Duration> {
            Ok(self.duration)
        }
    }

341
342
343
344
345
    #[allow(clippy::type_complexity)]
    fn make_scheduler(
        workers: HashMap<WorkerId, SimpleWorkerConfig>,
        threshold_frac: Option<f64>,
        monitor_worker_configs: bool,
346
        prefill_load_estimator: Option<Arc<dyn PrefillLoadEstimator>>,
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
    ) -> (
        Arc<LocalScheduler<NoopSequencePublisher, SimpleWorkerConfig, FcfsPolicy>>,
        Arc<ActiveSequencesMultiWorker<NoopSequencePublisher>>,
        watch::Sender<HashMap<WorkerId, SimpleWorkerConfig>>,
        CancellationToken,
    ) {
        let dp_range = workers
            .iter()
            .map(|(&id, cfg)| (id, (cfg.data_parallel_start_rank, cfg.data_parallel_size)))
            .collect();
        let slots = Arc::new(ActiveSequencesMultiWorker::new(
            NoopSequencePublisher,
            64,
            dp_range,
            false,
            0,
            "test",
        ));
        let (cfg_tx, cfg_rx) = watch::channel(workers);
        let cancel_token = CancellationToken::new();
        let scheduler = Arc::new(LocalScheduler::new(
            Arc::clone(&slots),
            cfg_rx,
            threshold_frac,
            64,
            DefaultWorkerSelector::new(None, "test"),
            FcfsPolicy,
374
375
            prefill_load_estimator,
            Duration::from_secs(60),
376
            true,
377
378
379
380
381
382
383
            cancel_token.clone(),
            "test",
            monitor_worker_configs,
        ));
        (scheduler, slots, cfg_tx, cancel_token)
    }

384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
    fn start_replica_sync(
        slots: &Arc<ActiveSequencesMultiWorker<NoopSequencePublisher>>,
        cancel_token: &CancellationToken,
    ) -> mpsc::UnboundedSender<ActiveSequenceEvent> {
        let (tx, rx) = mpsc::unbounded_channel();
        slots.start_replica_sync(TestSequenceSubscriber { rx }, cancel_token.clone());
        tx
    }

    async fn wait_for_pending_count(
        scheduler: &Arc<LocalScheduler<NoopSequencePublisher, SimpleWorkerConfig, FcfsPolicy>>,
        expected: usize,
    ) {
        tokio::time::timeout(Duration::from_millis(250), async {
            loop {
                if scheduler.pending_count() == expected {
                    break;
                }
                tokio::time::sleep(Duration::from_millis(5)).await;
            }
        })
        .await
        .unwrap();
    }

409
410
411
412
413
414
415
416
417
418
    #[tokio::test]
    async fn test_schedule_books_request_into_active_sequences() {
        let mut workers = HashMap::new();
        workers.insert(
            0,
            SimpleWorkerConfig {
                max_num_batched_tokens: Some(64),
                ..Default::default()
            },
        );
419
        let (scheduler, _slots, _cfg_tx, cancel_token) = make_scheduler(workers, None, true, None);
420
421
422
423
424
425
426
427
428
429
430
431
432

        let response = scheduler
            .schedule(
                Some("req-1".to_string()),
                64,
                Some(vec![1, 2, 3, 4]),
                OverlapScores::default(),
                None,
                true,
                Some("adapter-a".to_string()),
                0.0,
                None,
                None,
433
                None,
434
                None,
435
436
437
438
439
440
441
442
443
444
445
446
447
            )
            .await
            .unwrap();

        assert_eq!(response.best_worker.worker_id, 0);
        assert_eq!(
            scheduler.get_active_lora_counts(),
            HashMap::from([(String::from("adapter-a"), 1)])
        );

        cancel_token.cancel();
    }

448
449
450
451
452
453
454
455
456
457
    #[tokio::test]
    async fn test_schedule_override_can_disable_prefill_tracking() {
        let mut workers = HashMap::new();
        workers.insert(
            0,
            SimpleWorkerConfig {
                max_num_batched_tokens: Some(64),
                ..Default::default()
            },
        );
458
        let (scheduler, slots, _cfg_tx, cancel_token) = make_scheduler(workers, None, true, None);
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474

        scheduler
            .schedule(
                Some("req-1".to_string()),
                64,
                Some(vec![1, 2, 3, 4]),
                OverlapScores::default(),
                Some(&crate::config::RouterConfigOverride {
                    track_prefill_tokens: Some(false),
                    ..Default::default()
                }),
                true,
                None,
                0.0,
                None,
                None,
475
                None,
476
                None,
477
478
479
480
481
482
            )
            .await
            .unwrap();

        assert_eq!(
            slots
483
                .active_tokens(Instant::now())
484
485
486
487
488
489
490
491
                .get(&WorkerWithDpRank::new(0, 0))
                .copied(),
            Some(0)
        );

        cancel_token.cancel();
    }

492
493
494
495
496
497
498
499
500
501
    #[tokio::test]
    async fn test_mark_prefill_completed_drains_pending_queue() {
        let mut workers = HashMap::new();
        workers.insert(
            0,
            SimpleWorkerConfig {
                max_num_batched_tokens: Some(64),
                ..Default::default()
            },
        );
502
503
        let (scheduler, _slots, _cfg_tx, cancel_token) =
            make_scheduler(workers, Some(0.5), true, None);
504
505
506
507
508
509
510
511
512
513
514
515
516

        scheduler
            .schedule(
                Some("req-1".to_string()),
                64,
                Some(vec![1, 2, 3, 4]),
                OverlapScores::default(),
                None,
                true,
                None,
                0.0,
                None,
                None,
517
                None,
518
                None,
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
            )
            .await
            .unwrap();

        let queued = {
            let scheduler = Arc::clone(&scheduler);
            tokio::spawn(async move {
                scheduler
                    .schedule(
                        Some("req-2".to_string()),
                        64,
                        Some(vec![5, 6, 7, 8]),
                        OverlapScores::default(),
                        None,
                        true,
                        None,
                        0.0,
                        None,
                        None,
538
                        None,
539
                        None,
540
541
542
543
544
                    )
                    .await
            })
        };

545
        wait_for_pending_count(&scheduler, 1).await;
546
547
548
549
550
551
552
553

        scheduler.mark_prefill_completed("req-1").await.unwrap();
        queued.await.unwrap().unwrap();
        assert_eq!(scheduler.pending_count(), 0);

        cancel_token.cancel();
    }

554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
    #[tokio::test]
    async fn test_remote_mark_prefill_completed_drains_pending_queue() {
        let mut workers = HashMap::new();
        workers.insert(
            0,
            SimpleWorkerConfig {
                max_num_batched_tokens: Some(64),
                ..Default::default()
            },
        );
        let (scheduler, slots, _cfg_tx, cancel_token) =
            make_scheduler(workers, Some(0.5), true, None);
        let event_tx = start_replica_sync(&slots, &cancel_token);

        scheduler
            .schedule(
                Some("req-1".to_string()),
                64,
                Some(vec![1, 2, 3, 4]),
                OverlapScores::default(),
                None,
                true,
                None,
                0.0,
                None,
                None,
580
                None,
581
                None,
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
            )
            .await
            .unwrap();

        let queued = {
            let scheduler = Arc::clone(&scheduler);
            tokio::spawn(async move {
                scheduler
                    .schedule(
                        Some("req-2".to_string()),
                        64,
                        Some(vec![5, 6, 7, 8]),
                        OverlapScores::default(),
                        None,
                        true,
                        None,
                        0.0,
                        None,
                        None,
601
                        None,
602
                        None,
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
                    )
                    .await
            })
        };

        wait_for_pending_count(&scheduler, 1).await;

        event_tx
            .send(ActiveSequenceEvent {
                request_id: "req-1".to_string(),
                worker: WorkerWithDpRank::new(0, 0),
                data: ActiveSequenceEventData::MarkPrefillCompleted,
                router_id: 1,
                lora_name: None,
            })
            .unwrap();

        tokio::time::timeout(Duration::from_millis(250), async {
            queued.await.unwrap().unwrap();
        })
        .await
        .unwrap();
        assert_eq!(scheduler.pending_count(), 0);

        cancel_token.cancel();
    }

    #[tokio::test]
    async fn test_remote_queue_update_notification_fires_after_drain() {
        let mut workers = HashMap::new();
        workers.insert(
            0,
            SimpleWorkerConfig {
                max_num_batched_tokens: Some(64),
                ..Default::default()
            },
        );
        let (scheduler, slots, _cfg_tx, cancel_token) =
            make_scheduler(workers, Some(0.5), true, None);
        let event_tx = start_replica_sync(&slots, &cancel_token);
        let mut queue_updates = scheduler.subscribe_queue_updates();

        scheduler
            .schedule(
                Some("req-1".to_string()),
                64,
                Some(vec![1, 2, 3, 4]),
                OverlapScores::default(),
                None,
                true,
                None,
                0.0,
                None,
                None,
657
                None,
658
                None,
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
            )
            .await
            .unwrap();

        let queued = {
            let scheduler = Arc::clone(&scheduler);
            tokio::spawn(async move {
                scheduler
                    .schedule(
                        Some("req-2".to_string()),
                        64,
                        Some(vec![5, 6, 7, 8]),
                        OverlapScores::default(),
                        None,
                        true,
                        None,
                        0.0,
                        None,
                        None,
678
                        None,
679
                        None,
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
                    )
                    .await
            })
        };

        wait_for_pending_count(&scheduler, 1).await;

        event_tx
            .send(ActiveSequenceEvent {
                request_id: "req-1".to_string(),
                worker: WorkerWithDpRank::new(0, 0),
                data: ActiveSequenceEventData::Free,
                router_id: 1,
                lora_name: None,
            })
            .unwrap();

        tokio::time::timeout(Duration::from_millis(250), queue_updates.changed())
            .await
            .unwrap()
            .unwrap();
        assert_eq!(scheduler.pending_count(), 0);
        queued.await.unwrap().unwrap();

        cancel_token.cancel();
    }

    #[tokio::test]
    async fn test_remote_free_drains_pending_queue() {
        let mut workers = HashMap::new();
        workers.insert(
            0,
            SimpleWorkerConfig {
                max_num_batched_tokens: Some(64),
                ..Default::default()
            },
        );
        let (scheduler, slots, _cfg_tx, cancel_token) =
            make_scheduler(workers, Some(0.5), true, None);
        let event_tx = start_replica_sync(&slots, &cancel_token);

        scheduler
            .schedule(
                Some("req-1".to_string()),
                64,
                Some(vec![1, 2, 3, 4]),
                OverlapScores::default(),
                None,
                true,
                None,
                0.0,
                None,
                None,
733
                None,
734
                None,
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
            )
            .await
            .unwrap();

        let queued = {
            let scheduler = Arc::clone(&scheduler);
            tokio::spawn(async move {
                scheduler
                    .schedule(
                        Some("req-2".to_string()),
                        64,
                        Some(vec![5, 6, 7, 8]),
                        OverlapScores::default(),
                        None,
                        true,
                        None,
                        0.0,
                        None,
                        None,
754
                        None,
755
                        None,
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
                    )
                    .await
            })
        };

        wait_for_pending_count(&scheduler, 1).await;

        event_tx
            .send(ActiveSequenceEvent {
                request_id: "req-1".to_string(),
                worker: WorkerWithDpRank::new(0, 0),
                data: ActiveSequenceEventData::Free,
                router_id: 1,
                lora_name: None,
            })
            .unwrap();

        tokio::time::timeout(Duration::from_millis(250), async {
            queued.await.unwrap().unwrap();
        })
        .await
        .unwrap();
        assert_eq!(scheduler.pending_count(), 0);

        cancel_token.cancel();
    }

783
784
785
786
787
788
789
790
791
792
    #[tokio::test]
    async fn test_free_updates_active_state() {
        let mut workers = HashMap::new();
        workers.insert(
            0,
            SimpleWorkerConfig {
                max_num_batched_tokens: Some(64),
                ..Default::default()
            },
        );
793
        let (scheduler, _slots, _cfg_tx, cancel_token) = make_scheduler(workers, None, true, None);
794
795
796
797
798
799
800
801
802
803
804
805
806

        scheduler
            .schedule(
                Some("req-1".to_string()),
                64,
                Some(vec![1, 2, 3, 4]),
                OverlapScores::default(),
                None,
                true,
                Some("adapter-a".to_string()),
                0.0,
                None,
                None,
807
                None,
808
                None,
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
            )
            .await
            .unwrap();
        assert_eq!(
            scheduler.get_active_lora_counts(),
            HashMap::from([(String::from("adapter-a"), 1)])
        );

        scheduler.free("req-1").await.unwrap();
        assert!(scheduler.get_active_lora_counts().is_empty());

        cancel_token.cancel();
    }

    #[tokio::test]
    async fn test_get_potential_loads_matches_slots() {
        let mut workers = HashMap::new();
        workers.insert(
            0,
            SimpleWorkerConfig {
                max_num_batched_tokens: Some(256),
                ..Default::default()
            },
        );
        workers.insert(
            1,
            SimpleWorkerConfig {
                max_num_batched_tokens: Some(256),
                ..Default::default()
            },
        );
840
        let (scheduler, slots, _cfg_tx, cancel_token) = make_scheduler(workers, None, true, None);
841
842
843
        let token_seq = vec![11, 22, 33, 44];
        let overlaps = OverlapScores::default();

844
845
846
847
848
849
        let (decode_blocks, prefill_tokens) = slots.potential_blocks_and_tokens(
            Some(&token_seq),
            128,
            overlaps.clone(),
            Instant::now(),
        );
850
851
852
853
854
855
856
857
858
859
860
        let mut expected: Vec<_> = decode_blocks
            .keys()
            .map(|worker| PotentialLoad {
                worker_id: worker.worker_id,
                dp_rank: worker.dp_rank,
                potential_prefill_tokens: prefill_tokens.get(worker).copied().unwrap_or(128),
                potential_decode_blocks: decode_blocks.get(worker).copied().unwrap_or(0),
            })
            .collect();
        expected.sort_by_key(|load| (load.worker_id, load.dp_rank));

861
        let mut actual = scheduler.get_potential_loads(Some(token_seq), 128, overlaps, true);
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
        actual.sort_by_key(|load| (load.worker_id, load.dp_rank));

        assert_eq!(actual.len(), expected.len());
        for (actual, expected) in actual.iter().zip(expected.iter()) {
            assert_eq!(actual.worker_id, expected.worker_id);
            assert_eq!(actual.dp_rank, expected.dp_rank);
            assert_eq!(
                actual.potential_prefill_tokens,
                expected.potential_prefill_tokens
            );
            assert_eq!(
                actual.potential_decode_blocks,
                expected.potential_decode_blocks
            );
        }

        cancel_token.cancel();
    }

881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
    #[tokio::test(start_paused = true)]
    async fn test_get_potential_loads_uses_decayed_prefill_tokens() {
        let mut workers = HashMap::new();
        workers.insert(
            0,
            SimpleWorkerConfig {
                max_num_batched_tokens: Some(256),
                ..Default::default()
            },
        );
        let estimator: Arc<dyn PrefillLoadEstimator> = Arc::new(FixedPrefillLoadEstimator {
            duration: Duration::from_secs(10),
        });
        let (scheduler, _slots, _cfg_tx, cancel_token) =
            make_scheduler(workers, None, true, Some(estimator));

        scheduler
            .schedule(
                Some("req-1".to_string()),
                100,
                Some(vec![1, 2, 3, 4]),
                OverlapScores::default(),
                None,
                true,
                None,
                0.0,
                None,
                None,
909
                None,
910
                None,
911
912
913
914
915
916
917
918
919
920
921
922
923
            )
            .await
            .unwrap();

        tokio::time::advance(Duration::from_secs(6)).await;

        let loads = scheduler.get_potential_loads(None, 0, OverlapScores::default(), true);
        assert_eq!(loads.len(), 1);
        assert_eq!(loads[0].potential_prefill_tokens, 40);

        cancel_token.cancel();
    }

924
925
926
    #[tokio::test]
    async fn test_register_workers_uses_default_dp_fallback() {
        let (scheduler, _slots, _cfg_tx, cancel_token) =
927
            make_scheduler(HashMap::new(), None, false, None);
928
929

        scheduler.register_workers(&HashSet::from([42]));
930
        let loads = scheduler.get_potential_loads(None, 64, OverlapScores::default(), true);
931
932
933
934
935
936
937
938
939
940
941
942

        assert_eq!(loads.len(), 1);
        assert_eq!(loads[0].worker_id, 42);
        assert_eq!(loads[0].dp_rank, 0);

        cancel_token.cancel();
    }

    #[tokio::test]
    async fn test_worker_watch_updates_slot_ranges() {
        let mut workers = HashMap::new();
        workers.insert(0, SimpleWorkerConfig::default());
943
        let (scheduler, _slots, cfg_tx, cancel_token) = make_scheduler(workers, None, true, None);
944
945
946

        assert_eq!(
            scheduler
947
                .get_potential_loads(None, 64, OverlapScores::default(), true)
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
                .len(),
            1
        );

        let mut updated_workers = HashMap::new();
        updated_workers.insert(
            0,
            SimpleWorkerConfig {
                data_parallel_size: 2,
                ..Default::default()
            },
        );
        updated_workers.insert(1, SimpleWorkerConfig::default());
        cfg_tx.send(updated_workers).unwrap();

        tokio::time::timeout(Duration::from_secs(1), async {
            loop {
                if scheduler
966
                    .get_potential_loads(None, 64, OverlapScores::default(), true)
967
968
969
970
971
972
973
974
975
976
977
978
979
                    .len()
                    == 3
                {
                    break;
                }
                tokio::task::yield_now().await;
            }
        })
        .await
        .unwrap();

        cancel_token.cancel();
    }
980
981
982
983
984
985
986
987
988
989
990

    #[tokio::test]
    async fn test_get_potential_loads_can_ignore_prefill_tokens() {
        let mut workers = HashMap::new();
        workers.insert(
            0,
            SimpleWorkerConfig {
                max_num_batched_tokens: Some(256),
                ..Default::default()
            },
        );
991
        let (scheduler, _slots, _cfg_tx, cancel_token) = make_scheduler(workers, None, true, None);
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004

        scheduler
            .schedule(
                Some("req-1".to_string()),
                64,
                Some(vec![11, 22]),
                OverlapScores::default(),
                None,
                true,
                None,
                0.0,
                None,
                None,
1005
                None,
1006
                None,
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
            )
            .await
            .unwrap();

        let loads = scheduler.get_potential_loads(None, 64, OverlapScores::default(), false);
        assert_eq!(loads.len(), 1);
        assert_eq!(loads[0].potential_prefill_tokens, 64);

        cancel_token.cancel();
    }
1017
}