scheduler.py 21.8 KB
Newer Older
1
from collections import deque
2
3
import enum
import time
4
from typing import Deque, Dict, Iterable, List, Optional, Tuple, Union, Set
Woosuk Kwon's avatar
Woosuk Kwon committed
5

6
from vllm.config import CacheConfig, LoRAConfig, SchedulerConfig
7
from vllm.core.block_manager import AllocStatus, BlockSpaceManager
Woosuk Kwon's avatar
Woosuk Kwon committed
8
from vllm.core.policy import PolicyFactory
9
from vllm.lora.request import LoRARequest
Woosuk Kwon's avatar
Woosuk Kwon committed
10
11
from vllm.logger import init_logger
from vllm.sequence import (Sequence, SequenceData, SequenceGroup,
12
                           SequenceGroupMetadata, SequenceStatus)
Woosuk Kwon's avatar
Woosuk Kwon committed
13

Woosuk Kwon's avatar
Woosuk Kwon committed
14
logger = init_logger(__name__)
15

Woosuk Kwon's avatar
Woosuk Kwon committed
16

17
18
19
20
21
22
23
24
25
26
27
28
29
class PreemptionMode(enum.Enum):
    """Preemption modes.

    1. Swapping: Swap out the blocks of the preempted sequences to CPU memory
    and swap them back in when the sequences are resumed.
    2. Recomputation: Discard the blocks of the preempted sequences and
    recompute them when the sequences are resumed, treating the sequences as
    new prompts.
    """
    SWAP = enum.auto()
    RECOMPUTE = enum.auto()


30
31
32
33
class SchedulerOutputs:

    def __init__(
        self,
34
        scheduled_seq_groups: Iterable[SequenceGroup],
Woosuk Kwon's avatar
Woosuk Kwon committed
35
36
        prompt_run: bool,
        num_batched_tokens: int,
37
38
39
        blocks_to_swap_in: Dict[int, int],
        blocks_to_swap_out: Dict[int, int],
        blocks_to_copy: Dict[int, List[int]],
Woosuk Kwon's avatar
Woosuk Kwon committed
40
        ignored_seq_groups: List[SequenceGroup],
41
    ) -> None:
Woosuk Kwon's avatar
Woosuk Kwon committed
42
43
44
        self.scheduled_seq_groups = scheduled_seq_groups
        self.prompt_run = prompt_run
        self.num_batched_tokens = num_batched_tokens
45
46
47
48
49
        self.blocks_to_swap_in = blocks_to_swap_in
        self.blocks_to_swap_out = blocks_to_swap_out
        self.blocks_to_copy = blocks_to_copy
        # Swap in and swap out should never happen at the same time.
        assert not (blocks_to_swap_in and blocks_to_swap_out)
Woosuk Kwon's avatar
Woosuk Kwon committed
50
        self.ignored_seq_groups = ignored_seq_groups
51

52
53
54
55
        self.num_loras = len(self.lora_requests)
        if self.num_loras > 0:
            self._sort_by_lora_ids()

56
    def is_empty(self) -> bool:
Woosuk Kwon's avatar
Woosuk Kwon committed
57
58
59
        # NOTE: We do not consider the ignored sequence groups.
        return (not self.scheduled_seq_groups and not self.blocks_to_swap_in
                and not self.blocks_to_swap_out and not self.blocks_to_copy)
60

61
    def _sort_by_lora_ids(self) -> bool:
62
63
64
        self.scheduled_seq_groups = sorted(self.scheduled_seq_groups,
                                           key=lambda g:
                                           (g.lora_int_id, g.request_id))
65
66
67
68
69

    @property
    def lora_requests(self) -> Set[LoRARequest]:
        return {g.lora_request for g in self.scheduled_seq_groups}

70

Woosuk Kwon's avatar
Woosuk Kwon committed
71
72
class Scheduler:

Woosuk Kwon's avatar
Woosuk Kwon committed
73
    def __init__(
Woosuk Kwon's avatar
Woosuk Kwon committed
74
        self,
75
76
        scheduler_config: SchedulerConfig,
        cache_config: CacheConfig,
77
        lora_config: Optional[LoRAConfig],
Woosuk Kwon's avatar
Woosuk Kwon committed
78
    ) -> None:
79
80
        self.scheduler_config = scheduler_config
        self.cache_config = cache_config
81
82
83
84
        # Note for LoRA scheduling: the current policy is extremely
        # simple and NOT fair. It can lead to starvation of some
        # LoRAs. This should be improved in the future.
        self.lora_config = lora_config
Woosuk Kwon's avatar
Woosuk Kwon committed
85

86
87
88
        self.prompt_limit = min(self.scheduler_config.max_model_len,
                                self.scheduler_config.max_num_batched_tokens)

89
        # Instantiate the scheduling policy.
90
        self.policy = PolicyFactory.get_policy(policy_name="fcfs")
Woosuk Kwon's avatar
Woosuk Kwon committed
91
        # Create the block space manager.
Woosuk Kwon's avatar
Woosuk Kwon committed
92
        self.block_manager = BlockSpaceManager(
93
94
95
            block_size=self.cache_config.block_size,
            num_gpu_blocks=self.cache_config.num_gpu_blocks,
            num_cpu_blocks=self.cache_config.num_cpu_blocks,
96
97
            sliding_window=self.cache_config.sliding_window,
            enable_caching=self.cache_config.enable_prefix_caching)
98

99
        # Sequence groups in the WAITING state.
100
        self.waiting: Deque[SequenceGroup] = deque()
101
        # Sequence groups in the RUNNING state.
102
        self.running: Deque[SequenceGroup] = deque()
103
        # Sequence groups in the SWAPPED state.
104
        self.swapped: Deque[SequenceGroup] = deque()
Woosuk Kwon's avatar
Woosuk Kwon committed
105

106
107
108
109
110
111
112
        # Time at previous scheduling step
        self.prev_time = 0.0
        # Did we schedule a prompt at previous step?
        self.prev_prompt = False
        # Latency of the last prompt step
        self.last_prompt_latency = 0.0

113
114
115
116
    @property
    def lora_enabled(self) -> bool:
        return bool(self.lora_config)

117
    def add_seq_group(self, seq_group: SequenceGroup) -> None:
118
        # Add sequence groups to the waiting queue.
119
        self.waiting.append(seq_group)
Woosuk Kwon's avatar
Woosuk Kwon committed
120

Antoni Baum's avatar
Antoni Baum committed
121
    def abort_seq_group(self, request_id: Union[str, Iterable[str]]) -> None:
122
123
124
125
126
127
128
129
130
131
132
133
        """Aborts a sequence group with the given ID.

        Check if the sequence group with the given ID
            is present in any of the state queue.
        If present, remove the sequence group from the state queue.
            Also, if any of the sequences in the sequence group is not finished,
                free the sequence with status `FINISHED_ABORTED`.
        Otherwise, do nothing.

        Args:
            request_id: The ID(s) of the sequence group to abort.
        """
Antoni Baum's avatar
Antoni Baum committed
134
135
136
        if isinstance(request_id, str):
            request_id = (request_id, )
        request_ids = set(request_id)
137
        for state_queue in [self.waiting, self.running, self.swapped]:
ljss's avatar
ljss committed
138
            aborted_groups: List[SequenceGroup] = []
139
140
141
142
143
            for seq_group in state_queue:
                if not request_ids:
                    # Using 'break' here may add two extra iterations,
                    # but is acceptable to reduce complexity .
                    break
Antoni Baum's avatar
Antoni Baum committed
144
                if seq_group.request_id in request_ids:
145
146
                    # Appending aborted group into pending list.
                    aborted_groups.append(seq_group)
Antoni Baum's avatar
Antoni Baum committed
147
                    request_ids.remove(seq_group.request_id)
148
149
150
            for aborted_group in aborted_groups:
                # Remove the sequence group from the state queue.
                state_queue.remove(aborted_group)
ljss's avatar
ljss committed
151
                for seq in aborted_group.get_seqs():
152
153
154
155
                    if seq.is_finished():
                        continue
                    seq.status = SequenceStatus.FINISHED_ABORTED
                    self.free_seq(seq)
156

157
158
159
    def has_unfinished_seqs(self) -> bool:
        return self.waiting or self.running or self.swapped

160
161
162
    def get_num_unfinished_seq_groups(self) -> int:
        return len(self.waiting) + len(self.running) + len(self.swapped)

Woosuk Kwon's avatar
Woosuk Kwon committed
163
    def _schedule(self) -> SchedulerOutputs:
164
        # Blocks that need to be swapped or copied before model execution.
165
166
        blocks_to_swap_in: Dict[int, int] = {}
        blocks_to_swap_out: Dict[int, int] = {}
167
        blocks_to_copy: Dict[int, List[int]] = {}
168

169
        # Fix the current time.
170
        now = time.time()
171

Woosuk Kwon's avatar
Woosuk Kwon committed
172
173
174
175
        # Join waiting sequences if possible.
        if not self.swapped:
            ignored_seq_groups: List[SequenceGroup] = []
            scheduled: List[SequenceGroup] = []
176
177
178
179
            # The total number of sequences on the fly, including the
            # requests in the generation phase.
            num_curr_seqs = sum(seq_group.get_max_num_running_seqs()
                                for seq_group in self.running)
180
181
182
            curr_loras = set(
                seq_group.lora_int_id
                for seq_group in self.running) if self.lora_enabled else None
183

Woosuk Kwon's avatar
Woosuk Kwon committed
184
185
186
            # Optimization: We do not sort the waiting queue since the preempted
            # sequence groups are added to the front and the new sequence groups
            # are added to the back.
187
            leftover_waiting_sequences = deque()
188
            num_batched_tokens = 0
189
            while self._passed_delay(now) and self.waiting:
Woosuk Kwon's avatar
Woosuk Kwon committed
190
                seq_group = self.waiting[0]
191
192
193
                waiting_seqs = seq_group.get_seqs(
                    status=SequenceStatus.WAITING)
                assert len(waiting_seqs) == 1, (
194
195
                    "Waiting sequence group should have only one prompt "
                    "sequence.")
196
                num_prompt_tokens = waiting_seqs[0].get_len()
197
                if num_prompt_tokens > self.prompt_limit:
Woosuk Kwon's avatar
Woosuk Kwon committed
198
199
                    logger.warning(
                        f"Input prompt ({num_prompt_tokens} tokens) is too long"
200
                        f" and exceeds limit of {self.prompt_limit}")
201
                    for seq in waiting_seqs:
Woosuk Kwon's avatar
Woosuk Kwon committed
202
203
                        seq.status = SequenceStatus.FINISHED_IGNORED
                    ignored_seq_groups.append(seq_group)
204
                    self.waiting.popleft()
205
                    continue
Woosuk Kwon's avatar
Woosuk Kwon committed
206
207

                # If the sequence group cannot be allocated, stop.
208
209
                can_allocate = self.block_manager.can_allocate(seq_group)
                if can_allocate == AllocStatus.LATER:
Woosuk Kwon's avatar
Woosuk Kwon committed
210
                    break
211
212
213
214
                elif can_allocate == AllocStatus.NEVER:
                    logger.warning(
                        f"Input prompt ({num_prompt_tokens} tokens) is too long"
                        f" and exceeds the capacity of block_manager")
215
                    for seq in waiting_seqs:
216
217
                        seq.status = SequenceStatus.FINISHED_IGNORED
                    ignored_seq_groups.append(seq_group)
218
                    self.waiting.popleft()
219
                    continue
Woosuk Kwon's avatar
Woosuk Kwon committed
220

221
222
223
                lora_int_id = 0
                if self.lora_enabled:
                    lora_int_id = seq_group.lora_int_id
224
225
                    if (lora_int_id > 0 and lora_int_id not in curr_loras
                            and len(curr_loras) >= self.lora_config.max_loras):
226
227
228
229
230
231
                        # We don't have a space for another LoRA, so
                        # we ignore this request for now.
                        leftover_waiting_sequences.appendleft(seq_group)
                        self.waiting.popleft()
                        continue

Woosuk Kwon's avatar
Woosuk Kwon committed
232
                # If the number of batched tokens exceeds the limit, stop.
233
                num_batched_tokens += num_prompt_tokens
234
                if (num_batched_tokens >
Woosuk Kwon's avatar
Woosuk Kwon committed
235
236
237
238
239
                        self.scheduler_config.max_num_batched_tokens):
                    break

                # The total number of sequences in the RUNNING state should not
                # exceed the maximum number of sequences.
240
                num_new_seqs = seq_group.get_max_num_running_seqs()
Woosuk Kwon's avatar
Woosuk Kwon committed
241
242
243
244
                if (num_curr_seqs + num_new_seqs >
                        self.scheduler_config.max_num_seqs):
                    break

245
246
247
                if lora_int_id > 0:
                    curr_loras.add(lora_int_id)
                self.waiting.popleft()
Woosuk Kwon's avatar
Woosuk Kwon committed
248
249
                self._allocate(seq_group)
                self.running.append(seq_group)
250
                num_curr_seqs += num_new_seqs
Woosuk Kwon's avatar
Woosuk Kwon committed
251
252
                scheduled.append(seq_group)

253
254
            self.waiting.extendleft(leftover_waiting_sequences)

255
            if scheduled or ignored_seq_groups:
256
                self.prev_prompt = True
Woosuk Kwon's avatar
Woosuk Kwon committed
257
258
259
                scheduler_outputs = SchedulerOutputs(
                    scheduled_seq_groups=scheduled,
                    prompt_run=True,
260
                    num_batched_tokens=num_batched_tokens,
Woosuk Kwon's avatar
Woosuk Kwon committed
261
262
263
264
265
266
267
268
269
                    blocks_to_swap_in=blocks_to_swap_in,
                    blocks_to_swap_out=blocks_to_swap_out,
                    blocks_to_copy=blocks_to_copy,
                    ignored_seq_groups=ignored_seq_groups,
                )
                return scheduler_outputs

        # NOTE(woosuk): Preemption happens only when there is no available slot
        # to keep all the sequence groups in the RUNNING state.
270
271
272
273
274
        # In this case, the policy is responsible for deciding which sequence
        # groups to preempt.
        self.running = self.policy.sort_by_priority(now, self.running)

        # Reserve new token slots for the running sequence groups.
275
        running: Deque[SequenceGroup] = deque()
276
277
        preempted: List[SequenceGroup] = []
        while self.running:
278
            seq_group = self.running.popleft()
279
            while not self.block_manager.can_append_slot(seq_group):
280
281
                if self.running:
                    # Preempt the lowest-priority sequence groups.
282
                    victim_seq_group = self.running.pop()
283
284
285
286
287
288
289
                    self._preempt(victim_seq_group, blocks_to_swap_out)
                    preempted.append(victim_seq_group)
                else:
                    # No other sequence groups can be preempted.
                    # Preempt the current sequence group.
                    self._preempt(seq_group, blocks_to_swap_out)
                    preempted.append(seq_group)
Woosuk Kwon's avatar
Woosuk Kwon committed
290
291
                    break
            else:
292
                # Append new slots to the sequence group.
293
                self._append_slot(seq_group, blocks_to_copy)
294
295
296
297
298
                running.append(seq_group)
        self.running = running

        # Swap in the sequence groups in the SWAPPED state if possible.
        self.swapped = self.policy.sort_by_priority(now, self.swapped)
299
300
301
        if not preempted:
            num_curr_seqs = sum(seq_group.get_max_num_running_seqs()
                                for seq_group in self.running)
302
303
304
305
306
            curr_loras = set(
                seq_group.lora_int_id
                for seq_group in self.running) if self.lora_enabled else None

            leftover_swapped = deque()
307
308
309

            while self.swapped:
                seq_group = self.swapped[0]
310
311
312
                lora_int_id = 0
                if self.lora_enabled:
                    lora_int_id = seq_group.lora_int_id
313
314
                    if (lora_int_id > 0 and lora_int_id not in curr_loras
                            and len(curr_loras) >= self.lora_config.max_loras):
315
316
317
318
319
320
                        # We don't have a space for another LoRA, so
                        # we ignore this request for now.
                        leftover_swapped.appendleft(seq_group)
                        self.swapped.popleft()
                        continue

321
322
323
                # If the sequence group cannot be swapped in, stop.
                if not self.block_manager.can_swap_in(seq_group):
                    break
324

325
326
327
328
329
330
331
                # The total number of sequences in the RUNNING state should not
                # exceed the maximum number of sequences.
                num_new_seqs = seq_group.get_max_num_running_seqs()
                if (num_curr_seqs + num_new_seqs >
                        self.scheduler_config.max_num_seqs):
                    break

332
333
334
                if lora_int_id > 0:
                    curr_loras.add(lora_int_id)
                self.swapped.popleft()
335
336
337
338
339
                self._swap_in(seq_group, blocks_to_swap_in)
                self._append_slot(seq_group, blocks_to_copy)
                num_curr_seqs += num_new_seqs
                self.running.append(seq_group)

340
341
            self.swapped.extendleft(leftover_swapped)

342
343
344
        # Each sequence in the generation phase only takes one token slot.
        # Therefore, the number of batched tokens is equal to the number of
        # sequences in the RUNNING state.
345
346
        num_batched_tokens = sum(
            seq_group.num_seqs(status=SequenceStatus.RUNNING)
347
            for seq_group in self.running)
348

349
        scheduler_outputs = SchedulerOutputs(
Woosuk Kwon's avatar
Woosuk Kwon committed
350
351
352
            scheduled_seq_groups=self.running,
            prompt_run=False,
            num_batched_tokens=num_batched_tokens,
353
354
355
            blocks_to_swap_in=blocks_to_swap_in,
            blocks_to_swap_out=blocks_to_swap_out,
            blocks_to_copy=blocks_to_copy,
Woosuk Kwon's avatar
Woosuk Kwon committed
356
            ignored_seq_groups=[],
357
        )
Woosuk Kwon's avatar
Woosuk Kwon committed
358
        return scheduler_outputs
Woosuk Kwon's avatar
Woosuk Kwon committed
359

Woosuk Kwon's avatar
Woosuk Kwon committed
360
    def schedule(self) -> Tuple[List[SequenceGroupMetadata], SchedulerOutputs]:
361
362
363
        # Schedule sequence groups.
        # This function call changes the internal states of the scheduler
        # such as self.running, self.swapped, and self.waiting.
Woosuk Kwon's avatar
Woosuk Kwon committed
364
        scheduler_outputs = self._schedule()
365
        now = time.time()
366
367

        # Create input data structures.
368
        seq_group_metadata_list: List[SequenceGroupMetadata] = []
Woosuk Kwon's avatar
Woosuk Kwon committed
369
        for seq_group in scheduler_outputs.scheduled_seq_groups:
370
371
            seq_group.maybe_set_first_scheduled_time(now)

Light Lin's avatar
Light Lin committed
372
            seq_data: Dict[int, SequenceData] = {}
373
            block_tables: Dict[int, List[int]] = {}
374

375
            for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
376
                seq_id = seq.seq_id
377
                seq_data[seq_id] = seq.data
378
                block_tables[seq_id] = self.block_manager.get_block_table(seq)
379
                self.block_manager.access_all_blocks_in_seq(seq, now)
380

381
            seq_group_metadata = SequenceGroupMetadata(
382
                request_id=seq_group.request_id,
Woosuk Kwon's avatar
Woosuk Kwon committed
383
                is_prompt=scheduler_outputs.prompt_run,
384
                seq_data=seq_data,
385
                sampling_params=seq_group.sampling_params,
386
                block_tables=block_tables,
387
                lora_request=seq_group.lora_request,
388
389
                computed_block_nums=self.block_manager.
                get_common_computed_block_ids(seq_group),
Nick Hill's avatar
Nick Hill committed
390
                state=seq_group.state,
391
            )
392
            seq_group_metadata_list.append(seq_group_metadata)
Woosuk Kwon's avatar
Woosuk Kwon committed
393
        return seq_group_metadata_list, scheduler_outputs
394

395
396
    def fork_seq(self, parent_seq: Sequence, child_seq: Sequence) -> None:
        self.block_manager.fork(parent_seq, child_seq)
Woosuk Kwon's avatar
Woosuk Kwon committed
397

398
    def free_seq(self, seq: Sequence) -> None:
399
        self.block_manager.free(seq)
Woosuk Kwon's avatar
Woosuk Kwon committed
400

401
    def free_finished_seq_groups(self) -> None:
402
403
        self.running = deque(seq_group for seq_group in self.running
                             if not seq_group.is_finished())
Woosuk Kwon's avatar
Woosuk Kwon committed
404

405
406
    def _allocate(self, seq_group: SequenceGroup) -> None:
        self.block_manager.allocate(seq_group)
407
        for seq in seq_group.get_seqs(status=SequenceStatus.WAITING):
408
409
            seq.status = SequenceStatus.RUNNING

410
    def _append_slot(
411
412
413
414
415
        self,
        seq_group: SequenceGroup,
        blocks_to_copy: Dict[int, List[int]],
    ) -> None:
        for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
416
            ret = self.block_manager.append_slot(seq)
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
            if ret is not None:
                src_block, dst_block = ret
                if src_block in blocks_to_copy:
                    blocks_to_copy[src_block].append(dst_block)
                else:
                    blocks_to_copy[src_block] = [dst_block]

    def _preempt(
        self,
        seq_group: SequenceGroup,
        blocks_to_swap_out: Dict[int, int],
        preemption_mode: Optional[PreemptionMode] = None,
    ) -> None:
        # If preemption mode is not specified, we determine the mode as follows:
        # We use recomputation by default since it incurs lower overhead than
        # swapping. However, when the sequence group has multiple sequences
433
434
        # (e.g., beam search), recomputation is not currently supported. In
        # such a case, we use swapping instead.
435
436
437
438
439
440
441
        # FIXME(woosuk): This makes our scheduling policy a bit bizarre.
        # As swapped sequences are prioritized over waiting sequences,
        # sequence groups with multiple sequences are implicitly prioritized
        # over sequence groups with a single sequence.
        # TODO(woosuk): Support recomputation for sequence groups with multiple
        # sequences. This may require a more sophisticated CUDA kernel.
        if preemption_mode is None:
442
            if seq_group.get_max_num_running_seqs() == 1:
443
444
445
446
447
448
449
450
                preemption_mode = PreemptionMode.RECOMPUTE
            else:
                preemption_mode = PreemptionMode.SWAP
        if preemption_mode == PreemptionMode.RECOMPUTE:
            self._preempt_by_recompute(seq_group)
        elif preemption_mode == PreemptionMode.SWAP:
            self._preempt_by_swap(seq_group, blocks_to_swap_out)
        else:
451
            raise AssertionError("Invalid preemption mode.")
452
453
454
455
456
457
458
459
460
461

    def _preempt_by_recompute(
        self,
        seq_group: SequenceGroup,
    ) -> None:
        seqs = seq_group.get_seqs(status=SequenceStatus.RUNNING)
        assert len(seqs) == 1
        for seq in seqs:
            seq.status = SequenceStatus.WAITING
            self.block_manager.free(seq)
462
463
        # NOTE: For FCFS, we insert the preempted sequence group to the front
        # of the waiting queue.
464
        self.waiting.appendleft(seq_group)
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488

    def _preempt_by_swap(
        self,
        seq_group: SequenceGroup,
        blocks_to_swap_out: Dict[int, int],
    ) -> None:
        self._swap_out(seq_group, blocks_to_swap_out)
        self.swapped.append(seq_group)

    def _swap_in(
        self,
        seq_group: SequenceGroup,
        blocks_to_swap_in: Dict[int, int],
    ) -> None:
        mapping = self.block_manager.swap_in(seq_group)
        blocks_to_swap_in.update(mapping)
        for seq in seq_group.get_seqs(status=SequenceStatus.SWAPPED):
            seq.status = SequenceStatus.RUNNING

    def _swap_out(
        self,
        seq_group: SequenceGroup,
        blocks_to_swap_out: Dict[int, int],
    ) -> None:
489
490
491
492
493
494
        if not self.block_manager.can_swap_out(seq_group):
            # FIXME(woosuk): Abort the sequence group instead of aborting the
            # entire engine.
            raise RuntimeError(
                "Aborted due to the lack of CPU swap space. Please increase "
                "the swap space to avoid this error.")
495
496
497
498
        mapping = self.block_manager.swap_out(seq_group)
        blocks_to_swap_out.update(mapping)
        for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):
            seq.status = SequenceStatus.SWAPPED
499
500
501

    def mark_blocks_as_computed(self, seq_group: SequenceGroup):
        self.block_manager.mark_blocks_as_computed(seq_group)
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517

    def _passed_delay(self, now: float) -> bool:
        if self.prev_prompt:
            self.last_prompt_latency = now - self.prev_time
        self.prev_time, self.prev_prompt = now, False
        # Delay scheduling prompts to let waiting queue fill up
        if self.scheduler_config.delay_factor > 0 and self.waiting:
            earliest_arrival_time = min(
                [e.metrics.arrival_time for e in self.waiting])
            passed_delay = (
                (now - earliest_arrival_time) >
                (self.scheduler_config.delay_factor * self.last_prompt_latency)
                or not self.running)
        else:
            passed_delay = True
        return passed_delay