scheduler.py 16.8 KB
Newer Older
1
2
import enum
import time
3
from typing import Dict, List, Optional, Tuple
Woosuk Kwon's avatar
Woosuk Kwon committed
4

Woosuk Kwon's avatar
Woosuk Kwon committed
5
6
7
8
9
10
11
from vllm.config import CacheConfig, SchedulerConfig
from vllm.core.block_manager import BlockSpaceManager
from vllm.core.policy import PolicyFactory
from vllm.logger import init_logger
from vllm.sequence import (Sequence, SequenceData, SequenceGroup,
                           SequenceGroupMetadata, SequenceOutputs,
                           SequenceStatus)
Woosuk Kwon's avatar
Woosuk Kwon committed
12

Woosuk Kwon's avatar
Woosuk Kwon committed
13
logger = init_logger(__name__)
14

Woosuk Kwon's avatar
Woosuk Kwon committed
15

16
17
18
19
20
21
22
23
24
25
26
27
28
class PreemptionMode(enum.Enum):
    """Preemption modes.

    1. Swapping: Swap out the blocks of the preempted sequences to CPU memory
    and swap them back in when the sequences are resumed.
    2. Recomputation: Discard the blocks of the preempted sequences and
    recompute them when the sequences are resumed, treating the sequences as
    new prompts.
    """
    SWAP = enum.auto()
    RECOMPUTE = enum.auto()


29
30
31
32
class SchedulerOutputs:

    def __init__(
        self,
Woosuk Kwon's avatar
Woosuk Kwon committed
33
34
35
        scheduled_seq_groups: List[SequenceGroup],
        prompt_run: bool,
        num_batched_tokens: int,
36
37
38
        blocks_to_swap_in: Dict[int, int],
        blocks_to_swap_out: Dict[int, int],
        blocks_to_copy: Dict[int, List[int]],
Woosuk Kwon's avatar
Woosuk Kwon committed
39
        ignored_seq_groups: List[SequenceGroup],
40
    ) -> None:
Woosuk Kwon's avatar
Woosuk Kwon committed
41
42
43
        self.scheduled_seq_groups = scheduled_seq_groups
        self.prompt_run = prompt_run
        self.num_batched_tokens = num_batched_tokens
44
45
46
47
48
        self.blocks_to_swap_in = blocks_to_swap_in
        self.blocks_to_swap_out = blocks_to_swap_out
        self.blocks_to_copy = blocks_to_copy
        # Swap in and swap out should never happen at the same time.
        assert not (blocks_to_swap_in and blocks_to_swap_out)
Woosuk Kwon's avatar
Woosuk Kwon committed
49
        self.ignored_seq_groups = ignored_seq_groups
50
51

    def is_empty(self) -> bool:
Woosuk Kwon's avatar
Woosuk Kwon committed
52
53
54
        # NOTE: We do not consider the ignored sequence groups.
        return (not self.scheduled_seq_groups and not self.blocks_to_swap_in
                and not self.blocks_to_swap_out and not self.blocks_to_copy)
55
56


Woosuk Kwon's avatar
Woosuk Kwon committed
57
58
class Scheduler:

Woosuk Kwon's avatar
Woosuk Kwon committed
59
    def __init__(
Woosuk Kwon's avatar
Woosuk Kwon committed
60
        self,
61
62
        scheduler_config: SchedulerConfig,
        cache_config: CacheConfig,
Woosuk Kwon's avatar
Woosuk Kwon committed
63
    ) -> None:
64
65
        self.scheduler_config = scheduler_config
        self.cache_config = cache_config
Woosuk Kwon's avatar
Woosuk Kwon committed
66

67
68
69
        self.prompt_limit = min(self.scheduler_config.max_model_len,
                                self.scheduler_config.max_num_batched_tokens)

70
        # Instantiate the scheduling policy.
71
        self.policy = PolicyFactory.get_policy(policy_name="fcfs")
Woosuk Kwon's avatar
Woosuk Kwon committed
72
        # Create the block space manager.
Woosuk Kwon's avatar
Woosuk Kwon committed
73
        self.block_manager = BlockSpaceManager(
74
75
76
            block_size=self.cache_config.block_size,
            num_gpu_blocks=self.cache_config.num_gpu_blocks,
            num_cpu_blocks=self.cache_config.num_cpu_blocks,
Woosuk Kwon's avatar
Woosuk Kwon committed
77
78
        )

79
80
81
        # Sequence groups in the WAITING state.
        self.waiting: List[SequenceGroup] = []
        # Sequence groups in the RUNNING state.
82
        self.running: List[SequenceGroup] = []
83
        # Sequence groups in the SWAPPED state.
Woosuk Kwon's avatar
Woosuk Kwon committed
84
        self.swapped: List[SequenceGroup] = []
Woosuk Kwon's avatar
Woosuk Kwon committed
85

86
    def add_seq_group(self, seq_group: SequenceGroup) -> None:
87
        # Add sequence groups to the waiting queue.
88
        self.waiting.append(seq_group)
Woosuk Kwon's avatar
Woosuk Kwon committed
89

90
91
92
93
94
95
96
97
98
99
100
101
    def abort_seq_group(self, request_id: str) -> None:
        for state_queue in [self.waiting, self.running, self.swapped]:
            for seq_group in state_queue:
                if seq_group.request_id == request_id:
                    # Remove the sequence group from the state queue.
                    state_queue.remove(seq_group)
                    for seq in seq_group.seqs:
                        if seq.is_finished():
                            continue
                        self.free_seq(seq, SequenceStatus.FINISHED_ABORTED)
                    return

102
103
104
    def has_unfinished_seqs(self) -> bool:
        return self.waiting or self.running or self.swapped

105
106
107
    def get_num_unfinished_seq_groups(self) -> int:
        return len(self.waiting) + len(self.running) + len(self.swapped)

Woosuk Kwon's avatar
Woosuk Kwon committed
108
    def _schedule(self) -> SchedulerOutputs:
109
110
111
        # Blocks that need to be swaped or copied before model execution.
        blocks_to_swap_in: Dict[int, int] = {}
        blocks_to_swap_out: Dict[int, int] = {}
112
        blocks_to_copy: Dict[int, List[int]] = {}
113

114
115
116
        # Fix the current time.
        now = time.time()

Woosuk Kwon's avatar
Woosuk Kwon committed
117
118
119
120
121
122
123
124
125
126
127
128
        # Join waiting sequences if possible.
        if not self.swapped:
            ignored_seq_groups: List[SequenceGroup] = []
            scheduled: List[SequenceGroup] = []
            num_batched_tokens = 0
            # Optimization: We do not sort the waiting queue since the preempted
            # sequence groups are added to the front and the new sequence groups
            # are added to the back.
            while self.waiting:
                seq_group = self.waiting[0]

                num_prompt_tokens = seq_group.get_seqs()[0].get_len()
129
                if num_prompt_tokens > self.prompt_limit:
Woosuk Kwon's avatar
Woosuk Kwon committed
130
131
                    logger.warning(
                        f"Input prompt ({num_prompt_tokens} tokens) is too long"
132
                        f" and exceeds limit of {self.prompt_limit}")
Woosuk Kwon's avatar
Woosuk Kwon committed
133
134
135
136
                    for seq in seq_group.get_seqs():
                        seq.status = SequenceStatus.FINISHED_IGNORED
                    ignored_seq_groups.append(seq_group)
                    self.waiting.pop(0)
137
                    continue
Woosuk Kwon's avatar
Woosuk Kwon committed
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178

                # If the sequence group cannot be allocated, stop.
                if not self.block_manager.can_allocate(seq_group):
                    break

                # If the number of batched tokens exceeds the limit, stop.
                if (num_batched_tokens + num_prompt_tokens >
                        self.scheduler_config.max_num_batched_tokens):
                    break

                # The total number of sequences in the RUNNING state should not
                # exceed the maximum number of sequences.
                num_new_seqs = seq_group.num_seqs(
                    status=SequenceStatus.WAITING)
                num_curr_seqs = sum(
                    seq_group.num_seqs(status=SequenceStatus.RUNNING)
                    for seq_group in self.running)
                if (num_curr_seqs + num_new_seqs >
                        self.scheduler_config.max_num_seqs):
                    break

                seq_group = self.waiting.pop(0)
                self._allocate(seq_group)
                self.running.append(seq_group)
                num_batched_tokens += num_prompt_tokens
                scheduled.append(seq_group)

            if scheduled:
                scheduler_outputs = SchedulerOutputs(
                    scheduled_seq_groups=scheduled,
                    prompt_run=True,
                    num_batched_tokens=num_batched_tokens,
                    blocks_to_swap_in=blocks_to_swap_in,
                    blocks_to_swap_out=blocks_to_swap_out,
                    blocks_to_copy=blocks_to_copy,
                    ignored_seq_groups=ignored_seq_groups,
                )
                return scheduler_outputs

        # NOTE(woosuk): Preemption happens only when there is no available slot
        # to keep all the sequence groups in the RUNNING state.
179
180
181
182
183
184
185
186
187
        # In this case, the policy is responsible for deciding which sequence
        # groups to preempt.
        self.running = self.policy.sort_by_priority(now, self.running)

        # Reserve new token slots for the running sequence groups.
        running: List[SequenceGroup] = []
        preempted: List[SequenceGroup] = []
        while self.running:
            seq_group = self.running.pop(0)
188
            while not self.block_manager.can_append_slot(seq_group):
189
190
191
192
193
194
195
196
197
198
                if self.running:
                    # Preempt the lowest-priority sequence groups.
                    victim_seq_group = self.running.pop(-1)
                    self._preempt(victim_seq_group, blocks_to_swap_out)
                    preempted.append(victim_seq_group)
                else:
                    # No other sequence groups can be preempted.
                    # Preempt the current sequence group.
                    self._preempt(seq_group, blocks_to_swap_out)
                    preempted.append(seq_group)
Woosuk Kwon's avatar
Woosuk Kwon committed
199
200
                    break
            else:
201
                # Append new slots to the sequence group.
202
                self._append_slot(seq_group, blocks_to_copy)
203
204
205
206
207
                running.append(seq_group)
        self.running = running

        # Swap in the sequence groups in the SWAPPED state if possible.
        self.swapped = self.policy.sort_by_priority(now, self.swapped)
208
        while self.swapped and not blocks_to_swap_out:
209
210
211
212
213
214
            seq_group = self.swapped[0]
            # If the sequence group has been preempted in this step, stop.
            if seq_group in preempted:
                break
            # If the sequence group cannot be swapped in, stop.
            if not self.block_manager.can_swap_in(seq_group):
Woosuk Kwon's avatar
Woosuk Kwon committed
215
216
                break

217
218
            # The total number of sequences in the RUNNING state should not
            # exceed the maximum number of sequences.
219
            num_new_seqs = seq_group.num_seqs(status=SequenceStatus.SWAPPED)
220
221
222
            num_curr_seqs = sum(
                seq_group.num_seqs(status=SequenceStatus.RUNNING)
                for seq_group in self.running)
223
224
            if (num_curr_seqs + num_new_seqs >
                    self.scheduler_config.max_num_seqs):
225
226
                break

227
228
            seq_group = self.swapped.pop(0)
            self._swap_in(seq_group, blocks_to_swap_in)
229
            self._append_slot(seq_group, blocks_to_copy)
230
            self.running.append(seq_group)
231

232
233
        num_batched_tokens = sum(
            seq_group.num_seqs(status=SequenceStatus.RUNNING)
234
            for seq_group in self.running)
235

236
        scheduler_outputs = SchedulerOutputs(
Woosuk Kwon's avatar
Woosuk Kwon committed
237
238
239
            scheduled_seq_groups=self.running,
            prompt_run=False,
            num_batched_tokens=num_batched_tokens,
240
241
242
            blocks_to_swap_in=blocks_to_swap_in,
            blocks_to_swap_out=blocks_to_swap_out,
            blocks_to_copy=blocks_to_copy,
Woosuk Kwon's avatar
Woosuk Kwon committed
243
            ignored_seq_groups=[],
244
        )
Woosuk Kwon's avatar
Woosuk Kwon committed
245
        return scheduler_outputs
Woosuk Kwon's avatar
Woosuk Kwon committed
246

Woosuk Kwon's avatar
Woosuk Kwon committed
247
    def schedule(self) -> Tuple[List[SequenceGroupMetadata], SchedulerOutputs]:
248
249
250
        # Schedule sequence groups.
        # This function call changes the internal states of the scheduler
        # such as self.running, self.swapped, and self.waiting.
Woosuk Kwon's avatar
Woosuk Kwon committed
251
        scheduler_outputs = self._schedule()
252
253

        # Create input data structures.
254
        seq_group_metadata_list: List[SequenceGroupMetadata] = []
Woosuk Kwon's avatar
Woosuk Kwon committed
255
        for seq_group in scheduler_outputs.scheduled_seq_groups:
256
            seq_data: Dict[int, List[SequenceData]] = {}
257
258
            block_tables: Dict[int, List[int]] = {}
            for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
259
                seq_id = seq.seq_id
260
                seq_data[seq_id] = seq.data
261
                block_tables[seq_id] = self.block_manager.get_block_table(seq)
262

263
            seq_group_metadata = SequenceGroupMetadata(
264
                request_id=seq_group.request_id,
Woosuk Kwon's avatar
Woosuk Kwon committed
265
                is_prompt=scheduler_outputs.prompt_run,
266
                seq_data=seq_data,
267
                sampling_params=seq_group.sampling_params,
268
269
                block_tables=block_tables,
            )
270
            seq_group_metadata_list.append(seq_group_metadata)
Woosuk Kwon's avatar
Woosuk Kwon committed
271
        return seq_group_metadata_list, scheduler_outputs
272

273
    def update(
Woosuk Kwon's avatar
Woosuk Kwon committed
274
        self,
275
        seq_outputs: Dict[int, SequenceOutputs],
276
    ) -> List[SequenceGroup]:
Woosuk Kwon's avatar
Woosuk Kwon committed
277
        scheduled: List[SequenceGroup] = []
278
        for seq_group in self.running:
Woosuk Kwon's avatar
Woosuk Kwon committed
279
280
281
282
283
284
285
            for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
                if seq.seq_id in seq_outputs:
                    scheduled.append(seq_group)
                    break

        # Update the scheduled sequences and free blocks.
        for seq_group in scheduled:
286
287
            # Process beam search results before processing the new tokens.
            for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
288
289
                output = seq_outputs[seq.seq_id]
                if seq.seq_id != output.parent_seq_id:
290
291
                    # The sequence is a fork of the parent sequence (beam
                    # search). Free the current sequence.
Woosuk Kwon's avatar
Woosuk Kwon committed
292
293
                    self.block_manager.free(seq)
                    # Fork the parent sequence.
294
295
                    parent_seq = seq_group.find(output.parent_seq_id)
                    parent_seq.fork(seq)
Woosuk Kwon's avatar
Woosuk Kwon committed
296
297
                    self.block_manager.fork(parent_seq, seq)

298
299
            # Process the new tokens.
            for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
Woosuk Kwon's avatar
Woosuk Kwon committed
300
                # Append a new token to the sequence.
301
                output = seq_outputs[seq.seq_id]
302
                seq.append_token_id(output.output_token, output.logprobs)
Woosuk Kwon's avatar
Woosuk Kwon committed
303
        return scheduled
Woosuk Kwon's avatar
Woosuk Kwon committed
304

Zhuohan Li's avatar
Zhuohan Li committed
305
306
    def free_seq(self, seq: Sequence, finish_status: SequenceStatus) -> None:
        seq.status = finish_status
307
        self.block_manager.free(seq)
Woosuk Kwon's avatar
Woosuk Kwon committed
308

309
310
311
312
313
    def free_finished_seq_groups(self) -> None:
        self.running = [
            seq_group for seq_group in self.running
            if not seq_group.is_finished()
        ]
Woosuk Kwon's avatar
Woosuk Kwon committed
314

315
316
    def _allocate(self, seq_group: SequenceGroup) -> None:
        self.block_manager.allocate(seq_group)
317
        for seq in seq_group.get_seqs():
318
319
            seq.status = SequenceStatus.RUNNING

320
    def _append_slot(
321
322
323
324
325
        self,
        seq_group: SequenceGroup,
        blocks_to_copy: Dict[int, List[int]],
    ) -> None:
        for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
326
            ret = self.block_manager.append_slot(seq)
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
            if ret is not None:
                src_block, dst_block = ret
                if src_block in blocks_to_copy:
                    blocks_to_copy[src_block].append(dst_block)
                else:
                    blocks_to_copy[src_block] = [dst_block]

    def _preempt(
        self,
        seq_group: SequenceGroup,
        blocks_to_swap_out: Dict[int, int],
        preemption_mode: Optional[PreemptionMode] = None,
    ) -> None:
        # If preemption mode is not specified, we determine the mode as follows:
        # We use recomputation by default since it incurs lower overhead than
        # swapping. However, when the sequence group has multiple sequences
        # (e.g., beam search), recomputation is not supported. In such a case,
        # we use swapping instead.
        # FIXME(woosuk): This makes our scheduling policy a bit bizarre.
        # As swapped sequences are prioritized over waiting sequences,
        # sequence groups with multiple sequences are implicitly prioritized
        # over sequence groups with a single sequence.
        # TODO(woosuk): Support recomputation for sequence groups with multiple
        # sequences. This may require a more sophisticated CUDA kernel.
        if preemption_mode is None:
            seqs = seq_group.get_seqs(status=SequenceStatus.RUNNING)
            if len(seqs) == 1:
                preemption_mode = PreemptionMode.RECOMPUTE
            else:
                preemption_mode = PreemptionMode.SWAP
        if preemption_mode == PreemptionMode.RECOMPUTE:
            self._preempt_by_recompute(seq_group)
        elif preemption_mode == PreemptionMode.SWAP:
            self._preempt_by_swap(seq_group, blocks_to_swap_out)
        else:
362
            assert False, "Invalid preemption mode."
363
364
365
366
367
368
369
370
371
372

    def _preempt_by_recompute(
        self,
        seq_group: SequenceGroup,
    ) -> None:
        seqs = seq_group.get_seqs(status=SequenceStatus.RUNNING)
        assert len(seqs) == 1
        for seq in seqs:
            seq.status = SequenceStatus.WAITING
            self.block_manager.free(seq)
373
374
375
        # NOTE: For FCFS, we insert the preempted sequence group to the front
        # of the waiting queue.
        self.waiting.insert(0, seq_group)
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399

    def _preempt_by_swap(
        self,
        seq_group: SequenceGroup,
        blocks_to_swap_out: Dict[int, int],
    ) -> None:
        self._swap_out(seq_group, blocks_to_swap_out)
        self.swapped.append(seq_group)

    def _swap_in(
        self,
        seq_group: SequenceGroup,
        blocks_to_swap_in: Dict[int, int],
    ) -> None:
        mapping = self.block_manager.swap_in(seq_group)
        blocks_to_swap_in.update(mapping)
        for seq in seq_group.get_seqs(status=SequenceStatus.SWAPPED):
            seq.status = SequenceStatus.RUNNING

    def _swap_out(
        self,
        seq_group: SequenceGroup,
        blocks_to_swap_out: Dict[int, int],
    ) -> None:
400
401
402
403
404
405
        if not self.block_manager.can_swap_out(seq_group):
            # FIXME(woosuk): Abort the sequence group instead of aborting the
            # entire engine.
            raise RuntimeError(
                "Aborted due to the lack of CPU swap space. Please increase "
                "the swap space to avoid this error.")
406
407
408
409
        mapping = self.block_manager.swap_out(seq_group)
        blocks_to_swap_out.update(mapping)
        for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
            seq.status = SequenceStatus.SWAPPED