scheduler.py 18.1 KB
Newer Older
1
from collections import deque
2
3
import enum
import time
4
from typing import Deque, Dict, Iterable, List, Optional, Tuple, Union
Woosuk Kwon's avatar
Woosuk Kwon committed
5

Woosuk Kwon's avatar
Woosuk Kwon committed
6
from vllm.config import CacheConfig, SchedulerConfig
7
from vllm.core.block_manager import AllocStatus, BlockSpaceManager
Woosuk Kwon's avatar
Woosuk Kwon committed
8
9
10
from vllm.core.policy import PolicyFactory
from vllm.logger import init_logger
from vllm.sequence import (Sequence, SequenceData, SequenceGroup,
11
                           SequenceGroupMetadata, SequenceStatus)
12
from vllm.prefix import PrefixPool
Woosuk Kwon's avatar
Woosuk Kwon committed
13

Woosuk Kwon's avatar
Woosuk Kwon committed
14
logger = init_logger(__name__)
15

Woosuk Kwon's avatar
Woosuk Kwon committed
16

17
18
19
20
21
22
23
24
25
26
27
28
29
class PreemptionMode(enum.Enum):
    """Preemption modes.

    1. Swapping: Swap out the blocks of the preempted sequences to CPU memory
    and swap them back in when the sequences are resumed.
    2. Recomputation: Discard the blocks of the preempted sequences and
    recompute them when the sequences are resumed, treating the sequences as
    new prompts.
    """
    SWAP = enum.auto()
    RECOMPUTE = enum.auto()


30
31
32
33
class SchedulerOutputs:

    def __init__(
        self,
34
        scheduled_seq_groups: Iterable[SequenceGroup],
Woosuk Kwon's avatar
Woosuk Kwon committed
35
36
        prompt_run: bool,
        num_batched_tokens: int,
37
38
39
        blocks_to_swap_in: Dict[int, int],
        blocks_to_swap_out: Dict[int, int],
        blocks_to_copy: Dict[int, List[int]],
Woosuk Kwon's avatar
Woosuk Kwon committed
40
        ignored_seq_groups: List[SequenceGroup],
41
    ) -> None:
Woosuk Kwon's avatar
Woosuk Kwon committed
42
43
44
        self.scheduled_seq_groups = scheduled_seq_groups
        self.prompt_run = prompt_run
        self.num_batched_tokens = num_batched_tokens
45
46
47
48
49
        self.blocks_to_swap_in = blocks_to_swap_in
        self.blocks_to_swap_out = blocks_to_swap_out
        self.blocks_to_copy = blocks_to_copy
        # Swap in and swap out should never happen at the same time.
        assert not (blocks_to_swap_in and blocks_to_swap_out)
Woosuk Kwon's avatar
Woosuk Kwon committed
50
        self.ignored_seq_groups = ignored_seq_groups
51
52

    def is_empty(self) -> bool:
Woosuk Kwon's avatar
Woosuk Kwon committed
53
54
55
        # NOTE: We do not consider the ignored sequence groups.
        return (not self.scheduled_seq_groups and not self.blocks_to_swap_in
                and not self.blocks_to_swap_out and not self.blocks_to_copy)
56
57


Woosuk Kwon's avatar
Woosuk Kwon committed
58
59
class Scheduler:

Woosuk Kwon's avatar
Woosuk Kwon committed
60
    def __init__(
Woosuk Kwon's avatar
Woosuk Kwon committed
61
        self,
62
63
        scheduler_config: SchedulerConfig,
        cache_config: CacheConfig,
Woosuk Kwon's avatar
Woosuk Kwon committed
64
    ) -> None:
65
66
        self.scheduler_config = scheduler_config
        self.cache_config = cache_config
Woosuk Kwon's avatar
Woosuk Kwon committed
67

68
69
70
        self.prompt_limit = min(self.scheduler_config.max_model_len,
                                self.scheduler_config.max_num_batched_tokens)

71
        # Instantiate the scheduling policy.
72
        self.policy = PolicyFactory.get_policy(policy_name="fcfs")
Woosuk Kwon's avatar
Woosuk Kwon committed
73
        # Create the block space manager.
Woosuk Kwon's avatar
Woosuk Kwon committed
74
        self.block_manager = BlockSpaceManager(
75
76
77
            block_size=self.cache_config.block_size,
            num_gpu_blocks=self.cache_config.num_gpu_blocks,
            num_cpu_blocks=self.cache_config.num_cpu_blocks,
78
            sliding_window=self.cache_config.sliding_window)
Woosuk Kwon's avatar
Woosuk Kwon committed
79

80
81
82
        # Create the prefix pool to cache the prefixes.
        self.prefix_pool = PrefixPool(self.cache_config.block_size)

83
        # Sequence groups in the WAITING state.
84
        self.waiting: Deque[SequenceGroup] = deque()
85
        # Sequence groups in the RUNNING state.
86
        self.running: Deque[SequenceGroup] = deque()
87
        # Sequence groups in the SWAPPED state.
88
        self.swapped: Deque[SequenceGroup] = deque()
Woosuk Kwon's avatar
Woosuk Kwon committed
89

90
    def add_seq_group(self, seq_group: SequenceGroup) -> None:
91
        # Add sequence groups to the waiting queue.
92
        self.waiting.append(seq_group)
Woosuk Kwon's avatar
Woosuk Kwon committed
93

Antoni Baum's avatar
Antoni Baum committed
94
    def abort_seq_group(self, request_id: Union[str, Iterable[str]]) -> None:
95
96
97
98
99
100
101
102
103
104
105
106
        """Aborts a sequence group with the given ID.

        Check if the sequence group with the given ID
            is present in any of the state queue.
        If present, remove the sequence group from the state queue.
            Also, if any of the sequences in the sequence group is not finished,
                free the sequence with status `FINISHED_ABORTED`.
        Otherwise, do nothing.

        Args:
            request_id: The ID(s) of the sequence group to abort.
        """
Antoni Baum's avatar
Antoni Baum committed
107
108
109
        if isinstance(request_id, str):
            request_id = (request_id, )
        request_ids = set(request_id)
110
        for state_queue in [self.waiting, self.running, self.swapped]:
111
112
113
114
115
116
            aborted_groups = []
            for seq_group in state_queue:
                if not request_ids:
                    # Using 'break' here may add two extra iterations,
                    # but is acceptable to reduce complexity .
                    break
Antoni Baum's avatar
Antoni Baum committed
117
                if seq_group.request_id in request_ids:
118
119
                    # Appending aborted group into pending list.
                    aborted_groups.append(seq_group)
Antoni Baum's avatar
Antoni Baum committed
120
                    request_ids.remove(seq_group.request_id)
121
122
123
124
125
126
127
128
            for aborted_group in aborted_groups:
                # Remove the sequence group from the state queue.
                state_queue.remove(aborted_group)
                for seq in seq_group.get_seqs():
                    if seq.is_finished():
                        continue
                    seq.status = SequenceStatus.FINISHED_ABORTED
                    self.free_seq(seq)
129

130
131
132
    def has_unfinished_seqs(self) -> bool:
        return self.waiting or self.running or self.swapped

133
134
135
    def get_num_unfinished_seq_groups(self) -> int:
        return len(self.waiting) + len(self.running) + len(self.swapped)

Woosuk Kwon's avatar
Woosuk Kwon committed
136
    def _schedule(self) -> SchedulerOutputs:
137
138
139
        # Blocks that need to be swaped or copied before model execution.
        blocks_to_swap_in: Dict[int, int] = {}
        blocks_to_swap_out: Dict[int, int] = {}
140
        blocks_to_copy: Dict[int, List[int]] = {}
141

142
        # Fix the current time.
143
        now = time.monotonic()
144

Woosuk Kwon's avatar
Woosuk Kwon committed
145
146
147
148
        # Join waiting sequences if possible.
        if not self.swapped:
            ignored_seq_groups: List[SequenceGroup] = []
            scheduled: List[SequenceGroup] = []
149
150
151
152
            # The total number of sequences on the fly, including the
            # requests in the generation phase.
            num_curr_seqs = sum(seq_group.get_max_num_running_seqs()
                                for seq_group in self.running)
153
154
            seq_lens: List[int] = []

Woosuk Kwon's avatar
Woosuk Kwon committed
155
156
157
158
159
160
            # Optimization: We do not sort the waiting queue since the preempted
            # sequence groups are added to the front and the new sequence groups
            # are added to the back.
            while self.waiting:
                seq_group = self.waiting[0]

161
162
163
                waiting_seqs = seq_group.get_seqs(
                    status=SequenceStatus.WAITING)
                assert len(waiting_seqs) == 1, (
164
165
                    "Waiting sequence group should have only one prompt "
                    "sequence.")
166
                num_prompt_tokens = waiting_seqs[0].get_len()
167
                if num_prompt_tokens > self.prompt_limit:
Woosuk Kwon's avatar
Woosuk Kwon committed
168
169
                    logger.warning(
                        f"Input prompt ({num_prompt_tokens} tokens) is too long"
170
                        f" and exceeds limit of {self.prompt_limit}")
171
                    for seq in waiting_seqs:
Woosuk Kwon's avatar
Woosuk Kwon committed
172
173
                        seq.status = SequenceStatus.FINISHED_IGNORED
                    ignored_seq_groups.append(seq_group)
174
                    self.waiting.popleft()
175
                    continue
Woosuk Kwon's avatar
Woosuk Kwon committed
176
177

                # If the sequence group cannot be allocated, stop.
178
179
                can_allocate = self.block_manager.can_allocate(seq_group)
                if can_allocate == AllocStatus.LATER:
Woosuk Kwon's avatar
Woosuk Kwon committed
180
                    break
181
182
183
184
                elif can_allocate == AllocStatus.NEVER:
                    logger.warning(
                        f"Input prompt ({num_prompt_tokens} tokens) is too long"
                        f" and exceeds the capacity of block_manager")
185
                    for seq in waiting_seqs:
186
187
                        seq.status = SequenceStatus.FINISHED_IGNORED
                    ignored_seq_groups.append(seq_group)
188
                    self.waiting.popleft()
189
                    continue
Woosuk Kwon's avatar
Woosuk Kwon committed
190
191

                # If the number of batched tokens exceeds the limit, stop.
192
193
194
                new_seq_lens = seq_lens + [num_prompt_tokens]
                num_batched_tokens = len(new_seq_lens) * max(new_seq_lens)
                if (num_batched_tokens >
Woosuk Kwon's avatar
Woosuk Kwon committed
195
196
197
198
199
                        self.scheduler_config.max_num_batched_tokens):
                    break

                # The total number of sequences in the RUNNING state should not
                # exceed the maximum number of sequences.
200
                num_new_seqs = seq_group.get_max_num_running_seqs()
Woosuk Kwon's avatar
Woosuk Kwon committed
201
202
203
204
                if (num_curr_seqs + num_new_seqs >
                        self.scheduler_config.max_num_seqs):
                    break

205
206
207
208
209
                num_paddings = num_batched_tokens - sum(new_seq_lens)
                if num_paddings > self.scheduler_config.max_paddings:
                    break
                seq_lens = new_seq_lens

210
                seq_group = self.waiting.popleft()
Woosuk Kwon's avatar
Woosuk Kwon committed
211
212
                self._allocate(seq_group)
                self.running.append(seq_group)
213
                num_curr_seqs += num_new_seqs
Woosuk Kwon's avatar
Woosuk Kwon committed
214
215
                scheduled.append(seq_group)

216
            if scheduled or ignored_seq_groups:
Woosuk Kwon's avatar
Woosuk Kwon committed
217
218
219
                scheduler_outputs = SchedulerOutputs(
                    scheduled_seq_groups=scheduled,
                    prompt_run=True,
Zhuofan's avatar
Zhuofan committed
220
221
                    num_batched_tokens=len(seq_lens) *
                    max(seq_lens) if seq_lens else 0,
Woosuk Kwon's avatar
Woosuk Kwon committed
222
223
224
225
226
227
228
229
230
                    blocks_to_swap_in=blocks_to_swap_in,
                    blocks_to_swap_out=blocks_to_swap_out,
                    blocks_to_copy=blocks_to_copy,
                    ignored_seq_groups=ignored_seq_groups,
                )
                return scheduler_outputs

        # NOTE(woosuk): Preemption happens only when there is no available slot
        # to keep all the sequence groups in the RUNNING state.
231
232
233
234
235
        # In this case, the policy is responsible for deciding which sequence
        # groups to preempt.
        self.running = self.policy.sort_by_priority(now, self.running)

        # Reserve new token slots for the running sequence groups.
236
        running: Deque[SequenceGroup] = deque()
237
238
        preempted: List[SequenceGroup] = []
        while self.running:
239
            seq_group = self.running.popleft()
240
            while not self.block_manager.can_append_slot(seq_group):
241
242
                if self.running:
                    # Preempt the lowest-priority sequence groups.
243
                    victim_seq_group = self.running.pop()
244
245
246
247
248
249
250
                    self._preempt(victim_seq_group, blocks_to_swap_out)
                    preempted.append(victim_seq_group)
                else:
                    # No other sequence groups can be preempted.
                    # Preempt the current sequence group.
                    self._preempt(seq_group, blocks_to_swap_out)
                    preempted.append(seq_group)
Woosuk Kwon's avatar
Woosuk Kwon committed
251
252
                    break
            else:
253
                # Append new slots to the sequence group.
254
                self._append_slot(seq_group, blocks_to_copy)
255
256
257
258
259
                running.append(seq_group)
        self.running = running

        # Swap in the sequence groups in the SWAPPED state if possible.
        self.swapped = self.policy.sort_by_priority(now, self.swapped)
260
261
262
263
264
265
266
267
268
        if not preempted:
            num_curr_seqs = sum(seq_group.get_max_num_running_seqs()
                                for seq_group in self.running)

            while self.swapped:
                seq_group = self.swapped[0]
                # If the sequence group cannot be swapped in, stop.
                if not self.block_manager.can_swap_in(seq_group):
                    break
269

270
271
272
273
274
275
276
                # The total number of sequences in the RUNNING state should not
                # exceed the maximum number of sequences.
                num_new_seqs = seq_group.get_max_num_running_seqs()
                if (num_curr_seqs + num_new_seqs >
                        self.scheduler_config.max_num_seqs):
                    break

277
                seq_group = self.swapped.popleft()
278
279
280
281
282
283
284
285
                self._swap_in(seq_group, blocks_to_swap_in)
                self._append_slot(seq_group, blocks_to_copy)
                num_curr_seqs += num_new_seqs
                self.running.append(seq_group)

        # Each sequence in the generation phase only takes one token slot.
        # Therefore, the number of batched tokens is equal to the number of
        # sequences in the RUNNING state.
286
287
        num_batched_tokens = sum(
            seq_group.num_seqs(status=SequenceStatus.RUNNING)
288
            for seq_group in self.running)
289

290
        scheduler_outputs = SchedulerOutputs(
Woosuk Kwon's avatar
Woosuk Kwon committed
291
292
293
            scheduled_seq_groups=self.running,
            prompt_run=False,
            num_batched_tokens=num_batched_tokens,
294
295
296
            blocks_to_swap_in=blocks_to_swap_in,
            blocks_to_swap_out=blocks_to_swap_out,
            blocks_to_copy=blocks_to_copy,
Woosuk Kwon's avatar
Woosuk Kwon committed
297
            ignored_seq_groups=[],
298
        )
Woosuk Kwon's avatar
Woosuk Kwon committed
299
        return scheduler_outputs
Woosuk Kwon's avatar
Woosuk Kwon committed
300

Woosuk Kwon's avatar
Woosuk Kwon committed
301
    def schedule(self) -> Tuple[List[SequenceGroupMetadata], SchedulerOutputs]:
302
303
304
        # Schedule sequence groups.
        # This function call changes the internal states of the scheduler
        # such as self.running, self.swapped, and self.waiting.
Woosuk Kwon's avatar
Woosuk Kwon committed
305
        scheduler_outputs = self._schedule()
306
307

        # Create input data structures.
308
        seq_group_metadata_list: List[SequenceGroupMetadata] = []
Woosuk Kwon's avatar
Woosuk Kwon committed
309
        for seq_group in scheduler_outputs.scheduled_seq_groups:
Light Lin's avatar
Light Lin committed
310
            seq_data: Dict[int, SequenceData] = {}
311
312
            block_tables: Dict[int, List[int]] = {}
            for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
313
                seq_id = seq.seq_id
314
                seq_data[seq_id] = seq.data
315
                block_tables[seq_id] = self.block_manager.get_block_table(seq)
316

317
            seq_group_metadata = SequenceGroupMetadata(
318
                request_id=seq_group.request_id,
Woosuk Kwon's avatar
Woosuk Kwon committed
319
                is_prompt=scheduler_outputs.prompt_run,
320
                seq_data=seq_data,
321
                sampling_params=seq_group.sampling_params,
322
                block_tables=block_tables,
323
                prefix=seq_group.prefix,
324
            )
325
            seq_group_metadata_list.append(seq_group_metadata)
Woosuk Kwon's avatar
Woosuk Kwon committed
326
        return seq_group_metadata_list, scheduler_outputs
327

328
329
    def fork_seq(self, parent_seq: Sequence, child_seq: Sequence) -> None:
        self.block_manager.fork(parent_seq, child_seq)
Woosuk Kwon's avatar
Woosuk Kwon committed
330

331
    def free_seq(self, seq: Sequence) -> None:
332
        self.block_manager.free(seq)
Woosuk Kwon's avatar
Woosuk Kwon committed
333

334
335
336
337
338
    def free_finished_seq_groups(self) -> None:
        self.running = [
            seq_group for seq_group in self.running
            if not seq_group.is_finished()
        ]
Woosuk Kwon's avatar
Woosuk Kwon committed
339

340
341
    def _allocate(self, seq_group: SequenceGroup) -> None:
        self.block_manager.allocate(seq_group)
342
        for seq in seq_group.get_seqs(status=SequenceStatus.WAITING):
343
344
            seq.status = SequenceStatus.RUNNING

345
    def _append_slot(
346
347
348
349
350
        self,
        seq_group: SequenceGroup,
        blocks_to_copy: Dict[int, List[int]],
    ) -> None:
        for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
351
            ret = self.block_manager.append_slot(seq)
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
            if ret is not None:
                src_block, dst_block = ret
                if src_block in blocks_to_copy:
                    blocks_to_copy[src_block].append(dst_block)
                else:
                    blocks_to_copy[src_block] = [dst_block]

    def _preempt(
        self,
        seq_group: SequenceGroup,
        blocks_to_swap_out: Dict[int, int],
        preemption_mode: Optional[PreemptionMode] = None,
    ) -> None:
        # If preemption mode is not specified, we determine the mode as follows:
        # We use recomputation by default since it incurs lower overhead than
        # swapping. However, when the sequence group has multiple sequences
368
369
        # (e.g., beam search), recomputation is not currently supported. In
        # such a case, we use swapping instead.
370
371
372
373
374
375
376
        # FIXME(woosuk): This makes our scheduling policy a bit bizarre.
        # As swapped sequences are prioritized over waiting sequences,
        # sequence groups with multiple sequences are implicitly prioritized
        # over sequence groups with a single sequence.
        # TODO(woosuk): Support recomputation for sequence groups with multiple
        # sequences. This may require a more sophisticated CUDA kernel.
        if preemption_mode is None:
377
            if seq_group.get_max_num_running_seqs() == 1:
378
379
380
381
382
383
384
385
                preemption_mode = PreemptionMode.RECOMPUTE
            else:
                preemption_mode = PreemptionMode.SWAP
        if preemption_mode == PreemptionMode.RECOMPUTE:
            self._preempt_by_recompute(seq_group)
        elif preemption_mode == PreemptionMode.SWAP:
            self._preempt_by_swap(seq_group, blocks_to_swap_out)
        else:
386
            raise AssertionError("Invalid preemption mode.")
387
388
389
390
391
392
393
394
395
396

    def _preempt_by_recompute(
        self,
        seq_group: SequenceGroup,
    ) -> None:
        seqs = seq_group.get_seqs(status=SequenceStatus.RUNNING)
        assert len(seqs) == 1
        for seq in seqs:
            seq.status = SequenceStatus.WAITING
            self.block_manager.free(seq)
397
398
        # NOTE: For FCFS, we insert the preempted sequence group to the front
        # of the waiting queue.
399
        self.waiting.appendleft(seq_group)
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423

    def _preempt_by_swap(
        self,
        seq_group: SequenceGroup,
        blocks_to_swap_out: Dict[int, int],
    ) -> None:
        self._swap_out(seq_group, blocks_to_swap_out)
        self.swapped.append(seq_group)

    def _swap_in(
        self,
        seq_group: SequenceGroup,
        blocks_to_swap_in: Dict[int, int],
    ) -> None:
        mapping = self.block_manager.swap_in(seq_group)
        blocks_to_swap_in.update(mapping)
        for seq in seq_group.get_seqs(status=SequenceStatus.SWAPPED):
            seq.status = SequenceStatus.RUNNING

    def _swap_out(
        self,
        seq_group: SequenceGroup,
        blocks_to_swap_out: Dict[int, int],
    ) -> None:
424
425
426
427
428
429
        if not self.block_manager.can_swap_out(seq_group):
            # FIXME(woosuk): Abort the sequence group instead of aborting the
            # entire engine.
            raise RuntimeError(
                "Aborted due to the lack of CPU swap space. Please increase "
                "the swap space to avoid this error.")
430
431
432
433
        mapping = self.block_manager.swap_out(seq_group)
        blocks_to_swap_out.update(mapping)
        for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
            seq.status = SequenceStatus.SWAPPED