"vscode:/vscode.git/clone" did not exist on "f7967577f5563b45b1ad5b6e0fae5b639af17e28"
test_scheduler.py 39.1 KB
Newer Older
1
import time
2
from collections import deque
3
from typing import List, Set, Tuple
4
from unittest.mock import MagicMock
5

6
import pytest  # noqa
7
from torch import Use  # noqa
8

9
10
11
12
from vllm.config import CacheConfig, LoRAConfig, SchedulerConfig
from vllm.core.interfaces import AllocStatus
from vllm.core.scheduler import Scheduler, SchedulingBudget
from vllm.lora.request import LoRARequest
13
from vllm.sequence import SequenceGroup, SequenceStatus
14

15
16
17
from .utils import (append_new_token, append_new_token_seq_group,
                    create_dummy_prompt, get_sequence_groups,
                    schedule_and_update_computed_tokens)
18
19


20
def test_scheduler_add_seq_group():
21
    block_size = 4
22
    scheduler_config = SchedulerConfig(
23
24
25
26
        100,
        64,
        1,
    )
27
    cache_config = CacheConfig(block_size, 1.0, 1, cache_dtype="auto")
28
29
30
31
32
33
34
    cache_config.num_cpu_blocks = 4
    cache_config.num_gpu_blocks = 4
    scheduler = Scheduler(scheduler_config, cache_config, None)

    # Add seq group to scheduler.
    num_seq_group = 4
    for i in range(num_seq_group):
35
36
37
        _, seq_group = create_dummy_prompt(str(i),
                                           block_size,
                                           block_size=block_size)
38
39
40
41
        scheduler.add_seq_group(seq_group)
        assert scheduler.get_num_unfinished_seq_groups() == i + 1


42
def test_scheduler_abort_seq_group():
43
    block_size = 4
44
    scheduler_config = SchedulerConfig(
45
46
47
48
        100,
        64,
        1,
    )
49
50
51
52
53
54
55
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
    cache_config.num_cpu_blocks = 4
    cache_config.num_gpu_blocks = 4
    scheduler = Scheduler(scheduler_config, cache_config, None)

    # Add multiple seq groups to scheduler.
    num_seq_group = 4
56
    request_ids: Set[str] = set()
57
58
59
60
61
62
63
64
65
66
67
    for i in range(num_seq_group):
        _, seq_group = create_dummy_prompt(str(i), block_size)
        scheduler.add_seq_group(seq_group)
        request_ids.add(str(i))

    # Abort all added seq groups.
    assert scheduler.get_num_unfinished_seq_groups() == num_seq_group
    scheduler.abort_seq_group(request_ids)
    assert scheduler.get_num_unfinished_seq_groups() == 0


68
def test_scheduler_schedule_simple():
69
70
71
    block_size = 4
    num_seq_group = 4
    max_model_len = 16
72
73
74
75
    scheduler_config = SchedulerConfig(
        64,
        num_seq_group,
        max_model_len,
76
    )
77
78
79
80
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
    cache_config.num_cpu_blocks = 8
    cache_config.num_gpu_blocks = 8
    scheduler = Scheduler(scheduler_config, cache_config, None)
81
    running: List[SequenceGroup] = []
82
83
84

    # Add seq groups to scheduler.
    for i in range(num_seq_group):
85
86
87
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=block_size,
                                           block_size=block_size)
88
89
90
91
        scheduler.add_seq_group(seq_group)
        running.append(seq_group)

    # Schedule seq groups prompts.
92
    num_tokens = block_size * num_seq_group
93
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
94
    assert set(get_sequence_groups(out)) == set(running)
95
    assert out.num_batched_tokens == num_tokens
96
97
98
    assert (not out.blocks_to_copy and not out.blocks_to_swap_in
            and not out.blocks_to_swap_out)
    assert len(seq_group_meta) == num_seq_group
99
    append_new_token(out, 1)
100
101

    # Schedule seq groups generation.
102
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
103
    assert set(get_sequence_groups(out)) == set(running)
104
105
106
107
    assert out.num_batched_tokens == num_seq_group
    assert (not out.blocks_to_copy and not out.blocks_to_swap_in
            and not out.blocks_to_swap_out)
    assert len(seq_group_meta) == num_seq_group
108
109
110
    append_new_token(out, 1)


111
def test_scheduler_prefill_prioritized():
112
113
114
115
    """Verify running batched tokens are not applied to prefill requests."""
    block_size = 4
    max_model_len = 30
    max_batched_num_tokens = 30
116
117
118
119
    scheduler_config = SchedulerConfig(
        max_batched_num_tokens,
        2,
        max_model_len,
120
    )
121
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
122
123
    cache_config.num_cpu_blocks = 16
    cache_config.num_gpu_blocks = 16
124
125
126
    scheduler = Scheduler(scheduler_config, cache_config, None)

    # Add seq groups to scheduler.
127
    _, seq_group_a = create_dummy_prompt("1", 1, block_size=block_size)
128
129
130
131
132
133
134
    scheduler.add_seq_group(seq_group_a)

    # Schedule seq groups prompts.
    _, out = schedule_and_update_computed_tokens(scheduler)
    assert get_sequence_groups(out) == [seq_group_a]

    # Add a new prefill request B.
135
    _, seq_group_b = create_dummy_prompt("2", 30, block_size=block_size)
136
137
138
139
140
141
    scheduler.add_seq_group(seq_group_b)

    # Verify prefill requests are prioritized. Since max_batched_num_tokens
    # is 1, new prefill request has to be scheduled first.
    _, out = schedule_and_update_computed_tokens(scheduler)
    assert get_sequence_groups(out) == [seq_group_b]
142
143


144
def test_scheduler_schedule_preempt_abort():
145
146
    block_size = 4
    max_model_len = 16
147
    scheduler_config = SchedulerConfig(
148
149
150
151
        64,
        2,
        max_model_len,
    )
152
153
154
155
156
157
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
    cache_config.num_cpu_blocks = 2
    cache_config.num_gpu_blocks = 2
    scheduler = Scheduler(scheduler_config, cache_config, None)

    # Add seq groups to scheduler.
158
159
160
161
162
163
    seq_a, seq_group_a = create_dummy_prompt("1",
                                             block_size,
                                             block_size=block_size)
    seq_b, seq_group_b = create_dummy_prompt("2",
                                             block_size,
                                             block_size=block_size)
164
165
166
167
    scheduler.add_seq_group(seq_group_a)
    scheduler.add_seq_group(seq_group_b)

    # Schedule seq groups prompts.
168
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
169
    assert get_sequence_groups(out) == [seq_group_a, seq_group_b]
170
    assert out.num_batched_tokens == block_size * 2  # seq_a and seq_b
171
172
173
174
175
176
177
    assert (not out.blocks_to_copy and not out.blocks_to_swap_in
            and not out.blocks_to_swap_out)
    assert len(seq_group_meta) == 2
    assert scheduler.get_num_unfinished_seq_groups() == 2

    # Append "generated" tokens, allowing the sequence to mark prompt tokens as
    # processed.
178
    append_new_token(out, 1)
179
180

    # Schedule seq groups generation and preempt seq group b.
181
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
182
    assert get_sequence_groups(out) == [seq_group_a]
183
184
185
186
187
    assert out.num_batched_tokens == 1
    assert (not out.blocks_to_copy and not out.blocks_to_swap_in
            and not out.blocks_to_swap_out)
    assert len(seq_group_meta) == 1
    assert scheduler.get_num_unfinished_seq_groups() == 2
188
    assert out.preempted == 1
189
190
191

    # Abort seq group a. Re-schedule seq group b prompt with recomputation.
    scheduler.abort_seq_group("1")
192
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
193
    assert get_sequence_groups(out) == [seq_group_b]
194
    assert out.num_batched_tokens == 5  # 4 prompt + 1 generation.
195
196
197
198
199
200
    assert (not out.blocks_to_copy and not out.blocks_to_swap_in
            and not out.blocks_to_swap_out)
    assert len(seq_group_meta) == 1
    assert scheduler.get_num_unfinished_seq_groups() == 1


201
def test_scheduler_max_seqs():
202
203
204
205
    block_size = 4
    num_seq_group = 4
    max_seq_group = 2
    max_model_len = 16
206
207
208
209
    scheduler_config = SchedulerConfig(
        64,
        max_seq_group,
        max_model_len,
210
    )
211
212
213
214
215
216
217
218
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
    cache_config.num_cpu_blocks = 8
    cache_config.num_gpu_blocks = 8
    scheduler = Scheduler(scheduler_config, cache_config, None)

    all_seq_groups: List[SequenceGroup] = []
    # Add seq groups to scheduler.
    for i in range(num_seq_group):
219
220
221
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=block_size,
                                           block_size=block_size)
222
223
224
225
226
227
        all_seq_groups.append(seq_group)

    # Append 1 seq group
    scheduler.add_seq_group(all_seq_groups[0])

    # Schedule seq groups prompts.
228
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
229
    assert set(get_sequence_groups(out)) == set([all_seq_groups[0]])
230
    append_new_token(out, 1)
231
232

    # Schedule seq groups generation.
233
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
234
    assert set(get_sequence_groups(out)) == set([all_seq_groups[0]])
235
    append_new_token(out, 1)
236
237
238
239
240
241
242
243

    # Append 2 more seq group
    scheduler.add_seq_group(all_seq_groups[1])
    scheduler.add_seq_group(all_seq_groups[2])

    # Schedule seq groups prompts.
    # Only 1 seq group should be scheduled since max_seq_group is 2
    # and one is prompting.
244
    _, out = schedule_and_update_computed_tokens(scheduler)
245
    assert set(get_sequence_groups(out)) == set([all_seq_groups[1]])
246
247


248
def test_scheduler_delay_factor():
249
    block_size = 4
250
251
252
253
254
    scheduler_config = SchedulerConfig(
        100,
        64,
        16,
        delay_factor=0.5,
255
    )
256
257
258
259
260
261
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
    cache_config.num_cpu_blocks = 8
    cache_config.num_gpu_blocks = 8
    scheduler = Scheduler(scheduler_config, cache_config, None)

    # schedule first prompt
262
    seq_group_meta, seq_group = create_dummy_prompt("0",
263
264
                                                    prompt_length=block_size,
                                                    block_size=block_size)
265
    scheduler.add_seq_group(seq_group)
266
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
267
    assert out.num_prefill_groups > 0
268
    assert seq_group_meta[0].request_id == '0'
269
    append_new_token(out, 1)
270
271
272

    # wait for a second before scheduling next prompt
    time.sleep(1)
273
    seq_group_meta, seq_group = create_dummy_prompt("1",
274
275
                                                    prompt_length=block_size,
                                                    block_size=block_size)
276
277
278
    scheduler.add_seq_group(seq_group)

    # second prompt should *not* be scheduled
279
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
280
    assert out.num_prefill_groups == 0
281
    assert seq_group_meta[0].request_id == '0'
282
    append_new_token(out, 1)
283
284
285

    # wait for more than 0.5 second and try again
    time.sleep(0.6)
286
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
287
    assert out.num_prefill_groups > 0
288
    assert seq_group_meta[0].request_id == '1'
289
    append_new_token(out, 1)
290
291


292
def test_swapped_out_prioritized():
293
294
295
296
297
    block_size = 4
    scheduler = initialize_scheduler(max_num_seqs=6,
                                     block_size=block_size,
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
298
299
    # best_of=2 * 3 == 6 sequences.
    for i in range(3):
300
301
302
303
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           best_of=2,
                                           block_size=block_size)
304
        scheduler.add_seq_group(seq_group)
305
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
306
307
    # prefill scheduled now.
    assert len(out.scheduled_seq_groups) == 3
308
    append_new_token(out, 1)
309
310
311
312
313
314
315
316
317
318

    # The last request should be swapped out.
    scheduler.block_manager.can_append_slots = MagicMock()

    def cannot_append_second_group(seq_group, num_lookahead_slots):
        return seq_group.request_id != "2"

    scheduler.block_manager.can_append_slots.side_effect = (
        cannot_append_second_group)

319
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
320
321
    assert len(out.scheduled_seq_groups) == 2
    assert out.num_batched_tokens == 2
322
323
    assert out.blocks_to_swap_out != []
    assert out.blocks_to_swap_in == []
324
    append_new_token(out, 1)
325
326

    # Add 1 more task. Swap should be prioritized over prefill.
327
328
329
330
    _, seq_group = create_dummy_prompt(str(i),
                                       prompt_length=60,
                                       best_of=2,
                                       block_size=block_size)
331
    scheduler.add_seq_group(seq_group)
332
333
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
    append_new_token(out, 1)
334
335
336
    assert len(out.scheduled_seq_groups) == 3
    # 3 decodes. It is swapped in.
    assert out.num_batched_tokens == 3
337
338
    assert out.blocks_to_swap_in != []
    assert out.blocks_to_swap_out == []
339
340


341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
def initialize_scheduler(
    *,
    max_num_seqs=1000,
    max_token_budget=1000,
    max_model_len=1000,
    lora_config=None,
    block_size=4,
    num_cpu_blocks=8,
    num_gpu_blocks=8,
):
    block_size = block_size
    scheduler_config = SchedulerConfig(
        max_token_budget,
        max_num_seqs,
        max_model_len,
356
    )
357
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
358
359
    cache_config.num_cpu_blocks = num_cpu_blocks
    cache_config.num_gpu_blocks = num_gpu_blocks
360
361
362
363
    scheduler = Scheduler(scheduler_config, cache_config, lora_config)
    return scheduler


364
def create_token_budget(token_budget: int = 10000,
365
366
367
368
369
370
371
                        max_num_seqs: int = 10000) -> SchedulingBudget:
    return SchedulingBudget(
        token_budget=token_budget,
        max_num_seqs=max_num_seqs,
    )


372
373
374
375
376
377
378
379
380
def add_token_budget(budget: SchedulingBudget,
                     num_batched_tokens: int = 0,
                     num_curr_seqs: int = 0):
    mock_seq_group = create_dummy_prompt('10', prompt_length=60)[1]
    budget.add_num_batched_tokens(mock_seq_group.request_id,
                                  num_batched_tokens)
    budget.add_num_seqs(mock_seq_group.request_id, num_curr_seqs)


381
def test_prefill_schedule_max_prompt_len():
382
383
384
    """
    Test prompt longer than max_prompt_len is aborted.
    """
385
    block_size = 4
386
    scheduler = initialize_scheduler(max_model_len=30, block_size=block_size)
387
388
389
    _, seq_group = create_dummy_prompt("0",
                                       prompt_length=60,
                                       block_size=block_size)
390
    scheduler.add_seq_group(seq_group)
391
    budget = create_token_budget()
392
393
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
394
395
396
397
398
399
400
    assert len(output.ignored_seq_groups) == 1
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
    assert len(remaining_waiting) == 0


401
def test_prefill_schedule_token_budget():
402
403
404
    """
    Test token budget respected.
    """
405
    block_size = 4
406
    scheduler = initialize_scheduler(block_size=block_size,
407
408
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
409
410
    budget = create_token_budget(token_budget=0)
    for i in range(2):
411
412
413
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
414
        scheduler.add_seq_group(seq_group)
415
416

    # 0 token budget == nothing is scheduled.
417
418
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
419
420
421
422
423
424
425
426
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
    assert len(remaining_waiting) == 2

    # 60 token budget == 1 request scheduled.
    budget = create_token_budget(token_budget=60)
427
428
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
429
430
431
432
433
434
435
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 1
    assert budget.num_batched_tokens == 60
    assert budget.num_curr_seqs == 1
    assert len(remaining_waiting) == 1

    # Test when current_batched_tokens respected.
436
    scheduler = initialize_scheduler(block_size=block_size,
437
438
                                     num_cpu_blocks=16,
                                     num_gpu_blocks=16)
439
440
    budget = create_token_budget(token_budget=60)
    add_token_budget(budget, 30, 0)
441
442
443
    _, seq_group = create_dummy_prompt(str(i),
                                       prompt_length=60,
                                       block_size=block_size)
444
    # Cannot schedule a prompt that doesn't fit the budget.
445
446
447
    scheduler.add_seq_group(seq_group)
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
448
449
450
451
452
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 30
    assert budget.num_curr_seqs == 0
    assert len(remaining_waiting) == 1
453
454
    budget = create_token_budget(token_budget=90)
    add_token_budget(budget, 30, 0)
455
456
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
457
458
459
460
461
462
    assert len(output.seq_groups) == 1
    assert budget.num_batched_tokens == 90
    assert budget.num_curr_seqs == 1
    assert len(remaining_waiting) == 0


463
def test_prefill_schedule_max_seqs():
464
465
466
    """
    Test max seq respected.
    """
467
    block_size = 4
468
    scheduler = initialize_scheduler(block_size=block_size,
469
470
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
471
472
    budget = create_token_budget(max_num_seqs=2)
    for i in range(3):
473
474
475
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
476
477
478
        scheduler.add_seq_group(seq_group)
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
479
480
481
482
483
484
485
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 2
    assert budget.num_batched_tokens == 120
    assert budget.num_curr_seqs == 2
    assert len(remaining_waiting) == 1

    # Verify curr_num_seqs respected.
486
    scheduler.waiting = deque()
487
488
    budget = create_token_budget(max_num_seqs=2)
    add_token_budget(budget, 0, 2)
489
490
491
    _, seq_group = create_dummy_prompt(str(i),
                                       prompt_length=60,
                                       block_size=block_size)
492
493
494
    scheduler.add_seq_group(seq_group)
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
495
496
497
498
499
500
501
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 2
    assert len(remaining_waiting) == 1


502
def test_prefill_schedule_max_lora():
503
504
505
    """
    Test max lora is respected and prioritized.
    """
506
    block_size = 4
507
    lora_config = LoRAConfig(max_lora_rank=8, max_loras=1)
508
509
510
511
    scheduler = initialize_scheduler(lora_config=lora_config,
                                     block_size=block_size,
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
512
    budget = create_token_budget(token_budget=120)
513
    curr_loras: Set[int] = set()
514
515
516
    for i in range(2):
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
517
                                           block_size=block_size,
518
519
520
                                           lora_request=LoRARequest(
                                               lora_name=str(i),
                                               lora_int_id=i + 1,
521
                                               lora_path="abc"))
522
        scheduler.add_seq_group(seq_group)
523
524
525
526
527
528
    # Add two more requests to verify lora is prioritized.
    # 0: Lora, 1: Lora, 2: regular, 3: regular
    # In the first iteration, index 0, 2 is scheduled.
    # If a request is not scheduled because it hits max lora, it is
    # prioritized. Verify that.
    for i in range(2, 4):
529
530
531
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
532
        scheduler.add_seq_group(seq_group)
533
    # Schedule 2 requests (0 and 2)
534
535
    output = scheduler._schedule_prefills(budget, curr_loras)
    remaining_waiting = scheduler.waiting
536
537
538
539
540
541
542
543
544
545
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 2
    assert budget.num_batched_tokens == 120
    assert budget.num_curr_seqs == 2
    assert len(remaining_waiting) == 2
    assert len(curr_loras) == 1
    # The second lora request is scheduled next as FCFS policy.
    # Reset curr_loras so that it can be scheduled.
    curr_loras = set()
    budget = create_token_budget(token_budget=60)
546
547
    output = scheduler._schedule_prefills(budget, curr_loras)
    remaining_waiting = scheduler.waiting
548
549
550
551
552
553
554
    assert len(output.seq_groups) == 1
    assert output.seq_groups[0].seq_group.request_id == "1"
    assert len(remaining_waiting) == 1
    assert len(curr_loras) == 1
    assert budget.num_batched_tokens == 60


555
def test_prefill_schedule_no_block_manager_capacity():
556
557
558
    """
    Test sequence cannot be scheduled due to block manager has no capacity.
    """
559
    block_size = 4
560
    scheduler = initialize_scheduler(block_size=block_size,
561
562
                                     num_gpu_blocks=128,
                                     num_cpu_blocks=128)
563
564
    budget = create_token_budget()
    for i in range(3):
565
566
567
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
568
        scheduler.add_seq_group(seq_group)
569
570
    scheduler.block_manager.can_allocate = MagicMock()
    scheduler.block_manager.can_allocate.return_value = AllocStatus.LATER
571
572
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
573
574
575
576
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
577
    assert len(remaining_waiting) == 3
578
579
580
581

    scheduler = initialize_scheduler()
    budget = create_token_budget()
    for i in range(3):
582
583
584
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
585
        scheduler.add_seq_group(seq_group)
586
587
    scheduler.block_manager.can_allocate = MagicMock()
    scheduler.block_manager.can_allocate.return_value = AllocStatus.NEVER
588
589
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
590
591
592
593
594
595
596
    assert len(output.ignored_seq_groups) == 3
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
    assert len(remaining_waiting) == 0


597
def test_decode_schedule_preempted():
598
599
600
    """
    Test decodes cannot be scheduled and preempted.
    """
601
    block_size = 4
602
    scheduler = initialize_scheduler(block_size=block_size,
603
604
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
605
606
    curr_loras = None
    for i in range(3):
607
608
609
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
610
        scheduler._allocate_and_set_running(seq_group)
611
        append_new_token_seq_group(60, seq_group, 1)
612
        scheduler._add_seq_group_to_running(seq_group)
613
614
615
616
617
618
619
620
621
622
    scheduler.block_manager.can_append_slots = MagicMock()

    def cannot_append_second_group(seq_group, num_lookahead_slots):
        return seq_group.request_id != "1"

    scheduler.block_manager.can_append_slots.side_effect = (
        cannot_append_second_group)

    # 1 cannot be scheduled, and the lowest priority (request 2)
    # should be preempted. 1 will also be preempted.
623
    budget = create_token_budget()
624
625
    output = scheduler._schedule_running(budget, curr_loras)
    remainig_running = scheduler.running
626
    assert len(remainig_running) == 0
627
628
629
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
    assert output.decode_seq_groups[0].seq_group.request_id == "0"
630
631
632
    assert len(output.preempted) == 2
    # Verify budgets are updated.
    assert budget.num_batched_tokens == 1
633
634
    # NOTE: When enable_chunk is False, num_seqs budget is not updated.
    # assert budget.num_curr_seqs == 1
635
    # Both should be preempted, not swapped.
636
    assert output.blocks_to_swap_out == []
637
    # Nothing is copied.
638
    assert output.blocks_to_copy == []
639
640


641
def test_decode_swap_beam_search():
642
643
644
    """
    Test best_of > 1 swap out blocks
    """
645
    block_size = 4
646
    scheduler = initialize_scheduler(block_size=block_size,
647
648
                                     num_gpu_blocks=64,
                                     num_cpu_blocks=64)
649
    curr_loras = None
650
    budget = create_token_budget()
651
    for i in range(3):
652
653
654
655
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           best_of=2,
                                           block_size=block_size)
656
        scheduler._allocate_and_set_running(seq_group)
657
        scheduler._add_seq_group_to_running(seq_group)
658
659
660
661
662
        append_new_token_seq_group(60, seq_group, 1)
        budget.add_num_seqs(seq_group.request_id,
                            seq_group.get_max_num_running_seqs())
        budget.add_num_batched_tokens(
            seq_group.request_id, seq_group.num_seqs(SequenceStatus.RUNNING))
663
664
665
666
667
668
669
670
671
672

    # The last request should be swapped out.
    scheduler.block_manager.can_append_slots = MagicMock()

    def cannot_append_second_group(seq_group, num_lookahead_slots):
        return seq_group.request_id != "2"

    scheduler.block_manager.can_append_slots.side_effect = (
        cannot_append_second_group)
    scheduler.block_manager.swap_out = MagicMock()
673
    expected_swap_mapping = [("5", "7")]
674
675
    scheduler.block_manager.swap_out.return_value = expected_swap_mapping

676
677
    output = scheduler._schedule_running(budget, curr_loras)
    remainig_running = scheduler.running
678
    assert len(remainig_running) == 0
679
680
681
682
    assert len(output.decode_seq_groups) == 2
    assert len(output.prefill_seq_groups) == 0
    assert output.decode_seq_groups[0].seq_group.request_id == "0"
    assert output.decode_seq_groups[1].seq_group.request_id == "1"
683
684
685
686
687
    assert len(output.preempted) == 0
    assert len(output.swapped_out) == 1
    # Budget should refledct preempted requests.
    assert budget.num_batched_tokens == 2
    # since there are 2 sequences, 2 should be subtracted.
688
    assert budget.num_curr_seqs == 4
689
690
691
    # Both should be preempted, not swapped.
    assert output.blocks_to_swap_out == expected_swap_mapping
    # Nothing is copied.
692
    assert output.blocks_to_copy == []
693
694


695
def test_schedule_decode_blocks_to_copy_update():
696
697
698
    """
    Verify blocks_to_copy is updated.
    """
699
    block_size = 4
700
    scheduler = initialize_scheduler(block_size=4,
701
702
703
704
705
706
                                     num_cpu_blocks=16,
                                     num_gpu_blocks=16)
    _, seq_group = create_dummy_prompt("1",
                                       prompt_length=60,
                                       best_of=2,
                                       block_size=block_size)
707
    curr_loras = None
708
    scheduler._allocate_and_set_running(seq_group)
709
    append_new_token_seq_group(60, seq_group, 1)
710
    scheduler._add_seq_group_to_running(seq_group)
711
712
713

    # The last request should be swapped out.
    scheduler.block_manager.append_slots = MagicMock()
714
    scheduler.block_manager.append_slots.return_value = [(2, 3)]
715
716

    budget = create_token_budget()
717
718
    output = scheduler._schedule_running(budget, curr_loras)
    remaining_running = scheduler.running
719
    assert len(remaining_running) == 0
720
721
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
722
723
724
    assert len(output.preempted) == 0
    assert len(output.swapped_out) == 0
    # Nothing is preempted.
725
    assert output.blocks_to_swap_out == []
726
727
    # Since append_slot returns the source -> dist mapping, it should
    # applied.
728
    assert output.blocks_to_copy == [(2, 3)]
729
730


731
def test_schedule_swapped_simple():
732
    block_size = 4
733
    scheduler = initialize_scheduler(block_size=block_size)
734
    curr_loras = None
735
    blocks_to_swap_out: List[Tuple[int, int]] = []
736
737
738
739
    _, seq_group = create_dummy_prompt("1",
                                       prompt_length=4,
                                       best_of=2,
                                       block_size=block_size)
740
    scheduler._allocate_and_set_running(seq_group)
741
    append_new_token_seq_group(4, seq_group, 1)
742
    scheduler._swap_out(seq_group, blocks_to_swap_out)
743
    scheduler._add_seq_group_to_swapped(seq_group)
744
745

    budget = create_token_budget()
746
747
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
748
749
750
    assert len(remaining_swapped) == 0
    assert budget.num_batched_tokens == 1
    assert budget.num_curr_seqs == 2
751
752
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
753
    # swap in is the reverse of swap out
754
755
756
    blocks_to_swap_in_reverse = []
    for swapin, swapout in output.blocks_to_swap_in:
        blocks_to_swap_in_reverse.append((swapout, swapin))
757
758
759
    assert blocks_to_swap_out == blocks_to_swap_in_reverse


760
def test_schedule_swapped_max_token_budget():
761
    block_size = 4
762
    scheduler = initialize_scheduler(block_size=block_size,
763
764
                                     num_cpu_blocks=32,
                                     num_gpu_blocks=32)
765
    curr_loras = None
766
    blocks_to_swap_out: List[Tuple[int, int]] = []
767
768
    for i in range(2):
        _, seq_group = create_dummy_prompt(str(i), prompt_length=60, best_of=2)
769
        scheduler._allocate_and_set_running(seq_group)
770
        append_new_token_seq_group(60, seq_group, 1)
771
        scheduler._swap_out(seq_group, blocks_to_swap_out)
772
        scheduler._add_seq_group_to_swapped(seq_group)
773
774

    budget = create_token_budget(token_budget=1)
775
776
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
777
778
779
    assert len(remaining_swapped) == 1
    assert budget.num_batched_tokens == 1
    assert budget.num_curr_seqs == 2
780
781
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
782
783

    # Verify num_batched_tokens are respected.
784
785
    budget = create_token_budget(token_budget=1)
    add_token_budget(budget, 1, 0)
786
787
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
788
789
790
    assert len(remaining_swapped) == 1
    assert budget.num_batched_tokens == 1
    assert budget.num_curr_seqs == 0
791
792
    assert len(output.decode_seq_groups) == 0
    assert len(output.prefill_seq_groups) == 0
793
794


795
def test_schedule_swapped_max_seqs():
796
    block_size = 4
797
    scheduler = initialize_scheduler(block_size=block_size,
798
799
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
800
    curr_loras = None
801
    blocks_to_swap_out: List[Tuple[int, int]] = []
802
    for i in range(4):
803
804
805
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=4)
806
        scheduler._allocate_and_set_running(seq_group)
807
        append_new_token_seq_group(60, seq_group, 1)
808
        scheduler._swap_out(seq_group, blocks_to_swap_out)
809
        scheduler._add_seq_group_to_swapped(seq_group)
810
811

    budget = create_token_budget(max_num_seqs=2)
812
813
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
814
815
    assert len(remaining_swapped) == 2
    assert budget.num_batched_tokens == 2
816
    assert budget.num_curr_seqs == 2
817
818
    assert len(output.decode_seq_groups) == 2
    assert len(output.prefill_seq_groups) == 0
819
820

    # Verify num_curr_seqs are respected.
821
822
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
823
824
    assert len(remaining_swapped) == 2
    assert budget.num_batched_tokens == 2
825
    assert budget.num_curr_seqs == 2
826
827
    assert len(output.decode_seq_groups) == 0
    assert len(output.prefill_seq_groups) == 0
828
829


830
def test_schedule_swapped_max_loras():
831
    block_size = 4
832
    lora_config = LoRAConfig(max_lora_rank=8, max_loras=1)
833
834
835
836
    scheduler = initialize_scheduler(lora_config=lora_config,
                                     block_size=block_size,
                                     num_cpu_blocks=32,
                                     num_gpu_blocks=32)
837
838
    curr_loras: Set[int] = set()
    blocks_to_swap_out: List[Tuple[int, int]] = []
839
840
841
    for i in range(2):
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
842
                                           block_size=block_size,
843
844
845
                                           lora_request=LoRARequest(
                                               lora_name=str(i),
                                               lora_int_id=i + 1,
846
                                               lora_path="abc"))
847
        scheduler._allocate_and_set_running(seq_group)
848
        append_new_token_seq_group(60, seq_group, 1)
849
        scheduler._swap_out(seq_group, blocks_to_swap_out)
850
        scheduler._add_seq_group_to_swapped(seq_group)
851
852

    budget = create_token_budget()
853
854
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
855
856
857
    assert len(remaining_swapped) == 1
    assert budget.num_batched_tokens == 1
    assert budget.num_curr_seqs == 1
858
859
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
860
861
862
    assert len(curr_loras) == 1


863
def test_schedule_swapped_cannot_swap_in():
864
    block_size = 4
865
    scheduler = initialize_scheduler(block_size=block_size,
866
867
                                     num_cpu_blocks=32,
                                     num_gpu_blocks=32)
868
    curr_loras = None
869
    blocks_to_swap_out: List[Tuple[int, int]] = []
870
871
872
873
874
    for i in range(2):
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           best_of=2,
                                           block_size=block_size)
875
        scheduler._allocate_and_set_running(seq_group)
876
        append_new_token_seq_group(60, seq_group, 1)
877
        scheduler._swap_out(seq_group, blocks_to_swap_out)
878
        scheduler._add_seq_group_to_swapped(seq_group)
879
880
881

    # The last request should be swapped out.
    scheduler.block_manager.can_swap_in = MagicMock()
882
    scheduler.block_manager.can_swap_in.return_value = AllocStatus.LATER
883
884
    # Since we cannot swap in, none of the requests are swapped in.
    budget = create_token_budget()
885
886
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
887
888
889
    assert len(remaining_swapped) == 2
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
890
    assert len(output.decode_seq_groups) == 0
891
892
893
    assert len(output.prefill_seq_groups) == 0


894
def test_infeasible_swap():
895
    block_size = 4
896
    scheduler = initialize_scheduler(block_size=block_size,
897
898
                                     num_cpu_blocks=32,
                                     num_gpu_blocks=32)
899
    curr_loras = None
900
    blocks_to_swap_out: List[Tuple[int, int]] = []
901
902
903
904
905
    for i in range(2):
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           best_of=2,
                                           block_size=block_size)
906
907
908
        scheduler._allocate_and_set_running(seq_group)
        append_new_token_seq_group(60, seq_group, 1)
        scheduler._swap_out(seq_group, blocks_to_swap_out)
909
        scheduler._add_seq_group_to_swapped(seq_group)
910
911
912
913
914
915

    # The last request should be swapped out.
    scheduler.block_manager.can_swap_in = MagicMock()
    scheduler.block_manager.can_swap_in.return_value = AllocStatus.NEVER
    # Since we cannot swap in, none of the requests are swapped in.
    budget = create_token_budget()
916
917
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
918
919
920
921
922
    assert len(remaining_swapped) == 0
    assert len(output.infeasible_seq_groups) == 2
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
    assert len(output.decode_seq_groups) == 0
923
    assert len(output.prefill_seq_groups) == 0
924
925


926
def test_schedule_swapped_blocks_to_copy():
927
    block_size = 4
928
    scheduler = initialize_scheduler(block_size=block_size,
929
930
                                     num_cpu_blocks=32,
                                     num_gpu_blocks=32)
931
    curr_loras = None
932
933
934
935
    _, seq_group = create_dummy_prompt("1",
                                       prompt_length=60,
                                       best_of=2,
                                       block_size=block_size)
936
    scheduler._allocate_and_set_running(seq_group)
937
    append_new_token_seq_group(60, seq_group, 1)
938
    blocks_to_swap_out: List[Tuple[int, int]] = []
939
    scheduler._swap_out(seq_group, blocks_to_swap_out)
940
    scheduler._add_seq_group_to_swapped(seq_group)
941
942
943

    # The last request should be swapped out.
    scheduler.block_manager.append_slots = MagicMock()
944
    scheduler.block_manager.append_slots.return_value = [(2, 3)]
945
946

    budget = create_token_budget()
947
948
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
949
    assert len(remaining_swapped) == 0
950
951
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
952
    assert output.blocks_to_copy == [(2, 3)]
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996


def test_scheduling_budget():
    TOKEN_BUDGET = 4
    MAX_SEQS = 4
    budget = SchedulingBudget(token_budget=TOKEN_BUDGET, max_num_seqs=MAX_SEQS)
    assert budget.can_schedule(num_new_tokens=1, num_new_seqs=1)
    assert budget.can_schedule(num_new_tokens=4, num_new_seqs=4)
    assert not budget.can_schedule(num_new_tokens=1, num_new_seqs=5)
    assert not budget.can_schedule(num_new_tokens=5, num_new_seqs=1)
    assert not budget.can_schedule(num_new_tokens=5, num_new_seqs=5)
    assert budget.remaining_token_budget() == TOKEN_BUDGET

    # Verify add/subtract num batched tokens.
    _, seq_group = create_dummy_prompt("1", 3)
    budget.add_num_batched_tokens(seq_group.request_id, 2)
    assert budget.remaining_token_budget() == 2
    assert budget.num_batched_tokens == 2
    assert budget.can_schedule(num_new_tokens=2, num_new_seqs=1)
    assert not budget.can_schedule(num_new_tokens=3, num_new_seqs=1)
    # Verify adding another seq group is no-op.
    budget.add_num_batched_tokens(seq_group.request_id, 2)
    assert budget.remaining_token_budget() == 2
    assert budget.num_batched_tokens == 2
    budget.subtract_num_batched_tokens(seq_group.request_id, 2)
    assert budget.remaining_token_budget() == 4
    assert budget.num_batched_tokens == 0
    budget.subtract_num_batched_tokens(seq_group.request_id, 2)
    assert budget.remaining_token_budget() == 4
    assert budget.num_batched_tokens == 0

    # Verify add/subtract max seqs.
    _, seq_group = create_dummy_prompt("1", 3)
    budget.add_num_seqs(seq_group.request_id, 2)
    assert budget.can_schedule(num_new_tokens=1, num_new_seqs=2)
    assert not budget.can_schedule(num_new_tokens=1, num_new_seqs=3)
    assert budget.num_curr_seqs == 2
    # Verify adding another seq group is no-op.
    budget.add_num_seqs(seq_group.request_id, 2)
    assert budget.num_curr_seqs == 2
    budget.subtract_num_seqs(seq_group.request_id, 2)
    assert budget.num_curr_seqs == 0
    budget.subtract_num_seqs(seq_group.request_id, 2)
    assert budget.num_curr_seqs == 0