scheduler.py 22.7 KB
Newer Older
1
2
import enum
import time
3
4
from collections import deque
from typing import Deque, Dict, Iterable, List, Optional, Set, Tuple, Union
Woosuk Kwon's avatar
Woosuk Kwon committed
5

6
from vllm.config import CacheConfig, LoRAConfig, SchedulerConfig
7
from vllm.core.interfaces import AllocStatus, BlockSpaceManager
Woosuk Kwon's avatar
Woosuk Kwon committed
8
9
from vllm.core.policy import PolicyFactory
from vllm.logger import init_logger
10
from vllm.lora.request import LoRARequest
Woosuk Kwon's avatar
Woosuk Kwon committed
11
from vllm.sequence import (Sequence, SequenceData, SequenceGroup,
12
                           SequenceGroupMetadata, SequenceStatus)
Woosuk Kwon's avatar
Woosuk Kwon committed
13

Woosuk Kwon's avatar
Woosuk Kwon committed
14
logger = init_logger(__name__)
15

Woosuk Kwon's avatar
Woosuk Kwon committed
16

17
18
19
20
21
22
23
24
25
26
27
28
29
class PreemptionMode(enum.Enum):
    """Preemption modes.

    1. Swapping: Swap out the blocks of the preempted sequences to CPU memory
    and swap them back in when the sequences are resumed.
    2. Recomputation: Discard the blocks of the preempted sequences and
    recompute them when the sequences are resumed, treating the sequences as
    new prompts.
    """
    SWAP = enum.auto()
    RECOMPUTE = enum.auto()


30
31
32
33
class SchedulerOutputs:

    def __init__(
        self,
34
        scheduled_seq_groups: Iterable[SequenceGroup],
Woosuk Kwon's avatar
Woosuk Kwon committed
35
36
        prompt_run: bool,
        num_batched_tokens: int,
37
38
39
        blocks_to_swap_in: Dict[int, int],
        blocks_to_swap_out: Dict[int, int],
        blocks_to_copy: Dict[int, List[int]],
Woosuk Kwon's avatar
Woosuk Kwon committed
40
        ignored_seq_groups: List[SequenceGroup],
41
    ) -> None:
Woosuk Kwon's avatar
Woosuk Kwon committed
42
43
44
        self.scheduled_seq_groups = scheduled_seq_groups
        self.prompt_run = prompt_run
        self.num_batched_tokens = num_batched_tokens
45
46
47
48
49
        self.blocks_to_swap_in = blocks_to_swap_in
        self.blocks_to_swap_out = blocks_to_swap_out
        self.blocks_to_copy = blocks_to_copy
        # Swap in and swap out should never happen at the same time.
        assert not (blocks_to_swap_in and blocks_to_swap_out)
Woosuk Kwon's avatar
Woosuk Kwon committed
50
        self.ignored_seq_groups = ignored_seq_groups
51

52
53
54
55
        self.num_loras = len(self.lora_requests)
        if self.num_loras > 0:
            self._sort_by_lora_ids()

56
    def is_empty(self) -> bool:
Woosuk Kwon's avatar
Woosuk Kwon committed
57
58
59
        # NOTE: We do not consider the ignored sequence groups.
        return (not self.scheduled_seq_groups and not self.blocks_to_swap_in
                and not self.blocks_to_swap_out and not self.blocks_to_copy)
60

61
    def _sort_by_lora_ids(self) -> bool:
62
63
64
        self.scheduled_seq_groups = sorted(self.scheduled_seq_groups,
                                           key=lambda g:
                                           (g.lora_int_id, g.request_id))
65
66
67
68
69

    @property
    def lora_requests(self) -> Set[LoRARequest]:
        return {g.lora_request for g in self.scheduled_seq_groups}

70

Woosuk Kwon's avatar
Woosuk Kwon committed
71
72
class Scheduler:

Woosuk Kwon's avatar
Woosuk Kwon committed
73
    def __init__(
Woosuk Kwon's avatar
Woosuk Kwon committed
74
        self,
75
76
        scheduler_config: SchedulerConfig,
        cache_config: CacheConfig,
77
        lora_config: Optional[LoRAConfig],
Woosuk Kwon's avatar
Woosuk Kwon committed
78
    ) -> None:
79
80
        self.scheduler_config = scheduler_config
        self.cache_config = cache_config
81
82
83
84
        # Note for LoRA scheduling: the current policy is extremely
        # simple and NOT fair. It can lead to starvation of some
        # LoRAs. This should be improved in the future.
        self.lora_config = lora_config
Woosuk Kwon's avatar
Woosuk Kwon committed
85

86
87
88
        self.prompt_limit = min(self.scheduler_config.max_model_len,
                                self.scheduler_config.max_num_batched_tokens)

89
        # Instantiate the scheduling policy.
90
        self.policy = PolicyFactory.get_policy(policy_name="fcfs")
91
92
93
94
95

        BlockSpaceManagerImpl = BlockSpaceManager.get_block_space_manager_class(
            version="v2" if self.scheduler_config.
            use_v2_block_manager else "v1")

Woosuk Kwon's avatar
Woosuk Kwon committed
96
        # Create the block space manager.
97
        self.block_manager = BlockSpaceManagerImpl(
98
99
100
            block_size=self.cache_config.block_size,
            num_gpu_blocks=self.cache_config.num_gpu_blocks,
            num_cpu_blocks=self.cache_config.num_cpu_blocks,
101
102
            sliding_window=self.cache_config.sliding_window,
            enable_caching=self.cache_config.enable_prefix_caching)
103

104
        # Sequence groups in the WAITING state.
105
        self.waiting: Deque[SequenceGroup] = deque()
106
        # Sequence groups in the RUNNING state.
107
        self.running: Deque[SequenceGroup] = deque()
108
        # Sequence groups in the SWAPPED state.
109
        self.swapped: Deque[SequenceGroup] = deque()
Woosuk Kwon's avatar
Woosuk Kwon committed
110

111
112
113
114
115
116
117
        # Time at previous scheduling step
        self.prev_time = 0.0
        # Did we schedule a prompt at previous step?
        self.prev_prompt = False
        # Latency of the last prompt step
        self.last_prompt_latency = 0.0

118
119
120
121
    @property
    def lora_enabled(self) -> bool:
        return bool(self.lora_config)

122
    def add_seq_group(self, seq_group: SequenceGroup) -> None:
123
        # Add sequence groups to the waiting queue.
124
        self.waiting.append(seq_group)
Woosuk Kwon's avatar
Woosuk Kwon committed
125

Antoni Baum's avatar
Antoni Baum committed
126
    def abort_seq_group(self, request_id: Union[str, Iterable[str]]) -> None:
127
128
129
130
131
132
133
134
135
136
137
138
        """Aborts a sequence group with the given ID.

        Check if the sequence group with the given ID
            is present in any of the state queue.
        If present, remove the sequence group from the state queue.
            Also, if any of the sequences in the sequence group is not finished,
                free the sequence with status `FINISHED_ABORTED`.
        Otherwise, do nothing.

        Args:
            request_id: The ID(s) of the sequence group to abort.
        """
Antoni Baum's avatar
Antoni Baum committed
139
140
141
        if isinstance(request_id, str):
            request_id = (request_id, )
        request_ids = set(request_id)
142
        for state_queue in [self.waiting, self.running, self.swapped]:
ljss's avatar
ljss committed
143
            aborted_groups: List[SequenceGroup] = []
144
145
146
147
148
            for seq_group in state_queue:
                if not request_ids:
                    # Using 'break' here may add two extra iterations,
                    # but is acceptable to reduce complexity .
                    break
Antoni Baum's avatar
Antoni Baum committed
149
                if seq_group.request_id in request_ids:
150
151
                    # Appending aborted group into pending list.
                    aborted_groups.append(seq_group)
Antoni Baum's avatar
Antoni Baum committed
152
                    request_ids.remove(seq_group.request_id)
153
154
155
            for aborted_group in aborted_groups:
                # Remove the sequence group from the state queue.
                state_queue.remove(aborted_group)
ljss's avatar
ljss committed
156
                for seq in aborted_group.get_seqs():
157
158
159
160
                    if seq.is_finished():
                        continue
                    seq.status = SequenceStatus.FINISHED_ABORTED
                    self.free_seq(seq)
161

162
163
164
    def has_unfinished_seqs(self) -> bool:
        return self.waiting or self.running or self.swapped

165
166
167
    def get_num_unfinished_seq_groups(self) -> int:
        return len(self.waiting) + len(self.running) + len(self.swapped)

Woosuk Kwon's avatar
Woosuk Kwon committed
168
    def _schedule(self) -> SchedulerOutputs:
169
        # Blocks that need to be swapped or copied before model execution.
170
171
        blocks_to_swap_in: Dict[int, int] = {}
        blocks_to_swap_out: Dict[int, int] = {}
172
        blocks_to_copy: Dict[int, List[int]] = {}
173

174
        # Fix the current time.
175
        now = time.time()
176

Woosuk Kwon's avatar
Woosuk Kwon committed
177
178
179
180
        # Join waiting sequences if possible.
        if not self.swapped:
            ignored_seq_groups: List[SequenceGroup] = []
            scheduled: List[SequenceGroup] = []
181
182
183
184
            # The total number of sequences on the fly, including the
            # requests in the generation phase.
            num_curr_seqs = sum(seq_group.get_max_num_running_seqs()
                                for seq_group in self.running)
185
186
187
            curr_loras = set(
                seq_group.lora_int_id
                for seq_group in self.running) if self.lora_enabled else None
188

Woosuk Kwon's avatar
Woosuk Kwon committed
189
190
191
            # Optimization: We do not sort the waiting queue since the preempted
            # sequence groups are added to the front and the new sequence groups
            # are added to the back.
192
            leftover_waiting_sequences = deque()
193
            num_batched_tokens = 0
194
            while self._passed_delay(now) and self.waiting:
Woosuk Kwon's avatar
Woosuk Kwon committed
195
                seq_group = self.waiting[0]
196
197
198
                waiting_seqs = seq_group.get_seqs(
                    status=SequenceStatus.WAITING)
                assert len(waiting_seqs) == 1, (
199
200
                    "Waiting sequence group should have only one prompt "
                    "sequence.")
201
                num_prompt_tokens = waiting_seqs[0].get_len()
202
                if num_prompt_tokens > self.prompt_limit:
Woosuk Kwon's avatar
Woosuk Kwon committed
203
204
                    logger.warning(
                        f"Input prompt ({num_prompt_tokens} tokens) is too long"
205
                        f" and exceeds limit of {self.prompt_limit}")
206
                    for seq in waiting_seqs:
Woosuk Kwon's avatar
Woosuk Kwon committed
207
208
                        seq.status = SequenceStatus.FINISHED_IGNORED
                    ignored_seq_groups.append(seq_group)
209
                    self.waiting.popleft()
210
                    continue
Woosuk Kwon's avatar
Woosuk Kwon committed
211
212

                # If the sequence group cannot be allocated, stop.
213
214
                can_allocate = self.block_manager.can_allocate(seq_group)
                if can_allocate == AllocStatus.LATER:
Woosuk Kwon's avatar
Woosuk Kwon committed
215
                    break
216
217
218
219
                elif can_allocate == AllocStatus.NEVER:
                    logger.warning(
                        f"Input prompt ({num_prompt_tokens} tokens) is too long"
                        f" and exceeds the capacity of block_manager")
220
                    for seq in waiting_seqs:
221
222
                        seq.status = SequenceStatus.FINISHED_IGNORED
                    ignored_seq_groups.append(seq_group)
223
                    self.waiting.popleft()
224
                    continue
Woosuk Kwon's avatar
Woosuk Kwon committed
225

226
227
228
                lora_int_id = 0
                if self.lora_enabled:
                    lora_int_id = seq_group.lora_int_id
229
230
                    if (lora_int_id > 0 and lora_int_id not in curr_loras
                            and len(curr_loras) >= self.lora_config.max_loras):
231
232
233
234
235
236
                        # We don't have a space for another LoRA, so
                        # we ignore this request for now.
                        leftover_waiting_sequences.appendleft(seq_group)
                        self.waiting.popleft()
                        continue

Woosuk Kwon's avatar
Woosuk Kwon committed
237
                # If the number of batched tokens exceeds the limit, stop.
238
                num_batched_tokens += num_prompt_tokens
239
                if (num_batched_tokens >
Woosuk Kwon's avatar
Woosuk Kwon committed
240
241
242
243
244
                        self.scheduler_config.max_num_batched_tokens):
                    break

                # The total number of sequences in the RUNNING state should not
                # exceed the maximum number of sequences.
245
                num_new_seqs = seq_group.get_max_num_running_seqs()
Woosuk Kwon's avatar
Woosuk Kwon committed
246
247
248
249
                if (num_curr_seqs + num_new_seqs >
                        self.scheduler_config.max_num_seqs):
                    break

250
251
252
                if lora_int_id > 0:
                    curr_loras.add(lora_int_id)
                self.waiting.popleft()
Woosuk Kwon's avatar
Woosuk Kwon committed
253
254
                self._allocate(seq_group)
                self.running.append(seq_group)
255
                num_curr_seqs += num_new_seqs
Woosuk Kwon's avatar
Woosuk Kwon committed
256
257
                scheduled.append(seq_group)

258
259
            self.waiting.extendleft(leftover_waiting_sequences)

260
            if scheduled or ignored_seq_groups:
261
                self.prev_prompt = True
Woosuk Kwon's avatar
Woosuk Kwon committed
262
263
264
                scheduler_outputs = SchedulerOutputs(
                    scheduled_seq_groups=scheduled,
                    prompt_run=True,
265
                    num_batched_tokens=num_batched_tokens,
Woosuk Kwon's avatar
Woosuk Kwon committed
266
267
268
269
270
271
272
273
274
                    blocks_to_swap_in=blocks_to_swap_in,
                    blocks_to_swap_out=blocks_to_swap_out,
                    blocks_to_copy=blocks_to_copy,
                    ignored_seq_groups=ignored_seq_groups,
                )
                return scheduler_outputs

        # NOTE(woosuk): Preemption happens only when there is no available slot
        # to keep all the sequence groups in the RUNNING state.
275
276
277
278
279
        # In this case, the policy is responsible for deciding which sequence
        # groups to preempt.
        self.running = self.policy.sort_by_priority(now, self.running)

        # Reserve new token slots for the running sequence groups.
280
        running: Deque[SequenceGroup] = deque()
281
282
        preempted: List[SequenceGroup] = []
        while self.running:
283
            seq_group = self.running.popleft()
284
            while not self.block_manager.can_append_slot(seq_group):
285
286
                if self.running:
                    # Preempt the lowest-priority sequence groups.
287
                    victim_seq_group = self.running.pop()
288
289
290
291
292
293
294
                    self._preempt(victim_seq_group, blocks_to_swap_out)
                    preempted.append(victim_seq_group)
                else:
                    # No other sequence groups can be preempted.
                    # Preempt the current sequence group.
                    self._preempt(seq_group, blocks_to_swap_out)
                    preempted.append(seq_group)
Woosuk Kwon's avatar
Woosuk Kwon committed
295
296
                    break
            else:
297
                # Append new slots to the sequence group.
298
                self._append_slot(seq_group, blocks_to_copy)
299
300
301
302
303
                running.append(seq_group)
        self.running = running

        # Swap in the sequence groups in the SWAPPED state if possible.
        self.swapped = self.policy.sort_by_priority(now, self.swapped)
304
305
306
        if not preempted:
            num_curr_seqs = sum(seq_group.get_max_num_running_seqs()
                                for seq_group in self.running)
307
308
309
310
311
            curr_loras = set(
                seq_group.lora_int_id
                for seq_group in self.running) if self.lora_enabled else None

            leftover_swapped = deque()
312
313
314

            while self.swapped:
                seq_group = self.swapped[0]
315
316
317
                lora_int_id = 0
                if self.lora_enabled:
                    lora_int_id = seq_group.lora_int_id
318
319
                    if (lora_int_id > 0 and lora_int_id not in curr_loras
                            and len(curr_loras) >= self.lora_config.max_loras):
320
321
322
323
324
325
                        # We don't have a space for another LoRA, so
                        # we ignore this request for now.
                        leftover_swapped.appendleft(seq_group)
                        self.swapped.popleft()
                        continue

326
327
328
                # If the sequence group cannot be swapped in, stop.
                if not self.block_manager.can_swap_in(seq_group):
                    break
329

330
331
332
333
334
335
336
                # The total number of sequences in the RUNNING state should not
                # exceed the maximum number of sequences.
                num_new_seqs = seq_group.get_max_num_running_seqs()
                if (num_curr_seqs + num_new_seqs >
                        self.scheduler_config.max_num_seqs):
                    break

337
338
339
                if lora_int_id > 0:
                    curr_loras.add(lora_int_id)
                self.swapped.popleft()
340
341
342
343
344
                self._swap_in(seq_group, blocks_to_swap_in)
                self._append_slot(seq_group, blocks_to_copy)
                num_curr_seqs += num_new_seqs
                self.running.append(seq_group)

345
346
            self.swapped.extendleft(leftover_swapped)

347
348
349
        # Each sequence in the generation phase only takes one token slot.
        # Therefore, the number of batched tokens is equal to the number of
        # sequences in the RUNNING state.
350
351
        num_batched_tokens = sum(
            seq_group.num_seqs(status=SequenceStatus.RUNNING)
352
            for seq_group in self.running)
353

354
        scheduler_outputs = SchedulerOutputs(
Woosuk Kwon's avatar
Woosuk Kwon committed
355
356
357
            scheduled_seq_groups=self.running,
            prompt_run=False,
            num_batched_tokens=num_batched_tokens,
358
359
360
            blocks_to_swap_in=blocks_to_swap_in,
            blocks_to_swap_out=blocks_to_swap_out,
            blocks_to_copy=blocks_to_copy,
Woosuk Kwon's avatar
Woosuk Kwon committed
361
            ignored_seq_groups=[],
362
        )
Woosuk Kwon's avatar
Woosuk Kwon committed
363
        return scheduler_outputs
Woosuk Kwon's avatar
Woosuk Kwon committed
364

Woosuk Kwon's avatar
Woosuk Kwon committed
365
    def schedule(self) -> Tuple[List[SequenceGroupMetadata], SchedulerOutputs]:
366
367
368
        # Schedule sequence groups.
        # This function call changes the internal states of the scheduler
        # such as self.running, self.swapped, and self.waiting.
Woosuk Kwon's avatar
Woosuk Kwon committed
369
        scheduler_outputs = self._schedule()
370
        now = time.time()
371
372

        # Create input data structures.
373
        seq_group_metadata_list: List[SequenceGroupMetadata] = []
Woosuk Kwon's avatar
Woosuk Kwon committed
374
        for seq_group in scheduler_outputs.scheduled_seq_groups:
375
376
            seq_group.maybe_set_first_scheduled_time(now)

Light Lin's avatar
Light Lin committed
377
            seq_data: Dict[int, SequenceData] = {}
378
            block_tables: Dict[int, List[int]] = {}
379

380
            for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
381
                seq_id = seq.seq_id
382
                seq_data[seq_id] = seq.data
383
                block_tables[seq_id] = self.block_manager.get_block_table(seq)
384
                self.block_manager.access_all_blocks_in_seq(seq, now)
385

386
387
388
389
            common_computed_block_nums = (
                self.block_manager.get_common_computed_block_ids(
                    seq_group.get_seqs(status=SequenceStatus.RUNNING)))

390
            seq_group_metadata = SequenceGroupMetadata(
391
                request_id=seq_group.request_id,
Woosuk Kwon's avatar
Woosuk Kwon committed
392
                is_prompt=scheduler_outputs.prompt_run,
393
                seq_data=seq_data,
394
                sampling_params=seq_group.sampling_params,
395
                block_tables=block_tables,
396
                lora_request=seq_group.lora_request,
397
                computed_block_nums=common_computed_block_nums,
Nick Hill's avatar
Nick Hill committed
398
                state=seq_group.state,
399
400
401
402
403
404
                # `multi_modal_data` will only be present for the 1st comm
                # between engine and worker.
                # the subsequent comms can still use delta, but
                # `multi_modal_data` will be None.
                multi_modal_data=seq_group.multi_modal_data
                if scheduler_outputs.prompt_run else None,
405
            )
406
            seq_group_metadata_list.append(seq_group_metadata)
407
408
409
410
411
412
413
414

        # Now that the batch has been created, we can assume all blocks in the
        # batch will have been computed before the next scheduling invocation.
        # This is because the engine assumes that a failure in model execution
        # will crash the vLLM instance / will not retry.
        for seq_group in scheduler_outputs.scheduled_seq_groups:
            self.block_manager.mark_blocks_as_computed(seq_group)

Woosuk Kwon's avatar
Woosuk Kwon committed
415
        return seq_group_metadata_list, scheduler_outputs
416

417
418
    def fork_seq(self, parent_seq: Sequence, child_seq: Sequence) -> None:
        self.block_manager.fork(parent_seq, child_seq)
Woosuk Kwon's avatar
Woosuk Kwon committed
419

420
    def free_seq(self, seq: Sequence) -> None:
421
        self.block_manager.free(seq)
Woosuk Kwon's avatar
Woosuk Kwon committed
422

423
    def free_finished_seq_groups(self) -> None:
424
425
        self.running = deque(seq_group for seq_group in self.running
                             if not seq_group.is_finished())
Woosuk Kwon's avatar
Woosuk Kwon committed
426

427
428
    def _allocate(self, seq_group: SequenceGroup) -> None:
        self.block_manager.allocate(seq_group)
429
        for seq in seq_group.get_seqs(status=SequenceStatus.WAITING):
430
431
            seq.status = SequenceStatus.RUNNING

432
    def _append_slot(
433
434
435
436
437
        self,
        seq_group: SequenceGroup,
        blocks_to_copy: Dict[int, List[int]],
    ) -> None:
        for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
438
            ret = self.block_manager.append_slot(seq)
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
            if ret is not None:
                src_block, dst_block = ret
                if src_block in blocks_to_copy:
                    blocks_to_copy[src_block].append(dst_block)
                else:
                    blocks_to_copy[src_block] = [dst_block]

    def _preempt(
        self,
        seq_group: SequenceGroup,
        blocks_to_swap_out: Dict[int, int],
        preemption_mode: Optional[PreemptionMode] = None,
    ) -> None:
        # If preemption mode is not specified, we determine the mode as follows:
        # We use recomputation by default since it incurs lower overhead than
        # swapping. However, when the sequence group has multiple sequences
455
456
        # (e.g., beam search), recomputation is not currently supported. In
        # such a case, we use swapping instead.
457
458
459
460
461
462
463
        # FIXME(woosuk): This makes our scheduling policy a bit bizarre.
        # As swapped sequences are prioritized over waiting sequences,
        # sequence groups with multiple sequences are implicitly prioritized
        # over sequence groups with a single sequence.
        # TODO(woosuk): Support recomputation for sequence groups with multiple
        # sequences. This may require a more sophisticated CUDA kernel.
        if preemption_mode is None:
464
            if seq_group.get_max_num_running_seqs() == 1:
465
466
467
468
469
470
471
472
                preemption_mode = PreemptionMode.RECOMPUTE
            else:
                preemption_mode = PreemptionMode.SWAP
        if preemption_mode == PreemptionMode.RECOMPUTE:
            self._preempt_by_recompute(seq_group)
        elif preemption_mode == PreemptionMode.SWAP:
            self._preempt_by_swap(seq_group, blocks_to_swap_out)
        else:
473
            raise AssertionError("Invalid preemption mode.")
474
475
476
477
478
479
480
481
482
483

    def _preempt_by_recompute(
        self,
        seq_group: SequenceGroup,
    ) -> None:
        seqs = seq_group.get_seqs(status=SequenceStatus.RUNNING)
        assert len(seqs) == 1
        for seq in seqs:
            seq.status = SequenceStatus.WAITING
            self.block_manager.free(seq)
484
485
        # NOTE: For FCFS, we insert the preempted sequence group to the front
        # of the waiting queue.
486
        self.waiting.appendleft(seq_group)
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510

    def _preempt_by_swap(
        self,
        seq_group: SequenceGroup,
        blocks_to_swap_out: Dict[int, int],
    ) -> None:
        self._swap_out(seq_group, blocks_to_swap_out)
        self.swapped.append(seq_group)

    def _swap_in(
        self,
        seq_group: SequenceGroup,
        blocks_to_swap_in: Dict[int, int],
    ) -> None:
        mapping = self.block_manager.swap_in(seq_group)
        blocks_to_swap_in.update(mapping)
        for seq in seq_group.get_seqs(status=SequenceStatus.SWAPPED):
            seq.status = SequenceStatus.RUNNING

    def _swap_out(
        self,
        seq_group: SequenceGroup,
        blocks_to_swap_out: Dict[int, int],
    ) -> None:
511
512
513
514
515
516
        if not self.block_manager.can_swap_out(seq_group):
            # FIXME(woosuk): Abort the sequence group instead of aborting the
            # entire engine.
            raise RuntimeError(
                "Aborted due to the lack of CPU swap space. Please increase "
                "the swap space to avoid this error.")
517
518
519
520
        mapping = self.block_manager.swap_out(seq_group)
        blocks_to_swap_out.update(mapping)
        for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
            seq.status = SequenceStatus.SWAPPED
521

522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
    def _passed_delay(self, now: float) -> bool:
        if self.prev_prompt:
            self.last_prompt_latency = now - self.prev_time
        self.prev_time, self.prev_prompt = now, False
        # Delay scheduling prompts to let waiting queue fill up
        if self.scheduler_config.delay_factor > 0 and self.waiting:
            earliest_arrival_time = min(
                [e.metrics.arrival_time for e in self.waiting])
            passed_delay = (
                (now - earliest_arrival_time) >
                (self.scheduler_config.delay_factor * self.last_prompt_latency)
                or not self.running)
        else:
            passed_delay = True
        return passed_delay