test_scheduler.py 50.4 KB
Newer Older
1
# SPDX-License-Identifier: Apache-2.0
2
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3

4
import time
5
from collections import deque
6
from typing import Optional
7
from unittest.mock import MagicMock
8

9
import pytest  # noqa
10
import torch
11
from torch import Use  # noqa
12

13
14
from vllm.config import CacheConfig, SchedulerConfig
from vllm.config.lora import LoRAConfig
15
16
17
from vllm.core.interfaces import AllocStatus
from vllm.core.scheduler import Scheduler, SchedulingBudget
from vllm.lora.request import LoRARequest
18
from vllm.sequence import SequenceGroup, SequenceStatus
19

20
21
22
from .utils import (append_new_token, append_new_token_seq,
                    append_new_token_seq_group, create_dummy_prompt,
                    get_sequence_groups, schedule_and_update_computed_tokens)
23
24


25
def test_scheduler_add_seq_group():
26
    block_size = 4
27
    scheduler_config = SchedulerConfig(
28
29
30
31
        "generate",
        max_num_batched_tokens=100,
        max_num_seqs=64,
        max_model_len=1,
32
    )
33
    cache_config = CacheConfig(block_size, 1.0, 1, cache_dtype="auto")
34
35
36
37
38
39
40
    cache_config.num_cpu_blocks = 4
    cache_config.num_gpu_blocks = 4
    scheduler = Scheduler(scheduler_config, cache_config, None)

    # Add seq group to scheduler.
    num_seq_group = 4
    for i in range(num_seq_group):
41
42
43
        _, seq_group = create_dummy_prompt(str(i),
                                           block_size,
                                           block_size=block_size)
44
45
46
47
        scheduler.add_seq_group(seq_group)
        assert scheduler.get_num_unfinished_seq_groups() == i + 1


48
def test_scheduler_abort_seq_group():
49
    block_size = 4
50
    scheduler_config = SchedulerConfig(
51
52
53
54
        "generate",
        max_num_batched_tokens=100,
        max_num_seqs=64,
        max_model_len=1,
55
    )
56
57
58
59
60
61
62
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
    cache_config.num_cpu_blocks = 4
    cache_config.num_gpu_blocks = 4
    scheduler = Scheduler(scheduler_config, cache_config, None)

    # Add multiple seq groups to scheduler.
    num_seq_group = 4
63
    request_ids: set[str] = set()
64
65
66
67
68
69
70
71
72
73
74
    for i in range(num_seq_group):
        _, seq_group = create_dummy_prompt(str(i), block_size)
        scheduler.add_seq_group(seq_group)
        request_ids.add(str(i))

    # Abort all added seq groups.
    assert scheduler.get_num_unfinished_seq_groups() == num_seq_group
    scheduler.abort_seq_group(request_ids)
    assert scheduler.get_num_unfinished_seq_groups() == 0


75
def test_scheduler_schedule_simple():
76
77
78
    block_size = 4
    num_seq_group = 4
    max_model_len = 16
79
    scheduler_config = SchedulerConfig(
80
81
82
83
        "generate",
        max_num_batched_tokens=64,
        max_num_seqs=num_seq_group,
        max_model_len=max_model_len,
84
    )
85
86
87
88
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
    cache_config.num_cpu_blocks = 8
    cache_config.num_gpu_blocks = 8
    scheduler = Scheduler(scheduler_config, cache_config, None)
89
    running: list[SequenceGroup] = []
90
91
92

    # Add seq groups to scheduler.
    for i in range(num_seq_group):
93
94
95
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=block_size,
                                           block_size=block_size)
96
97
98
99
        scheduler.add_seq_group(seq_group)
        running.append(seq_group)

    # Schedule seq groups prompts.
100
    num_tokens = block_size * num_seq_group
101
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
102
    assert set(get_sequence_groups(out)) == set(running)
103
    assert out.num_batched_tokens == num_tokens
104
105
106
    assert (not out.blocks_to_copy and not out.blocks_to_swap_in
            and not out.blocks_to_swap_out)
    assert len(seq_group_meta) == num_seq_group
107
    append_new_token(out, 1)
108
109

    # Schedule seq groups generation.
110
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
111
    assert set(get_sequence_groups(out)) == set(running)
112
113
114
115
    assert out.num_batched_tokens == num_seq_group
    assert (not out.blocks_to_copy and not out.blocks_to_swap_in
            and not out.blocks_to_swap_out)
    assert len(seq_group_meta) == num_seq_group
116
117
118
    append_new_token(out, 1)


119
def test_scheduler_prefill_prioritized():
120
121
122
123
    """Verify running batched tokens are not applied to prefill requests."""
    block_size = 4
    max_model_len = 30
    max_batched_num_tokens = 30
124
    scheduler_config = SchedulerConfig(
125
126
127
128
        "generate",
        max_num_batched_tokens=max_batched_num_tokens,
        max_num_seqs=2,
        max_model_len=max_model_len,
129
    )
130
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
131
132
    cache_config.num_cpu_blocks = 16
    cache_config.num_gpu_blocks = 16
133
134
135
    scheduler = Scheduler(scheduler_config, cache_config, None)

    # Add seq groups to scheduler.
136
    _, seq_group_a = create_dummy_prompt("1", 1, block_size=block_size)
137
138
139
140
141
142
143
    scheduler.add_seq_group(seq_group_a)

    # Schedule seq groups prompts.
    _, out = schedule_and_update_computed_tokens(scheduler)
    assert get_sequence_groups(out) == [seq_group_a]

    # Add a new prefill request B.
144
    _, seq_group_b = create_dummy_prompt("2", 30, block_size=block_size)
145
146
147
148
149
150
    scheduler.add_seq_group(seq_group_b)

    # Verify prefill requests are prioritized. Since max_batched_num_tokens
    # is 1, new prefill request has to be scheduled first.
    _, out = schedule_and_update_computed_tokens(scheduler)
    assert get_sequence_groups(out) == [seq_group_b]
151
152


153
def test_scheduler_schedule_preempt_abort():
154
155
    block_size = 4
    max_model_len = 16
156
    scheduler_config = SchedulerConfig(
157
158
159
160
        "generate",
        max_num_batched_tokens=64,
        max_num_seqs=2,
        max_model_len=max_model_len,
161
    )
162
163
164
165
166
167
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
    cache_config.num_cpu_blocks = 2
    cache_config.num_gpu_blocks = 2
    scheduler = Scheduler(scheduler_config, cache_config, None)

    # Add seq groups to scheduler.
168
169
170
171
172
173
    seq_a, seq_group_a = create_dummy_prompt("1",
                                             block_size,
                                             block_size=block_size)
    seq_b, seq_group_b = create_dummy_prompt("2",
                                             block_size,
                                             block_size=block_size)
174
175
176
177
    scheduler.add_seq_group(seq_group_a)
    scheduler.add_seq_group(seq_group_b)

    # Schedule seq groups prompts.
178
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
179
    assert get_sequence_groups(out) == [seq_group_a, seq_group_b]
180
    assert out.num_batched_tokens == block_size * 2  # seq_a and seq_b
181
182
183
184
185
186
187
    assert (not out.blocks_to_copy and not out.blocks_to_swap_in
            and not out.blocks_to_swap_out)
    assert len(seq_group_meta) == 2
    assert scheduler.get_num_unfinished_seq_groups() == 2

    # Append "generated" tokens, allowing the sequence to mark prompt tokens as
    # processed.
188
    append_new_token(out, 1)
189
190

    # Schedule seq groups generation and preempt seq group b.
191
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
192
    assert get_sequence_groups(out) == [seq_group_a]
193
194
195
196
197
    assert out.num_batched_tokens == 1
    assert (not out.blocks_to_copy and not out.blocks_to_swap_in
            and not out.blocks_to_swap_out)
    assert len(seq_group_meta) == 1
    assert scheduler.get_num_unfinished_seq_groups() == 2
198
    assert out.preempted == 1
199
200
201

    # Abort seq group a. Re-schedule seq group b prompt with recomputation.
    scheduler.abort_seq_group("1")
202
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
203
    assert get_sequence_groups(out) == [seq_group_b]
204
    assert out.num_batched_tokens == 5  # 4 prompt + 1 generation.
205
206
207
208
209
210
    assert (not out.blocks_to_copy and not out.blocks_to_swap_in
            and not out.blocks_to_swap_out)
    assert len(seq_group_meta) == 1
    assert scheduler.get_num_unfinished_seq_groups() == 1


211
def test_scheduler_max_seqs():
212
213
214
215
    block_size = 4
    num_seq_group = 4
    max_seq_group = 2
    max_model_len = 16
216
    scheduler_config = SchedulerConfig(
217
218
219
220
        "generate",
        max_num_batched_tokens=64,
        max_num_seqs=max_seq_group,
        max_model_len=max_model_len,
221
    )
222
223
224
225
226
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
    cache_config.num_cpu_blocks = 8
    cache_config.num_gpu_blocks = 8
    scheduler = Scheduler(scheduler_config, cache_config, None)

227
    all_seq_groups: list[SequenceGroup] = []
228
229
    # Add seq groups to scheduler.
    for i in range(num_seq_group):
230
231
232
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=block_size,
                                           block_size=block_size)
233
234
235
236
237
238
        all_seq_groups.append(seq_group)

    # Append 1 seq group
    scheduler.add_seq_group(all_seq_groups[0])

    # Schedule seq groups prompts.
239
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
240
    assert set(get_sequence_groups(out)) == set([all_seq_groups[0]])
241
    append_new_token(out, 1)
242
243

    # Schedule seq groups generation.
244
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
245
    assert set(get_sequence_groups(out)) == set([all_seq_groups[0]])
246
    append_new_token(out, 1)
247
248
249
250
251
252
253
254

    # Append 2 more seq group
    scheduler.add_seq_group(all_seq_groups[1])
    scheduler.add_seq_group(all_seq_groups[2])

    # Schedule seq groups prompts.
    # Only 1 seq group should be scheduled since max_seq_group is 2
    # and one is prompting.
255
    _, out = schedule_and_update_computed_tokens(scheduler)
256
    assert set(get_sequence_groups(out)) == set([all_seq_groups[1]])
257
258


259
def test_scheduler_delay_factor():
260
    block_size = 4
261
    scheduler_config = SchedulerConfig(
262
263
264
265
        "generate",
        max_num_batched_tokens=100,
        max_num_seqs=64,
        max_model_len=16,
266
        delay_factor=0.5,
267
    )
268
269
270
271
272
273
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
    cache_config.num_cpu_blocks = 8
    cache_config.num_gpu_blocks = 8
    scheduler = Scheduler(scheduler_config, cache_config, None)

    # schedule first prompt
274
    seq_group_meta, seq_group = create_dummy_prompt("0",
275
276
                                                    prompt_length=block_size,
                                                    block_size=block_size)
277
    scheduler.add_seq_group(seq_group)
278
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
279
    assert out.num_prefill_groups > 0
280
    assert seq_group_meta[0].request_id == '0'
281
    append_new_token(out, 1)
282
283
284

    # wait for a second before scheduling next prompt
    time.sleep(1)
285
    seq_group_meta, seq_group = create_dummy_prompt("1",
286
287
                                                    prompt_length=block_size,
                                                    block_size=block_size)
288
289
290
    scheduler.add_seq_group(seq_group)

    # second prompt should *not* be scheduled
291
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
292
    assert out.num_prefill_groups == 0
293
    assert seq_group_meta[0].request_id == '0'
294
    append_new_token(out, 1)
295
296
297

    # wait for more than 0.5 second and try again
    time.sleep(0.6)
298
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
299
    assert out.num_prefill_groups > 0
300
    assert seq_group_meta[0].request_id == '1'
301
    append_new_token(out, 1)
302
303


304
305
306
307
308
309
310
311
312
def initialize_scheduler(
    *,
    max_num_seqs=1000,
    max_token_budget=1000,
    max_model_len=1000,
    lora_config=None,
    block_size=4,
    num_cpu_blocks=8,
    num_gpu_blocks=8,
313
314
    enable_prefix_caching=False,
    enable_chunked_prefill=False,
315
316
317
):
    block_size = block_size
    scheduler_config = SchedulerConfig(
318
319
320
321
        "generate",
        max_num_batched_tokens=max_token_budget,
        max_num_seqs=max_num_seqs,
        max_model_len=max_model_len,
322
323
324
325
326
327
328
329
        enable_chunked_prefill=enable_chunked_prefill,
    )
    cache_config = CacheConfig(
        block_size,
        1.0,
        1,
        "auto",
        enable_prefix_caching=enable_prefix_caching,
330
    )
331
332
    cache_config.num_cpu_blocks = num_cpu_blocks
    cache_config.num_gpu_blocks = num_gpu_blocks
333
334
335
336
    scheduler = Scheduler(scheduler_config, cache_config, lora_config)
    return scheduler


337
def create_token_budget(token_budget: int = 10000,
338
339
340
341
342
343
344
                        max_num_seqs: int = 10000) -> SchedulingBudget:
    return SchedulingBudget(
        token_budget=token_budget,
        max_num_seqs=max_num_seqs,
    )


345
346
347
348
349
350
351
352
353
def add_token_budget(budget: SchedulingBudget,
                     num_batched_tokens: int = 0,
                     num_curr_seqs: int = 0):
    mock_seq_group = create_dummy_prompt('10', prompt_length=60)[1]
    budget.add_num_batched_tokens(mock_seq_group.request_id,
                                  num_batched_tokens)
    budget.add_num_seqs(mock_seq_group.request_id, num_curr_seqs)


354
def test_prefill_schedule_max_prompt_len():
355
356
357
    """
    Test prompt longer than max_prompt_len is aborted.
    """
358
    block_size = 4
359
    scheduler = initialize_scheduler(max_model_len=30, block_size=block_size)
360
361
362
    _, seq_group = create_dummy_prompt("0",
                                       prompt_length=60,
                                       block_size=block_size)
363
    scheduler.add_seq_group(seq_group)
364
    budget = create_token_budget()
365
366
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
367
368
369
370
371
372
373
    assert len(output.ignored_seq_groups) == 1
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
    assert len(remaining_waiting) == 0


374
def test_prefill_schedule_token_budget():
375
376
377
    """
    Test token budget respected.
    """
378
    block_size = 4
379
    scheduler = initialize_scheduler(block_size=block_size,
380
381
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
382
383
    budget = create_token_budget(token_budget=0)
    for i in range(2):
384
385
386
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
387
        scheduler.add_seq_group(seq_group)
388
389

    # 0 token budget == nothing is scheduled.
390
391
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
392
393
394
395
396
397
398
399
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
    assert len(remaining_waiting) == 2

    # 60 token budget == 1 request scheduled.
    budget = create_token_budget(token_budget=60)
400
401
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
402
403
404
405
406
407
408
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 1
    assert budget.num_batched_tokens == 60
    assert budget.num_curr_seqs == 1
    assert len(remaining_waiting) == 1

    # Test when current_batched_tokens respected.
409
    scheduler = initialize_scheduler(block_size=block_size,
410
411
                                     num_cpu_blocks=16,
                                     num_gpu_blocks=16)
412
413
    budget = create_token_budget(token_budget=60)
    add_token_budget(budget, 30, 0)
414
415
416
    _, seq_group = create_dummy_prompt(str(i),
                                       prompt_length=60,
                                       block_size=block_size)
417
    # Cannot schedule a prompt that doesn't fit the budget.
418
419
420
    scheduler.add_seq_group(seq_group)
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
421
422
423
424
425
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 30
    assert budget.num_curr_seqs == 0
    assert len(remaining_waiting) == 1
426
427
    budget = create_token_budget(token_budget=90)
    add_token_budget(budget, 30, 0)
428
429
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
430
431
432
433
434
435
    assert len(output.seq_groups) == 1
    assert budget.num_batched_tokens == 90
    assert budget.num_curr_seqs == 1
    assert len(remaining_waiting) == 0


436
def test_prefill_schedule_max_seqs():
437
438
439
    """
    Test max seq respected.
    """
440
    block_size = 4
441
    scheduler = initialize_scheduler(block_size=block_size,
442
443
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
444
445
    budget = create_token_budget(max_num_seqs=2)
    for i in range(3):
446
447
448
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
449
450
451
        scheduler.add_seq_group(seq_group)
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
452
453
454
455
456
457
458
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 2
    assert budget.num_batched_tokens == 120
    assert budget.num_curr_seqs == 2
    assert len(remaining_waiting) == 1

    # Verify curr_num_seqs respected.
459
    scheduler.waiting = deque()
460
461
    budget = create_token_budget(max_num_seqs=2)
    add_token_budget(budget, 0, 2)
462
463
464
    _, seq_group = create_dummy_prompt(str(i),
                                       prompt_length=60,
                                       block_size=block_size)
465
466
467
    scheduler.add_seq_group(seq_group)
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
468
469
470
471
472
473
474
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 2
    assert len(remaining_waiting) == 1


475
def test_prefill_schedule_max_lora():
476
477
478
    """
    Test max lora is respected and prioritized.
    """
479
    block_size = 4
480
    lora_config = LoRAConfig(max_lora_rank=8, max_loras=1)
481
482
483
484
    scheduler = initialize_scheduler(lora_config=lora_config,
                                     block_size=block_size,
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
485
    budget = create_token_budget(token_budget=120)
486
    curr_loras: set[int] = set()
487
488
489
    for i in range(2):
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
490
                                           block_size=block_size,
491
492
493
                                           lora_request=LoRARequest(
                                               lora_name=str(i),
                                               lora_int_id=i + 1,
494
                                               lora_path="abc"))
495
        scheduler.add_seq_group(seq_group)
496
    # Add two more requests to verify lora is prioritized.
497
    # 0: LoRA, 1: LoRA, 2: regular, 3: regular
498
499
500
501
    # In the first iteration, index 0, 2 is scheduled.
    # If a request is not scheduled because it hits max lora, it is
    # prioritized. Verify that.
    for i in range(2, 4):
502
503
504
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
505
        scheduler.add_seq_group(seq_group)
506
    # Schedule 2 requests (0 and 2)
507
508
    output = scheduler._schedule_prefills(budget, curr_loras)
    remaining_waiting = scheduler.waiting
509
510
511
512
513
514
515
516
517
518
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 2
    assert budget.num_batched_tokens == 120
    assert budget.num_curr_seqs == 2
    assert len(remaining_waiting) == 2
    assert len(curr_loras) == 1
    # The second lora request is scheduled next as FCFS policy.
    # Reset curr_loras so that it can be scheduled.
    curr_loras = set()
    budget = create_token_budget(token_budget=60)
519
520
    output = scheduler._schedule_prefills(budget, curr_loras)
    remaining_waiting = scheduler.waiting
521
522
523
524
525
526
527
    assert len(output.seq_groups) == 1
    assert output.seq_groups[0].seq_group.request_id == "1"
    assert len(remaining_waiting) == 1
    assert len(curr_loras) == 1
    assert budget.num_batched_tokens == 60


528
def test_prefill_schedule_no_block_manager_capacity():
529
530
531
    """
    Test sequence cannot be scheduled due to block manager has no capacity.
    """
532
    block_size = 4
533
    scheduler = initialize_scheduler(block_size=block_size,
534
535
                                     num_gpu_blocks=128,
                                     num_cpu_blocks=128)
536
537
    budget = create_token_budget()
    for i in range(3):
538
539
540
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
541
        scheduler.add_seq_group(seq_group)
542
543
    scheduler.block_manager.can_allocate = MagicMock()
    scheduler.block_manager.can_allocate.return_value = AllocStatus.LATER
544
545
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
546
547
548
549
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
550
    assert len(remaining_waiting) == 3
551
552
553
554

    scheduler = initialize_scheduler()
    budget = create_token_budget()
    for i in range(3):
555
556
557
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
558
        scheduler.add_seq_group(seq_group)
559
560
    scheduler.block_manager.can_allocate = MagicMock()
    scheduler.block_manager.can_allocate.return_value = AllocStatus.NEVER
561
562
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
563
564
565
566
567
568
569
    assert len(output.ignored_seq_groups) == 3
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
    assert len(remaining_waiting) == 0


570
def test_decode_schedule_preempted():
571
572
573
    """
    Test decodes cannot be scheduled and preempted.
    """
574
    block_size = 4
575
    scheduler = initialize_scheduler(block_size=block_size,
576
577
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
578
579
    curr_loras = None
    for i in range(3):
580
581
582
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
583
        scheduler._allocate_and_set_running(seq_group)
584
        append_new_token_seq_group(60, seq_group, 1)
585
        scheduler._add_seq_group_to_running(seq_group)
586
587
588
589
590
591
592
593
594
595
    scheduler.block_manager.can_append_slots = MagicMock()

    def cannot_append_second_group(seq_group, num_lookahead_slots):
        return seq_group.request_id != "1"

    scheduler.block_manager.can_append_slots.side_effect = (
        cannot_append_second_group)

    # 1 cannot be scheduled, and the lowest priority (request 2)
    # should be preempted. 1 will also be preempted.
596
    budget = create_token_budget()
597
    output = scheduler._schedule_running(budget, curr_loras)
598
599
    remaining_running = scheduler.running
    assert len(remaining_running) == 0
600
601
602
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
    assert output.decode_seq_groups[0].seq_group.request_id == "0"
603
604
605
    assert len(output.preempted) == 2
    # Verify budgets are updated.
    assert budget.num_batched_tokens == 1
606
607
    # NOTE: When enable_chunk is False, num_seqs budget is not updated.
    # assert budget.num_curr_seqs == 1
608
    # Both should be preempted, not swapped.
609
    assert output.blocks_to_swap_out == []
610
    # Nothing is copied.
611
    assert output.blocks_to_copy == []
612
613


614
def test_schedule_decode_blocks_to_copy_update():
615
616
617
    """
    Verify blocks_to_copy is updated.
    """
618
    block_size = 4
619
    scheduler = initialize_scheduler(block_size=4,
620
621
622
623
624
                                     num_cpu_blocks=16,
                                     num_gpu_blocks=16)
    _, seq_group = create_dummy_prompt("1",
                                       prompt_length=60,
                                       block_size=block_size)
625
    curr_loras = None
626
    scheduler._allocate_and_set_running(seq_group)
627
    append_new_token_seq_group(60, seq_group, 1)
628
    scheduler._add_seq_group_to_running(seq_group)
629
630
631

    # The last request should be swapped out.
    scheduler.block_manager.append_slots = MagicMock()
632
    scheduler.block_manager.append_slots.return_value = [(2, 3)]
633
634

    budget = create_token_budget()
635
636
    output = scheduler._schedule_running(budget, curr_loras)
    remaining_running = scheduler.running
637
    assert len(remaining_running) == 0
638
639
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
640
641
642
    assert len(output.preempted) == 0
    assert len(output.swapped_out) == 0
    # Nothing is preempted.
643
    assert output.blocks_to_swap_out == []
644
    # Since append_slot returns the source -> dist mapping, it should
645
    # be applied.
646
    assert output.blocks_to_copy == [(2, 3)]
647
648


649
def test_schedule_swapped_max_loras():
650
    block_size = 4
651
    lora_config = LoRAConfig(max_lora_rank=8, max_loras=1)
652
653
654
655
    scheduler = initialize_scheduler(lora_config=lora_config,
                                     block_size=block_size,
                                     num_cpu_blocks=32,
                                     num_gpu_blocks=32)
656
657
    curr_loras: set[int] = set()
    blocks_to_swap_out: list[tuple[int, int]] = []
658
659
660
    for i in range(2):
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
661
                                           block_size=block_size,
662
663
664
                                           lora_request=LoRARequest(
                                               lora_name=str(i),
                                               lora_int_id=i + 1,
665
                                               lora_path="abc"))
666
        scheduler._allocate_and_set_running(seq_group)
667
        append_new_token_seq_group(60, seq_group, 1)
668
        scheduler._swap_out(seq_group, blocks_to_swap_out)
669
        scheduler._add_seq_group_to_swapped(seq_group)
670
671

    budget = create_token_budget()
672
673
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
674
675
676
    assert len(remaining_swapped) == 1
    assert budget.num_batched_tokens == 1
    assert budget.num_curr_seqs == 1
677
678
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
679
680
681
    assert len(curr_loras) == 1


682
def test_schedule_swapped_cannot_swap_in():
683
    block_size = 4
684
    scheduler = initialize_scheduler(block_size=block_size,
685
686
                                     num_cpu_blocks=32,
                                     num_gpu_blocks=32)
687
    curr_loras = None
688
    blocks_to_swap_out: list[tuple[int, int]] = []
689
690
691
692
    for i in range(2):
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
693
        scheduler._allocate_and_set_running(seq_group)
694
        append_new_token_seq_group(60, seq_group, 1)
695
        scheduler._swap_out(seq_group, blocks_to_swap_out)
696
        scheduler._add_seq_group_to_swapped(seq_group)
697
698
699

    # The last request should be swapped out.
    scheduler.block_manager.can_swap_in = MagicMock()
700
    scheduler.block_manager.can_swap_in.return_value = AllocStatus.LATER
701
702
    # Since we cannot swap in, none of the requests are swapped in.
    budget = create_token_budget()
703
704
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
705
706
707
    assert len(remaining_swapped) == 2
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
708
    assert len(output.decode_seq_groups) == 0
709
710
711
    assert len(output.prefill_seq_groups) == 0


712
def test_infeasible_swap():
713
    block_size = 4
714
    scheduler = initialize_scheduler(block_size=block_size,
715
716
                                     num_cpu_blocks=32,
                                     num_gpu_blocks=32)
717
    curr_loras = None
718
    blocks_to_swap_out: list[tuple[int, int]] = []
719
720
721
722
    for i in range(2):
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
723
724
725
        scheduler._allocate_and_set_running(seq_group)
        append_new_token_seq_group(60, seq_group, 1)
        scheduler._swap_out(seq_group, blocks_to_swap_out)
726
        scheduler._add_seq_group_to_swapped(seq_group)
727
728
729
730
731
732

    # The last request should be swapped out.
    scheduler.block_manager.can_swap_in = MagicMock()
    scheduler.block_manager.can_swap_in.return_value = AllocStatus.NEVER
    # Since we cannot swap in, none of the requests are swapped in.
    budget = create_token_budget()
733
734
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
735
736
737
738
739
    assert len(remaining_swapped) == 0
    assert len(output.infeasible_seq_groups) == 2
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
    assert len(output.decode_seq_groups) == 0
740
    assert len(output.prefill_seq_groups) == 0
741
742


743
def test_schedule_swapped_blocks_to_copy():
744
    block_size = 4
745
    scheduler = initialize_scheduler(block_size=block_size,
746
747
                                     num_cpu_blocks=32,
                                     num_gpu_blocks=32)
748
    curr_loras = None
749
750
751
    _, seq_group = create_dummy_prompt("1",
                                       prompt_length=60,
                                       block_size=block_size)
752
    scheduler._allocate_and_set_running(seq_group)
753
    append_new_token_seq_group(60, seq_group, 1)
754
    blocks_to_swap_out: list[tuple[int, int]] = []
755
    scheduler._swap_out(seq_group, blocks_to_swap_out)
756
    scheduler._add_seq_group_to_swapped(seq_group)
757
758
759

    # The last request should be swapped out.
    scheduler.block_manager.append_slots = MagicMock()
760
    scheduler.block_manager.append_slots.return_value = [(2, 3)]
761
762

    budget = create_token_budget()
763
764
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
765
    assert len(remaining_swapped) == 0
766
767
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
768
    assert output.blocks_to_copy == [(2, 3)]
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812


def test_scheduling_budget():
    TOKEN_BUDGET = 4
    MAX_SEQS = 4
    budget = SchedulingBudget(token_budget=TOKEN_BUDGET, max_num_seqs=MAX_SEQS)
    assert budget.can_schedule(num_new_tokens=1, num_new_seqs=1)
    assert budget.can_schedule(num_new_tokens=4, num_new_seqs=4)
    assert not budget.can_schedule(num_new_tokens=1, num_new_seqs=5)
    assert not budget.can_schedule(num_new_tokens=5, num_new_seqs=1)
    assert not budget.can_schedule(num_new_tokens=5, num_new_seqs=5)
    assert budget.remaining_token_budget() == TOKEN_BUDGET

    # Verify add/subtract num batched tokens.
    _, seq_group = create_dummy_prompt("1", 3)
    budget.add_num_batched_tokens(seq_group.request_id, 2)
    assert budget.remaining_token_budget() == 2
    assert budget.num_batched_tokens == 2
    assert budget.can_schedule(num_new_tokens=2, num_new_seqs=1)
    assert not budget.can_schedule(num_new_tokens=3, num_new_seqs=1)
    # Verify adding another seq group is no-op.
    budget.add_num_batched_tokens(seq_group.request_id, 2)
    assert budget.remaining_token_budget() == 2
    assert budget.num_batched_tokens == 2
    budget.subtract_num_batched_tokens(seq_group.request_id, 2)
    assert budget.remaining_token_budget() == 4
    assert budget.num_batched_tokens == 0
    budget.subtract_num_batched_tokens(seq_group.request_id, 2)
    assert budget.remaining_token_budget() == 4
    assert budget.num_batched_tokens == 0

    # Verify add/subtract max seqs.
    _, seq_group = create_dummy_prompt("1", 3)
    budget.add_num_seqs(seq_group.request_id, 2)
    assert budget.can_schedule(num_new_tokens=1, num_new_seqs=2)
    assert not budget.can_schedule(num_new_tokens=1, num_new_seqs=3)
    assert budget.num_curr_seqs == 2
    # Verify adding another seq group is no-op.
    budget.add_num_seqs(seq_group.request_id, 2)
    assert budget.num_curr_seqs == 2
    budget.subtract_num_seqs(seq_group.request_id, 2)
    assert budget.num_curr_seqs == 0
    budget.subtract_num_seqs(seq_group.request_id, 2)
    assert budget.num_curr_seqs == 0
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974


@pytest.mark.parametrize("enable_prefix_caching", [True, False])
def test_prefix_caching_aware_prefills(enable_prefix_caching):
    """
    Test the below scenario:

    For 3 sequences, seqA, seqB, seqC, share the first block as prefix.

    The test verifies the below scenarios:
    1.  SeqA is first scheduled.
    2.  SeqB and SeqC can be prefilled together in a single schedule round
    even though there are not enough token budgets to prefill both without
    considering prefix caching.
    """

    block_size = 4
    max_num_batched_tokens = 12
    max_seq_group = 3
    scheduler = initialize_scheduler(
        block_size=block_size,
        num_cpu_blocks=16,
        num_gpu_blocks=16,
        max_token_budget=max_num_batched_tokens,
        max_num_seqs=max_seq_group,
        max_model_len=max_num_batched_tokens,
        enable_prefix_caching=enable_prefix_caching,
    )

    seqA_tokens = list(range(8))
    num_shared_tokens = 4
    seqB_tokens = seqA_tokens[:num_shared_tokens] + list(range(
        12, 16))  # Shared prefix first 4.
    seqC_tokens = seqA_tokens[:num_shared_tokens] + list(range(
        16, 20))  # Shared prefix first 4.

    seqA, seqA_group = create_dummy_prompt("0",
                                           prompt_tokens=seqA_tokens,
                                           block_size=block_size)
    seqB, seqB_group = create_dummy_prompt("1",
                                           prompt_tokens=seqB_tokens,
                                           block_size=block_size)
    seqC, seqC_group = create_dummy_prompt("2",
                                           prompt_tokens=seqC_tokens,
                                           block_size=block_size)

    # Schedule seqA prefill.
    scheduler.add_seq_group(seqA_group)
    metas, out, _ = scheduler.schedule()
    assert (len(out.scheduled_seq_groups) == 1
            and out.scheduled_seq_groups[0].seq_group == seqA_group)
    assert out.scheduled_seq_groups[0].token_chunk_size == len(seqA_tokens)

    # Schedule seqA decode.
    append_new_token_seq_group(len(seqA_tokens), seqA_group, 999)
    metas, out, _ = scheduler.schedule()

    assert len(out.scheduled_seq_groups) == 1
    assert out.scheduled_seq_groups[0].seq_group == seqA_group
    assert out.scheduled_seq_groups[0].token_chunk_size == 1

    # Schedule seqB and seqC prefills should work with prefix caching.
    scheduler.add_seq_group(seqB_group)
    scheduler.add_seq_group(seqC_group)
    metas, out, _ = scheduler.schedule()

    if enable_prefix_caching:
        assert len(out.scheduled_seq_groups) == 2
        assert set([
            out.scheduled_seq_groups[0].seq_group,
            out.scheduled_seq_groups[1].seq_group,
        ]) == set([seqB_group, seqC_group])
        assert len(metas) == 2
        for meta in metas:
            assert meta.token_chunk_size == 8
            assert (len(meta.computed_block_nums) == num_shared_tokens //
                    block_size)  # 1 Block for the 8 tokens.
    else:
        assert len(out.scheduled_seq_groups) == 1
        assert len(metas) == 1
        assert metas[0].token_chunk_size == 8
        assert len(metas[0].computed_block_nums) == 0  # No blocks computed.


def test_no_multiple_partial_prefills_with_chunked_prefill_and_prefix_caching(
):
    """
    This test verifies that we don't schedule new prefills if there's already
    a continuous prefill in progress even though the new prefills with shared
    prefix can fit in the token budget:

    - SeqA is being chunked prefill.
    - SeqB with the same prompt shouldn't be scheduled for prefill even though
    there's enough token budget to prefill the cached tokens.
    - Neither should seqC be scheduled.

    - When seqA is in decoding phase, seqB and seqC can be scheduled.
        - Entire seqB should be prefilled since it's a full prefix cache hit.
        - SeqC would be partially prefilled with the prefix shared, and the
        remaining unique tokens would be prefilled (rounded down to be
        block-size aligned).
    """

    block_size = 2
    max_num_batched_tokens = 4
    max_seq_group = 3
    scheduler = initialize_scheduler(
        block_size=block_size,
        num_cpu_blocks=16,
        num_gpu_blocks=16,
        max_token_budget=max_num_batched_tokens,
        max_num_seqs=max_seq_group,
        max_model_len=100,
        enable_prefix_caching=True,
        enable_chunked_prefill=True,
    )

    seqA_tokens = list(range(8))
    seqB_tokens = seqA_tokens
    seqC_shared_prefix_len = 4
    seqC_tokens = seqA_tokens[:seqC_shared_prefix_len] + list(range(12, 20))

    seqA, seqA_group = create_dummy_prompt("0",
                                           prompt_tokens=seqA_tokens,
                                           block_size=block_size)
    seqB, seqB_group = create_dummy_prompt("1",
                                           prompt_tokens=seqB_tokens,
                                           block_size=block_size)

    # Chunked prefill seqA.
    scheduler.add_seq_group(seqA_group)
    metas, out = schedule_and_update_computed_tokens(scheduler)
    assert len(out.scheduled_seq_groups) == 1
    assert out.scheduled_seq_groups[0].seq_group == seqA_group
    assert out.scheduled_seq_groups[0].token_chunk_size == 4

    # seqB should not be scheduled with ongoing prefills.
    scheduler.add_seq_group(seqB_group)
    metas, out = schedule_and_update_computed_tokens(scheduler)
    assert len(out.scheduled_seq_groups) == 1
    assert out.scheduled_seq_groups[0].seq_group == seqA_group
    assert out.scheduled_seq_groups[0].token_chunk_size == 4

    # both seqB and seqC can now be scheduled with seqA is over.
    # seqA is in decoding phase.
    append_new_token_seq(seqA, 999)
    seqC, seqC_group = create_dummy_prompt("2",
                                           prompt_tokens=seqC_tokens,
                                           block_size=block_size)
    scheduler.add_seq_group(seqC_group)
    metas, out = schedule_and_update_computed_tokens(scheduler)
    assert len(out.scheduled_seq_groups) == 3

    metas = {meta.request_id: meta for meta in metas}
    assert metas[seqA_group.request_id].token_chunk_size == 1  # Decode
    assert (metas[seqB_group.request_id].token_chunk_size == 8
            )  # Fully cached prefill
    assert (
        metas[seqC_group.request_id].token_chunk_size == 6
    ), "A partial prefix of C (4 tokens) should be prefilled, with the "
    "remaining tokens fit into 3 token budget (4-1 from the seqA). It will "
    "then be rounded down to 2 tokens on block size, thus 6 tokens in total."
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044


def test_no_batches_mixed_with_prompt_tokens_and_prompt_embeds():
    """
    Test that the scheduler does not schedule batches with prompt tokens and 
    prompt embeddings co-mingled.
    """
    block_size = 2
    max_seq_group = 3
    scheduler = initialize_scheduler(
        block_size=block_size,
        num_cpu_blocks=16,
        num_gpu_blocks=16,
        max_num_seqs=max_seq_group,
        max_model_len=100,
        enable_prefix_caching=True,
    )

    # the odd indexed inputs should be passed in via embeddings,
    # evens via token_ids
    seq_length = 7
    embedding_size = 5
    num_seqs = 11
    seq_tokens: list[list[int]] = []
    seq_embeds: list[Optional[torch.Tensor]] = []
    for i in range(num_seqs):
        if i % 2:
            seq_tokens.append(list(range(seq_length)))
            seq_embeds.append(None)
        else:
            seq_tokens.append([0] * seq_length)
            seq_embeds.append(torch.rand(embedding_size))

    seq_and_seq_groups = [
        create_dummy_prompt(f"{i}",
                            prompt_tokens=seq_tokens[i],
                            prompt_embeds=seq_embeds[i],
                            block_size=block_size)
        for i in range(len(seq_tokens))
    ]

    for _, seq_group in seq_and_seq_groups:
        scheduler.add_seq_group(seq_group)

    while not all(seq.is_finished() for seq, _ in seq_and_seq_groups):
        unfinished_seq_groups = [
            seq_group for _, seq_group in seq_and_seq_groups
            if not seq_group.is_finished()
        ]
        _, out = schedule_and_update_computed_tokens(scheduler)
        assert len(out.scheduled_seq_groups) > 0
        batch_is_prompt_embeds = out.scheduled_seq_groups[
            0].seq_group.uses_prompt_embeds()
        expected_scheduled_seq_groups = [
            seq_group for seq_group in unfinished_seq_groups
            if seq_group.uses_prompt_embeds() == batch_is_prompt_embeds
        ]

        # We should have as many scheduled groups as possible, without mixing
        assert len(out.scheduled_seq_groups) == min(
            max_seq_group, len(expected_scheduled_seq_groups))
        assert all(scheduled_seq_group.seq_group.uses_prompt_embeds() ==
                   batch_is_prompt_embeds
                   for scheduled_seq_group in out.scheduled_seq_groups)

        # Finish the scheduled groups
        for scheduled_seq_group in out.scheduled_seq_groups:
            for seq in scheduled_seq_group.seq_group.seqs:
                seq.status = SequenceStatus.FINISHED_STOPPED
        scheduler.free_finished_seq_groups()
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338


def test_remove_seq_from_computed_blocks_tracker():
    """
    Test that computed_blocks_tracker correctly removes stale sequences
    during scheduling.

    The test covers 9 scheduling branches where stale seqs are removed:
    - 1 in _schedule_swapped
    - 1 in _schedule_priority_preemption
    - 7 in _schedule_prefill

    Each branch is tested to ensure proper cleanup of
    _seq_id_to_num_tokens_computed.
    """
    # Budget can not schedule in swapped
    block_size = 2
    max_seq_group = 3
    seq_tokens_with_swapped: list[list[int]] = []
    blocks_to_swap_out: list[tuple[int, int]] = []
    curr_loras: set[int] = set()

    scheduler = initialize_scheduler(
        block_size=block_size,
        num_cpu_blocks=64,
        num_gpu_blocks=16,
        max_num_seqs=max_seq_group,
        enable_prefix_caching=True,
    )
    budget = create_token_budget(token_budget=15)

    seq_length = 16
    num_seqs = 3
    for i in range(num_seqs):
        seq_tokens_with_swapped.append([i] * seq_length)

    seq_and_seq_groups = [
        create_dummy_prompt(f"{i}",
                            prompt_tokens=seq_tokens_with_swapped[i],
                            block_size=block_size)
        for i in range(len(seq_tokens_with_swapped))
    ]

    for _, seq_group in seq_and_seq_groups:
        scheduler._allocate_and_set_running(seq_group)
        scheduler._swap_out(seq_group, blocks_to_swap_out)
        scheduler._add_seq_group_to_swapped(seq_group)

    scheduler._schedule_swapped(budget, curr_loras)
    seq_id_to_num_tokens_computed = (
        scheduler.block_manager._computed_blocks_tracker.
        _seq_id_to_num_tokens_computed.get(1))
    assert seq_id_to_num_tokens_computed is None

    # Prefill schedule don't have a space for another LoRA, so
    # we ignore this request for now.
    block_size = 4
    lora_config = LoRAConfig(max_lora_rank=8, max_loras=1)
    scheduler = initialize_scheduler(lora_config=lora_config,
                                     block_size=block_size,
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64,
                                     enable_prefix_caching=True)
    budget = create_token_budget(token_budget=120)
    num_seqs = 2
    for i in range(num_seqs):
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=seq_length,
                                           block_size=block_size,
                                           lora_request=LoRARequest(
                                               lora_name=str(i),
                                               lora_int_id=i + 1,
                                               lora_path="abc"))
        scheduler.add_seq_group(seq_group)

    scheduler._schedule_prefills(budget, curr_loras)
    seq_id_to_num_tokens_computed = (
        scheduler.block_manager._computed_blocks_tracker.
        _seq_id_to_num_tokens_computed.get(1))
    assert seq_id_to_num_tokens_computed is None

    # Priority preemption schedule
    scheduler._schedule_priority_preemption(budget)
    seq_id_to_num_tokens_computed = (
        scheduler.block_manager._computed_blocks_tracker.
        _seq_id_to_num_tokens_computed.get(1))
    assert seq_id_to_num_tokens_computed is None

    # Prefill scheduler does not schedule batches with prompt tokens and
    # prompt embeddings co-mingled.
    block_size = 2
    max_seq_group = 3
    scheduler = initialize_scheduler(
        block_size=block_size,
        num_cpu_blocks=16,
        num_gpu_blocks=16,
        max_num_seqs=max_seq_group,
        max_model_len=100,
        enable_prefix_caching=True,
    )
    seq_length = 7
    embedding_size = 5
    seq_tokens_with_embedding: list[list[int]] = []
    seq_embeds: list[Optional[torch.Tensor]] = []

    seq_tokens_with_embedding.append(list(range(seq_length)))
    seq_embeds.append(None)
    seq_tokens_with_embedding.append([0] * seq_length)
    seq_embeds.append(torch.rand(embedding_size))

    seq_and_seq_groups = [
        create_dummy_prompt(f"{i}",
                            prompt_tokens=seq_tokens_with_embedding[i],
                            prompt_embeds=seq_embeds[i],
                            block_size=block_size)
        for i in range(len(seq_tokens_with_embedding))
    ]

    for _, seq_group in seq_and_seq_groups:
        scheduler.add_seq_group(seq_group)

    scheduler._schedule_default()
    seq_id_to_num_tokens_computed = (
        scheduler.block_manager._computed_blocks_tracker.
        _seq_id_to_num_tokens_computed.get(1))
    assert seq_id_to_num_tokens_computed is None

    #  Prefill scheduler budget num_batched_tokens
    #  >= scheduler_config max_num_batched_tokens
    block_size = 2
    max_seq_group = 3
    seq_tokens_prefill_budget: list[list[int]] = []

    scheduler = initialize_scheduler(
        block_size=block_size,
        max_token_budget=8,
        num_cpu_blocks=16,
        num_gpu_blocks=16,
        max_num_seqs=max_seq_group,
        max_model_len=5,
        enable_prefix_caching=True,
    )
    seq_length = 4
    num_seqs = 3
    for i in range(num_seqs):
        seq_tokens_prefill_budget.append([i] * seq_length)

    seq_and_seq_groups = [
        create_dummy_prompt(f"{i}",
                            prompt_tokens=seq_tokens_prefill_budget[i],
                            block_size=block_size)
        for i in range(len(seq_tokens_prefill_budget))
    ]

    for _, seq_group in seq_and_seq_groups:
        scheduler.add_seq_group(seq_group)

    scheduler._schedule_default()
    seq_id_to_num_tokens_computed = (
        scheduler.block_manager._computed_blocks_tracker.
        _seq_id_to_num_tokens_computed.get(2))
    assert seq_id_to_num_tokens_computed is None

    # Budget can not schedule in waiting
    block_size = 2
    max_seq_group = 3

    scheduler = initialize_scheduler(
        block_size=block_size,
        max_token_budget=30,
        num_cpu_blocks=16,
        num_gpu_blocks=16,
        max_num_seqs=max_seq_group,
        max_model_len=30,
        enable_prefix_caching=True,
    )
    seq_length = 16
    num_seqs = 3
    seq_tokens_prefill_budget_waiting: list[list[int]] = []

    for i in range(num_seqs):
        seq_tokens_prefill_budget_waiting.append(list(range(seq_length)))

    seq_and_seq_groups = [
        create_dummy_prompt(f"{i}",
                            prompt_tokens=seq_tokens_prefill_budget_waiting[i],
                            block_size=block_size)
        for i in range(len(seq_tokens_prefill_budget_waiting))
    ]

    for _, seq_group in seq_and_seq_groups:
        scheduler.add_seq_group(seq_group)

    scheduler._schedule_default()
    seq_id_to_num_tokens_computed = (
        scheduler.block_manager._computed_blocks_tracker.
        _seq_id_to_num_tokens_computed.get(1))
    assert seq_id_to_num_tokens_computed is None

    # Sequence num_new_tokens > prompt_limit marked FINISHED_IGNORED
    block_size = 2
    max_seq_group = 3
    scheduler = initialize_scheduler(
        block_size=block_size,
        num_cpu_blocks=16,
        num_gpu_blocks=16,
        max_num_seqs=max_seq_group,
        max_model_len=30,
        enable_prefix_caching=True,
    )

    seq_length = 31
    seq_tokens_prompt_limit: list[list[int]] = []
    seq_tokens_prompt_limit.append(list(range(seq_length)))
    seq_and_seq_groups = [
        create_dummy_prompt("0",
                            prompt_tokens=seq_tokens_prompt_limit[0],
                            block_size=block_size)
    ]
    for _, seq_group in seq_and_seq_groups:
        scheduler.add_seq_group(seq_group)
    scheduler._schedule_default()
    seq_id_to_num_tokens_computed = (
        scheduler.block_manager._computed_blocks_tracker.
        _seq_id_to_num_tokens_computed.get(0))
    assert seq_id_to_num_tokens_computed is None

    # Budget can not allocate, AllocStatus is NEVER marked FINISHED_IGNORED
    block_size = 2
    max_seq_group = 3
    scheduler = initialize_scheduler(
        block_size=block_size,
        num_cpu_blocks=160,
        num_gpu_blocks=160,
        max_num_seqs=max_seq_group,
        max_model_len=320,
        enable_prefix_caching=True,
    )

    seq_length = 320
    num_seqs = 1
    seq_tokens_never: list[list[int]] = []
    for i in range(num_seqs):
        seq_tokens_never.append(list(range(seq_length)))

    seq_and_seq_groups = [
        create_dummy_prompt(f"{i}",
                            prompt_tokens=seq_tokens_never[i],
                            block_size=block_size)
        for i in range(len(seq_tokens_never))
    ]

    for _, seq_group in seq_and_seq_groups:
        scheduler.add_seq_group(seq_group)

    scheduler._schedule_default()
    seq_id_to_num_tokens_computed = (
        scheduler.block_manager._computed_blocks_tracker.
        _seq_id_to_num_tokens_computed.get(0))
    assert seq_id_to_num_tokens_computed is None

    # Budget can not allocate, AllocStatus is LATER
    block_size = 2
    max_seq_group = 3
    scheduler = initialize_scheduler(
        block_size=block_size,
        num_cpu_blocks=160,
        num_gpu_blocks=160,
        max_num_seqs=max_seq_group,
        max_model_len=320,
        enable_prefix_caching=True,
    )

    seq_length = 160
    num_seqs = 2
    seq_tokens_later: list[list[int]] = []
    for i in range(num_seqs):
        seq_tokens_later.append(list(range(seq_length)))

    seq_and_seq_groups = [
        create_dummy_prompt(f"{i}",
                            prompt_tokens=seq_tokens_later[i],
                            block_size=block_size)
        for i in range(len(seq_tokens_later))
    ]

    for _, seq_group in seq_and_seq_groups:
        scheduler.add_seq_group(seq_group)

    scheduler._schedule_default()
    seq_id_to_num_tokens_computed = (
        scheduler.block_manager._computed_blocks_tracker.
        _seq_id_to_num_tokens_computed.get(1))
    assert seq_id_to_num_tokens_computed is None