test_scheduler.py 42.4 KB
Newer Older
1
import time
2
from collections import deque
3
from typing import List, Set, Tuple
4
from unittest.mock import MagicMock
5

6
7
import pytest
from torch import Use  # noqa
8

9
10
11
12
from vllm.config import CacheConfig, LoRAConfig, SchedulerConfig
from vllm.core.interfaces import AllocStatus
from vllm.core.scheduler import Scheduler, SchedulingBudget
from vllm.lora.request import LoRARequest
13
from vllm.sequence import SequenceGroup, SequenceStatus
14

15
16
17
from .utils import (append_new_token, append_new_token_seq_group,
                    create_dummy_prompt, get_sequence_groups,
                    schedule_and_update_computed_tokens)
18
19


20
21
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_scheduler_add_seq_group(use_v2_block_manager: bool):
22
    block_size = 4
23
24
    scheduler_config = SchedulerConfig(
        100, 64, 1, use_v2_block_manager=use_v2_block_manager)
25
    cache_config = CacheConfig(block_size, 1.0, 1, cache_dtype="auto")
26
27
28
29
30
31
32
    cache_config.num_cpu_blocks = 4
    cache_config.num_gpu_blocks = 4
    scheduler = Scheduler(scheduler_config, cache_config, None)

    # Add seq group to scheduler.
    num_seq_group = 4
    for i in range(num_seq_group):
33
34
35
        _, seq_group = create_dummy_prompt(str(i),
                                           block_size,
                                           block_size=block_size)
36
37
38
39
        scheduler.add_seq_group(seq_group)
        assert scheduler.get_num_unfinished_seq_groups() == i + 1


40
41
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_scheduler_abort_seq_group(use_v2_block_manager: bool):
42
    block_size = 4
43
44
    scheduler_config = SchedulerConfig(
        100, 64, 1, use_v2_block_manager=use_v2_block_manager)
45
46
47
48
49
50
51
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
    cache_config.num_cpu_blocks = 4
    cache_config.num_gpu_blocks = 4
    scheduler = Scheduler(scheduler_config, cache_config, None)

    # Add multiple seq groups to scheduler.
    num_seq_group = 4
52
    request_ids: Set[str] = set()
53
54
55
56
57
58
59
60
61
62
63
    for i in range(num_seq_group):
        _, seq_group = create_dummy_prompt(str(i), block_size)
        scheduler.add_seq_group(seq_group)
        request_ids.add(str(i))

    # Abort all added seq groups.
    assert scheduler.get_num_unfinished_seq_groups() == num_seq_group
    scheduler.abort_seq_group(request_ids)
    assert scheduler.get_num_unfinished_seq_groups() == 0


64
65
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_scheduler_schedule_simple(use_v2_block_manager: bool):
66
67
68
    block_size = 4
    num_seq_group = 4
    max_model_len = 16
69
70
71
72
73
    scheduler_config = SchedulerConfig(
        64,
        num_seq_group,
        max_model_len,
        use_v2_block_manager=use_v2_block_manager)
74
75
76
77
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
    cache_config.num_cpu_blocks = 8
    cache_config.num_gpu_blocks = 8
    scheduler = Scheduler(scheduler_config, cache_config, None)
78
    running: List[SequenceGroup] = []
79
80
81

    # Add seq groups to scheduler.
    for i in range(num_seq_group):
82
83
84
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=block_size,
                                           block_size=block_size)
85
86
87
88
        scheduler.add_seq_group(seq_group)
        running.append(seq_group)

    # Schedule seq groups prompts.
89
    num_tokens = block_size * num_seq_group
90
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
91
    assert set(get_sequence_groups(out)) == set(running)
92
    assert out.num_batched_tokens == num_tokens
93
94
95
    assert (not out.blocks_to_copy and not out.blocks_to_swap_in
            and not out.blocks_to_swap_out)
    assert len(seq_group_meta) == num_seq_group
96
    append_new_token(out, 1)
97
98

    # Schedule seq groups generation.
99
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
100
    assert set(get_sequence_groups(out)) == set(running)
101
102
103
104
    assert out.num_batched_tokens == num_seq_group
    assert (not out.blocks_to_copy and not out.blocks_to_swap_in
            and not out.blocks_to_swap_out)
    assert len(seq_group_meta) == num_seq_group
105
106
107
    append_new_token(out, 1)


108
109
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_scheduler_prefill_prioritized(use_v2_block_manager: bool):
110
111
112
113
    """Verify running batched tokens are not applied to prefill requests."""
    block_size = 4
    max_model_len = 30
    max_batched_num_tokens = 30
114
115
116
117
118
    scheduler_config = SchedulerConfig(
        max_batched_num_tokens,
        2,
        max_model_len,
        use_v2_block_manager=use_v2_block_manager)
119
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
120
121
    cache_config.num_cpu_blocks = 16
    cache_config.num_gpu_blocks = 16
122
123
124
    scheduler = Scheduler(scheduler_config, cache_config, None)

    # Add seq groups to scheduler.
125
    _, seq_group_a = create_dummy_prompt("1", 1, block_size=block_size)
126
127
128
129
130
131
132
    scheduler.add_seq_group(seq_group_a)

    # Schedule seq groups prompts.
    _, out = schedule_and_update_computed_tokens(scheduler)
    assert get_sequence_groups(out) == [seq_group_a]

    # Add a new prefill request B.
133
    _, seq_group_b = create_dummy_prompt("2", 30, block_size=block_size)
134
135
136
137
138
139
    scheduler.add_seq_group(seq_group_b)

    # Verify prefill requests are prioritized. Since max_batched_num_tokens
    # is 1, new prefill request has to be scheduled first.
    _, out = schedule_and_update_computed_tokens(scheduler)
    assert get_sequence_groups(out) == [seq_group_b]
140
141


142
143
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_scheduler_schedule_preempt_abort(use_v2_block_manager: bool):
144
145
    block_size = 4
    max_model_len = 16
146
147
    scheduler_config = SchedulerConfig(
        64, 2, max_model_len, use_v2_block_manager=use_v2_block_manager)
148
149
150
151
152
153
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
    cache_config.num_cpu_blocks = 2
    cache_config.num_gpu_blocks = 2
    scheduler = Scheduler(scheduler_config, cache_config, None)

    # Add seq groups to scheduler.
154
155
156
157
158
159
    seq_a, seq_group_a = create_dummy_prompt("1",
                                             block_size,
                                             block_size=block_size)
    seq_b, seq_group_b = create_dummy_prompt("2",
                                             block_size,
                                             block_size=block_size)
160
161
162
163
    scheduler.add_seq_group(seq_group_a)
    scheduler.add_seq_group(seq_group_b)

    # Schedule seq groups prompts.
164
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
165
    assert get_sequence_groups(out) == [seq_group_a, seq_group_b]
166
    assert out.num_batched_tokens == block_size * 2  # seq_a and seq_b
167
168
169
170
171
172
173
    assert (not out.blocks_to_copy and not out.blocks_to_swap_in
            and not out.blocks_to_swap_out)
    assert len(seq_group_meta) == 2
    assert scheduler.get_num_unfinished_seq_groups() == 2

    # Append "generated" tokens, allowing the sequence to mark prompt tokens as
    # processed.
174
    append_new_token(out, 1)
175
176

    # Schedule seq groups generation and preempt seq group b.
177
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
178
    assert get_sequence_groups(out) == [seq_group_a]
179
180
181
182
183
    assert out.num_batched_tokens == 1
    assert (not out.blocks_to_copy and not out.blocks_to_swap_in
            and not out.blocks_to_swap_out)
    assert len(seq_group_meta) == 1
    assert scheduler.get_num_unfinished_seq_groups() == 2
184
    assert out.preempted == 1
185
186
187

    # Abort seq group a. Re-schedule seq group b prompt with recomputation.
    scheduler.abort_seq_group("1")
188
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
189
    assert get_sequence_groups(out) == [seq_group_b]
190
    assert out.num_batched_tokens == 5  # 4 prompt + 1 generation.
191
192
193
194
195
196
    assert (not out.blocks_to_copy and not out.blocks_to_swap_in
            and not out.blocks_to_swap_out)
    assert len(seq_group_meta) == 1
    assert scheduler.get_num_unfinished_seq_groups() == 1


197
198
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_scheduler_max_seqs(use_v2_block_manager: bool):
199
200
201
202
    block_size = 4
    num_seq_group = 4
    max_seq_group = 2
    max_model_len = 16
203
204
205
206
207
    scheduler_config = SchedulerConfig(
        64,
        max_seq_group,
        max_model_len,
        use_v2_block_manager=use_v2_block_manager)
208
209
210
211
212
213
214
215
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
    cache_config.num_cpu_blocks = 8
    cache_config.num_gpu_blocks = 8
    scheduler = Scheduler(scheduler_config, cache_config, None)

    all_seq_groups: List[SequenceGroup] = []
    # Add seq groups to scheduler.
    for i in range(num_seq_group):
216
217
218
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=block_size,
                                           block_size=block_size)
219
220
221
222
223
224
        all_seq_groups.append(seq_group)

    # Append 1 seq group
    scheduler.add_seq_group(all_seq_groups[0])

    # Schedule seq groups prompts.
225
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
226
    assert set(get_sequence_groups(out)) == set([all_seq_groups[0]])
227
    append_new_token(out, 1)
228
229

    # Schedule seq groups generation.
230
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
231
    assert set(get_sequence_groups(out)) == set([all_seq_groups[0]])
232
    append_new_token(out, 1)
233
234
235
236
237
238
239
240

    # Append 2 more seq group
    scheduler.add_seq_group(all_seq_groups[1])
    scheduler.add_seq_group(all_seq_groups[2])

    # Schedule seq groups prompts.
    # Only 1 seq group should be scheduled since max_seq_group is 2
    # and one is prompting.
241
    _, out = schedule_and_update_computed_tokens(scheduler)
242
    assert set(get_sequence_groups(out)) == set([all_seq_groups[1]])
243
244


245
246
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_scheduler_delay_factor(use_v2_block_manager: bool):
247
    block_size = 4
248
249
250
251
252
253
    scheduler_config = SchedulerConfig(
        100,
        64,
        16,
        delay_factor=0.5,
        use_v2_block_manager=use_v2_block_manager)
254
255
256
257
258
259
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
    cache_config.num_cpu_blocks = 8
    cache_config.num_gpu_blocks = 8
    scheduler = Scheduler(scheduler_config, cache_config, None)

    # schedule first prompt
260
    seq_group_meta, seq_group = create_dummy_prompt("0",
261
262
                                                    prompt_length=block_size,
                                                    block_size=block_size)
263
    scheduler.add_seq_group(seq_group)
264
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
265
    assert out.num_prefill_groups > 0
266
    assert seq_group_meta[0].request_id == '0'
267
    append_new_token(out, 1)
268
269
270

    # wait for a second before scheduling next prompt
    time.sleep(1)
271
    seq_group_meta, seq_group = create_dummy_prompt("1",
272
273
                                                    prompt_length=block_size,
                                                    block_size=block_size)
274
275
276
    scheduler.add_seq_group(seq_group)

    # second prompt should *not* be scheduled
277
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
278
    assert out.num_prefill_groups == 0
279
    assert seq_group_meta[0].request_id == '0'
280
    append_new_token(out, 1)
281
282
283

    # wait for more than 0.5 second and try again
    time.sleep(0.6)
284
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
285
    assert out.num_prefill_groups > 0
286
    assert seq_group_meta[0].request_id == '1'
287
    append_new_token(out, 1)
288
289


290
291
292
293
294
295
296
297
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_swapped_out_prioritized(use_v2_block_manager: bool):
    block_size = 4
    scheduler = initialize_scheduler(max_num_seqs=6,
                                     block_size=block_size,
                                     use_v2_block_manager=use_v2_block_manager,
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
298
299
    # best_of=2 * 3 == 6 sequences.
    for i in range(3):
300
301
302
303
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           best_of=2,
                                           block_size=block_size)
304
        scheduler.add_seq_group(seq_group)
305
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
306
307
    # prefill scheduled now.
    assert len(out.scheduled_seq_groups) == 3
308
    append_new_token(out, 1)
309
310
311
312
313
314
315
316
317
318

    # The last request should be swapped out.
    scheduler.block_manager.can_append_slots = MagicMock()

    def cannot_append_second_group(seq_group, num_lookahead_slots):
        return seq_group.request_id != "2"

    scheduler.block_manager.can_append_slots.side_effect = (
        cannot_append_second_group)

319
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
320
321
    assert len(out.scheduled_seq_groups) == 2
    assert out.num_batched_tokens == 2
322
323
    assert out.blocks_to_swap_out != []
    assert out.blocks_to_swap_in == []
324
    append_new_token(out, 1)
325
326

    # Add 1 more task. Swap should be prioritized over prefill.
327
328
329
330
    _, seq_group = create_dummy_prompt(str(i),
                                       prompt_length=60,
                                       best_of=2,
                                       block_size=block_size)
331
    scheduler.add_seq_group(seq_group)
332
333
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
    append_new_token(out, 1)
334
335
336
    assert len(out.scheduled_seq_groups) == 3
    # 3 decodes. It is swapped in.
    assert out.num_batched_tokens == 3
337
338
    assert out.blocks_to_swap_in != []
    assert out.blocks_to_swap_out == []
339
340


341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
def initialize_scheduler(
    *,
    max_num_seqs=1000,
    max_token_budget=1000,
    max_model_len=1000,
    lora_config=None,
    use_v2_block_manager=False,
    block_size=4,
    num_cpu_blocks=8,
    num_gpu_blocks=8,
):
    block_size = block_size
    scheduler_config = SchedulerConfig(
        max_token_budget,
        max_num_seqs,
        max_model_len,
        use_v2_block_manager=use_v2_block_manager)
358
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
359
360
    cache_config.num_cpu_blocks = num_cpu_blocks
    cache_config.num_gpu_blocks = num_gpu_blocks
361
362
363
364
    scheduler = Scheduler(scheduler_config, cache_config, lora_config)
    return scheduler


365
def create_token_budget(token_budget: int = 10000,
366
367
368
369
370
371
372
                        max_num_seqs: int = 10000) -> SchedulingBudget:
    return SchedulingBudget(
        token_budget=token_budget,
        max_num_seqs=max_num_seqs,
    )


373
374
375
376
377
378
379
380
381
def add_token_budget(budget: SchedulingBudget,
                     num_batched_tokens: int = 0,
                     num_curr_seqs: int = 0):
    mock_seq_group = create_dummy_prompt('10', prompt_length=60)[1]
    budget.add_num_batched_tokens(mock_seq_group.request_id,
                                  num_batched_tokens)
    budget.add_num_seqs(mock_seq_group.request_id, num_curr_seqs)


382
383
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_prefill_schedule_max_prompt_len(use_v2_block_manager: bool):
384
385
386
    """
    Test prompt longer than max_prompt_len is aborted.
    """
387
388
389
390
391
392
393
    block_size = 4
    scheduler = initialize_scheduler(max_model_len=30,
                                     use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size)
    _, seq_group = create_dummy_prompt("0",
                                       prompt_length=60,
                                       block_size=block_size)
394
    scheduler.add_seq_group(seq_group)
395
    budget = create_token_budget()
396
397
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
398
399
400
401
402
403
404
    assert len(output.ignored_seq_groups) == 1
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
    assert len(remaining_waiting) == 0


405
406
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_prefill_schedule_token_budget(use_v2_block_manager: bool):
407
408
409
    """
    Test token budget respected.
    """
410
411
412
413
414
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
415
416
    budget = create_token_budget(token_budget=0)
    for i in range(2):
417
418
419
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
420
        scheduler.add_seq_group(seq_group)
421
422

    # 0 token budget == nothing is scheduled.
423
424
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
425
426
427
428
429
430
431
432
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
    assert len(remaining_waiting) == 2

    # 60 token budget == 1 request scheduled.
    budget = create_token_budget(token_budget=60)
433
434
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
435
436
437
438
439
440
441
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 1
    assert budget.num_batched_tokens == 60
    assert budget.num_curr_seqs == 1
    assert len(remaining_waiting) == 1

    # Test when current_batched_tokens respected.
442
443
444
445
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=16,
                                     num_gpu_blocks=16)
446
447
    budget = create_token_budget(token_budget=60)
    add_token_budget(budget, 30, 0)
448
449
450
    _, seq_group = create_dummy_prompt(str(i),
                                       prompt_length=60,
                                       block_size=block_size)
451
    # Cannot schedule a prompt that doesn't fit the budget.
452
453
454
    scheduler.add_seq_group(seq_group)
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
455
456
457
458
459
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 30
    assert budget.num_curr_seqs == 0
    assert len(remaining_waiting) == 1
460
461
    budget = create_token_budget(token_budget=90)
    add_token_budget(budget, 30, 0)
462
463
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
464
465
466
467
468
469
    assert len(output.seq_groups) == 1
    assert budget.num_batched_tokens == 90
    assert budget.num_curr_seqs == 1
    assert len(remaining_waiting) == 0


470
471
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_prefill_schedule_max_seqs(use_v2_block_manager: bool):
472
473
474
    """
    Test max seq respected.
    """
475
476
477
478
479
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
480
481
    budget = create_token_budget(max_num_seqs=2)
    for i in range(3):
482
483
484
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
485
486
487
        scheduler.add_seq_group(seq_group)
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
488
489
490
491
492
493
494
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 2
    assert budget.num_batched_tokens == 120
    assert budget.num_curr_seqs == 2
    assert len(remaining_waiting) == 1

    # Verify curr_num_seqs respected.
495
    scheduler.waiting = deque()
496
497
    budget = create_token_budget(max_num_seqs=2)
    add_token_budget(budget, 0, 2)
498
499
500
    _, seq_group = create_dummy_prompt(str(i),
                                       prompt_length=60,
                                       block_size=block_size)
501
502
503
    scheduler.add_seq_group(seq_group)
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
504
505
506
507
508
509
510
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 2
    assert len(remaining_waiting) == 1


511
512
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_prefill_schedule_max_lora(use_v2_block_manager: bool):
513
514
515
    """
    Test max lora is respected and prioritized.
    """
516
    block_size = 4
517
    lora_config = LoRAConfig(max_lora_rank=8, max_loras=1)
518
519
520
521
522
    scheduler = initialize_scheduler(lora_config=lora_config,
                                     use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
523
    budget = create_token_budget(token_budget=120)
524
    curr_loras: Set[int] = set()
525
526
527
    for i in range(2):
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
528
                                           block_size=block_size,
529
530
531
                                           lora_request=LoRARequest(
                                               lora_name=str(i),
                                               lora_int_id=i + 1,
532
                                               lora_path="abc"))
533
        scheduler.add_seq_group(seq_group)
534
535
536
537
538
539
    # Add two more requests to verify lora is prioritized.
    # 0: Lora, 1: Lora, 2: regular, 3: regular
    # In the first iteration, index 0, 2 is scheduled.
    # If a request is not scheduled because it hits max lora, it is
    # prioritized. Verify that.
    for i in range(2, 4):
540
541
542
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
543
        scheduler.add_seq_group(seq_group)
544
    # Schedule 2 requests (0 and 2)
545
546
    output = scheduler._schedule_prefills(budget, curr_loras)
    remaining_waiting = scheduler.waiting
547
548
549
550
551
552
553
554
555
556
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 2
    assert budget.num_batched_tokens == 120
    assert budget.num_curr_seqs == 2
    assert len(remaining_waiting) == 2
    assert len(curr_loras) == 1
    # The second lora request is scheduled next as FCFS policy.
    # Reset curr_loras so that it can be scheduled.
    curr_loras = set()
    budget = create_token_budget(token_budget=60)
557
558
    output = scheduler._schedule_prefills(budget, curr_loras)
    remaining_waiting = scheduler.waiting
559
560
561
562
563
564
565
    assert len(output.seq_groups) == 1
    assert output.seq_groups[0].seq_group.request_id == "1"
    assert len(remaining_waiting) == 1
    assert len(curr_loras) == 1
    assert budget.num_batched_tokens == 60


566
567
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_prefill_schedule_no_block_manager_capacity(use_v2_block_manager):
568
569
570
    """
    Test sequence cannot be scheduled due to block manager has no capacity.
    """
571
572
573
574
575
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_gpu_blocks=128,
                                     num_cpu_blocks=128)
576
577
    budget = create_token_budget()
    for i in range(3):
578
579
580
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
581
        scheduler.add_seq_group(seq_group)
582
583
    scheduler.block_manager.can_allocate = MagicMock()
    scheduler.block_manager.can_allocate.return_value = AllocStatus.LATER
584
585
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
586
587
588
589
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
590
    assert len(remaining_waiting) == 3
591
592
593
594

    scheduler = initialize_scheduler()
    budget = create_token_budget()
    for i in range(3):
595
596
597
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
598
        scheduler.add_seq_group(seq_group)
599
600
    scheduler.block_manager.can_allocate = MagicMock()
    scheduler.block_manager.can_allocate.return_value = AllocStatus.NEVER
601
602
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
603
604
605
606
607
608
609
    assert len(output.ignored_seq_groups) == 3
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
    assert len(remaining_waiting) == 0


610
611
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_decode_schedule_preempted(use_v2_block_manager: bool):
612
613
614
    """
    Test decodes cannot be scheduled and preempted.
    """
615
616
617
618
619
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
620
621
    curr_loras = None
    for i in range(3):
622
623
624
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
625
        scheduler._allocate_and_set_running(seq_group)
626
        append_new_token_seq_group(60, seq_group, 1)
627
        scheduler._add_seq_group_to_running(seq_group)
628
629
630
631
632
633
634
635
636
637
    scheduler.block_manager.can_append_slots = MagicMock()

    def cannot_append_second_group(seq_group, num_lookahead_slots):
        return seq_group.request_id != "1"

    scheduler.block_manager.can_append_slots.side_effect = (
        cannot_append_second_group)

    # 1 cannot be scheduled, and the lowest priority (request 2)
    # should be preempted. 1 will also be preempted.
638
    budget = create_token_budget()
639
640
    output = scheduler._schedule_running(budget, curr_loras)
    remainig_running = scheduler.running
641
    assert len(remainig_running) == 0
642
643
644
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
    assert output.decode_seq_groups[0].seq_group.request_id == "0"
645
646
647
    assert len(output.preempted) == 2
    # Verify budgets are updated.
    assert budget.num_batched_tokens == 1
648
649
    # NOTE: When enable_chunk is False, num_seqs budget is not updated.
    # assert budget.num_curr_seqs == 1
650
    # Both should be preempted, not swapped.
651
    assert output.blocks_to_swap_out == []
652
    # Nothing is copied.
653
    assert output.blocks_to_copy == []
654
655


656
657
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_decode_swap_beam_search(use_v2_block_manager: bool):
658
659
660
    """
    Test best_of > 1 swap out blocks
    """
661
662
663
664
665
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_gpu_blocks=64,
                                     num_cpu_blocks=64)
666
    curr_loras = None
667
    budget = create_token_budget()
668
    for i in range(3):
669
670
671
672
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           best_of=2,
                                           block_size=block_size)
673
        scheduler._allocate_and_set_running(seq_group)
674
        scheduler._add_seq_group_to_running(seq_group)
675
676
677
678
679
        append_new_token_seq_group(60, seq_group, 1)
        budget.add_num_seqs(seq_group.request_id,
                            seq_group.get_max_num_running_seqs())
        budget.add_num_batched_tokens(
            seq_group.request_id, seq_group.num_seqs(SequenceStatus.RUNNING))
680
681
682
683
684
685
686
687
688
689

    # The last request should be swapped out.
    scheduler.block_manager.can_append_slots = MagicMock()

    def cannot_append_second_group(seq_group, num_lookahead_slots):
        return seq_group.request_id != "2"

    scheduler.block_manager.can_append_slots.side_effect = (
        cannot_append_second_group)
    scheduler.block_manager.swap_out = MagicMock()
690
    expected_swap_mapping = [("5", "7")]
691
692
    scheduler.block_manager.swap_out.return_value = expected_swap_mapping

693
694
    output = scheduler._schedule_running(budget, curr_loras)
    remainig_running = scheduler.running
695
    assert len(remainig_running) == 0
696
697
698
699
    assert len(output.decode_seq_groups) == 2
    assert len(output.prefill_seq_groups) == 0
    assert output.decode_seq_groups[0].seq_group.request_id == "0"
    assert output.decode_seq_groups[1].seq_group.request_id == "1"
700
701
702
703
704
    assert len(output.preempted) == 0
    assert len(output.swapped_out) == 1
    # Budget should refledct preempted requests.
    assert budget.num_batched_tokens == 2
    # since there are 2 sequences, 2 should be subtracted.
705
    assert budget.num_curr_seqs == 4
706
707
708
    # Both should be preempted, not swapped.
    assert output.blocks_to_swap_out == expected_swap_mapping
    # Nothing is copied.
709
    assert output.blocks_to_copy == []
710
711


712
713
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_schedule_decode_blocks_to_copy_update(use_v2_block_manager: bool):
714
715
716
    """
    Verify blocks_to_copy is updated.
    """
717
718
719
720
721
722
723
724
725
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=4,
                                     num_cpu_blocks=16,
                                     num_gpu_blocks=16)
    _, seq_group = create_dummy_prompt("1",
                                       prompt_length=60,
                                       best_of=2,
                                       block_size=block_size)
726
    curr_loras = None
727
    scheduler._allocate_and_set_running(seq_group)
728
    append_new_token_seq_group(60, seq_group, 1)
729
    scheduler._add_seq_group_to_running(seq_group)
730
731
732

    # The last request should be swapped out.
    scheduler.block_manager.append_slots = MagicMock()
733
    scheduler.block_manager.append_slots.return_value = [(2, 3)]
734
735

    budget = create_token_budget()
736
737
    output = scheduler._schedule_running(budget, curr_loras)
    remaining_running = scheduler.running
738
    assert len(remaining_running) == 0
739
740
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
741
742
743
    assert len(output.preempted) == 0
    assert len(output.swapped_out) == 0
    # Nothing is preempted.
744
    assert output.blocks_to_swap_out == []
745
746
    # Since append_slot returns the source -> dist mapping, it should
    # applied.
747
    assert output.blocks_to_copy == [(2, 3)]
748
749
750
751
752


def test_schedule_swapped_simple():
    scheduler = initialize_scheduler()
    curr_loras = None
753
    blocks_to_swap_out: List[Tuple[int, int]] = []
754
    _, seq_group = create_dummy_prompt("1", prompt_length=60, best_of=2)
755
    scheduler._allocate_and_set_running(seq_group)
756
    append_new_token_seq_group(60, seq_group, 1)
757
    scheduler._swap_out(seq_group, blocks_to_swap_out)
758
    scheduler._add_seq_group_to_swapped(seq_group)
759
760

    budget = create_token_budget()
761
762
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
763
764
765
    assert len(remaining_swapped) == 0
    assert budget.num_batched_tokens == 1
    assert budget.num_curr_seqs == 2
766
767
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
768
    # swap in is the reverse of swap out
769
770
771
    blocks_to_swap_in_reverse = []
    for swapin, swapout in output.blocks_to_swap_in:
        blocks_to_swap_in_reverse.append((swapout, swapin))
772
773
774
    assert blocks_to_swap_out == blocks_to_swap_in_reverse


775
776
777
778
779
780
781
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_schedule_swapped_max_token_budget(use_v2_block_manager: bool):
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=32,
                                     num_gpu_blocks=32)
782
    curr_loras = None
783
    blocks_to_swap_out: List[Tuple[int, int]] = []
784
785
    for i in range(2):
        _, seq_group = create_dummy_prompt(str(i), prompt_length=60, best_of=2)
786
        scheduler._allocate_and_set_running(seq_group)
787
        append_new_token_seq_group(60, seq_group, 1)
788
        scheduler._swap_out(seq_group, blocks_to_swap_out)
789
        scheduler._add_seq_group_to_swapped(seq_group)
790
791

    budget = create_token_budget(token_budget=1)
792
793
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
794
795
796
    assert len(remaining_swapped) == 1
    assert budget.num_batched_tokens == 1
    assert budget.num_curr_seqs == 2
797
798
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
799
800

    # Verify num_batched_tokens are respected.
801
802
    budget = create_token_budget(token_budget=1)
    add_token_budget(budget, 1, 0)
803
804
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
805
806
807
    assert len(remaining_swapped) == 1
    assert budget.num_batched_tokens == 1
    assert budget.num_curr_seqs == 0
808
809
    assert len(output.decode_seq_groups) == 0
    assert len(output.prefill_seq_groups) == 0
810
811


812
813
814
815
816
817
818
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_schedule_swapped_max_seqs(use_v2_block_manager: bool):
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
819
    curr_loras = None
820
    blocks_to_swap_out: List[Tuple[int, int]] = []
821
    for i in range(4):
822
823
824
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=4)
825
        scheduler._allocate_and_set_running(seq_group)
826
        append_new_token_seq_group(60, seq_group, 1)
827
        scheduler._swap_out(seq_group, blocks_to_swap_out)
828
        scheduler._add_seq_group_to_swapped(seq_group)
829
830

    budget = create_token_budget(max_num_seqs=2)
831
832
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
833
834
    assert len(remaining_swapped) == 2
    assert budget.num_batched_tokens == 2
835
    assert budget.num_curr_seqs == 2
836
837
    assert len(output.decode_seq_groups) == 2
    assert len(output.prefill_seq_groups) == 0
838
839

    # Verify num_curr_seqs are respected.
840
841
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
842
843
    assert len(remaining_swapped) == 2
    assert budget.num_batched_tokens == 2
844
    assert budget.num_curr_seqs == 2
845
846
    assert len(output.decode_seq_groups) == 0
    assert len(output.prefill_seq_groups) == 0
847
848


849
850
851
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_schedule_swapped_max_loras(use_v2_block_manager: bool):
    block_size = 4
852
    lora_config = LoRAConfig(max_lora_rank=8, max_loras=1)
853
854
855
856
857
    scheduler = initialize_scheduler(lora_config=lora_config,
                                     use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=32,
                                     num_gpu_blocks=32)
858
859
    curr_loras: Set[int] = set()
    blocks_to_swap_out: List[Tuple[int, int]] = []
860
861
862
    for i in range(2):
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
863
                                           block_size=block_size,
864
865
866
                                           lora_request=LoRARequest(
                                               lora_name=str(i),
                                               lora_int_id=i + 1,
867
                                               lora_path="abc"))
868
        scheduler._allocate_and_set_running(seq_group)
869
        append_new_token_seq_group(60, seq_group, 1)
870
        scheduler._swap_out(seq_group, blocks_to_swap_out)
871
        scheduler._add_seq_group_to_swapped(seq_group)
872
873

    budget = create_token_budget()
874
875
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
876
877
878
    assert len(remaining_swapped) == 1
    assert budget.num_batched_tokens == 1
    assert budget.num_curr_seqs == 1
879
880
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
881
882
883
    assert len(curr_loras) == 1


884
885
886
887
888
889
890
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_schedule_swapped_cannot_swap_in(use_v2_block_manager: bool):
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=32,
                                     num_gpu_blocks=32)
891
    curr_loras = None
892
    blocks_to_swap_out: List[Tuple[int, int]] = []
893
894
895
896
897
    for i in range(2):
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           best_of=2,
                                           block_size=block_size)
898
        scheduler._allocate_and_set_running(seq_group)
899
        append_new_token_seq_group(60, seq_group, 1)
900
        scheduler._swap_out(seq_group, blocks_to_swap_out)
901
        scheduler._add_seq_group_to_swapped(seq_group)
902
903
904

    # The last request should be swapped out.
    scheduler.block_manager.can_swap_in = MagicMock()
905
    scheduler.block_manager.can_swap_in.return_value = AllocStatus.LATER
906
907
    # Since we cannot swap in, none of the requests are swapped in.
    budget = create_token_budget()
908
909
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
910
911
912
    assert len(remaining_swapped) == 2
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
913
    assert len(output.decode_seq_groups) == 0
914
915
916
    assert len(output.prefill_seq_groups) == 0


917
918
919
920
921
922
923
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_infeasible_swap(use_v2_block_manager: bool):
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=32,
                                     num_gpu_blocks=32)
924
    curr_loras = None
925
    blocks_to_swap_out: List[Tuple[int, int]] = []
926
927
928
929
930
    for i in range(2):
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           best_of=2,
                                           block_size=block_size)
931
932
933
        scheduler._allocate_and_set_running(seq_group)
        append_new_token_seq_group(60, seq_group, 1)
        scheduler._swap_out(seq_group, blocks_to_swap_out)
934
        scheduler._add_seq_group_to_swapped(seq_group)
935
936
937
938
939
940

    # The last request should be swapped out.
    scheduler.block_manager.can_swap_in = MagicMock()
    scheduler.block_manager.can_swap_in.return_value = AllocStatus.NEVER
    # Since we cannot swap in, none of the requests are swapped in.
    budget = create_token_budget()
941
942
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
943
944
945
946
947
    assert len(remaining_swapped) == 0
    assert len(output.infeasible_seq_groups) == 2
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
    assert len(output.decode_seq_groups) == 0
948
    assert len(output.prefill_seq_groups) == 0
949
950


951
952
953
954
955
956
957
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_schedule_swapped_blocks_to_copy(use_v2_block_manager: bool):
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=32,
                                     num_gpu_blocks=32)
958
    curr_loras = None
959
960
961
962
    _, seq_group = create_dummy_prompt("1",
                                       prompt_length=60,
                                       best_of=2,
                                       block_size=block_size)
963
    scheduler._allocate_and_set_running(seq_group)
964
    append_new_token_seq_group(60, seq_group, 1)
965
    blocks_to_swap_out: List[Tuple[int, int]] = []
966
    scheduler._swap_out(seq_group, blocks_to_swap_out)
967
    scheduler._add_seq_group_to_swapped(seq_group)
968
969
970

    # The last request should be swapped out.
    scheduler.block_manager.append_slots = MagicMock()
971
    scheduler.block_manager.append_slots.return_value = [(2, 3)]
972
973

    budget = create_token_budget()
974
975
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
976
    assert len(remaining_swapped) == 0
977
978
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
979
    assert output.blocks_to_copy == [(2, 3)]
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023


def test_scheduling_budget():
    TOKEN_BUDGET = 4
    MAX_SEQS = 4
    budget = SchedulingBudget(token_budget=TOKEN_BUDGET, max_num_seqs=MAX_SEQS)
    assert budget.can_schedule(num_new_tokens=1, num_new_seqs=1)
    assert budget.can_schedule(num_new_tokens=4, num_new_seqs=4)
    assert not budget.can_schedule(num_new_tokens=1, num_new_seqs=5)
    assert not budget.can_schedule(num_new_tokens=5, num_new_seqs=1)
    assert not budget.can_schedule(num_new_tokens=5, num_new_seqs=5)
    assert budget.remaining_token_budget() == TOKEN_BUDGET

    # Verify add/subtract num batched tokens.
    _, seq_group = create_dummy_prompt("1", 3)
    budget.add_num_batched_tokens(seq_group.request_id, 2)
    assert budget.remaining_token_budget() == 2
    assert budget.num_batched_tokens == 2
    assert budget.can_schedule(num_new_tokens=2, num_new_seqs=1)
    assert not budget.can_schedule(num_new_tokens=3, num_new_seqs=1)
    # Verify adding another seq group is no-op.
    budget.add_num_batched_tokens(seq_group.request_id, 2)
    assert budget.remaining_token_budget() == 2
    assert budget.num_batched_tokens == 2
    budget.subtract_num_batched_tokens(seq_group.request_id, 2)
    assert budget.remaining_token_budget() == 4
    assert budget.num_batched_tokens == 0
    budget.subtract_num_batched_tokens(seq_group.request_id, 2)
    assert budget.remaining_token_budget() == 4
    assert budget.num_batched_tokens == 0

    # Verify add/subtract max seqs.
    _, seq_group = create_dummy_prompt("1", 3)
    budget.add_num_seqs(seq_group.request_id, 2)
    assert budget.can_schedule(num_new_tokens=1, num_new_seqs=2)
    assert not budget.can_schedule(num_new_tokens=1, num_new_seqs=3)
    assert budget.num_curr_seqs == 2
    # Verify adding another seq group is no-op.
    budget.add_num_seqs(seq_group.request_id, 2)
    assert budget.num_curr_seqs == 2
    budget.subtract_num_seqs(seq_group.request_id, 2)
    assert budget.num_curr_seqs == 0
    budget.subtract_num_seqs(seq_group.request_id, 2)
    assert budget.num_curr_seqs == 0