"vscode:/vscode.git/clone" did not exist on "f231e5bc21d5c972bffe3aae286c0102c5623e48"
test_scheduler.py 43 KB
Newer Older
1
import time
2
from collections import deque
3
from typing import List, Set, Tuple
4
from unittest.mock import MagicMock
5

6
7
import pytest
from torch import Use  # noqa
8

9
10
11
12
from vllm.config import CacheConfig, LoRAConfig, SchedulerConfig
from vllm.core.interfaces import AllocStatus
from vllm.core.scheduler import Scheduler, SchedulingBudget
from vllm.lora.request import LoRARequest
13
from vllm.sequence import SequenceGroup, SequenceStatus
14

15
from ..utils import check_deprecated_block_manager_usage
16
17
18
from .utils import (append_new_token, append_new_token_seq_group,
                    create_dummy_prompt, get_sequence_groups,
                    schedule_and_update_computed_tokens)
19
20


21
22
23
24
25
26
@pytest.fixture(scope="module", autouse=True)
def check_deprecated_block_manager():
    check_deprecated_block_manager_usage(
        "tests/core/test_chunked_prefill_scheduler.py")


27
28
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_scheduler_add_seq_group(use_v2_block_manager: bool):
29
    block_size = 4
30
31
    scheduler_config = SchedulerConfig(
        100, 64, 1, use_v2_block_manager=use_v2_block_manager)
32
    cache_config = CacheConfig(block_size, 1.0, 1, cache_dtype="auto")
33
34
35
36
37
38
39
    cache_config.num_cpu_blocks = 4
    cache_config.num_gpu_blocks = 4
    scheduler = Scheduler(scheduler_config, cache_config, None)

    # Add seq group to scheduler.
    num_seq_group = 4
    for i in range(num_seq_group):
40
41
42
        _, seq_group = create_dummy_prompt(str(i),
                                           block_size,
                                           block_size=block_size)
43
44
45
46
        scheduler.add_seq_group(seq_group)
        assert scheduler.get_num_unfinished_seq_groups() == i + 1


47
48
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_scheduler_abort_seq_group(use_v2_block_manager: bool):
49
    block_size = 4
50
51
    scheduler_config = SchedulerConfig(
        100, 64, 1, use_v2_block_manager=use_v2_block_manager)
52
53
54
55
56
57
58
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
    cache_config.num_cpu_blocks = 4
    cache_config.num_gpu_blocks = 4
    scheduler = Scheduler(scheduler_config, cache_config, None)

    # Add multiple seq groups to scheduler.
    num_seq_group = 4
59
    request_ids: Set[str] = set()
60
61
62
63
64
65
66
67
68
69
70
    for i in range(num_seq_group):
        _, seq_group = create_dummy_prompt(str(i), block_size)
        scheduler.add_seq_group(seq_group)
        request_ids.add(str(i))

    # Abort all added seq groups.
    assert scheduler.get_num_unfinished_seq_groups() == num_seq_group
    scheduler.abort_seq_group(request_ids)
    assert scheduler.get_num_unfinished_seq_groups() == 0


71
72
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_scheduler_schedule_simple(use_v2_block_manager: bool):
73
74
75
    block_size = 4
    num_seq_group = 4
    max_model_len = 16
76
77
78
79
80
    scheduler_config = SchedulerConfig(
        64,
        num_seq_group,
        max_model_len,
        use_v2_block_manager=use_v2_block_manager)
81
82
83
84
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
    cache_config.num_cpu_blocks = 8
    cache_config.num_gpu_blocks = 8
    scheduler = Scheduler(scheduler_config, cache_config, None)
85
    running: List[SequenceGroup] = []
86
87
88

    # Add seq groups to scheduler.
    for i in range(num_seq_group):
89
90
91
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=block_size,
                                           block_size=block_size)
92
93
94
95
        scheduler.add_seq_group(seq_group)
        running.append(seq_group)

    # Schedule seq groups prompts.
96
    num_tokens = block_size * num_seq_group
97
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
98
    assert set(get_sequence_groups(out)) == set(running)
99
    assert out.num_batched_tokens == num_tokens
100
101
102
    assert (not out.blocks_to_copy and not out.blocks_to_swap_in
            and not out.blocks_to_swap_out)
    assert len(seq_group_meta) == num_seq_group
103
    append_new_token(out, 1)
104
105

    # Schedule seq groups generation.
106
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
107
    assert set(get_sequence_groups(out)) == set(running)
108
109
110
111
    assert out.num_batched_tokens == num_seq_group
    assert (not out.blocks_to_copy and not out.blocks_to_swap_in
            and not out.blocks_to_swap_out)
    assert len(seq_group_meta) == num_seq_group
112
113
114
    append_new_token(out, 1)


115
116
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_scheduler_prefill_prioritized(use_v2_block_manager: bool):
117
118
119
120
    """Verify running batched tokens are not applied to prefill requests."""
    block_size = 4
    max_model_len = 30
    max_batched_num_tokens = 30
121
122
123
124
125
    scheduler_config = SchedulerConfig(
        max_batched_num_tokens,
        2,
        max_model_len,
        use_v2_block_manager=use_v2_block_manager)
126
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
127
128
    cache_config.num_cpu_blocks = 16
    cache_config.num_gpu_blocks = 16
129
130
131
    scheduler = Scheduler(scheduler_config, cache_config, None)

    # Add seq groups to scheduler.
132
    _, seq_group_a = create_dummy_prompt("1", 1, block_size=block_size)
133
134
135
136
137
138
139
    scheduler.add_seq_group(seq_group_a)

    # Schedule seq groups prompts.
    _, out = schedule_and_update_computed_tokens(scheduler)
    assert get_sequence_groups(out) == [seq_group_a]

    # Add a new prefill request B.
140
    _, seq_group_b = create_dummy_prompt("2", 30, block_size=block_size)
141
142
143
144
145
146
    scheduler.add_seq_group(seq_group_b)

    # Verify prefill requests are prioritized. Since max_batched_num_tokens
    # is 1, new prefill request has to be scheduled first.
    _, out = schedule_and_update_computed_tokens(scheduler)
    assert get_sequence_groups(out) == [seq_group_b]
147
148


149
150
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_scheduler_schedule_preempt_abort(use_v2_block_manager: bool):
151
152
    block_size = 4
    max_model_len = 16
153
154
    scheduler_config = SchedulerConfig(
        64, 2, max_model_len, use_v2_block_manager=use_v2_block_manager)
155
156
157
158
159
160
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
    cache_config.num_cpu_blocks = 2
    cache_config.num_gpu_blocks = 2
    scheduler = Scheduler(scheduler_config, cache_config, None)

    # Add seq groups to scheduler.
161
162
163
164
165
166
    seq_a, seq_group_a = create_dummy_prompt("1",
                                             block_size,
                                             block_size=block_size)
    seq_b, seq_group_b = create_dummy_prompt("2",
                                             block_size,
                                             block_size=block_size)
167
168
169
170
    scheduler.add_seq_group(seq_group_a)
    scheduler.add_seq_group(seq_group_b)

    # Schedule seq groups prompts.
171
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
172
    assert get_sequence_groups(out) == [seq_group_a, seq_group_b]
173
    assert out.num_batched_tokens == block_size * 2  # seq_a and seq_b
174
175
176
177
178
179
180
    assert (not out.blocks_to_copy and not out.blocks_to_swap_in
            and not out.blocks_to_swap_out)
    assert len(seq_group_meta) == 2
    assert scheduler.get_num_unfinished_seq_groups() == 2

    # Append "generated" tokens, allowing the sequence to mark prompt tokens as
    # processed.
181
    append_new_token(out, 1)
182
183

    # Schedule seq groups generation and preempt seq group b.
184
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
185
    assert get_sequence_groups(out) == [seq_group_a]
186
187
188
189
190
    assert out.num_batched_tokens == 1
    assert (not out.blocks_to_copy and not out.blocks_to_swap_in
            and not out.blocks_to_swap_out)
    assert len(seq_group_meta) == 1
    assert scheduler.get_num_unfinished_seq_groups() == 2
191
    assert out.preempted == 1
192
193
194

    # Abort seq group a. Re-schedule seq group b prompt with recomputation.
    scheduler.abort_seq_group("1")
195
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
196
    assert get_sequence_groups(out) == [seq_group_b]
197
    assert out.num_batched_tokens == 5  # 4 prompt + 1 generation.
198
199
200
201
202
203
    assert (not out.blocks_to_copy and not out.blocks_to_swap_in
            and not out.blocks_to_swap_out)
    assert len(seq_group_meta) == 1
    assert scheduler.get_num_unfinished_seq_groups() == 1


204
205
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_scheduler_max_seqs(use_v2_block_manager: bool):
206
207
208
209
    block_size = 4
    num_seq_group = 4
    max_seq_group = 2
    max_model_len = 16
210
211
212
213
214
    scheduler_config = SchedulerConfig(
        64,
        max_seq_group,
        max_model_len,
        use_v2_block_manager=use_v2_block_manager)
215
216
217
218
219
220
221
222
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
    cache_config.num_cpu_blocks = 8
    cache_config.num_gpu_blocks = 8
    scheduler = Scheduler(scheduler_config, cache_config, None)

    all_seq_groups: List[SequenceGroup] = []
    # Add seq groups to scheduler.
    for i in range(num_seq_group):
223
224
225
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=block_size,
                                           block_size=block_size)
226
227
228
229
230
231
        all_seq_groups.append(seq_group)

    # Append 1 seq group
    scheduler.add_seq_group(all_seq_groups[0])

    # Schedule seq groups prompts.
232
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
233
    assert set(get_sequence_groups(out)) == set([all_seq_groups[0]])
234
    append_new_token(out, 1)
235
236

    # Schedule seq groups generation.
237
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
238
    assert set(get_sequence_groups(out)) == set([all_seq_groups[0]])
239
    append_new_token(out, 1)
240
241
242
243
244
245
246
247

    # Append 2 more seq group
    scheduler.add_seq_group(all_seq_groups[1])
    scheduler.add_seq_group(all_seq_groups[2])

    # Schedule seq groups prompts.
    # Only 1 seq group should be scheduled since max_seq_group is 2
    # and one is prompting.
248
    _, out = schedule_and_update_computed_tokens(scheduler)
249
    assert set(get_sequence_groups(out)) == set([all_seq_groups[1]])
250
251


252
253
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_scheduler_delay_factor(use_v2_block_manager: bool):
254
    block_size = 4
255
256
257
258
259
260
    scheduler_config = SchedulerConfig(
        100,
        64,
        16,
        delay_factor=0.5,
        use_v2_block_manager=use_v2_block_manager)
261
262
263
264
265
266
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
    cache_config.num_cpu_blocks = 8
    cache_config.num_gpu_blocks = 8
    scheduler = Scheduler(scheduler_config, cache_config, None)

    # schedule first prompt
267
    seq_group_meta, seq_group = create_dummy_prompt("0",
268
269
                                                    prompt_length=block_size,
                                                    block_size=block_size)
270
    scheduler.add_seq_group(seq_group)
271
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
272
    assert out.num_prefill_groups > 0
273
    assert seq_group_meta[0].request_id == '0'
274
    append_new_token(out, 1)
275
276
277

    # wait for a second before scheduling next prompt
    time.sleep(1)
278
    seq_group_meta, seq_group = create_dummy_prompt("1",
279
280
                                                    prompt_length=block_size,
                                                    block_size=block_size)
281
282
283
    scheduler.add_seq_group(seq_group)

    # second prompt should *not* be scheduled
284
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
285
    assert out.num_prefill_groups == 0
286
    assert seq_group_meta[0].request_id == '0'
287
    append_new_token(out, 1)
288
289
290

    # wait for more than 0.5 second and try again
    time.sleep(0.6)
291
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
292
    assert out.num_prefill_groups > 0
293
    assert seq_group_meta[0].request_id == '1'
294
    append_new_token(out, 1)
295
296


297
298
299
300
301
302
303
304
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_swapped_out_prioritized(use_v2_block_manager: bool):
    block_size = 4
    scheduler = initialize_scheduler(max_num_seqs=6,
                                     block_size=block_size,
                                     use_v2_block_manager=use_v2_block_manager,
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
305
306
    # best_of=2 * 3 == 6 sequences.
    for i in range(3):
307
308
309
310
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           best_of=2,
                                           block_size=block_size)
311
        scheduler.add_seq_group(seq_group)
312
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
313
314
    # prefill scheduled now.
    assert len(out.scheduled_seq_groups) == 3
315
    append_new_token(out, 1)
316
317
318
319
320
321
322
323
324
325

    # The last request should be swapped out.
    scheduler.block_manager.can_append_slots = MagicMock()

    def cannot_append_second_group(seq_group, num_lookahead_slots):
        return seq_group.request_id != "2"

    scheduler.block_manager.can_append_slots.side_effect = (
        cannot_append_second_group)

326
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
327
328
    assert len(out.scheduled_seq_groups) == 2
    assert out.num_batched_tokens == 2
329
330
    assert out.blocks_to_swap_out != []
    assert out.blocks_to_swap_in == []
331
    append_new_token(out, 1)
332
333

    # Add 1 more task. Swap should be prioritized over prefill.
334
335
336
337
    _, seq_group = create_dummy_prompt(str(i),
                                       prompt_length=60,
                                       best_of=2,
                                       block_size=block_size)
338
    scheduler.add_seq_group(seq_group)
339
340
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
    append_new_token(out, 1)
341
342
343
    assert len(out.scheduled_seq_groups) == 3
    # 3 decodes. It is swapped in.
    assert out.num_batched_tokens == 3
344
345
    assert out.blocks_to_swap_in != []
    assert out.blocks_to_swap_out == []
346
347


348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
def initialize_scheduler(
    *,
    max_num_seqs=1000,
    max_token_budget=1000,
    max_model_len=1000,
    lora_config=None,
    use_v2_block_manager=False,
    block_size=4,
    num_cpu_blocks=8,
    num_gpu_blocks=8,
):
    block_size = block_size
    scheduler_config = SchedulerConfig(
        max_token_budget,
        max_num_seqs,
        max_model_len,
        use_v2_block_manager=use_v2_block_manager)
365
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
366
367
    cache_config.num_cpu_blocks = num_cpu_blocks
    cache_config.num_gpu_blocks = num_gpu_blocks
368
369
370
371
    scheduler = Scheduler(scheduler_config, cache_config, lora_config)
    return scheduler


372
def create_token_budget(token_budget: int = 10000,
373
374
375
376
377
378
379
                        max_num_seqs: int = 10000) -> SchedulingBudget:
    return SchedulingBudget(
        token_budget=token_budget,
        max_num_seqs=max_num_seqs,
    )


380
381
382
383
384
385
386
387
388
def add_token_budget(budget: SchedulingBudget,
                     num_batched_tokens: int = 0,
                     num_curr_seqs: int = 0):
    mock_seq_group = create_dummy_prompt('10', prompt_length=60)[1]
    budget.add_num_batched_tokens(mock_seq_group.request_id,
                                  num_batched_tokens)
    budget.add_num_seqs(mock_seq_group.request_id, num_curr_seqs)


389
390
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_prefill_schedule_max_prompt_len(use_v2_block_manager: bool):
391
392
393
    """
    Test prompt longer than max_prompt_len is aborted.
    """
394
395
396
397
398
399
400
    block_size = 4
    scheduler = initialize_scheduler(max_model_len=30,
                                     use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size)
    _, seq_group = create_dummy_prompt("0",
                                       prompt_length=60,
                                       block_size=block_size)
401
    scheduler.add_seq_group(seq_group)
402
    budget = create_token_budget()
403
404
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
405
406
407
408
409
410
411
    assert len(output.ignored_seq_groups) == 1
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
    assert len(remaining_waiting) == 0


412
413
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_prefill_schedule_token_budget(use_v2_block_manager: bool):
414
415
416
    """
    Test token budget respected.
    """
417
418
419
420
421
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
422
423
    budget = create_token_budget(token_budget=0)
    for i in range(2):
424
425
426
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
427
        scheduler.add_seq_group(seq_group)
428
429

    # 0 token budget == nothing is scheduled.
430
431
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
432
433
434
435
436
437
438
439
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
    assert len(remaining_waiting) == 2

    # 60 token budget == 1 request scheduled.
    budget = create_token_budget(token_budget=60)
440
441
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
442
443
444
445
446
447
448
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 1
    assert budget.num_batched_tokens == 60
    assert budget.num_curr_seqs == 1
    assert len(remaining_waiting) == 1

    # Test when current_batched_tokens respected.
449
450
451
452
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=16,
                                     num_gpu_blocks=16)
453
454
    budget = create_token_budget(token_budget=60)
    add_token_budget(budget, 30, 0)
455
456
457
    _, seq_group = create_dummy_prompt(str(i),
                                       prompt_length=60,
                                       block_size=block_size)
458
    # Cannot schedule a prompt that doesn't fit the budget.
459
460
461
    scheduler.add_seq_group(seq_group)
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
462
463
464
465
466
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 30
    assert budget.num_curr_seqs == 0
    assert len(remaining_waiting) == 1
467
468
    budget = create_token_budget(token_budget=90)
    add_token_budget(budget, 30, 0)
469
470
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
471
472
473
474
475
476
    assert len(output.seq_groups) == 1
    assert budget.num_batched_tokens == 90
    assert budget.num_curr_seqs == 1
    assert len(remaining_waiting) == 0


477
478
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_prefill_schedule_max_seqs(use_v2_block_manager: bool):
479
480
481
    """
    Test max seq respected.
    """
482
483
484
485
486
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
487
488
    budget = create_token_budget(max_num_seqs=2)
    for i in range(3):
489
490
491
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
492
493
494
        scheduler.add_seq_group(seq_group)
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
495
496
497
498
499
500
501
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 2
    assert budget.num_batched_tokens == 120
    assert budget.num_curr_seqs == 2
    assert len(remaining_waiting) == 1

    # Verify curr_num_seqs respected.
502
    scheduler.waiting = deque()
503
504
    budget = create_token_budget(max_num_seqs=2)
    add_token_budget(budget, 0, 2)
505
506
507
    _, seq_group = create_dummy_prompt(str(i),
                                       prompt_length=60,
                                       block_size=block_size)
508
509
510
    scheduler.add_seq_group(seq_group)
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
511
512
513
514
515
516
517
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 2
    assert len(remaining_waiting) == 1


518
519
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_prefill_schedule_max_lora(use_v2_block_manager: bool):
520
521
522
    """
    Test max lora is respected and prioritized.
    """
523
    block_size = 4
524
    lora_config = LoRAConfig(max_lora_rank=8, max_loras=1)
525
526
527
528
529
    scheduler = initialize_scheduler(lora_config=lora_config,
                                     use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
530
    budget = create_token_budget(token_budget=120)
531
    curr_loras: Set[int] = set()
532
533
534
    for i in range(2):
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
535
                                           block_size=block_size,
536
537
538
                                           lora_request=LoRARequest(
                                               lora_name=str(i),
                                               lora_int_id=i + 1,
539
                                               lora_path="abc"))
540
        scheduler.add_seq_group(seq_group)
541
542
543
544
545
546
    # Add two more requests to verify lora is prioritized.
    # 0: Lora, 1: Lora, 2: regular, 3: regular
    # In the first iteration, index 0, 2 is scheduled.
    # If a request is not scheduled because it hits max lora, it is
    # prioritized. Verify that.
    for i in range(2, 4):
547
548
549
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
550
        scheduler.add_seq_group(seq_group)
551
    # Schedule 2 requests (0 and 2)
552
553
    output = scheduler._schedule_prefills(budget, curr_loras)
    remaining_waiting = scheduler.waiting
554
555
556
557
558
559
560
561
562
563
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 2
    assert budget.num_batched_tokens == 120
    assert budget.num_curr_seqs == 2
    assert len(remaining_waiting) == 2
    assert len(curr_loras) == 1
    # The second lora request is scheduled next as FCFS policy.
    # Reset curr_loras so that it can be scheduled.
    curr_loras = set()
    budget = create_token_budget(token_budget=60)
564
565
    output = scheduler._schedule_prefills(budget, curr_loras)
    remaining_waiting = scheduler.waiting
566
567
568
569
570
571
572
    assert len(output.seq_groups) == 1
    assert output.seq_groups[0].seq_group.request_id == "1"
    assert len(remaining_waiting) == 1
    assert len(curr_loras) == 1
    assert budget.num_batched_tokens == 60


573
574
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_prefill_schedule_no_block_manager_capacity(use_v2_block_manager):
575
576
577
    """
    Test sequence cannot be scheduled due to block manager has no capacity.
    """
578
579
580
581
582
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_gpu_blocks=128,
                                     num_cpu_blocks=128)
583
584
    budget = create_token_budget()
    for i in range(3):
585
586
587
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
588
        scheduler.add_seq_group(seq_group)
589
590
    scheduler.block_manager.can_allocate = MagicMock()
    scheduler.block_manager.can_allocate.return_value = AllocStatus.LATER
591
592
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
593
594
595
596
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
597
    assert len(remaining_waiting) == 3
598
599
600
601

    scheduler = initialize_scheduler()
    budget = create_token_budget()
    for i in range(3):
602
603
604
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
605
        scheduler.add_seq_group(seq_group)
606
607
    scheduler.block_manager.can_allocate = MagicMock()
    scheduler.block_manager.can_allocate.return_value = AllocStatus.NEVER
608
609
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
610
611
612
613
614
615
616
    assert len(output.ignored_seq_groups) == 3
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
    assert len(remaining_waiting) == 0


617
618
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_decode_schedule_preempted(use_v2_block_manager: bool):
619
620
621
    """
    Test decodes cannot be scheduled and preempted.
    """
622
623
624
625
626
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
627
628
    curr_loras = None
    for i in range(3):
629
630
631
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
632
        scheduler._allocate_and_set_running(seq_group)
633
        append_new_token_seq_group(60, seq_group, 1)
634
        scheduler._add_seq_group_to_running(seq_group)
635
636
637
638
639
640
641
642
643
644
    scheduler.block_manager.can_append_slots = MagicMock()

    def cannot_append_second_group(seq_group, num_lookahead_slots):
        return seq_group.request_id != "1"

    scheduler.block_manager.can_append_slots.side_effect = (
        cannot_append_second_group)

    # 1 cannot be scheduled, and the lowest priority (request 2)
    # should be preempted. 1 will also be preempted.
645
    budget = create_token_budget()
646
647
    output = scheduler._schedule_running(budget, curr_loras)
    remainig_running = scheduler.running
648
    assert len(remainig_running) == 0
649
650
651
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
    assert output.decode_seq_groups[0].seq_group.request_id == "0"
652
653
654
    assert len(output.preempted) == 2
    # Verify budgets are updated.
    assert budget.num_batched_tokens == 1
655
656
    # NOTE: When enable_chunk is False, num_seqs budget is not updated.
    # assert budget.num_curr_seqs == 1
657
    # Both should be preempted, not swapped.
658
    assert output.blocks_to_swap_out == []
659
    # Nothing is copied.
660
    assert output.blocks_to_copy == []
661
662


663
664
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_decode_swap_beam_search(use_v2_block_manager: bool):
665
666
667
    """
    Test best_of > 1 swap out blocks
    """
668
669
670
671
672
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_gpu_blocks=64,
                                     num_cpu_blocks=64)
673
    curr_loras = None
674
    budget = create_token_budget()
675
    for i in range(3):
676
677
678
679
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           best_of=2,
                                           block_size=block_size)
680
        scheduler._allocate_and_set_running(seq_group)
681
        scheduler._add_seq_group_to_running(seq_group)
682
683
684
685
686
        append_new_token_seq_group(60, seq_group, 1)
        budget.add_num_seqs(seq_group.request_id,
                            seq_group.get_max_num_running_seqs())
        budget.add_num_batched_tokens(
            seq_group.request_id, seq_group.num_seqs(SequenceStatus.RUNNING))
687
688
689
690
691
692
693
694
695
696

    # The last request should be swapped out.
    scheduler.block_manager.can_append_slots = MagicMock()

    def cannot_append_second_group(seq_group, num_lookahead_slots):
        return seq_group.request_id != "2"

    scheduler.block_manager.can_append_slots.side_effect = (
        cannot_append_second_group)
    scheduler.block_manager.swap_out = MagicMock()
697
    expected_swap_mapping = [("5", "7")]
698
699
    scheduler.block_manager.swap_out.return_value = expected_swap_mapping

700
701
    output = scheduler._schedule_running(budget, curr_loras)
    remainig_running = scheduler.running
702
    assert len(remainig_running) == 0
703
704
705
706
    assert len(output.decode_seq_groups) == 2
    assert len(output.prefill_seq_groups) == 0
    assert output.decode_seq_groups[0].seq_group.request_id == "0"
    assert output.decode_seq_groups[1].seq_group.request_id == "1"
707
708
709
710
711
    assert len(output.preempted) == 0
    assert len(output.swapped_out) == 1
    # Budget should refledct preempted requests.
    assert budget.num_batched_tokens == 2
    # since there are 2 sequences, 2 should be subtracted.
712
    assert budget.num_curr_seqs == 4
713
714
715
    # Both should be preempted, not swapped.
    assert output.blocks_to_swap_out == expected_swap_mapping
    # Nothing is copied.
716
    assert output.blocks_to_copy == []
717
718


719
720
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_schedule_decode_blocks_to_copy_update(use_v2_block_manager: bool):
721
722
723
    """
    Verify blocks_to_copy is updated.
    """
724
725
726
727
728
729
730
731
732
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=4,
                                     num_cpu_blocks=16,
                                     num_gpu_blocks=16)
    _, seq_group = create_dummy_prompt("1",
                                       prompt_length=60,
                                       best_of=2,
                                       block_size=block_size)
733
    curr_loras = None
734
    scheduler._allocate_and_set_running(seq_group)
735
    append_new_token_seq_group(60, seq_group, 1)
736
    scheduler._add_seq_group_to_running(seq_group)
737
738
739

    # The last request should be swapped out.
    scheduler.block_manager.append_slots = MagicMock()
740
    scheduler.block_manager.append_slots.return_value = [(2, 3)]
741
742

    budget = create_token_budget()
743
744
    output = scheduler._schedule_running(budget, curr_loras)
    remaining_running = scheduler.running
745
    assert len(remaining_running) == 0
746
747
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
748
749
750
    assert len(output.preempted) == 0
    assert len(output.swapped_out) == 0
    # Nothing is preempted.
751
    assert output.blocks_to_swap_out == []
752
753
    # Since append_slot returns the source -> dist mapping, it should
    # applied.
754
    assert output.blocks_to_copy == [(2, 3)]
755
756


757
758
759
760
761
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_schedule_swapped_simple(use_v2_block_manager: bool):
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size)
762
    curr_loras = None
763
    blocks_to_swap_out: List[Tuple[int, int]] = []
764
765
766
767
    _, seq_group = create_dummy_prompt("1",
                                       prompt_length=4,
                                       best_of=2,
                                       block_size=block_size)
768
    scheduler._allocate_and_set_running(seq_group)
769
    append_new_token_seq_group(4, seq_group, 1)
770
    scheduler._swap_out(seq_group, blocks_to_swap_out)
771
    scheduler._add_seq_group_to_swapped(seq_group)
772
773

    budget = create_token_budget()
774
775
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
776
777
778
    assert len(remaining_swapped) == 0
    assert budget.num_batched_tokens == 1
    assert budget.num_curr_seqs == 2
779
780
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
781
    # swap in is the reverse of swap out
782
783
784
    blocks_to_swap_in_reverse = []
    for swapin, swapout in output.blocks_to_swap_in:
        blocks_to_swap_in_reverse.append((swapout, swapin))
785
786
787
    assert blocks_to_swap_out == blocks_to_swap_in_reverse


788
789
790
791
792
793
794
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_schedule_swapped_max_token_budget(use_v2_block_manager: bool):
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=32,
                                     num_gpu_blocks=32)
795
    curr_loras = None
796
    blocks_to_swap_out: List[Tuple[int, int]] = []
797
798
    for i in range(2):
        _, seq_group = create_dummy_prompt(str(i), prompt_length=60, best_of=2)
799
        scheduler._allocate_and_set_running(seq_group)
800
        append_new_token_seq_group(60, seq_group, 1)
801
        scheduler._swap_out(seq_group, blocks_to_swap_out)
802
        scheduler._add_seq_group_to_swapped(seq_group)
803
804

    budget = create_token_budget(token_budget=1)
805
806
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
807
808
809
    assert len(remaining_swapped) == 1
    assert budget.num_batched_tokens == 1
    assert budget.num_curr_seqs == 2
810
811
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
812
813

    # Verify num_batched_tokens are respected.
814
815
    budget = create_token_budget(token_budget=1)
    add_token_budget(budget, 1, 0)
816
817
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
818
819
820
    assert len(remaining_swapped) == 1
    assert budget.num_batched_tokens == 1
    assert budget.num_curr_seqs == 0
821
822
    assert len(output.decode_seq_groups) == 0
    assert len(output.prefill_seq_groups) == 0
823
824


825
826
827
828
829
830
831
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_schedule_swapped_max_seqs(use_v2_block_manager: bool):
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
832
    curr_loras = None
833
    blocks_to_swap_out: List[Tuple[int, int]] = []
834
    for i in range(4):
835
836
837
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=4)
838
        scheduler._allocate_and_set_running(seq_group)
839
        append_new_token_seq_group(60, seq_group, 1)
840
        scheduler._swap_out(seq_group, blocks_to_swap_out)
841
        scheduler._add_seq_group_to_swapped(seq_group)
842
843

    budget = create_token_budget(max_num_seqs=2)
844
845
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
846
847
    assert len(remaining_swapped) == 2
    assert budget.num_batched_tokens == 2
848
    assert budget.num_curr_seqs == 2
849
850
    assert len(output.decode_seq_groups) == 2
    assert len(output.prefill_seq_groups) == 0
851
852

    # Verify num_curr_seqs are respected.
853
854
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
855
856
    assert len(remaining_swapped) == 2
    assert budget.num_batched_tokens == 2
857
    assert budget.num_curr_seqs == 2
858
859
    assert len(output.decode_seq_groups) == 0
    assert len(output.prefill_seq_groups) == 0
860
861


862
863
864
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_schedule_swapped_max_loras(use_v2_block_manager: bool):
    block_size = 4
865
    lora_config = LoRAConfig(max_lora_rank=8, max_loras=1)
866
867
868
869
870
    scheduler = initialize_scheduler(lora_config=lora_config,
                                     use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=32,
                                     num_gpu_blocks=32)
871
872
    curr_loras: Set[int] = set()
    blocks_to_swap_out: List[Tuple[int, int]] = []
873
874
875
    for i in range(2):
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
876
                                           block_size=block_size,
877
878
879
                                           lora_request=LoRARequest(
                                               lora_name=str(i),
                                               lora_int_id=i + 1,
880
                                               lora_path="abc"))
881
        scheduler._allocate_and_set_running(seq_group)
882
        append_new_token_seq_group(60, seq_group, 1)
883
        scheduler._swap_out(seq_group, blocks_to_swap_out)
884
        scheduler._add_seq_group_to_swapped(seq_group)
885
886

    budget = create_token_budget()
887
888
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
889
890
891
    assert len(remaining_swapped) == 1
    assert budget.num_batched_tokens == 1
    assert budget.num_curr_seqs == 1
892
893
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
894
895
896
    assert len(curr_loras) == 1


897
898
899
900
901
902
903
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_schedule_swapped_cannot_swap_in(use_v2_block_manager: bool):
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=32,
                                     num_gpu_blocks=32)
904
    curr_loras = None
905
    blocks_to_swap_out: List[Tuple[int, int]] = []
906
907
908
909
910
    for i in range(2):
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           best_of=2,
                                           block_size=block_size)
911
        scheduler._allocate_and_set_running(seq_group)
912
        append_new_token_seq_group(60, seq_group, 1)
913
        scheduler._swap_out(seq_group, blocks_to_swap_out)
914
        scheduler._add_seq_group_to_swapped(seq_group)
915
916
917

    # The last request should be swapped out.
    scheduler.block_manager.can_swap_in = MagicMock()
918
    scheduler.block_manager.can_swap_in.return_value = AllocStatus.LATER
919
920
    # Since we cannot swap in, none of the requests are swapped in.
    budget = create_token_budget()
921
922
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
923
924
925
    assert len(remaining_swapped) == 2
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
926
    assert len(output.decode_seq_groups) == 0
927
928
929
    assert len(output.prefill_seq_groups) == 0


930
931
932
933
934
935
936
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_infeasible_swap(use_v2_block_manager: bool):
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=32,
                                     num_gpu_blocks=32)
937
    curr_loras = None
938
    blocks_to_swap_out: List[Tuple[int, int]] = []
939
940
941
942
943
    for i in range(2):
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           best_of=2,
                                           block_size=block_size)
944
945
946
        scheduler._allocate_and_set_running(seq_group)
        append_new_token_seq_group(60, seq_group, 1)
        scheduler._swap_out(seq_group, blocks_to_swap_out)
947
        scheduler._add_seq_group_to_swapped(seq_group)
948
949
950
951
952
953

    # The last request should be swapped out.
    scheduler.block_manager.can_swap_in = MagicMock()
    scheduler.block_manager.can_swap_in.return_value = AllocStatus.NEVER
    # Since we cannot swap in, none of the requests are swapped in.
    budget = create_token_budget()
954
955
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
956
957
958
959
960
    assert len(remaining_swapped) == 0
    assert len(output.infeasible_seq_groups) == 2
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
    assert len(output.decode_seq_groups) == 0
961
    assert len(output.prefill_seq_groups) == 0
962
963


964
965
966
967
968
969
970
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_schedule_swapped_blocks_to_copy(use_v2_block_manager: bool):
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=32,
                                     num_gpu_blocks=32)
971
    curr_loras = None
972
973
974
975
    _, seq_group = create_dummy_prompt("1",
                                       prompt_length=60,
                                       best_of=2,
                                       block_size=block_size)
976
    scheduler._allocate_and_set_running(seq_group)
977
    append_new_token_seq_group(60, seq_group, 1)
978
    blocks_to_swap_out: List[Tuple[int, int]] = []
979
    scheduler._swap_out(seq_group, blocks_to_swap_out)
980
    scheduler._add_seq_group_to_swapped(seq_group)
981
982
983

    # The last request should be swapped out.
    scheduler.block_manager.append_slots = MagicMock()
984
    scheduler.block_manager.append_slots.return_value = [(2, 3)]
985
986

    budget = create_token_budget()
987
988
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
989
    assert len(remaining_swapped) == 0
990
991
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
992
    assert output.blocks_to_copy == [(2, 3)]
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036


def test_scheduling_budget():
    TOKEN_BUDGET = 4
    MAX_SEQS = 4
    budget = SchedulingBudget(token_budget=TOKEN_BUDGET, max_num_seqs=MAX_SEQS)
    assert budget.can_schedule(num_new_tokens=1, num_new_seqs=1)
    assert budget.can_schedule(num_new_tokens=4, num_new_seqs=4)
    assert not budget.can_schedule(num_new_tokens=1, num_new_seqs=5)
    assert not budget.can_schedule(num_new_tokens=5, num_new_seqs=1)
    assert not budget.can_schedule(num_new_tokens=5, num_new_seqs=5)
    assert budget.remaining_token_budget() == TOKEN_BUDGET

    # Verify add/subtract num batched tokens.
    _, seq_group = create_dummy_prompt("1", 3)
    budget.add_num_batched_tokens(seq_group.request_id, 2)
    assert budget.remaining_token_budget() == 2
    assert budget.num_batched_tokens == 2
    assert budget.can_schedule(num_new_tokens=2, num_new_seqs=1)
    assert not budget.can_schedule(num_new_tokens=3, num_new_seqs=1)
    # Verify adding another seq group is no-op.
    budget.add_num_batched_tokens(seq_group.request_id, 2)
    assert budget.remaining_token_budget() == 2
    assert budget.num_batched_tokens == 2
    budget.subtract_num_batched_tokens(seq_group.request_id, 2)
    assert budget.remaining_token_budget() == 4
    assert budget.num_batched_tokens == 0
    budget.subtract_num_batched_tokens(seq_group.request_id, 2)
    assert budget.remaining_token_budget() == 4
    assert budget.num_batched_tokens == 0

    # Verify add/subtract max seqs.
    _, seq_group = create_dummy_prompt("1", 3)
    budget.add_num_seqs(seq_group.request_id, 2)
    assert budget.can_schedule(num_new_tokens=1, num_new_seqs=2)
    assert not budget.can_schedule(num_new_tokens=1, num_new_seqs=3)
    assert budget.num_curr_seqs == 2
    # Verify adding another seq group is no-op.
    budget.add_num_seqs(seq_group.request_id, 2)
    assert budget.num_curr_seqs == 2
    budget.subtract_num_seqs(seq_group.request_id, 2)
    assert budget.num_curr_seqs == 0
    budget.subtract_num_seqs(seq_group.request_id, 2)
    assert budget.num_curr_seqs == 0