scheduler.py 17.4 KB
Newer Older
1
from collections import deque
2
3
import enum
import time
4
from typing import Deque, Dict, Iterable, List, Optional, Tuple, Union
Woosuk Kwon's avatar
Woosuk Kwon committed
5

Woosuk Kwon's avatar
Woosuk Kwon committed
6
from vllm.config import CacheConfig, SchedulerConfig
7
from vllm.core.block_manager import AllocStatus, BlockSpaceManager
Woosuk Kwon's avatar
Woosuk Kwon committed
8
9
10
from vllm.core.policy import PolicyFactory
from vllm.logger import init_logger
from vllm.sequence import (Sequence, SequenceData, SequenceGroup,
11
                           SequenceGroupMetadata, SequenceStatus)
Woosuk Kwon's avatar
Woosuk Kwon committed
12

Woosuk Kwon's avatar
Woosuk Kwon committed
13
logger = init_logger(__name__)
14

Woosuk Kwon's avatar
Woosuk Kwon committed
15

16
17
18
19
20
21
22
23
24
25
26
27
28
class PreemptionMode(enum.Enum):
    """Preemption modes.

    1. Swapping: Swap out the blocks of the preempted sequences to CPU memory
    and swap them back in when the sequences are resumed.
    2. Recomputation: Discard the blocks of the preempted sequences and
    recompute them when the sequences are resumed, treating the sequences as
    new prompts.
    """
    SWAP = enum.auto()
    RECOMPUTE = enum.auto()


29
30
31
32
class SchedulerOutputs:

    def __init__(
        self,
33
        scheduled_seq_groups: Iterable[SequenceGroup],
Woosuk Kwon's avatar
Woosuk Kwon committed
34
35
        prompt_run: bool,
        num_batched_tokens: int,
36
37
38
        blocks_to_swap_in: Dict[int, int],
        blocks_to_swap_out: Dict[int, int],
        blocks_to_copy: Dict[int, List[int]],
Woosuk Kwon's avatar
Woosuk Kwon committed
39
        ignored_seq_groups: List[SequenceGroup],
40
    ) -> None:
Woosuk Kwon's avatar
Woosuk Kwon committed
41
42
43
        self.scheduled_seq_groups = scheduled_seq_groups
        self.prompt_run = prompt_run
        self.num_batched_tokens = num_batched_tokens
44
45
46
47
48
        self.blocks_to_swap_in = blocks_to_swap_in
        self.blocks_to_swap_out = blocks_to_swap_out
        self.blocks_to_copy = blocks_to_copy
        # Swap in and swap out should never happen at the same time.
        assert not (blocks_to_swap_in and blocks_to_swap_out)
Woosuk Kwon's avatar
Woosuk Kwon committed
49
        self.ignored_seq_groups = ignored_seq_groups
50
51

    def is_empty(self) -> bool:
Woosuk Kwon's avatar
Woosuk Kwon committed
52
53
54
        # NOTE: We do not consider the ignored sequence groups.
        return (not self.scheduled_seq_groups and not self.blocks_to_swap_in
                and not self.blocks_to_swap_out and not self.blocks_to_copy)
55
56


Woosuk Kwon's avatar
Woosuk Kwon committed
57
58
class Scheduler:

Woosuk Kwon's avatar
Woosuk Kwon committed
59
    def __init__(
Woosuk Kwon's avatar
Woosuk Kwon committed
60
        self,
61
62
        scheduler_config: SchedulerConfig,
        cache_config: CacheConfig,
Woosuk Kwon's avatar
Woosuk Kwon committed
63
    ) -> None:
64
65
        self.scheduler_config = scheduler_config
        self.cache_config = cache_config
Woosuk Kwon's avatar
Woosuk Kwon committed
66

67
68
69
        self.prompt_limit = min(self.scheduler_config.max_model_len,
                                self.scheduler_config.max_num_batched_tokens)

70
        # Instantiate the scheduling policy.
71
        self.policy = PolicyFactory.get_policy(policy_name="fcfs")
Woosuk Kwon's avatar
Woosuk Kwon committed
72
        # Create the block space manager.
Woosuk Kwon's avatar
Woosuk Kwon committed
73
        self.block_manager = BlockSpaceManager(
74
75
76
            block_size=self.cache_config.block_size,
            num_gpu_blocks=self.cache_config.num_gpu_blocks,
            num_cpu_blocks=self.cache_config.num_cpu_blocks,
77
            sliding_window=self.cache_config.sliding_window)
Woosuk Kwon's avatar
Woosuk Kwon committed
78

79
        # Sequence groups in the WAITING state.
80
        self.waiting: Deque[SequenceGroup] = deque()
81
        # Sequence groups in the RUNNING state.
82
        self.running: Deque[SequenceGroup] = deque()
83
        # Sequence groups in the SWAPPED state.
84
        self.swapped: Deque[SequenceGroup] = deque()
Woosuk Kwon's avatar
Woosuk Kwon committed
85

86
    def add_seq_group(self, seq_group: SequenceGroup) -> None:
87
        # Add sequence groups to the waiting queue.
88
        self.waiting.append(seq_group)
Woosuk Kwon's avatar
Woosuk Kwon committed
89

Antoni Baum's avatar
Antoni Baum committed
90
91
92
93
    def abort_seq_group(self, request_id: Union[str, Iterable[str]]) -> None:
        if isinstance(request_id, str):
            request_id = (request_id, )
        request_ids = set(request_id)
94
        for state_queue in [self.waiting, self.running, self.swapped]:
95
96
97
98
            # We need to reverse the list as we are removing elements
            # from it as we iterate over it. If we don't do it,
            # indices will get messed up and we will skip over elements.
            for seq_group in reversed(state_queue):
Antoni Baum's avatar
Antoni Baum committed
99
                if seq_group.request_id in request_ids:
100
101
                    # Remove the sequence group from the state queue.
                    state_queue.remove(seq_group)
102
                    for seq in seq_group.get_seqs():
103
104
                        if seq.is_finished():
                            continue
105
106
                        seq.status = SequenceStatus.FINISHED_ABORTED
                        self.free_seq(seq)
Antoni Baum's avatar
Antoni Baum committed
107
108
109
                    request_ids.remove(seq_group.request_id)
                    if not request_ids:
                        return
110

111
112
113
    def has_unfinished_seqs(self) -> bool:
        return self.waiting or self.running or self.swapped

114
115
116
    def get_num_unfinished_seq_groups(self) -> int:
        return len(self.waiting) + len(self.running) + len(self.swapped)

Woosuk Kwon's avatar
Woosuk Kwon committed
117
    def _schedule(self) -> SchedulerOutputs:
118
119
120
        # Blocks that need to be swaped or copied before model execution.
        blocks_to_swap_in: Dict[int, int] = {}
        blocks_to_swap_out: Dict[int, int] = {}
121
        blocks_to_copy: Dict[int, List[int]] = {}
122

123
        # Fix the current time.
124
        now = time.monotonic()
125

Woosuk Kwon's avatar
Woosuk Kwon committed
126
127
128
129
        # Join waiting sequences if possible.
        if not self.swapped:
            ignored_seq_groups: List[SequenceGroup] = []
            scheduled: List[SequenceGroup] = []
130
131
132
133
            # The total number of sequences on the fly, including the
            # requests in the generation phase.
            num_curr_seqs = sum(seq_group.get_max_num_running_seqs()
                                for seq_group in self.running)
134
135
            seq_lens: List[int] = []

Woosuk Kwon's avatar
Woosuk Kwon committed
136
137
138
139
140
141
            # Optimization: We do not sort the waiting queue since the preempted
            # sequence groups are added to the front and the new sequence groups
            # are added to the back.
            while self.waiting:
                seq_group = self.waiting[0]

142
143
144
                waiting_seqs = seq_group.get_seqs(
                    status=SequenceStatus.WAITING)
                assert len(waiting_seqs) == 1, (
145
146
                    "Waiting sequence group should have only one prompt "
                    "sequence.")
147
                num_prompt_tokens = waiting_seqs[0].get_len()
148
                if num_prompt_tokens > self.prompt_limit:
Woosuk Kwon's avatar
Woosuk Kwon committed
149
150
                    logger.warning(
                        f"Input prompt ({num_prompt_tokens} tokens) is too long"
151
                        f" and exceeds limit of {self.prompt_limit}")
152
                    for seq in waiting_seqs:
Woosuk Kwon's avatar
Woosuk Kwon committed
153
154
                        seq.status = SequenceStatus.FINISHED_IGNORED
                    ignored_seq_groups.append(seq_group)
155
                    self.waiting.popleft()
156
                    continue
Woosuk Kwon's avatar
Woosuk Kwon committed
157
158

                # If the sequence group cannot be allocated, stop.
159
160
                can_allocate = self.block_manager.can_allocate(seq_group)
                if can_allocate == AllocStatus.LATER:
Woosuk Kwon's avatar
Woosuk Kwon committed
161
                    break
162
163
164
165
                elif can_allocate == AllocStatus.NEVER:
                    logger.warning(
                        f"Input prompt ({num_prompt_tokens} tokens) is too long"
                        f" and exceeds the capacity of block_manager")
166
                    for seq in waiting_seqs:
167
168
                        seq.status = SequenceStatus.FINISHED_IGNORED
                    ignored_seq_groups.append(seq_group)
169
                    self.waiting.popleft()
170
                    continue
Woosuk Kwon's avatar
Woosuk Kwon committed
171
172

                # If the number of batched tokens exceeds the limit, stop.
173
174
175
                new_seq_lens = seq_lens + [num_prompt_tokens]
                num_batched_tokens = len(new_seq_lens) * max(new_seq_lens)
                if (num_batched_tokens >
Woosuk Kwon's avatar
Woosuk Kwon committed
176
177
178
179
180
                        self.scheduler_config.max_num_batched_tokens):
                    break

                # The total number of sequences in the RUNNING state should not
                # exceed the maximum number of sequences.
181
                num_new_seqs = seq_group.get_max_num_running_seqs()
Woosuk Kwon's avatar
Woosuk Kwon committed
182
183
184
185
                if (num_curr_seqs + num_new_seqs >
                        self.scheduler_config.max_num_seqs):
                    break

186
187
188
189
190
                num_paddings = num_batched_tokens - sum(new_seq_lens)
                if num_paddings > self.scheduler_config.max_paddings:
                    break
                seq_lens = new_seq_lens

191
                seq_group = self.waiting.popleft()
Woosuk Kwon's avatar
Woosuk Kwon committed
192
193
                self._allocate(seq_group)
                self.running.append(seq_group)
194
                num_curr_seqs += num_new_seqs
Woosuk Kwon's avatar
Woosuk Kwon committed
195
196
                scheduled.append(seq_group)

197
            if scheduled or ignored_seq_groups:
Woosuk Kwon's avatar
Woosuk Kwon committed
198
199
200
                scheduler_outputs = SchedulerOutputs(
                    scheduled_seq_groups=scheduled,
                    prompt_run=True,
Zhuofan's avatar
Zhuofan committed
201
202
                    num_batched_tokens=len(seq_lens) *
                    max(seq_lens) if seq_lens else 0,
Woosuk Kwon's avatar
Woosuk Kwon committed
203
204
205
206
207
208
209
210
211
                    blocks_to_swap_in=blocks_to_swap_in,
                    blocks_to_swap_out=blocks_to_swap_out,
                    blocks_to_copy=blocks_to_copy,
                    ignored_seq_groups=ignored_seq_groups,
                )
                return scheduler_outputs

        # NOTE(woosuk): Preemption happens only when there is no available slot
        # to keep all the sequence groups in the RUNNING state.
212
213
214
215
216
        # In this case, the policy is responsible for deciding which sequence
        # groups to preempt.
        self.running = self.policy.sort_by_priority(now, self.running)

        # Reserve new token slots for the running sequence groups.
217
        running: Deque[SequenceGroup] = deque()
218
219
        preempted: List[SequenceGroup] = []
        while self.running:
220
            seq_group = self.running.popleft()
221
            while not self.block_manager.can_append_slot(seq_group):
222
223
                if self.running:
                    # Preempt the lowest-priority sequence groups.
224
                    victim_seq_group = self.running.pop()
225
226
227
228
229
230
231
                    self._preempt(victim_seq_group, blocks_to_swap_out)
                    preempted.append(victim_seq_group)
                else:
                    # No other sequence groups can be preempted.
                    # Preempt the current sequence group.
                    self._preempt(seq_group, blocks_to_swap_out)
                    preempted.append(seq_group)
Woosuk Kwon's avatar
Woosuk Kwon committed
232
233
                    break
            else:
234
                # Append new slots to the sequence group.
235
                self._append_slot(seq_group, blocks_to_copy)
236
237
238
239
240
                running.append(seq_group)
        self.running = running

        # Swap in the sequence groups in the SWAPPED state if possible.
        self.swapped = self.policy.sort_by_priority(now, self.swapped)
241
242
243
244
245
246
247
248
249
        if not preempted:
            num_curr_seqs = sum(seq_group.get_max_num_running_seqs()
                                for seq_group in self.running)

            while self.swapped:
                seq_group = self.swapped[0]
                # If the sequence group cannot be swapped in, stop.
                if not self.block_manager.can_swap_in(seq_group):
                    break
250

251
252
253
254
255
256
257
                # The total number of sequences in the RUNNING state should not
                # exceed the maximum number of sequences.
                num_new_seqs = seq_group.get_max_num_running_seqs()
                if (num_curr_seqs + num_new_seqs >
                        self.scheduler_config.max_num_seqs):
                    break

258
                seq_group = self.swapped.popleft()
259
260
261
262
263
264
265
266
                self._swap_in(seq_group, blocks_to_swap_in)
                self._append_slot(seq_group, blocks_to_copy)
                num_curr_seqs += num_new_seqs
                self.running.append(seq_group)

        # Each sequence in the generation phase only takes one token slot.
        # Therefore, the number of batched tokens is equal to the number of
        # sequences in the RUNNING state.
267
268
        num_batched_tokens = sum(
            seq_group.num_seqs(status=SequenceStatus.RUNNING)
269
            for seq_group in self.running)
270

271
        scheduler_outputs = SchedulerOutputs(
Woosuk Kwon's avatar
Woosuk Kwon committed
272
273
274
            scheduled_seq_groups=self.running,
            prompt_run=False,
            num_batched_tokens=num_batched_tokens,
275
276
277
            blocks_to_swap_in=blocks_to_swap_in,
            blocks_to_swap_out=blocks_to_swap_out,
            blocks_to_copy=blocks_to_copy,
Woosuk Kwon's avatar
Woosuk Kwon committed
278
            ignored_seq_groups=[],
279
        )
Woosuk Kwon's avatar
Woosuk Kwon committed
280
        return scheduler_outputs
Woosuk Kwon's avatar
Woosuk Kwon committed
281

Woosuk Kwon's avatar
Woosuk Kwon committed
282
    def schedule(self) -> Tuple[List[SequenceGroupMetadata], SchedulerOutputs]:
283
284
285
        # Schedule sequence groups.
        # This function call changes the internal states of the scheduler
        # such as self.running, self.swapped, and self.waiting.
Woosuk Kwon's avatar
Woosuk Kwon committed
286
        scheduler_outputs = self._schedule()
287
288

        # Create input data structures.
289
        seq_group_metadata_list: List[SequenceGroupMetadata] = []
Woosuk Kwon's avatar
Woosuk Kwon committed
290
        for seq_group in scheduler_outputs.scheduled_seq_groups:
Light Lin's avatar
Light Lin committed
291
            seq_data: Dict[int, SequenceData] = {}
292
293
            block_tables: Dict[int, List[int]] = {}
            for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
294
                seq_id = seq.seq_id
295
                seq_data[seq_id] = seq.data
296
                block_tables[seq_id] = self.block_manager.get_block_table(seq)
297

298
            seq_group_metadata = SequenceGroupMetadata(
299
                request_id=seq_group.request_id,
Woosuk Kwon's avatar
Woosuk Kwon committed
300
                is_prompt=scheduler_outputs.prompt_run,
301
                seq_data=seq_data,
302
                sampling_params=seq_group.sampling_params,
303
304
                block_tables=block_tables,
            )
305
            seq_group_metadata_list.append(seq_group_metadata)
Woosuk Kwon's avatar
Woosuk Kwon committed
306
        return seq_group_metadata_list, scheduler_outputs
307

308
309
    def fork_seq(self, parent_seq: Sequence, child_seq: Sequence) -> None:
        self.block_manager.fork(parent_seq, child_seq)
Woosuk Kwon's avatar
Woosuk Kwon committed
310

311
    def free_seq(self, seq: Sequence) -> None:
312
        self.block_manager.free(seq)
Woosuk Kwon's avatar
Woosuk Kwon committed
313

314
315
316
317
318
    def free_finished_seq_groups(self) -> None:
        self.running = [
            seq_group for seq_group in self.running
            if not seq_group.is_finished()
        ]
Woosuk Kwon's avatar
Woosuk Kwon committed
319

320
321
    def _allocate(self, seq_group: SequenceGroup) -> None:
        self.block_manager.allocate(seq_group)
322
        for seq in seq_group.get_seqs(status=SequenceStatus.WAITING):
323
324
            seq.status = SequenceStatus.RUNNING

325
    def _append_slot(
326
327
328
329
330
        self,
        seq_group: SequenceGroup,
        blocks_to_copy: Dict[int, List[int]],
    ) -> None:
        for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
331
            ret = self.block_manager.append_slot(seq)
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
            if ret is not None:
                src_block, dst_block = ret
                if src_block in blocks_to_copy:
                    blocks_to_copy[src_block].append(dst_block)
                else:
                    blocks_to_copy[src_block] = [dst_block]

    def _preempt(
        self,
        seq_group: SequenceGroup,
        blocks_to_swap_out: Dict[int, int],
        preemption_mode: Optional[PreemptionMode] = None,
    ) -> None:
        # If preemption mode is not specified, we determine the mode as follows:
        # We use recomputation by default since it incurs lower overhead than
        # swapping. However, when the sequence group has multiple sequences
348
349
        # (e.g., beam search), recomputation is not currently supported. In
        # such a case, we use swapping instead.
350
351
352
353
354
355
356
        # FIXME(woosuk): This makes our scheduling policy a bit bizarre.
        # As swapped sequences are prioritized over waiting sequences,
        # sequence groups with multiple sequences are implicitly prioritized
        # over sequence groups with a single sequence.
        # TODO(woosuk): Support recomputation for sequence groups with multiple
        # sequences. This may require a more sophisticated CUDA kernel.
        if preemption_mode is None:
357
            if seq_group.get_max_num_running_seqs() == 1:
358
359
360
361
362
363
364
365
                preemption_mode = PreemptionMode.RECOMPUTE
            else:
                preemption_mode = PreemptionMode.SWAP
        if preemption_mode == PreemptionMode.RECOMPUTE:
            self._preempt_by_recompute(seq_group)
        elif preemption_mode == PreemptionMode.SWAP:
            self._preempt_by_swap(seq_group, blocks_to_swap_out)
        else:
366
            raise AssertionError("Invalid preemption mode.")
367
368
369
370
371
372
373
374
375
376

    def _preempt_by_recompute(
        self,
        seq_group: SequenceGroup,
    ) -> None:
        seqs = seq_group.get_seqs(status=SequenceStatus.RUNNING)
        assert len(seqs) == 1
        for seq in seqs:
            seq.status = SequenceStatus.WAITING
            self.block_manager.free(seq)
377
378
        # NOTE: For FCFS, we insert the preempted sequence group to the front
        # of the waiting queue.
379
        self.waiting.appendleft(seq_group)
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403

    def _preempt_by_swap(
        self,
        seq_group: SequenceGroup,
        blocks_to_swap_out: Dict[int, int],
    ) -> None:
        self._swap_out(seq_group, blocks_to_swap_out)
        self.swapped.append(seq_group)

    def _swap_in(
        self,
        seq_group: SequenceGroup,
        blocks_to_swap_in: Dict[int, int],
    ) -> None:
        mapping = self.block_manager.swap_in(seq_group)
        blocks_to_swap_in.update(mapping)
        for seq in seq_group.get_seqs(status=SequenceStatus.SWAPPED):
            seq.status = SequenceStatus.RUNNING

    def _swap_out(
        self,
        seq_group: SequenceGroup,
        blocks_to_swap_out: Dict[int, int],
    ) -> None:
404
405
406
407
408
409
        if not self.block_manager.can_swap_out(seq_group):
            # FIXME(woosuk): Abort the sequence group instead of aborting the
            # entire engine.
            raise RuntimeError(
                "Aborted due to the lack of CPU swap space. Please increase "
                "the swap space to avoid this error.")
410
411
412
413
        mapping = self.block_manager.swap_out(seq_group)
        blocks_to_swap_out.update(mapping)
        for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
            seq.status = SequenceStatus.SWAPPED