scheduler.py 20.8 KB
Newer Older
1
from collections import deque
2
3
import enum
import time
4
from typing import Deque, Dict, Iterable, List, Optional, Tuple, Union, Set
Woosuk Kwon's avatar
Woosuk Kwon committed
5

6
from vllm.config import CacheConfig, LoRAConfig, SchedulerConfig
7
from vllm.core.block_manager import AllocStatus, BlockSpaceManager
Woosuk Kwon's avatar
Woosuk Kwon committed
8
from vllm.core.policy import PolicyFactory
9
from vllm.lora.request import LoRARequest
Woosuk Kwon's avatar
Woosuk Kwon committed
10
11
from vllm.logger import init_logger
from vllm.sequence import (Sequence, SequenceData, SequenceGroup,
12
                           SequenceGroupMetadata, SequenceStatus)
13
from vllm.prefix import PrefixPool
Woosuk Kwon's avatar
Woosuk Kwon committed
14

Woosuk Kwon's avatar
Woosuk Kwon committed
15
logger = init_logger(__name__)
16

Woosuk Kwon's avatar
Woosuk Kwon committed
17

18
19
20
21
22
23
24
25
26
27
28
29
30
class PreemptionMode(enum.Enum):
    """Preemption modes.

    1. Swapping: Swap out the blocks of the preempted sequences to CPU memory
    and swap them back in when the sequences are resumed.
    2. Recomputation: Discard the blocks of the preempted sequences and
    recompute them when the sequences are resumed, treating the sequences as
    new prompts.
    """
    SWAP = enum.auto()
    RECOMPUTE = enum.auto()


31
32
33
34
class SchedulerOutputs:

    def __init__(
        self,
35
        scheduled_seq_groups: Iterable[SequenceGroup],
Woosuk Kwon's avatar
Woosuk Kwon committed
36
37
        prompt_run: bool,
        num_batched_tokens: int,
38
39
40
        blocks_to_swap_in: Dict[int, int],
        blocks_to_swap_out: Dict[int, int],
        blocks_to_copy: Dict[int, List[int]],
Woosuk Kwon's avatar
Woosuk Kwon committed
41
        ignored_seq_groups: List[SequenceGroup],
42
    ) -> None:
Woosuk Kwon's avatar
Woosuk Kwon committed
43
44
45
        self.scheduled_seq_groups = scheduled_seq_groups
        self.prompt_run = prompt_run
        self.num_batched_tokens = num_batched_tokens
46
47
48
49
50
        self.blocks_to_swap_in = blocks_to_swap_in
        self.blocks_to_swap_out = blocks_to_swap_out
        self.blocks_to_copy = blocks_to_copy
        # Swap in and swap out should never happen at the same time.
        assert not (blocks_to_swap_in and blocks_to_swap_out)
Woosuk Kwon's avatar
Woosuk Kwon committed
51
        self.ignored_seq_groups = ignored_seq_groups
52

53
54
55
56
        self.num_loras = len(self.lora_requests)
        if self.num_loras > 0:
            self._sort_by_lora_ids()

57
    def is_empty(self) -> bool:
Woosuk Kwon's avatar
Woosuk Kwon committed
58
59
60
        # NOTE: We do not consider the ignored sequence groups.
        return (not self.scheduled_seq_groups and not self.blocks_to_swap_in
                and not self.blocks_to_swap_out and not self.blocks_to_copy)
61

62
63
64
65
66
67
68
69
70
71
    def _sort_by_lora_ids(self) -> bool:
        self.scheduled_seq_groups = sorted(
            self.scheduled_seq_groups,
            key=lambda g: (g.lora_request.lora_int_id
                           if g.lora_request else 0, g.request_id))

    @property
    def lora_requests(self) -> Set[LoRARequest]:
        return {g.lora_request for g in self.scheduled_seq_groups}

72

Woosuk Kwon's avatar
Woosuk Kwon committed
73
74
class Scheduler:

Woosuk Kwon's avatar
Woosuk Kwon committed
75
    def __init__(
Woosuk Kwon's avatar
Woosuk Kwon committed
76
        self,
77
78
        scheduler_config: SchedulerConfig,
        cache_config: CacheConfig,
79
        lora_config: Optional[LoRAConfig],
Woosuk Kwon's avatar
Woosuk Kwon committed
80
    ) -> None:
81
82
        self.scheduler_config = scheduler_config
        self.cache_config = cache_config
83
84
85
86
        # Note for LoRA scheduling: the current policy is extremely
        # simple and NOT fair. It can lead to starvation of some
        # LoRAs. This should be improved in the future.
        self.lora_config = lora_config
Woosuk Kwon's avatar
Woosuk Kwon committed
87

88
89
90
        self.prompt_limit = min(self.scheduler_config.max_model_len,
                                self.scheduler_config.max_num_batched_tokens)

91
        # Instantiate the scheduling policy.
92
        self.policy = PolicyFactory.get_policy(policy_name="fcfs")
Woosuk Kwon's avatar
Woosuk Kwon committed
93
        # Create the block space manager.
Woosuk Kwon's avatar
Woosuk Kwon committed
94
        self.block_manager = BlockSpaceManager(
95
96
97
            block_size=self.cache_config.block_size,
            num_gpu_blocks=self.cache_config.num_gpu_blocks,
            num_cpu_blocks=self.cache_config.num_cpu_blocks,
98
            sliding_window=self.cache_config.sliding_window)
Woosuk Kwon's avatar
Woosuk Kwon committed
99

100
101
102
        # Create the prefix pool to cache the prefixes.
        self.prefix_pool = PrefixPool(self.cache_config.block_size)

103
        # Sequence groups in the WAITING state.
104
        self.waiting: Deque[SequenceGroup] = deque()
105
        # Sequence groups in the RUNNING state.
106
        self.running: Deque[SequenceGroup] = deque()
107
        # Sequence groups in the SWAPPED state.
108
        self.swapped: Deque[SequenceGroup] = deque()
Woosuk Kwon's avatar
Woosuk Kwon committed
109

110
111
112
113
    @property
    def lora_enabled(self) -> bool:
        return bool(self.lora_config)

114
    def add_seq_group(self, seq_group: SequenceGroup) -> None:
115
        # Add sequence groups to the waiting queue.
116
        self.waiting.append(seq_group)
Woosuk Kwon's avatar
Woosuk Kwon committed
117

Antoni Baum's avatar
Antoni Baum committed
118
    def abort_seq_group(self, request_id: Union[str, Iterable[str]]) -> None:
119
120
121
122
123
124
125
126
127
128
129
130
        """Aborts a sequence group with the given ID.

        Check if the sequence group with the given ID
            is present in any of the state queue.
        If present, remove the sequence group from the state queue.
            Also, if any of the sequences in the sequence group is not finished,
                free the sequence with status `FINISHED_ABORTED`.
        Otherwise, do nothing.

        Args:
            request_id: The ID(s) of the sequence group to abort.
        """
Antoni Baum's avatar
Antoni Baum committed
131
132
133
        if isinstance(request_id, str):
            request_id = (request_id, )
        request_ids = set(request_id)
134
        for state_queue in [self.waiting, self.running, self.swapped]:
ljss's avatar
ljss committed
135
            aborted_groups: List[SequenceGroup] = []
136
137
138
139
140
            for seq_group in state_queue:
                if not request_ids:
                    # Using 'break' here may add two extra iterations,
                    # but is acceptable to reduce complexity .
                    break
Antoni Baum's avatar
Antoni Baum committed
141
                if seq_group.request_id in request_ids:
142
143
                    # Appending aborted group into pending list.
                    aborted_groups.append(seq_group)
Antoni Baum's avatar
Antoni Baum committed
144
                    request_ids.remove(seq_group.request_id)
145
146
147
            for aborted_group in aborted_groups:
                # Remove the sequence group from the state queue.
                state_queue.remove(aborted_group)
ljss's avatar
ljss committed
148
                for seq in aborted_group.get_seqs():
149
150
151
152
                    if seq.is_finished():
                        continue
                    seq.status = SequenceStatus.FINISHED_ABORTED
                    self.free_seq(seq)
153

154
155
156
    def has_unfinished_seqs(self) -> bool:
        return self.waiting or self.running or self.swapped

157
158
159
    def get_num_unfinished_seq_groups(self) -> int:
        return len(self.waiting) + len(self.running) + len(self.swapped)

Woosuk Kwon's avatar
Woosuk Kwon committed
160
    def _schedule(self) -> SchedulerOutputs:
161
162
163
        # Blocks that need to be swaped or copied before model execution.
        blocks_to_swap_in: Dict[int, int] = {}
        blocks_to_swap_out: Dict[int, int] = {}
164
        blocks_to_copy: Dict[int, List[int]] = {}
165

166
        # Fix the current time.
167
        now = time.monotonic()
168

Woosuk Kwon's avatar
Woosuk Kwon committed
169
170
171
172
        # Join waiting sequences if possible.
        if not self.swapped:
            ignored_seq_groups: List[SequenceGroup] = []
            scheduled: List[SequenceGroup] = []
173
174
175
176
            # The total number of sequences on the fly, including the
            # requests in the generation phase.
            num_curr_seqs = sum(seq_group.get_max_num_running_seqs()
                                for seq_group in self.running)
177
178
179
            curr_loras = set(
                seq_group.lora_int_id
                for seq_group in self.running) if self.lora_enabled else None
180
181
            seq_lens: List[int] = []

Woosuk Kwon's avatar
Woosuk Kwon committed
182
183
184
            # Optimization: We do not sort the waiting queue since the preempted
            # sequence groups are added to the front and the new sequence groups
            # are added to the back.
185
            leftover_waiting_sequences = deque()
Woosuk Kwon's avatar
Woosuk Kwon committed
186
187
            while self.waiting:
                seq_group = self.waiting[0]
188
189
190
                waiting_seqs = seq_group.get_seqs(
                    status=SequenceStatus.WAITING)
                assert len(waiting_seqs) == 1, (
191
192
                    "Waiting sequence group should have only one prompt "
                    "sequence.")
193
                num_prompt_tokens = waiting_seqs[0].get_len()
194
                if num_prompt_tokens > self.prompt_limit:
Woosuk Kwon's avatar
Woosuk Kwon committed
195
196
                    logger.warning(
                        f"Input prompt ({num_prompt_tokens} tokens) is too long"
197
                        f" and exceeds limit of {self.prompt_limit}")
198
                    for seq in waiting_seqs:
Woosuk Kwon's avatar
Woosuk Kwon committed
199
200
                        seq.status = SequenceStatus.FINISHED_IGNORED
                    ignored_seq_groups.append(seq_group)
201
                    self.waiting.popleft()
202
                    continue
Woosuk Kwon's avatar
Woosuk Kwon committed
203
204

                # If the sequence group cannot be allocated, stop.
205
206
                can_allocate = self.block_manager.can_allocate(seq_group)
                if can_allocate == AllocStatus.LATER:
Woosuk Kwon's avatar
Woosuk Kwon committed
207
                    break
208
209
210
211
                elif can_allocate == AllocStatus.NEVER:
                    logger.warning(
                        f"Input prompt ({num_prompt_tokens} tokens) is too long"
                        f" and exceeds the capacity of block_manager")
212
                    for seq in waiting_seqs:
213
214
                        seq.status = SequenceStatus.FINISHED_IGNORED
                    ignored_seq_groups.append(seq_group)
215
                    self.waiting.popleft()
216
                    continue
Woosuk Kwon's avatar
Woosuk Kwon committed
217

218
219
220
221
222
223
224
225
226
227
228
                lora_int_id = 0
                if self.lora_enabled:
                    lora_int_id = seq_group.lora_int_id
                    if lora_int_id > 0 and lora_int_id not in curr_loras and len(
                            curr_loras) >= self.lora_config.max_loras:
                        # We don't have a space for another LoRA, so
                        # we ignore this request for now.
                        leftover_waiting_sequences.appendleft(seq_group)
                        self.waiting.popleft()
                        continue

Woosuk Kwon's avatar
Woosuk Kwon committed
229
                # If the number of batched tokens exceeds the limit, stop.
230
231
232
                new_seq_lens = seq_lens + [num_prompt_tokens]
                num_batched_tokens = len(new_seq_lens) * max(new_seq_lens)
                if (num_batched_tokens >
Woosuk Kwon's avatar
Woosuk Kwon committed
233
234
235
236
237
                        self.scheduler_config.max_num_batched_tokens):
                    break

                # The total number of sequences in the RUNNING state should not
                # exceed the maximum number of sequences.
238
                num_new_seqs = seq_group.get_max_num_running_seqs()
Woosuk Kwon's avatar
Woosuk Kwon committed
239
240
241
242
                if (num_curr_seqs + num_new_seqs >
                        self.scheduler_config.max_num_seqs):
                    break

243
244
245
246
247
                num_paddings = num_batched_tokens - sum(new_seq_lens)
                if num_paddings > self.scheduler_config.max_paddings:
                    break
                seq_lens = new_seq_lens

248
249
250
                if lora_int_id > 0:
                    curr_loras.add(lora_int_id)
                self.waiting.popleft()
Woosuk Kwon's avatar
Woosuk Kwon committed
251
252
                self._allocate(seq_group)
                self.running.append(seq_group)
253
                num_curr_seqs += num_new_seqs
Woosuk Kwon's avatar
Woosuk Kwon committed
254
255
                scheduled.append(seq_group)

256
257
            self.waiting.extendleft(leftover_waiting_sequences)

258
            if scheduled or ignored_seq_groups:
Woosuk Kwon's avatar
Woosuk Kwon committed
259
260
261
                scheduler_outputs = SchedulerOutputs(
                    scheduled_seq_groups=scheduled,
                    prompt_run=True,
Zhuofan's avatar
Zhuofan committed
262
263
                    num_batched_tokens=len(seq_lens) *
                    max(seq_lens) if seq_lens else 0,
Woosuk Kwon's avatar
Woosuk Kwon committed
264
265
266
267
268
269
270
271
272
                    blocks_to_swap_in=blocks_to_swap_in,
                    blocks_to_swap_out=blocks_to_swap_out,
                    blocks_to_copy=blocks_to_copy,
                    ignored_seq_groups=ignored_seq_groups,
                )
                return scheduler_outputs

        # NOTE(woosuk): Preemption happens only when there is no available slot
        # to keep all the sequence groups in the RUNNING state.
273
274
275
276
277
        # In this case, the policy is responsible for deciding which sequence
        # groups to preempt.
        self.running = self.policy.sort_by_priority(now, self.running)

        # Reserve new token slots for the running sequence groups.
278
        running: Deque[SequenceGroup] = deque()
279
280
        preempted: List[SequenceGroup] = []
        while self.running:
281
            seq_group = self.running.popleft()
282
            while not self.block_manager.can_append_slot(seq_group):
283
284
                if self.running:
                    # Preempt the lowest-priority sequence groups.
285
                    victim_seq_group = self.running.pop()
286
287
288
289
290
291
292
                    self._preempt(victim_seq_group, blocks_to_swap_out)
                    preempted.append(victim_seq_group)
                else:
                    # No other sequence groups can be preempted.
                    # Preempt the current sequence group.
                    self._preempt(seq_group, blocks_to_swap_out)
                    preempted.append(seq_group)
Woosuk Kwon's avatar
Woosuk Kwon committed
293
294
                    break
            else:
295
                # Append new slots to the sequence group.
296
                self._append_slot(seq_group, blocks_to_copy)
297
298
299
300
301
                running.append(seq_group)
        self.running = running

        # Swap in the sequence groups in the SWAPPED state if possible.
        self.swapped = self.policy.sort_by_priority(now, self.swapped)
302
303
304
        if not preempted:
            num_curr_seqs = sum(seq_group.get_max_num_running_seqs()
                                for seq_group in self.running)
305
306
307
308
309
            curr_loras = set(
                seq_group.lora_int_id
                for seq_group in self.running) if self.lora_enabled else None

            leftover_swapped = deque()
310
311
312

            while self.swapped:
                seq_group = self.swapped[0]
313
314
315
316
317
318
319
320
321
322
323
                lora_int_id = 0
                if self.lora_enabled:
                    lora_int_id = seq_group.lora_int_id
                    if lora_int_id > 0 and lora_int_id not in curr_loras and len(
                            curr_loras) >= self.lora_config.max_loras:
                        # We don't have a space for another LoRA, so
                        # we ignore this request for now.
                        leftover_swapped.appendleft(seq_group)
                        self.swapped.popleft()
                        continue

324
325
326
                # If the sequence group cannot be swapped in, stop.
                if not self.block_manager.can_swap_in(seq_group):
                    break
327

328
329
330
331
332
333
334
                # The total number of sequences in the RUNNING state should not
                # exceed the maximum number of sequences.
                num_new_seqs = seq_group.get_max_num_running_seqs()
                if (num_curr_seqs + num_new_seqs >
                        self.scheduler_config.max_num_seqs):
                    break

335
336
337
                if lora_int_id > 0:
                    curr_loras.add(lora_int_id)
                self.swapped.popleft()
338
339
340
341
342
                self._swap_in(seq_group, blocks_to_swap_in)
                self._append_slot(seq_group, blocks_to_copy)
                num_curr_seqs += num_new_seqs
                self.running.append(seq_group)

343
344
            self.swapped.extendleft(leftover_swapped)

345
346
347
        # Each sequence in the generation phase only takes one token slot.
        # Therefore, the number of batched tokens is equal to the number of
        # sequences in the RUNNING state.
348
349
        num_batched_tokens = sum(
            seq_group.num_seqs(status=SequenceStatus.RUNNING)
350
            for seq_group in self.running)
351

352
        scheduler_outputs = SchedulerOutputs(
Woosuk Kwon's avatar
Woosuk Kwon committed
353
354
355
            scheduled_seq_groups=self.running,
            prompt_run=False,
            num_batched_tokens=num_batched_tokens,
356
357
358
            blocks_to_swap_in=blocks_to_swap_in,
            blocks_to_swap_out=blocks_to_swap_out,
            blocks_to_copy=blocks_to_copy,
Woosuk Kwon's avatar
Woosuk Kwon committed
359
            ignored_seq_groups=[],
360
        )
Woosuk Kwon's avatar
Woosuk Kwon committed
361
        return scheduler_outputs
Woosuk Kwon's avatar
Woosuk Kwon committed
362

Woosuk Kwon's avatar
Woosuk Kwon committed
363
    def schedule(self) -> Tuple[List[SequenceGroupMetadata], SchedulerOutputs]:
364
365
366
        # Schedule sequence groups.
        # This function call changes the internal states of the scheduler
        # such as self.running, self.swapped, and self.waiting.
Woosuk Kwon's avatar
Woosuk Kwon committed
367
        scheduler_outputs = self._schedule()
368
369

        # Create input data structures.
370
        seq_group_metadata_list: List[SequenceGroupMetadata] = []
Woosuk Kwon's avatar
Woosuk Kwon committed
371
        for seq_group in scheduler_outputs.scheduled_seq_groups:
Light Lin's avatar
Light Lin committed
372
            seq_data: Dict[int, SequenceData] = {}
373
374
            block_tables: Dict[int, List[int]] = {}
            for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
375
                seq_id = seq.seq_id
376
                seq_data[seq_id] = seq.data
377
                block_tables[seq_id] = self.block_manager.get_block_table(seq)
378

379
            seq_group_metadata = SequenceGroupMetadata(
380
                request_id=seq_group.request_id,
Woosuk Kwon's avatar
Woosuk Kwon committed
381
                is_prompt=scheduler_outputs.prompt_run,
382
                seq_data=seq_data,
383
                sampling_params=seq_group.sampling_params,
384
                block_tables=block_tables,
385
                lora_request=seq_group.lora_request,
386
                prefix=seq_group.prefix,
387
            )
388
            seq_group_metadata_list.append(seq_group_metadata)
Woosuk Kwon's avatar
Woosuk Kwon committed
389
        return seq_group_metadata_list, scheduler_outputs
390

391
392
    def fork_seq(self, parent_seq: Sequence, child_seq: Sequence) -> None:
        self.block_manager.fork(parent_seq, child_seq)
Woosuk Kwon's avatar
Woosuk Kwon committed
393

394
    def free_seq(self, seq: Sequence) -> None:
395
        self.block_manager.free(seq)
Woosuk Kwon's avatar
Woosuk Kwon committed
396

397
    def free_finished_seq_groups(self) -> None:
398
399
        self.running = deque(seq_group for seq_group in self.running
                             if not seq_group.is_finished())
Woosuk Kwon's avatar
Woosuk Kwon committed
400

401
402
    def _allocate(self, seq_group: SequenceGroup) -> None:
        self.block_manager.allocate(seq_group)
403
        for seq in seq_group.get_seqs(status=SequenceStatus.WAITING):
404
405
            seq.status = SequenceStatus.RUNNING

406
    def _append_slot(
407
408
409
410
411
        self,
        seq_group: SequenceGroup,
        blocks_to_copy: Dict[int, List[int]],
    ) -> None:
        for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
412
            ret = self.block_manager.append_slot(seq)
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
            if ret is not None:
                src_block, dst_block = ret
                if src_block in blocks_to_copy:
                    blocks_to_copy[src_block].append(dst_block)
                else:
                    blocks_to_copy[src_block] = [dst_block]

    def _preempt(
        self,
        seq_group: SequenceGroup,
        blocks_to_swap_out: Dict[int, int],
        preemption_mode: Optional[PreemptionMode] = None,
    ) -> None:
        # If preemption mode is not specified, we determine the mode as follows:
        # We use recomputation by default since it incurs lower overhead than
        # swapping. However, when the sequence group has multiple sequences
429
430
        # (e.g., beam search), recomputation is not currently supported. In
        # such a case, we use swapping instead.
431
432
433
434
435
436
437
        # FIXME(woosuk): This makes our scheduling policy a bit bizarre.
        # As swapped sequences are prioritized over waiting sequences,
        # sequence groups with multiple sequences are implicitly prioritized
        # over sequence groups with a single sequence.
        # TODO(woosuk): Support recomputation for sequence groups with multiple
        # sequences. This may require a more sophisticated CUDA kernel.
        if preemption_mode is None:
438
            if seq_group.get_max_num_running_seqs() == 1:
439
440
441
442
443
444
445
446
                preemption_mode = PreemptionMode.RECOMPUTE
            else:
                preemption_mode = PreemptionMode.SWAP
        if preemption_mode == PreemptionMode.RECOMPUTE:
            self._preempt_by_recompute(seq_group)
        elif preemption_mode == PreemptionMode.SWAP:
            self._preempt_by_swap(seq_group, blocks_to_swap_out)
        else:
447
            raise AssertionError("Invalid preemption mode.")
448
449
450
451
452
453
454
455
456
457

    def _preempt_by_recompute(
        self,
        seq_group: SequenceGroup,
    ) -> None:
        seqs = seq_group.get_seqs(status=SequenceStatus.RUNNING)
        assert len(seqs) == 1
        for seq in seqs:
            seq.status = SequenceStatus.WAITING
            self.block_manager.free(seq)
458
459
        # NOTE: For FCFS, we insert the preempted sequence group to the front
        # of the waiting queue.
460
        self.waiting.appendleft(seq_group)
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484

    def _preempt_by_swap(
        self,
        seq_group: SequenceGroup,
        blocks_to_swap_out: Dict[int, int],
    ) -> None:
        self._swap_out(seq_group, blocks_to_swap_out)
        self.swapped.append(seq_group)

    def _swap_in(
        self,
        seq_group: SequenceGroup,
        blocks_to_swap_in: Dict[int, int],
    ) -> None:
        mapping = self.block_manager.swap_in(seq_group)
        blocks_to_swap_in.update(mapping)
        for seq in seq_group.get_seqs(status=SequenceStatus.SWAPPED):
            seq.status = SequenceStatus.RUNNING

    def _swap_out(
        self,
        seq_group: SequenceGroup,
        blocks_to_swap_out: Dict[int, int],
    ) -> None:
485
486
487
488
489
490
        if not self.block_manager.can_swap_out(seq_group):
            # FIXME(woosuk): Abort the sequence group instead of aborting the
            # entire engine.
            raise RuntimeError(
                "Aborted due to the lack of CPU swap space. Please increase "
                "the swap space to avoid this error.")
491
492
493
494
        mapping = self.block_manager.swap_out(seq_group)
        blocks_to_swap_out.update(mapping)
        for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
            seq.status = SequenceStatus.SWAPPED