test_scheduler.py 31.4 KB
Newer Older
1
import time
2
from collections import deque
3
from typing import List, Set, Tuple
4
from unittest.mock import MagicMock
5

6
import pytest  # noqa
7
from torch import Use  # noqa
8

9
10
11
12
from vllm.config import CacheConfig, LoRAConfig, SchedulerConfig
from vllm.core.interfaces import AllocStatus
from vllm.core.scheduler import Scheduler, SchedulingBudget
from vllm.lora.request import LoRARequest
13
from vllm.sequence import SequenceGroup
14

15
16
17
from .utils import (append_new_token, append_new_token_seq_group,
                    create_dummy_prompt, get_sequence_groups,
                    schedule_and_update_computed_tokens)
18
19


20
def test_scheduler_add_seq_group():
21
    block_size = 4
22
    scheduler_config = SchedulerConfig(
23
24
25
26
        "generate",
        max_num_batched_tokens=100,
        max_num_seqs=64,
        max_model_len=1,
27
    )
28
    cache_config = CacheConfig(block_size, 1.0, 1, cache_dtype="auto")
29
30
31
32
33
34
35
    cache_config.num_cpu_blocks = 4
    cache_config.num_gpu_blocks = 4
    scheduler = Scheduler(scheduler_config, cache_config, None)

    # Add seq group to scheduler.
    num_seq_group = 4
    for i in range(num_seq_group):
36
37
38
        _, seq_group = create_dummy_prompt(str(i),
                                           block_size,
                                           block_size=block_size)
39
40
41
42
        scheduler.add_seq_group(seq_group)
        assert scheduler.get_num_unfinished_seq_groups() == i + 1


43
def test_scheduler_abort_seq_group():
44
    block_size = 4
45
    scheduler_config = SchedulerConfig(
46
47
48
49
        "generate",
        max_num_batched_tokens=100,
        max_num_seqs=64,
        max_model_len=1,
50
    )
51
52
53
54
55
56
57
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
    cache_config.num_cpu_blocks = 4
    cache_config.num_gpu_blocks = 4
    scheduler = Scheduler(scheduler_config, cache_config, None)

    # Add multiple seq groups to scheduler.
    num_seq_group = 4
58
    request_ids: Set[str] = set()
59
60
61
62
63
64
65
66
67
68
69
    for i in range(num_seq_group):
        _, seq_group = create_dummy_prompt(str(i), block_size)
        scheduler.add_seq_group(seq_group)
        request_ids.add(str(i))

    # Abort all added seq groups.
    assert scheduler.get_num_unfinished_seq_groups() == num_seq_group
    scheduler.abort_seq_group(request_ids)
    assert scheduler.get_num_unfinished_seq_groups() == 0


70
def test_scheduler_schedule_simple():
71
72
73
    block_size = 4
    num_seq_group = 4
    max_model_len = 16
74
    scheduler_config = SchedulerConfig(
75
76
77
78
        "generate",
        max_num_batched_tokens=64,
        max_num_seqs=num_seq_group,
        max_model_len=max_model_len,
79
    )
80
81
82
83
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
    cache_config.num_cpu_blocks = 8
    cache_config.num_gpu_blocks = 8
    scheduler = Scheduler(scheduler_config, cache_config, None)
84
    running: List[SequenceGroup] = []
85
86
87

    # Add seq groups to scheduler.
    for i in range(num_seq_group):
88
89
90
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=block_size,
                                           block_size=block_size)
91
92
93
94
        scheduler.add_seq_group(seq_group)
        running.append(seq_group)

    # Schedule seq groups prompts.
95
    num_tokens = block_size * num_seq_group
96
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
97
    assert set(get_sequence_groups(out)) == set(running)
98
    assert out.num_batched_tokens == num_tokens
99
100
101
    assert (not out.blocks_to_copy and not out.blocks_to_swap_in
            and not out.blocks_to_swap_out)
    assert len(seq_group_meta) == num_seq_group
102
    append_new_token(out, 1)
103
104

    # Schedule seq groups generation.
105
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
106
    assert set(get_sequence_groups(out)) == set(running)
107
108
109
110
    assert out.num_batched_tokens == num_seq_group
    assert (not out.blocks_to_copy and not out.blocks_to_swap_in
            and not out.blocks_to_swap_out)
    assert len(seq_group_meta) == num_seq_group
111
112
113
    append_new_token(out, 1)


114
def test_scheduler_prefill_prioritized():
115
116
117
118
    """Verify running batched tokens are not applied to prefill requests."""
    block_size = 4
    max_model_len = 30
    max_batched_num_tokens = 30
119
    scheduler_config = SchedulerConfig(
120
121
122
123
        "generate",
        max_num_batched_tokens=max_batched_num_tokens,
        max_num_seqs=2,
        max_model_len=max_model_len,
124
    )
125
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
126
127
    cache_config.num_cpu_blocks = 16
    cache_config.num_gpu_blocks = 16
128
129
130
    scheduler = Scheduler(scheduler_config, cache_config, None)

    # Add seq groups to scheduler.
131
    _, seq_group_a = create_dummy_prompt("1", 1, block_size=block_size)
132
133
134
135
136
137
138
    scheduler.add_seq_group(seq_group_a)

    # Schedule seq groups prompts.
    _, out = schedule_and_update_computed_tokens(scheduler)
    assert get_sequence_groups(out) == [seq_group_a]

    # Add a new prefill request B.
139
    _, seq_group_b = create_dummy_prompt("2", 30, block_size=block_size)
140
141
142
143
144
145
    scheduler.add_seq_group(seq_group_b)

    # Verify prefill requests are prioritized. Since max_batched_num_tokens
    # is 1, new prefill request has to be scheduled first.
    _, out = schedule_and_update_computed_tokens(scheduler)
    assert get_sequence_groups(out) == [seq_group_b]
146
147


148
def test_scheduler_schedule_preempt_abort():
149
150
    block_size = 4
    max_model_len = 16
151
    scheduler_config = SchedulerConfig(
152
153
154
155
        "generate",
        max_num_batched_tokens=64,
        max_num_seqs=2,
        max_model_len=max_model_len,
156
    )
157
158
159
160
161
162
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
    cache_config.num_cpu_blocks = 2
    cache_config.num_gpu_blocks = 2
    scheduler = Scheduler(scheduler_config, cache_config, None)

    # Add seq groups to scheduler.
163
164
165
166
167
168
    seq_a, seq_group_a = create_dummy_prompt("1",
                                             block_size,
                                             block_size=block_size)
    seq_b, seq_group_b = create_dummy_prompt("2",
                                             block_size,
                                             block_size=block_size)
169
170
171
172
    scheduler.add_seq_group(seq_group_a)
    scheduler.add_seq_group(seq_group_b)

    # Schedule seq groups prompts.
173
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
174
    assert get_sequence_groups(out) == [seq_group_a, seq_group_b]
175
    assert out.num_batched_tokens == block_size * 2  # seq_a and seq_b
176
177
178
179
180
181
182
    assert (not out.blocks_to_copy and not out.blocks_to_swap_in
            and not out.blocks_to_swap_out)
    assert len(seq_group_meta) == 2
    assert scheduler.get_num_unfinished_seq_groups() == 2

    # Append "generated" tokens, allowing the sequence to mark prompt tokens as
    # processed.
183
    append_new_token(out, 1)
184
185

    # Schedule seq groups generation and preempt seq group b.
186
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
187
    assert get_sequence_groups(out) == [seq_group_a]
188
189
190
191
192
    assert out.num_batched_tokens == 1
    assert (not out.blocks_to_copy and not out.blocks_to_swap_in
            and not out.blocks_to_swap_out)
    assert len(seq_group_meta) == 1
    assert scheduler.get_num_unfinished_seq_groups() == 2
193
    assert out.preempted == 1
194
195
196

    # Abort seq group a. Re-schedule seq group b prompt with recomputation.
    scheduler.abort_seq_group("1")
197
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
198
    assert get_sequence_groups(out) == [seq_group_b]
199
    assert out.num_batched_tokens == 5  # 4 prompt + 1 generation.
200
201
202
203
204
205
    assert (not out.blocks_to_copy and not out.blocks_to_swap_in
            and not out.blocks_to_swap_out)
    assert len(seq_group_meta) == 1
    assert scheduler.get_num_unfinished_seq_groups() == 1


206
def test_scheduler_max_seqs():
207
208
209
210
    block_size = 4
    num_seq_group = 4
    max_seq_group = 2
    max_model_len = 16
211
    scheduler_config = SchedulerConfig(
212
213
214
215
        "generate",
        max_num_batched_tokens=64,
        max_num_seqs=max_seq_group,
        max_model_len=max_model_len,
216
    )
217
218
219
220
221
222
223
224
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
    cache_config.num_cpu_blocks = 8
    cache_config.num_gpu_blocks = 8
    scheduler = Scheduler(scheduler_config, cache_config, None)

    all_seq_groups: List[SequenceGroup] = []
    # Add seq groups to scheduler.
    for i in range(num_seq_group):
225
226
227
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=block_size,
                                           block_size=block_size)
228
229
230
231
232
233
        all_seq_groups.append(seq_group)

    # Append 1 seq group
    scheduler.add_seq_group(all_seq_groups[0])

    # Schedule seq groups prompts.
234
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
235
    assert set(get_sequence_groups(out)) == set([all_seq_groups[0]])
236
    append_new_token(out, 1)
237
238

    # Schedule seq groups generation.
239
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
240
    assert set(get_sequence_groups(out)) == set([all_seq_groups[0]])
241
    append_new_token(out, 1)
242
243
244
245
246
247
248
249

    # Append 2 more seq group
    scheduler.add_seq_group(all_seq_groups[1])
    scheduler.add_seq_group(all_seq_groups[2])

    # Schedule seq groups prompts.
    # Only 1 seq group should be scheduled since max_seq_group is 2
    # and one is prompting.
250
    _, out = schedule_and_update_computed_tokens(scheduler)
251
    assert set(get_sequence_groups(out)) == set([all_seq_groups[1]])
252
253


254
def test_scheduler_delay_factor():
255
    block_size = 4
256
    scheduler_config = SchedulerConfig(
257
258
259
260
        "generate",
        max_num_batched_tokens=100,
        max_num_seqs=64,
        max_model_len=16,
261
        delay_factor=0.5,
262
    )
263
264
265
266
267
268
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
    cache_config.num_cpu_blocks = 8
    cache_config.num_gpu_blocks = 8
    scheduler = Scheduler(scheduler_config, cache_config, None)

    # schedule first prompt
269
    seq_group_meta, seq_group = create_dummy_prompt("0",
270
271
                                                    prompt_length=block_size,
                                                    block_size=block_size)
272
    scheduler.add_seq_group(seq_group)
273
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
274
    assert out.num_prefill_groups > 0
275
    assert seq_group_meta[0].request_id == '0'
276
    append_new_token(out, 1)
277
278
279

    # wait for a second before scheduling next prompt
    time.sleep(1)
280
    seq_group_meta, seq_group = create_dummy_prompt("1",
281
282
                                                    prompt_length=block_size,
                                                    block_size=block_size)
283
284
285
    scheduler.add_seq_group(seq_group)

    # second prompt should *not* be scheduled
286
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
287
    assert out.num_prefill_groups == 0
288
    assert seq_group_meta[0].request_id == '0'
289
    append_new_token(out, 1)
290
291
292

    # wait for more than 0.5 second and try again
    time.sleep(0.6)
293
    seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
294
    assert out.num_prefill_groups > 0
295
    assert seq_group_meta[0].request_id == '1'
296
    append_new_token(out, 1)
297
298


299
300
301
302
303
304
305
306
307
308
309
310
def initialize_scheduler(
    *,
    max_num_seqs=1000,
    max_token_budget=1000,
    max_model_len=1000,
    lora_config=None,
    block_size=4,
    num_cpu_blocks=8,
    num_gpu_blocks=8,
):
    block_size = block_size
    scheduler_config = SchedulerConfig(
311
312
313
314
        "generate",
        max_num_batched_tokens=max_token_budget,
        max_num_seqs=max_num_seqs,
        max_model_len=max_model_len,
315
    )
316
    cache_config = CacheConfig(block_size, 1.0, 1, "auto")
317
318
    cache_config.num_cpu_blocks = num_cpu_blocks
    cache_config.num_gpu_blocks = num_gpu_blocks
319
320
321
322
    scheduler = Scheduler(scheduler_config, cache_config, lora_config)
    return scheduler


323
def create_token_budget(token_budget: int = 10000,
324
325
326
327
328
329
330
                        max_num_seqs: int = 10000) -> SchedulingBudget:
    return SchedulingBudget(
        token_budget=token_budget,
        max_num_seqs=max_num_seqs,
    )


331
332
333
334
335
336
337
338
339
def add_token_budget(budget: SchedulingBudget,
                     num_batched_tokens: int = 0,
                     num_curr_seqs: int = 0):
    mock_seq_group = create_dummy_prompt('10', prompt_length=60)[1]
    budget.add_num_batched_tokens(mock_seq_group.request_id,
                                  num_batched_tokens)
    budget.add_num_seqs(mock_seq_group.request_id, num_curr_seqs)


340
def test_prefill_schedule_max_prompt_len():
341
342
343
    """
    Test prompt longer than max_prompt_len is aborted.
    """
344
    block_size = 4
345
    scheduler = initialize_scheduler(max_model_len=30, block_size=block_size)
346
347
348
    _, seq_group = create_dummy_prompt("0",
                                       prompt_length=60,
                                       block_size=block_size)
349
    scheduler.add_seq_group(seq_group)
350
    budget = create_token_budget()
351
352
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
353
354
355
356
357
358
359
    assert len(output.ignored_seq_groups) == 1
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
    assert len(remaining_waiting) == 0


360
def test_prefill_schedule_token_budget():
361
362
363
    """
    Test token budget respected.
    """
364
    block_size = 4
365
    scheduler = initialize_scheduler(block_size=block_size,
366
367
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
368
369
    budget = create_token_budget(token_budget=0)
    for i in range(2):
370
371
372
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
373
        scheduler.add_seq_group(seq_group)
374
375

    # 0 token budget == nothing is scheduled.
376
377
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
378
379
380
381
382
383
384
385
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
    assert len(remaining_waiting) == 2

    # 60 token budget == 1 request scheduled.
    budget = create_token_budget(token_budget=60)
386
387
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
388
389
390
391
392
393
394
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 1
    assert budget.num_batched_tokens == 60
    assert budget.num_curr_seqs == 1
    assert len(remaining_waiting) == 1

    # Test when current_batched_tokens respected.
395
    scheduler = initialize_scheduler(block_size=block_size,
396
397
                                     num_cpu_blocks=16,
                                     num_gpu_blocks=16)
398
399
    budget = create_token_budget(token_budget=60)
    add_token_budget(budget, 30, 0)
400
401
402
    _, seq_group = create_dummy_prompt(str(i),
                                       prompt_length=60,
                                       block_size=block_size)
403
    # Cannot schedule a prompt that doesn't fit the budget.
404
405
406
    scheduler.add_seq_group(seq_group)
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
407
408
409
410
411
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 30
    assert budget.num_curr_seqs == 0
    assert len(remaining_waiting) == 1
412
413
    budget = create_token_budget(token_budget=90)
    add_token_budget(budget, 30, 0)
414
415
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
416
417
418
419
420
421
    assert len(output.seq_groups) == 1
    assert budget.num_batched_tokens == 90
    assert budget.num_curr_seqs == 1
    assert len(remaining_waiting) == 0


422
def test_prefill_schedule_max_seqs():
423
424
425
    """
    Test max seq respected.
    """
426
    block_size = 4
427
    scheduler = initialize_scheduler(block_size=block_size,
428
429
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
430
431
    budget = create_token_budget(max_num_seqs=2)
    for i in range(3):
432
433
434
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
435
436
437
        scheduler.add_seq_group(seq_group)
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
438
439
440
441
442
443
444
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 2
    assert budget.num_batched_tokens == 120
    assert budget.num_curr_seqs == 2
    assert len(remaining_waiting) == 1

    # Verify curr_num_seqs respected.
445
    scheduler.waiting = deque()
446
447
    budget = create_token_budget(max_num_seqs=2)
    add_token_budget(budget, 0, 2)
448
449
450
    _, seq_group = create_dummy_prompt(str(i),
                                       prompt_length=60,
                                       block_size=block_size)
451
452
453
    scheduler.add_seq_group(seq_group)
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
454
455
456
457
458
459
460
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 2
    assert len(remaining_waiting) == 1


461
def test_prefill_schedule_max_lora():
462
463
464
    """
    Test max lora is respected and prioritized.
    """
465
    block_size = 4
466
    lora_config = LoRAConfig(max_lora_rank=8, max_loras=1)
467
468
469
470
    scheduler = initialize_scheduler(lora_config=lora_config,
                                     block_size=block_size,
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
471
    budget = create_token_budget(token_budget=120)
472
    curr_loras: Set[int] = set()
473
474
475
    for i in range(2):
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
476
                                           block_size=block_size,
477
478
479
                                           lora_request=LoRARequest(
                                               lora_name=str(i),
                                               lora_int_id=i + 1,
480
                                               lora_path="abc"))
481
        scheduler.add_seq_group(seq_group)
482
483
484
485
486
487
    # Add two more requests to verify lora is prioritized.
    # 0: Lora, 1: Lora, 2: regular, 3: regular
    # In the first iteration, index 0, 2 is scheduled.
    # If a request is not scheduled because it hits max lora, it is
    # prioritized. Verify that.
    for i in range(2, 4):
488
489
490
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
491
        scheduler.add_seq_group(seq_group)
492
    # Schedule 2 requests (0 and 2)
493
494
    output = scheduler._schedule_prefills(budget, curr_loras)
    remaining_waiting = scheduler.waiting
495
496
497
498
499
500
501
502
503
504
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 2
    assert budget.num_batched_tokens == 120
    assert budget.num_curr_seqs == 2
    assert len(remaining_waiting) == 2
    assert len(curr_loras) == 1
    # The second lora request is scheduled next as FCFS policy.
    # Reset curr_loras so that it can be scheduled.
    curr_loras = set()
    budget = create_token_budget(token_budget=60)
505
506
    output = scheduler._schedule_prefills(budget, curr_loras)
    remaining_waiting = scheduler.waiting
507
508
509
510
511
512
513
    assert len(output.seq_groups) == 1
    assert output.seq_groups[0].seq_group.request_id == "1"
    assert len(remaining_waiting) == 1
    assert len(curr_loras) == 1
    assert budget.num_batched_tokens == 60


514
def test_prefill_schedule_no_block_manager_capacity():
515
516
517
    """
    Test sequence cannot be scheduled due to block manager has no capacity.
    """
518
    block_size = 4
519
    scheduler = initialize_scheduler(block_size=block_size,
520
521
                                     num_gpu_blocks=128,
                                     num_cpu_blocks=128)
522
523
    budget = create_token_budget()
    for i in range(3):
524
525
526
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
527
        scheduler.add_seq_group(seq_group)
528
529
    scheduler.block_manager.can_allocate = MagicMock()
    scheduler.block_manager.can_allocate.return_value = AllocStatus.LATER
530
531
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
532
533
534
535
    assert len(output.ignored_seq_groups) == 0
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
536
    assert len(remaining_waiting) == 3
537
538
539
540

    scheduler = initialize_scheduler()
    budget = create_token_budget()
    for i in range(3):
541
542
543
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
544
        scheduler.add_seq_group(seq_group)
545
546
    scheduler.block_manager.can_allocate = MagicMock()
    scheduler.block_manager.can_allocate.return_value = AllocStatus.NEVER
547
548
    output = scheduler._schedule_prefills(budget, None)
    remaining_waiting = scheduler.waiting
549
550
551
552
553
554
555
    assert len(output.ignored_seq_groups) == 3
    assert len(output.seq_groups) == 0
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
    assert len(remaining_waiting) == 0


556
def test_decode_schedule_preempted():
557
558
559
    """
    Test decodes cannot be scheduled and preempted.
    """
560
    block_size = 4
561
    scheduler = initialize_scheduler(block_size=block_size,
562
563
                                     num_cpu_blocks=64,
                                     num_gpu_blocks=64)
564
565
    curr_loras = None
    for i in range(3):
566
567
568
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           block_size=block_size)
569
        scheduler._allocate_and_set_running(seq_group)
570
        append_new_token_seq_group(60, seq_group, 1)
571
        scheduler._add_seq_group_to_running(seq_group)
572
573
574
575
576
577
578
579
580
581
    scheduler.block_manager.can_append_slots = MagicMock()

    def cannot_append_second_group(seq_group, num_lookahead_slots):
        return seq_group.request_id != "1"

    scheduler.block_manager.can_append_slots.side_effect = (
        cannot_append_second_group)

    # 1 cannot be scheduled, and the lowest priority (request 2)
    # should be preempted. 1 will also be preempted.
582
    budget = create_token_budget()
583
584
    output = scheduler._schedule_running(budget, curr_loras)
    remainig_running = scheduler.running
585
    assert len(remainig_running) == 0
586
587
588
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
    assert output.decode_seq_groups[0].seq_group.request_id == "0"
589
590
591
    assert len(output.preempted) == 2
    # Verify budgets are updated.
    assert budget.num_batched_tokens == 1
592
593
    # NOTE: When enable_chunk is False, num_seqs budget is not updated.
    # assert budget.num_curr_seqs == 1
594
    # Both should be preempted, not swapped.
595
    assert output.blocks_to_swap_out == []
596
    # Nothing is copied.
597
    assert output.blocks_to_copy == []
598
599


600
def test_schedule_decode_blocks_to_copy_update():
601
602
603
    """
    Verify blocks_to_copy is updated.
    """
604
    block_size = 4
605
    scheduler = initialize_scheduler(block_size=4,
606
607
608
609
610
611
                                     num_cpu_blocks=16,
                                     num_gpu_blocks=16)
    _, seq_group = create_dummy_prompt("1",
                                       prompt_length=60,
                                       best_of=2,
                                       block_size=block_size)
612
    curr_loras = None
613
    scheduler._allocate_and_set_running(seq_group)
614
    append_new_token_seq_group(60, seq_group, 1)
615
    scheduler._add_seq_group_to_running(seq_group)
616
617
618

    # The last request should be swapped out.
    scheduler.block_manager.append_slots = MagicMock()
619
    scheduler.block_manager.append_slots.return_value = [(2, 3)]
620
621

    budget = create_token_budget()
622
623
    output = scheduler._schedule_running(budget, curr_loras)
    remaining_running = scheduler.running
624
    assert len(remaining_running) == 0
625
626
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
627
628
629
    assert len(output.preempted) == 0
    assert len(output.swapped_out) == 0
    # Nothing is preempted.
630
    assert output.blocks_to_swap_out == []
631
632
    # Since append_slot returns the source -> dist mapping, it should
    # applied.
633
    assert output.blocks_to_copy == [(2, 3)]
634
635


636
def test_schedule_swapped_max_loras():
637
    block_size = 4
638
    lora_config = LoRAConfig(max_lora_rank=8, max_loras=1)
639
640
641
642
    scheduler = initialize_scheduler(lora_config=lora_config,
                                     block_size=block_size,
                                     num_cpu_blocks=32,
                                     num_gpu_blocks=32)
643
644
    curr_loras: Set[int] = set()
    blocks_to_swap_out: List[Tuple[int, int]] = []
645
646
647
    for i in range(2):
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
648
                                           block_size=block_size,
649
650
651
                                           lora_request=LoRARequest(
                                               lora_name=str(i),
                                               lora_int_id=i + 1,
652
                                               lora_path="abc"))
653
        scheduler._allocate_and_set_running(seq_group)
654
        append_new_token_seq_group(60, seq_group, 1)
655
        scheduler._swap_out(seq_group, blocks_to_swap_out)
656
        scheduler._add_seq_group_to_swapped(seq_group)
657
658

    budget = create_token_budget()
659
660
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
661
662
663
    assert len(remaining_swapped) == 1
    assert budget.num_batched_tokens == 1
    assert budget.num_curr_seqs == 1
664
665
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
666
667
668
    assert len(curr_loras) == 1


669
def test_schedule_swapped_cannot_swap_in():
670
    block_size = 4
671
    scheduler = initialize_scheduler(block_size=block_size,
672
673
                                     num_cpu_blocks=32,
                                     num_gpu_blocks=32)
674
    curr_loras = None
675
    blocks_to_swap_out: List[Tuple[int, int]] = []
676
677
678
679
680
    for i in range(2):
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           best_of=2,
                                           block_size=block_size)
681
        scheduler._allocate_and_set_running(seq_group)
682
        append_new_token_seq_group(60, seq_group, 1)
683
        scheduler._swap_out(seq_group, blocks_to_swap_out)
684
        scheduler._add_seq_group_to_swapped(seq_group)
685
686
687

    # The last request should be swapped out.
    scheduler.block_manager.can_swap_in = MagicMock()
688
    scheduler.block_manager.can_swap_in.return_value = AllocStatus.LATER
689
690
    # Since we cannot swap in, none of the requests are swapped in.
    budget = create_token_budget()
691
692
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
693
694
695
    assert len(remaining_swapped) == 2
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
696
    assert len(output.decode_seq_groups) == 0
697
698
699
    assert len(output.prefill_seq_groups) == 0


700
def test_infeasible_swap():
701
    block_size = 4
702
    scheduler = initialize_scheduler(block_size=block_size,
703
704
                                     num_cpu_blocks=32,
                                     num_gpu_blocks=32)
705
    curr_loras = None
706
    blocks_to_swap_out: List[Tuple[int, int]] = []
707
708
709
710
711
    for i in range(2):
        _, seq_group = create_dummy_prompt(str(i),
                                           prompt_length=60,
                                           best_of=2,
                                           block_size=block_size)
712
713
714
        scheduler._allocate_and_set_running(seq_group)
        append_new_token_seq_group(60, seq_group, 1)
        scheduler._swap_out(seq_group, blocks_to_swap_out)
715
        scheduler._add_seq_group_to_swapped(seq_group)
716
717
718
719
720
721

    # The last request should be swapped out.
    scheduler.block_manager.can_swap_in = MagicMock()
    scheduler.block_manager.can_swap_in.return_value = AllocStatus.NEVER
    # Since we cannot swap in, none of the requests are swapped in.
    budget = create_token_budget()
722
723
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
724
725
726
727
728
    assert len(remaining_swapped) == 0
    assert len(output.infeasible_seq_groups) == 2
    assert budget.num_batched_tokens == 0
    assert budget.num_curr_seqs == 0
    assert len(output.decode_seq_groups) == 0
729
    assert len(output.prefill_seq_groups) == 0
730
731


732
def test_schedule_swapped_blocks_to_copy():
733
    block_size = 4
734
    scheduler = initialize_scheduler(block_size=block_size,
735
736
                                     num_cpu_blocks=32,
                                     num_gpu_blocks=32)
737
    curr_loras = None
738
739
740
741
    _, seq_group = create_dummy_prompt("1",
                                       prompt_length=60,
                                       best_of=2,
                                       block_size=block_size)
742
    scheduler._allocate_and_set_running(seq_group)
743
    append_new_token_seq_group(60, seq_group, 1)
744
    blocks_to_swap_out: List[Tuple[int, int]] = []
745
    scheduler._swap_out(seq_group, blocks_to_swap_out)
746
    scheduler._add_seq_group_to_swapped(seq_group)
747
748
749

    # The last request should be swapped out.
    scheduler.block_manager.append_slots = MagicMock()
750
    scheduler.block_manager.append_slots.return_value = [(2, 3)]
751
752

    budget = create_token_budget()
753
754
    output = scheduler._schedule_swapped(budget, curr_loras)
    remaining_swapped = scheduler.swapped
755
    assert len(remaining_swapped) == 0
756
757
    assert len(output.decode_seq_groups) == 1
    assert len(output.prefill_seq_groups) == 0
758
    assert output.blocks_to_copy == [(2, 3)]
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802


def test_scheduling_budget():
    TOKEN_BUDGET = 4
    MAX_SEQS = 4
    budget = SchedulingBudget(token_budget=TOKEN_BUDGET, max_num_seqs=MAX_SEQS)
    assert budget.can_schedule(num_new_tokens=1, num_new_seqs=1)
    assert budget.can_schedule(num_new_tokens=4, num_new_seqs=4)
    assert not budget.can_schedule(num_new_tokens=1, num_new_seqs=5)
    assert not budget.can_schedule(num_new_tokens=5, num_new_seqs=1)
    assert not budget.can_schedule(num_new_tokens=5, num_new_seqs=5)
    assert budget.remaining_token_budget() == TOKEN_BUDGET

    # Verify add/subtract num batched tokens.
    _, seq_group = create_dummy_prompt("1", 3)
    budget.add_num_batched_tokens(seq_group.request_id, 2)
    assert budget.remaining_token_budget() == 2
    assert budget.num_batched_tokens == 2
    assert budget.can_schedule(num_new_tokens=2, num_new_seqs=1)
    assert not budget.can_schedule(num_new_tokens=3, num_new_seqs=1)
    # Verify adding another seq group is no-op.
    budget.add_num_batched_tokens(seq_group.request_id, 2)
    assert budget.remaining_token_budget() == 2
    assert budget.num_batched_tokens == 2
    budget.subtract_num_batched_tokens(seq_group.request_id, 2)
    assert budget.remaining_token_budget() == 4
    assert budget.num_batched_tokens == 0
    budget.subtract_num_batched_tokens(seq_group.request_id, 2)
    assert budget.remaining_token_budget() == 4
    assert budget.num_batched_tokens == 0

    # Verify add/subtract max seqs.
    _, seq_group = create_dummy_prompt("1", 3)
    budget.add_num_seqs(seq_group.request_id, 2)
    assert budget.can_schedule(num_new_tokens=1, num_new_seqs=2)
    assert not budget.can_schedule(num_new_tokens=1, num_new_seqs=3)
    assert budget.num_curr_seqs == 2
    # Verify adding another seq group is no-op.
    budget.add_num_seqs(seq_group.request_id, 2)
    assert budget.num_curr_seqs == 2
    budget.subtract_num_seqs(seq_group.request_id, 2)
    assert budget.num_curr_seqs == 0
    budget.subtract_num_seqs(seq_group.request_id, 2)
    assert budget.num_curr_seqs == 0