__init__.py 5.94 KB
Newer Older
1
# SPDX-License-Identifier: Apache-2.0
2
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3

4
import enum
5
import time
6
from collections.abc import Mapping
7
from typing import Any, Optional, Union
8
9

import msgspec
10
import torch
11

12
from vllm.lora.request import LoRARequest
13
from vllm.multimodal.inputs import MultiModalFeatureSpec
14
from vllm.pooling_params import PoolingParams
15
from vllm.sampling_params import SamplingParams
16
from vllm.v1.metrics.stats import SchedulerStats
17
from vllm.v1.outputs import LogprobsLists, LogprobsTensors
18

19
20
21
# These are possible values of RequestOutput.finish_reason,
# so form part of the external API.
FINISH_REASON_STRINGS = ("stop", "length", "abort")
22

23
24

class FinishReason(enum.IntEnum):
25
26
27
    """
    Reason a request finished - stop, length, or abort.

28
29
    Int rather than Str for more compact serialization.

30
31
32
33
34
35
36
37
38
39
    stop - a stop string was emitted
    length - max_tokens was consumed, or max_model_len was reached
    abort - aborted for another reason

    """
    STOP = 0
    LENGTH = 1
    ABORT = 2

    def __str__(self):
40
        return FINISH_REASON_STRINGS[self.value]
41
42


43
44
45
46
47
class EngineCoreRequest(
        msgspec.Struct,
        array_like=True,  # type: ignore[call-arg]
        omit_defaults=True,  # type: ignore[call-arg]
        gc=False):  # type: ignore[call-arg]
48
49

    request_id: str
50
    prompt_token_ids: Optional[list[int]]
51
    mm_features: Optional[list[MultiModalFeatureSpec]]
52
53
    sampling_params: Optional[SamplingParams]
    pooling_params: Optional[PoolingParams]
54
55
    eos_token_id: Optional[int]
    arrival_time: float
56
    lora_request: Optional[LoRARequest]
57
    cache_salt: Optional[str]
58
    data_parallel_rank: Optional[int]
59
    prompt_embeds: Optional[torch.Tensor] = None
60

61
62
63
64
    # Index of the client, used to ensure outputs are sent back to the same
    # client for this request when scaling out the front-end.
    client_index: int = 0

65
66
67
68
    # Used in DP case to indicate which wave of requests this is expected to
    # belong to, to cover a race condition where the request is sent before
    # a wave finished notification is received.
    current_wave: int = 0
69
    priority: int = 0
70

71
72
    trace_headers: Optional[Mapping[str, str]] = None

73

74
75
76
77
class EngineCoreEventType(enum.IntEnum):
    """The type of engine core request event."""
    QUEUED = 1
    SCHEDULED = 2
78
    PREEMPTED = 3
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98


class EngineCoreEvent(msgspec.Struct):
    """A timestamped engine core event associated with a request.

    The timestamp is a monotonic timestamps and is used for by the engine
    frontend to calculate intervals between engine core events. These
    timestamps should not be compared with timestamps from other processes.
    """
    type: EngineCoreEventType
    timestamp: float

    @classmethod
    def new_event(cls,
                  event_type: EngineCoreEventType,
                  timestamp: Optional[float] = None) -> "EngineCoreEvent":
        timestamp = time.monotonic() if timestamp is None else timestamp
        return cls(event_type, timestamp)


99
100
101
102
103
class EngineCoreOutput(
        msgspec.Struct,
        array_like=True,  # type: ignore[call-arg]
        omit_defaults=True,  # type: ignore[call-arg]
        gc=False):  # type: ignore[call-arg]
104
105

    request_id: str
106
    new_token_ids: list[int]
107
108
109
110

    new_logprobs: Optional[LogprobsLists] = None
    new_prompt_logprobs_tensors: Optional[LogprobsTensors] = None

111
112
    pooling_output: Optional[torch.Tensor] = None

113
    finish_reason: Optional[FinishReason] = None
114
    stop_reason: Union[int, str, None] = None
115
    events: Optional[list[EngineCoreEvent]] = None
Robert Shaw's avatar
Robert Shaw committed
116
    kv_transfer_params: Optional[dict[str, Any]] = None
117

118
    trace_headers: Optional[Mapping[str, str]] = None
119
120
121
    # The number of tokens with prefix cache hits.
    num_cached_tokens: int = 0

122
123
124
125
    @property
    def finished(self) -> bool:
        return self.finish_reason is not None

126

127
128
129
130
131
132
133
class UtilityResult:
    """Wrapper for special handling when serializing/deserializing."""

    def __init__(self, r: Any = None):
        self.result = r


134
135
136
137
138
139
140
141
142
class UtilityOutput(
        msgspec.Struct,
        array_like=True,  # type: ignore[call-arg]
        gc=False):  # type: ignore[call-arg]

    call_id: int

    # Non-None implies the call failed, result should be None.
    failure_message: Optional[str] = None
143
    result: Optional[UtilityResult] = None
144
145


146
147
148
149
150
class EngineCoreOutputs(
        msgspec.Struct,
        array_like=True,  # type: ignore[call-arg]
        omit_defaults=True,  # type: ignore[call-arg]
        gc=False):  # type: ignore[call-arg]
151

152
    # NOTE(Nick): We could consider ways to make this more compact,
153
    # e.g. columnwise layout
154

155
156
    engine_index: int = 0

157
    # [num_reqs]
158
    outputs: list[EngineCoreOutput] = []
159
    scheduler_stats: Optional[SchedulerStats] = None
160
161
    timestamp: float = 0.0

162
    utility_output: Optional[UtilityOutput] = None
163
164
    finished_requests: Optional[set[str]] = None

165
166
167
168
169
170
    # In DP case, used to signal that the current wave of requests
    # has finished and the engines are paused.
    wave_complete: Optional[int] = None
    # In DP case, used to signal that a request was received for an
    # "old" wave, so the next wave needs to be started in other engines.
    start_wave: Optional[int] = None
171

172
173
174
    def __post_init__(self):
        if self.timestamp == 0.0:
            self.timestamp = time.monotonic()
175
176
177
178
179
180
181
182
183


class EngineCoreRequestType(enum.Enum):
    """
    Request types defined as hex byte strings, so it can be sent over sockets
    without separate encoding step.
    """
    ADD = b'\x00'
    ABORT = b'\x01'
184
    START_DP_WAVE = b'\x02'
185
    UTILITY = b'\x03'
186
187
    # Sentinel used within EngineCoreProc.
    EXECUTOR_FAILED = b'\x04'
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203


class ReconfigureDistributedRequest(msgspec.Struct):
    new_data_parallel_size: int
    new_data_parallel_rank: int
    new_data_parallel_rank_local: int
    new_data_parallel_master_ip: str
    new_data_parallel_master_port: int


class ReconfigureRankType(enum.IntEnum):
    """
    Rank type for reconfiguring distributed request.
    """
    KEEP_CURRENT_RANK = -1
    SHUTDOWN_CURRENT_RANK = -2