test_scheduler.py 42.7 KB
Newer Older
1
import time
2
from collections import deque
3
from typing import List, Set, Tuple
4
from unittest.mock import MagicMock
5

6
7
import pytest
from torch import Use  # noqa
8

9
10
11
12
from vllm.config import CacheConfig, LoRAConfig, SchedulerConfig
from vllm.core.interfaces import AllocStatus
from vllm.core.scheduler import Scheduler, SchedulingBudget
from vllm.lora.request import LoRARequest
13
from vllm.sequence import SequenceGroup, SequenceStatus
14

15
16
17
from .utils import (append_new_token, append_new_token_seq_group,
                    create_dummy_prompt, get_sequence_groups,
                    schedule_and_update_computed_tokens)
18
19


20
21
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_scheduler_add_seq_group(use_v2_block_manager: bool):
22
    block_size = 4
23
24
    scheduler_config = SchedulerConfig(
        100, 64, 1, use_v2_block_manager=use_v2_block_manager)
25
    cache_config = CacheConfig(block_size, 1.0, 1, cache_dtype="auto")
26
27
28
29
30
31
32
    cache_config.num_cpu_blocks = 4
    cache_config.num_gpu_blocks = 4
    scheduler = Scheduler(scheduler_config, cache_config, None)

    # Add seq group to scheduler.
    num_seq_group = 4
    for i in range(num_seq_group):
33
34
35
        _, seq_group = create_dummy_prompt(str(i),
                                           block_size,
                                           block_size=block_size)
36
37
38
39
        scheduler.add_seq_group(seq_group)
        assert scheduler.get_num_unfinished_seq_groups() == i + 1


40
41
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_scheduler_abort_seq_group(use_v2_block_manager: bool):
42
    block_size = 4
43
44
    scheduler_config = SchedulerConfig(
        100, 64, 1, use_v2_block_manager=use_v2_block_manager)
45
46
47
48
49
50
51
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
    cache_config.num_cpu_blocks = 4
    cache_config.num_gpu_blocks = 4
    scheduler = Scheduler(scheduler_config, cache_config, None)

    # Add multiple seq groups to scheduler.
    num_seq_group = 4
52
    request_ids: Set[str] = set()
53
54
55
56
57
58
59
60
61
62
63
    for i in range(num_seq_group):
        _, seq_group = create_dummy_prompt(str(i), block_size)
        scheduler.add_seq_group(seq_group)
        request_ids.add(str(i))

    # Abort all added seq groups.
    assert scheduler.get_num_unfinished_seq_groups() == num_seq_group
    scheduler.abort_seq_group(request_ids)
    assert scheduler.get_num_unfinished_seq_groups() == 0


64
65
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_scheduler_schedule_simple(use_v2_block_manager: bool):
66
67
68
    block_size = 4
    num_seq_group = 4
    max_model_len = 16
69
70
71
72
73
    scheduler_config = SchedulerConfig(
        64,
        num_seq_group,
        max_model_len,
        use_v2_block_manager=use_v2_block_manager)
74
75
76
77
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
    cache_config.num_cpu_blocks = 8
    cache_config.num_gpu_blocks = 8
    scheduler = Scheduler(scheduler_config, cache_config, None)
78
    running: List[SequenceGroup] = []
79
80
81

    # Add seq groups to scheduler.
    for i in range(num_seq_group):
82
83
84
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=block_size,
                                           block_size=block_size)
85
86
87
88
        scheduler.add_seq_group(seq_group)
        running.append(seq_group)

    # Schedule seq groups prompts.
89
    num_tokens = block_size * num_seq_group
90
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
91
    assert set(get_sequence_groups(out)) == set(running)
92
    assert out.num_batched_tokens == num_tokens
93
94
95
    assert (not out.blocks_to_copy and not out.blocks_to_swap_in
            and not out.blocks_to_swap_out)
    assert len(seq_group_meta) == num_seq_group
96
    append_new_token(out, 1)
97
98

    # Schedule seq groups generation.
99
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
100
    assert set(get_sequence_groups(out)) == set(running)
101
102
103
104
    assert out.num_batched_tokens == num_seq_group
    assert (not out.blocks_to_copy and not out.blocks_to_swap_in
            and not out.blocks_to_swap_out)
    assert len(seq_group_meta) == num_seq_group
105
106
107
    append_new_token(out, 1)


108
109
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_scheduler_prefill_prioritized(use_v2_block_manager: bool):
110
111
112
113
    """Verify running batched tokens are not applied to prefill requests."""
    block_size = 4
    max_model_len = 30
    max_batched_num_tokens = 30
114
115
116
117
118
    scheduler_config = SchedulerConfig(
        max_batched_num_tokens,
        2,
        max_model_len,
        use_v2_block_manager=use_v2_block_manager)
119
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
120
121
    cache_config.num_cpu_blocks = 16
    cache_config.num_gpu_blocks = 16
122
123
124
    scheduler = Scheduler(scheduler_config, cache_config, None)

    # Add seq groups to scheduler.
125
    _, seq_group_a = create_dummy_prompt("1", 1, block_size=block_size)
126
127
128
129
130
131
132
    scheduler.add_seq_group(seq_group_a)

    # Schedule seq groups prompts.
    _, out = schedule_and_update_computed_tokens(scheduler)
    assert get_sequence_groups(out) == [seq_group_a]

    # Add a new prefill request B.
133
    _, seq_group_b = create_dummy_prompt("2", 30, block_size=block_size)
134
135
136
137
138
139
    scheduler.add_seq_group(seq_group_b)

    # Verify prefill requests are prioritized. Since max_batched_num_tokens
    # is 1, new prefill request has to be scheduled first.
    _, out = schedule_and_update_computed_tokens(scheduler)
    assert get_sequence_groups(out) == [seq_group_b]
140
141


142
143
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_scheduler_schedule_preempt_abort(use_v2_block_manager: bool):
144
145
    block_size = 4
    max_model_len = 16
146
147
    scheduler_config = SchedulerConfig(
        64, 2, max_model_len, use_v2_block_manager=use_v2_block_manager)
148
149
150
151
152
153
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
    cache_config.num_cpu_blocks = 2
    cache_config.num_gpu_blocks = 2
    scheduler = Scheduler(scheduler_config, cache_config, None)

    # Add seq groups to scheduler.
154
155
156
157
158
159
    seq_a, seq_group_a = create_dummy_prompt("1",
                                             block_size,
                                             block_size=block_size)
    seq_b, seq_group_b = create_dummy_prompt("2",
                                             block_size,
                                             block_size=block_size)
160
161
162
163
    scheduler.add_seq_group(seq_group_a)
    scheduler.add_seq_group(seq_group_b)

    # Schedule seq groups prompts.
164
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
165
    assert get_sequence_groups(out) == [seq_group_a, seq_group_b]
166
    assert out.num_batched_tokens == block_size * 2  # seq_a and seq_b
167
168
169
170
171
172
173
    assert (not out.blocks_to_copy and not out.blocks_to_swap_in
            and not out.blocks_to_swap_out)
    assert len(seq_group_meta) == 2
    assert scheduler.get_num_unfinished_seq_groups() == 2

    # Append "generated" tokens, allowing the sequence to mark prompt tokens as
    # processed.
174
    append_new_token(out, 1)
175
176

    # Schedule seq groups generation and preempt seq group b.
177
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
178
    assert get_sequence_groups(out) == [seq_group_a]
179
180
181
182
183
    assert out.num_batched_tokens == 1
    assert (not out.blocks_to_copy and not out.blocks_to_swap_in
            and not out.blocks_to_swap_out)
    assert len(seq_group_meta) == 1
    assert scheduler.get_num_unfinished_seq_groups() == 2
184
    assert out.preempted == 1
185
186
187

    # Abort seq group a. Re-schedule seq group b prompt with recomputation.
    scheduler.abort_seq_group("1")
188
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
189
    assert get_sequence_groups(out) == [seq_group_b]
190
    assert out.num_batched_tokens == 5  # 4 prompt + 1 generation.
191
192
193
194
195
196
    assert (not out.blocks_to_copy and not out.blocks_to_swap_in
            and not out.blocks_to_swap_out)
    assert len(seq_group_meta) == 1
    assert scheduler.get_num_unfinished_seq_groups() == 1


197
198
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_scheduler_max_seqs(use_v2_block_manager: bool):
199
200
201
202
    block_size = 4
    num_seq_group = 4
    max_seq_group = 2
    max_model_len = 16
203
204
205
206
207
    scheduler_config = SchedulerConfig(
        64,
        max_seq_group,
        max_model_len,
        use_v2_block_manager=use_v2_block_manager)
208
209
210
211
212
213
214
215
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
    cache_config.num_cpu_blocks = 8
    cache_config.num_gpu_blocks = 8
    scheduler = Scheduler(scheduler_config, cache_config, None)

    all_seq_groups: List[SequenceGroup] = []
    # Add seq groups to scheduler.
    for i in range(num_seq_group):
216
217
218
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=block_size,
                                           block_size=block_size)
219
220
221
222
223
224
        all_seq_groups.append(seq_group)

    # Append 1 seq group
    scheduler.add_seq_group(all_seq_groups[0])

    # Schedule seq groups prompts.
225
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
226
    assert set(get_sequence_groups(out)) == set([all_seq_groups[0]])
227
    append_new_token(out, 1)
228
229

    # Schedule seq groups generation.
230
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
231
    assert set(get_sequence_groups(out)) == set([all_seq_groups[0]])
232
    append_new_token(out, 1)
233
234
235
236
237
238
239
240

    # Append 2 more seq group
    scheduler.add_seq_group(all_seq_groups[1])
    scheduler.add_seq_group(all_seq_groups[2])

    # Schedule seq groups prompts.
    # Only 1 seq group should be scheduled since max_seq_group is 2
    # and one is prompting.
241
    _, out = schedule_and_update_computed_tokens(scheduler)
242
    assert set(get_sequence_groups(out)) == set([all_seq_groups[1]])
243
244


245
246
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_scheduler_delay_factor(use_v2_block_manager: bool):
247
    block_size = 4
248
249
250
251
252
253
    scheduler_config = SchedulerConfig(
        100,
        64,
        16,
        delay_factor=0.5,
        use_v2_block_manager=use_v2_block_manager)
254
255
256
257
258
259
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
    cache_config.num_cpu_blocks = 8
    cache_config.num_gpu_blocks = 8
    scheduler = Scheduler(scheduler_config, cache_config, None)

    # schedule first prompt
260
    seq_group_meta, seq_group = create_dummy_prompt("0",
261
262
                                                    prompt_length=block_size,
                                                    block_size=block_size)
263
    scheduler.add_seq_group(seq_group)
264
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
265
    assert out.num_prefill_groups > 0
266
    assert seq_group_meta[0].request_id == '0'
267
    append_new_token(out, 1)
268
269
270

    # wait for a second before scheduling next prompt
    time.sleep(1)
271
    seq_group_meta, seq_group = create_dummy_prompt("1",
272
273
                                                    prompt_length=block_size,
                                                    block_size=block_size)
274
275
276
    scheduler.add_seq_group(seq_group)

    # second prompt should *not* be scheduled
277
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
278
    assert out.num_prefill_groups == 0
279
    assert seq_group_meta[0].request_id == '0'
280
    append_new_token(out, 1)
281
282
283

    # wait for more than 0.5 second and try again
    time.sleep(0.6)
284
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
285
    assert out.num_prefill_groups > 0
286
    assert seq_group_meta[0].request_id == '1'
287
    append_new_token(out, 1)
288
289


290
291
292
293
294
295
296
297
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_swapped_out_prioritized(use_v2_block_manager: bool):
    block_size = 4
    scheduler = initialize_scheduler(max_num_seqs=6,
                                     block_size=block_size,
                                     use_v2_block_manager=use_v2_block_manager,
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
298
299
    # best_of=2 * 3 == 6 sequences.
    for i in range(3):
300
301
302
303
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           best_of=2,
                                           block_size=block_size)
304
        scheduler.add_seq_group(seq_group)
305
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
306
307
    # prefill scheduled now.
    assert len(out.scheduled_seq_groups) == 3
308
    append_new_token(out, 1)
309
310
311
312
313
314
315
316
317
318

    # The last request should be swapped out.
    scheduler.block_manager.can_append_slots = MagicMock()

    def cannot_append_second_group(seq_group, num_lookahead_slots):
        return seq_group.request_id != "2"

    scheduler.block_manager.can_append_slots.side_effect = (
        cannot_append_second_group)

319
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
320
321
    assert len(out.scheduled_seq_groups) == 2
    assert out.num_batched_tokens == 2
322
323
    assert out.blocks_to_swap_out != []
    assert out.blocks_to_swap_in == []
324
    append_new_token(out, 1)
325
326

    # Add 1 more task. Swap should be prioritized over prefill.
327
328
329
330
    _, seq_group = create_dummy_prompt(str(i),
                                       prompt_length=60,
                                       best_of=2,
                                       block_size=block_size)
331
    scheduler.add_seq_group(seq_group)
332
333
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
    append_new_token(out, 1)
334
335
336
    assert len(out.scheduled_seq_groups) == 3
    # 3 decodes. It is swapped in.
    assert out.num_batched_tokens == 3
337
338
    assert out.blocks_to_swap_in != []
    assert out.blocks_to_swap_out == []
339
340


341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
def initialize_scheduler(
    *,
    max_num_seqs=1000,
    max_token_budget=1000,
    max_model_len=1000,
    lora_config=None,
    use_v2_block_manager=False,
    block_size=4,
    num_cpu_blocks=8,
    num_gpu_blocks=8,
):
    block_size = block_size
    scheduler_config = SchedulerConfig(
        max_token_budget,
        max_num_seqs,
        max_model_len,
        use_v2_block_manager=use_v2_block_manager)
358
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
359
360
    cache_config.num_cpu_blocks = num_cpu_blocks
    cache_config.num_gpu_blocks = num_gpu_blocks
361
362
363
364
    scheduler = Scheduler(scheduler_config, cache_config, lora_config)
    return scheduler


365
def create_token_budget(token_budget: int = 10000,
366
367
368
369
370
371
372
                        max_num_seqs: int = 10000) -> SchedulingBudget:
    return SchedulingBudget(
        token_budget=token_budget,
        max_num_seqs=max_num_seqs,
    )


373
374
375
376
377
378
379
380
381
def add_token_budget(budget: SchedulingBudget,
                     num_batched_tokens: int = 0,
                     num_curr_seqs: int = 0):
    mock_seq_group = create_dummy_prompt('10', prompt_length=60)[1]
    budget.add_num_batched_tokens(mock_seq_group.request_id,
                                  num_batched_tokens)
    budget.add_num_seqs(mock_seq_group.request_id, num_curr_seqs)


382
383
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_prefill_schedule_max_prompt_len(use_v2_block_manager: bool):
384
385
386
    """
    Test prompt longer than max_prompt_len is aborted.
    """
387
388
389
390
391
392
393
    block_size = 4
    scheduler = initialize_scheduler(max_model_len=30,
                                     use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size)
    _, seq_group = create_dummy_prompt("0",
                                       prompt_length=60,
                                       block_size=block_size)
394
    scheduler.add_seq_group(seq_group)
395
    budget = create_token_budget()
396
397
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
398
399
400
401
402
403
404
    assert len(output.ignored_seq_groups) == 1
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
    assert len(remaining_waiting) == 0


405
406
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_prefill_schedule_token_budget(use_v2_block_manager: bool):
407
408
409
    """
    Test token budget respected.
    """
410
411
412
413
414
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
415
416
    budget = create_token_budget(token_budget=0)
    for i in range(2):
417
418
419
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
420
        scheduler.add_seq_group(seq_group)
421
422

    # 0 token budget == nothing is scheduled.
423
424
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
425
426
427
428
429
430
431
432
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
    assert len(remaining_waiting) == 2

    # 60 token budget == 1 request scheduled.
    budget = create_token_budget(token_budget=60)
433
434
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
435
436
437
438
439
440
441
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 1
    assert budget.num_batched_tokens == 60
    assert budget.num_curr_seqs == 1
    assert len(remaining_waiting) == 1

    # Test when current_batched_tokens respected.
442
443
444
445
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=16,
                                     num_gpu_blocks=16)
446
447
    budget = create_token_budget(token_budget=60)
    add_token_budget(budget, 30, 0)
448
449
450
    _, seq_group = create_dummy_prompt(str(i),
                                       prompt_length=60,
                                       block_size=block_size)
451
    # Cannot schedule a prompt that doesn't fit the budget.
452
453
454
    scheduler.add_seq_group(seq_group)
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
455
456
457
458
459
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 30
    assert budget.num_curr_seqs == 0
    assert len(remaining_waiting) == 1
460
461
    budget = create_token_budget(token_budget=90)
    add_token_budget(budget, 30, 0)
462
463
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
464
465
466
467
468
469
    assert len(output.seq_groups) == 1
    assert budget.num_batched_tokens == 90
    assert budget.num_curr_seqs == 1
    assert len(remaining_waiting) == 0


470
471
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_prefill_schedule_max_seqs(use_v2_block_manager: bool):
472
473
474
    """
    Test max seq respected.
    """
475
476
477
478
479
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
480
481
    budget = create_token_budget(max_num_seqs=2)
    for i in range(3):
482
483
484
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
485
486
487
        scheduler.add_seq_group(seq_group)
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
488
489
490
491
492
493
494
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 2
    assert budget.num_batched_tokens == 120
    assert budget.num_curr_seqs == 2
    assert len(remaining_waiting) == 1

    # Verify curr_num_seqs respected.
495
    scheduler.waiting = deque()
496
497
    budget = create_token_budget(max_num_seqs=2)
    add_token_budget(budget, 0, 2)
498
499
500
    _, seq_group = create_dummy_prompt(str(i),
                                       prompt_length=60,
                                       block_size=block_size)
501
502
503
    scheduler.add_seq_group(seq_group)
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
504
505
506
507
508
509
510
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 2
    assert len(remaining_waiting) == 1


511
512
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_prefill_schedule_max_lora(use_v2_block_manager: bool):
513
514
515
    """
    Test max lora is respected and prioritized.
    """
516
    block_size = 4
517
    lora_config = LoRAConfig(max_lora_rank=8, max_loras=1)
518
519
520
521
522
    scheduler = initialize_scheduler(lora_config=lora_config,
                                     use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
523
    budget = create_token_budget(token_budget=120)
524
    curr_loras: Set[int] = set()
525
526
527
    for i in range(2):
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
528
                                           block_size=block_size,
529
530
531
                                           lora_request=LoRARequest(
                                               lora_name=str(i),
                                               lora_int_id=i + 1,
532
                                               lora_path="abc"))
533
        scheduler.add_seq_group(seq_group)
534
535
536
537
538
539
    # Add two more requests to verify lora is prioritized.
    # 0: Lora, 1: Lora, 2: regular, 3: regular
    # In the first iteration, index 0, 2 is scheduled.
    # If a request is not scheduled because it hits max lora, it is
    # prioritized. Verify that.
    for i in range(2, 4):
540
541
542
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
543
        scheduler.add_seq_group(seq_group)
544
    # Schedule 2 requests (0 and 2)
545
546
    output = scheduler._schedule_prefills(budget, curr_loras)
    remaining_waiting = scheduler.waiting
547
548
549
550
551
552
553
554
555
556
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 2
    assert budget.num_batched_tokens == 120
    assert budget.num_curr_seqs == 2
    assert len(remaining_waiting) == 2
    assert len(curr_loras) == 1
    # The second lora request is scheduled next as FCFS policy.
    # Reset curr_loras so that it can be scheduled.
    curr_loras = set()
    budget = create_token_budget(token_budget=60)
557
558
    output = scheduler._schedule_prefills(budget, curr_loras)
    remaining_waiting = scheduler.waiting
559
560
561
562
563
564
565
    assert len(output.seq_groups) == 1
    assert output.seq_groups[0].seq_group.request_id == "1"
    assert len(remaining_waiting) == 1
    assert len(curr_loras) == 1
    assert budget.num_batched_tokens == 60


566
567
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_prefill_schedule_no_block_manager_capacity(use_v2_block_manager):
568
569
570
    """
    Test sequence cannot be scheduled due to block manager has no capacity.
    """
571
572
573
574
575
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_gpu_blocks=128,
                                     num_cpu_blocks=128)
576
577
    budget = create_token_budget()
    for i in range(3):
578
579
580
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
581
        scheduler.add_seq_group(seq_group)
582
583
    scheduler.block_manager.can_allocate = MagicMock()
    scheduler.block_manager.can_allocate.return_value = AllocStatus.LATER
584
585
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
586
587
588
589
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
590
    assert len(remaining_waiting) == 3
591
592
593
594

    scheduler = initialize_scheduler()
    budget = create_token_budget()
    for i in range(3):
595
596
597
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
598
        scheduler.add_seq_group(seq_group)
599
600
    scheduler.block_manager.can_allocate = MagicMock()
    scheduler.block_manager.can_allocate.return_value = AllocStatus.NEVER
601
602
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
603
604
605
606
607
608
609
    assert len(output.ignored_seq_groups) == 3
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
    assert len(remaining_waiting) == 0


610
611
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_decode_schedule_preempted(use_v2_block_manager: bool):
612
613
614
    """
    Test decodes cannot be scheduled and preempted.
    """
615
616
617
618
619
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
620
621
    curr_loras = None
    for i in range(3):
622
623
624
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
625
        scheduler._allocate_and_set_running(seq_group)
626
        append_new_token_seq_group(60, seq_group, 1)
627
        scheduler._add_seq_group_to_running(seq_group)
628
629
630
631
632
633
634
635
636
637
    scheduler.block_manager.can_append_slots = MagicMock()

    def cannot_append_second_group(seq_group, num_lookahead_slots):
        return seq_group.request_id != "1"

    scheduler.block_manager.can_append_slots.side_effect = (
        cannot_append_second_group)

    # 1 cannot be scheduled, and the lowest priority (request 2)
    # should be preempted. 1 will also be preempted.
638
    budget = create_token_budget()
639
640
    output = scheduler._schedule_running(budget, curr_loras)
    remainig_running = scheduler.running
641
    assert len(remainig_running) == 0
642
643
644
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
    assert output.decode_seq_groups[0].seq_group.request_id == "0"
645
646
647
    assert len(output.preempted) == 2
    # Verify budgets are updated.
    assert budget.num_batched_tokens == 1
648
649
    # NOTE: When enable_chunk is False, num_seqs budget is not updated.
    # assert budget.num_curr_seqs == 1
650
    # Both should be preempted, not swapped.
651
    assert output.blocks_to_swap_out == []
652
    # Nothing is copied.
653
    assert output.blocks_to_copy == []
654
655


656
657
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_decode_swap_beam_search(use_v2_block_manager: bool):
658
659
660
    """
    Test best_of > 1 swap out blocks
    """
661
662
663
664
665
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_gpu_blocks=64,
                                     num_cpu_blocks=64)
666
    curr_loras = None
667
    budget = create_token_budget()
668
    for i in range(3):
669
670
671
672
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           best_of=2,
                                           block_size=block_size)
673
        scheduler._allocate_and_set_running(seq_group)
674
        scheduler._add_seq_group_to_running(seq_group)
675
676
677
678
679
        append_new_token_seq_group(60, seq_group, 1)
        budget.add_num_seqs(seq_group.request_id,
                            seq_group.get_max_num_running_seqs())
        budget.add_num_batched_tokens(
            seq_group.request_id, seq_group.num_seqs(SequenceStatus.RUNNING))
680
681
682
683
684
685
686
687
688
689

    # The last request should be swapped out.
    scheduler.block_manager.can_append_slots = MagicMock()

    def cannot_append_second_group(seq_group, num_lookahead_slots):
        return seq_group.request_id != "2"

    scheduler.block_manager.can_append_slots.side_effect = (
        cannot_append_second_group)
    scheduler.block_manager.swap_out = MagicMock()
690
    expected_swap_mapping = [("5", "7")]
691
692
    scheduler.block_manager.swap_out.return_value = expected_swap_mapping

693
694
    output = scheduler._schedule_running(budget, curr_loras)
    remainig_running = scheduler.running
695
    assert len(remainig_running) == 0
696
697
698
699
    assert len(output.decode_seq_groups) == 2
    assert len(output.prefill_seq_groups) == 0
    assert output.decode_seq_groups[0].seq_group.request_id == "0"
    assert output.decode_seq_groups[1].seq_group.request_id == "1"
700
701
702
703
704
    assert len(output.preempted) == 0
    assert len(output.swapped_out) == 1
    # Budget should refledct preempted requests.
    assert budget.num_batched_tokens == 2
    # since there are 2 sequences, 2 should be subtracted.
705
    assert budget.num_curr_seqs == 4
706
707
708
    # Both should be preempted, not swapped.
    assert output.blocks_to_swap_out == expected_swap_mapping
    # Nothing is copied.
709
    assert output.blocks_to_copy == []
710
711


712
713
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_schedule_decode_blocks_to_copy_update(use_v2_block_manager: bool):
714
715
716
    """
    Verify blocks_to_copy is updated.
    """
717
718
719
720
721
722
723
724
725
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=4,
                                     num_cpu_blocks=16,
                                     num_gpu_blocks=16)
    _, seq_group = create_dummy_prompt("1",
                                       prompt_length=60,
                                       best_of=2,
                                       block_size=block_size)
726
    curr_loras = None
727
    scheduler._allocate_and_set_running(seq_group)
728
    append_new_token_seq_group(60, seq_group, 1)
729
    scheduler._add_seq_group_to_running(seq_group)
730
731
732

    # The last request should be swapped out.
    scheduler.block_manager.append_slots = MagicMock()
733
    scheduler.block_manager.append_slots.return_value = [(2, 3)]
734
735

    budget = create_token_budget()
736
737
    output = scheduler._schedule_running(budget, curr_loras)
    remaining_running = scheduler.running
738
    assert len(remaining_running) == 0
739
740
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
741
742
743
    assert len(output.preempted) == 0
    assert len(output.swapped_out) == 0
    # Nothing is preempted.
744
    assert output.blocks_to_swap_out == []
745
746
    # Since append_slot returns the source -> dist mapping, it should
    # applied.
747
    assert output.blocks_to_copy == [(2, 3)]
748
749


750
751
752
753
754
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_schedule_swapped_simple(use_v2_block_manager: bool):
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size)
755
    curr_loras = None
756
    blocks_to_swap_out: List[Tuple[int, int]] = []
757
758
759
760
    _, seq_group = create_dummy_prompt("1",
                                       prompt_length=4,
                                       best_of=2,
                                       block_size=block_size)
761
    scheduler._allocate_and_set_running(seq_group)
762
    append_new_token_seq_group(4, seq_group, 1)
763
    scheduler._swap_out(seq_group, blocks_to_swap_out)
764
    scheduler._add_seq_group_to_swapped(seq_group)
765
766

    budget = create_token_budget()
767
768
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
769
770
771
    assert len(remaining_swapped) == 0
    assert budget.num_batched_tokens == 1
    assert budget.num_curr_seqs == 2
772
773
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
774
    # swap in is the reverse of swap out
775
776
777
    blocks_to_swap_in_reverse = []
    for swapin, swapout in output.blocks_to_swap_in:
        blocks_to_swap_in_reverse.append((swapout, swapin))
778
779
780
    assert blocks_to_swap_out == blocks_to_swap_in_reverse


781
782
783
784
785
786
787
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_schedule_swapped_max_token_budget(use_v2_block_manager: bool):
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=32,
                                     num_gpu_blocks=32)
788
    curr_loras = None
789
    blocks_to_swap_out: List[Tuple[int, int]] = []
790
791
    for i in range(2):
        _, seq_group = create_dummy_prompt(str(i), prompt_length=60, best_of=2)
792
        scheduler._allocate_and_set_running(seq_group)
793
        append_new_token_seq_group(60, seq_group, 1)
794
        scheduler._swap_out(seq_group, blocks_to_swap_out)
795
        scheduler._add_seq_group_to_swapped(seq_group)
796
797

    budget = create_token_budget(token_budget=1)
798
799
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
800
801
802
    assert len(remaining_swapped) == 1
    assert budget.num_batched_tokens == 1
    assert budget.num_curr_seqs == 2
803
804
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
805
806

    # Verify num_batched_tokens are respected.
807
808
    budget = create_token_budget(token_budget=1)
    add_token_budget(budget, 1, 0)
809
810
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
811
812
813
    assert len(remaining_swapped) == 1
    assert budget.num_batched_tokens == 1
    assert budget.num_curr_seqs == 0
814
815
    assert len(output.decode_seq_groups) == 0
    assert len(output.prefill_seq_groups) == 0
816
817


818
819
820
821
822
823
824
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_schedule_swapped_max_seqs(use_v2_block_manager: bool):
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
825
    curr_loras = None
826
    blocks_to_swap_out: List[Tuple[int, int]] = []
827
    for i in range(4):
828
829
830
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=4)
831
        scheduler._allocate_and_set_running(seq_group)
832
        append_new_token_seq_group(60, seq_group, 1)
833
        scheduler._swap_out(seq_group, blocks_to_swap_out)
834
        scheduler._add_seq_group_to_swapped(seq_group)
835
836

    budget = create_token_budget(max_num_seqs=2)
837
838
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
839
840
    assert len(remaining_swapped) == 2
    assert budget.num_batched_tokens == 2
841
    assert budget.num_curr_seqs == 2
842
843
    assert len(output.decode_seq_groups) == 2
    assert len(output.prefill_seq_groups) == 0
844
845

    # Verify num_curr_seqs are respected.
846
847
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
848
849
    assert len(remaining_swapped) == 2
    assert budget.num_batched_tokens == 2
850
    assert budget.num_curr_seqs == 2
851
852
    assert len(output.decode_seq_groups) == 0
    assert len(output.prefill_seq_groups) == 0
853
854


855
856
857
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_schedule_swapped_max_loras(use_v2_block_manager: bool):
    block_size = 4
858
    lora_config = LoRAConfig(max_lora_rank=8, max_loras=1)
859
860
861
862
863
    scheduler = initialize_scheduler(lora_config=lora_config,
                                     use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=32,
                                     num_gpu_blocks=32)
864
865
    curr_loras: Set[int] = set()
    blocks_to_swap_out: List[Tuple[int, int]] = []
866
867
868
    for i in range(2):
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
869
                                           block_size=block_size,
870
871
872
                                           lora_request=LoRARequest(
                                               lora_name=str(i),
                                               lora_int_id=i + 1,
873
                                               lora_path="abc"))
874
        scheduler._allocate_and_set_running(seq_group)
875
        append_new_token_seq_group(60, seq_group, 1)
876
        scheduler._swap_out(seq_group, blocks_to_swap_out)
877
        scheduler._add_seq_group_to_swapped(seq_group)
878
879

    budget = create_token_budget()
880
881
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
882
883
884
    assert len(remaining_swapped) == 1
    assert budget.num_batched_tokens == 1
    assert budget.num_curr_seqs == 1
885
886
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
887
888
889
    assert len(curr_loras) == 1


890
891
892
893
894
895
896
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_schedule_swapped_cannot_swap_in(use_v2_block_manager: bool):
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=32,
                                     num_gpu_blocks=32)
897
    curr_loras = None
898
    blocks_to_swap_out: List[Tuple[int, int]] = []
899
900
901
902
903
    for i in range(2):
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           best_of=2,
                                           block_size=block_size)
904
        scheduler._allocate_and_set_running(seq_group)
905
        append_new_token_seq_group(60, seq_group, 1)
906
        scheduler._swap_out(seq_group, blocks_to_swap_out)
907
        scheduler._add_seq_group_to_swapped(seq_group)
908
909
910

    # The last request should be swapped out.
    scheduler.block_manager.can_swap_in = MagicMock()
911
    scheduler.block_manager.can_swap_in.return_value = AllocStatus.LATER
912
913
    # Since we cannot swap in, none of the requests are swapped in.
    budget = create_token_budget()
914
915
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
916
917
918
    assert len(remaining_swapped) == 2
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
919
    assert len(output.decode_seq_groups) == 0
920
921
922
    assert len(output.prefill_seq_groups) == 0


923
924
925
926
927
928
929
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_infeasible_swap(use_v2_block_manager: bool):
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=32,
                                     num_gpu_blocks=32)
930
    curr_loras = None
931
    blocks_to_swap_out: List[Tuple[int, int]] = []
932
933
934
935
936
    for i in range(2):
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           best_of=2,
                                           block_size=block_size)
937
938
939
        scheduler._allocate_and_set_running(seq_group)
        append_new_token_seq_group(60, seq_group, 1)
        scheduler._swap_out(seq_group, blocks_to_swap_out)
940
        scheduler._add_seq_group_to_swapped(seq_group)
941
942
943
944
945
946

    # The last request should be swapped out.
    scheduler.block_manager.can_swap_in = MagicMock()
    scheduler.block_manager.can_swap_in.return_value = AllocStatus.NEVER
    # Since we cannot swap in, none of the requests are swapped in.
    budget = create_token_budget()
947
948
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
949
950
951
952
953
    assert len(remaining_swapped) == 0
    assert len(output.infeasible_seq_groups) == 2
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
    assert len(output.decode_seq_groups) == 0
954
    assert len(output.prefill_seq_groups) == 0
955
956


957
958
959
960
961
962
963
@pytest.mark.parametrize('use_v2_block_manager', [True, False])
def test_schedule_swapped_blocks_to_copy(use_v2_block_manager: bool):
    block_size = 4
    scheduler = initialize_scheduler(use_v2_block_manager=use_v2_block_manager,
                                     block_size=block_size,
                                     num_cpu_blocks=32,
                                     num_gpu_blocks=32)
964
    curr_loras = None
965
966
967
968
    _, seq_group = create_dummy_prompt("1",
                                       prompt_length=60,
                                       best_of=2,
                                       block_size=block_size)
969
    scheduler._allocate_and_set_running(seq_group)
970
    append_new_token_seq_group(60, seq_group, 1)
971
    blocks_to_swap_out: List[Tuple[int, int]] = []
972
    scheduler._swap_out(seq_group, blocks_to_swap_out)
973
    scheduler._add_seq_group_to_swapped(seq_group)
974
975
976

    # The last request should be swapped out.
    scheduler.block_manager.append_slots = MagicMock()
977
    scheduler.block_manager.append_slots.return_value = [(2, 3)]
978
979

    budget = create_token_budget()
980
981
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
982
    assert len(remaining_swapped) == 0
983
984
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
985
    assert output.blocks_to_copy == [(2, 3)]
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029


def test_scheduling_budget():
    TOKEN_BUDGET = 4
    MAX_SEQS = 4
    budget = SchedulingBudget(token_budget=TOKEN_BUDGET, max_num_seqs=MAX_SEQS)
    assert budget.can_schedule(num_new_tokens=1, num_new_seqs=1)
    assert budget.can_schedule(num_new_tokens=4, num_new_seqs=4)
    assert not budget.can_schedule(num_new_tokens=1, num_new_seqs=5)
    assert not budget.can_schedule(num_new_tokens=5, num_new_seqs=1)
    assert not budget.can_schedule(num_new_tokens=5, num_new_seqs=5)
    assert budget.remaining_token_budget() == TOKEN_BUDGET

    # Verify add/subtract num batched tokens.
    _, seq_group = create_dummy_prompt("1", 3)
    budget.add_num_batched_tokens(seq_group.request_id, 2)
    assert budget.remaining_token_budget() == 2
    assert budget.num_batched_tokens == 2
    assert budget.can_schedule(num_new_tokens=2, num_new_seqs=1)
    assert not budget.can_schedule(num_new_tokens=3, num_new_seqs=1)
    # Verify adding another seq group is no-op.
    budget.add_num_batched_tokens(seq_group.request_id, 2)
    assert budget.remaining_token_budget() == 2
    assert budget.num_batched_tokens == 2
    budget.subtract_num_batched_tokens(seq_group.request_id, 2)
    assert budget.remaining_token_budget() == 4
    assert budget.num_batched_tokens == 0
    budget.subtract_num_batched_tokens(seq_group.request_id, 2)
    assert budget.remaining_token_budget() == 4
    assert budget.num_batched_tokens == 0

    # Verify add/subtract max seqs.
    _, seq_group = create_dummy_prompt("1", 3)
    budget.add_num_seqs(seq_group.request_id, 2)
    assert budget.can_schedule(num_new_tokens=1, num_new_seqs=2)
    assert not budget.can_schedule(num_new_tokens=1, num_new_seqs=3)
    assert budget.num_curr_seqs == 2
    # Verify adding another seq group is no-op.
    budget.add_num_seqs(seq_group.request_id, 2)
    assert budget.num_curr_seqs == 2
    budget.subtract_num_seqs(seq_group.request_id, 2)
    assert budget.num_curr_seqs == 0
    budget.subtract_num_seqs(seq_group.request_id, 2)
    assert budget.num_curr_seqs == 0