request.py 8.8 KB
Newer Older
1
# SPDX-License-Identifier: Apache-2.0
2
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3

4
import enum
5
import time
6
from collections.abc import Mapping
7
8
from functools import partial
from typing import TYPE_CHECKING, Any, Callable, Optional, Union
9

10
from vllm.multimodal.inputs import MultiModalFeatureSpec
11
from vllm.pooling_params import PoolingParams
12
from vllm.sampling_params import SamplingParams
13
14
from vllm.v1.engine import (EngineCoreEvent, EngineCoreEventType,
                            EngineCoreRequest, FinishReason)
15
from vllm.v1.structured_output.request import StructuredOutputRequest
16
from vllm.v1.utils import ConstantList
17

18
if TYPE_CHECKING:
19
    from vllm.lora.request import LoRARequest
20
    from vllm.v1.core.kv_cache_utils import BlockHash
21

22
23
24
25
26
27

class Request:

    def __init__(
        self,
        request_id: str,
28
        prompt_token_ids: list[int],
29
30
        sampling_params: Optional[SamplingParams],
        pooling_params: Optional[PoolingParams],
31
        eos_token_id: Optional[int],
32
        client_index: int = 0,
33
        arrival_time: Optional[float] = None,
34
        mm_features: Optional[list[MultiModalFeatureSpec]] = None,
35
36
        lora_request: Optional["LoRARequest"] = None,
        structured_output_request: Optional["StructuredOutputRequest"] = None,
37
        cache_salt: Optional[str] = None,
38
        priority: int = 0,
39
        trace_headers: Optional[Mapping[str, str]] = None,
40
41
        block_hasher: Optional[Callable[["Request"],
                                        list["BlockHash"]]] = None,
42
43
    ) -> None:
        self.request_id = request_id
44
        self.client_index = client_index
45
        self.priority = priority
46
        self.sampling_params = sampling_params
47
        self.pooling_params = pooling_params
48
49
50
        # Because of LoRA, the eos token id can be different for each request.
        self.eos_token_id = eos_token_id
        self.lora_request = lora_request
51
        self.structured_output_request = structured_output_request
52
53
        self.arrival_time = arrival_time if arrival_time is not None else \
            time.time()
54

55
        self.status = RequestStatus.WAITING
56
        self.use_structured_output = False
57
        self.events: list[EngineCoreEvent] = []
58
        self.stop_reason: Union[int, str, None] = None
59
60
61
62
63

        # P/D: Connector-specific KV transfer parameters.
        self.kv_transfer_params: Optional[dict[str, Any]] = None

        if pooling_params is not None:
64
            # Pooling models.
65
66
            self.max_tokens = 1
        elif sampling_params is not None:
67
            # Generative models.
68
69
70
71
            assert sampling_params.max_tokens is not None
            self.max_tokens = sampling_params.max_tokens
            if sampling_params.guided_decoding is not None:
                self.status = RequestStatus.WAITING_FOR_FSM
72
                self.use_structured_output = True
73
74
75
76
77
78
79

            if sampling_params.extra_args is not None:
                self.kv_transfer_params = \
                    sampling_params.extra_args.get("kv_transfer_params")
        else:
            raise ValueError(
                "sampling_params and pooling_params can't both be unset")
80

81
        self.prompt_token_ids = prompt_token_ids
82
        self.num_prompt_tokens = len(self.prompt_token_ids)
83
84
        self._output_token_ids: list[int] = []
        self._all_token_ids: list[int] = self.prompt_token_ids.copy()
85
        self.num_output_placeholders = 0  # Used in async scheduling.
86
        self.spec_token_ids: list[int] = []
87
        self.num_computed_tokens = 0
88
        self.cache_salt: Optional[str] = cache_salt
89

90
        # Multi-modal related
91
92
        self.mm_features = mm_features or []
        self.num_encoder_inputs = len(self.mm_features)
93
        self.has_encoder_inputs = self.num_encoder_inputs > 0
94
95
96
97
98
        # TODO(sfeng33): Remove these legacy fields after clearing out all
        # references in scheduler and model runner
        self.mm_positions = [f.mm_position for f in self.mm_features]
        self.mm_kwargs = [f.data for f in self.mm_features]
        self.mm_hashes = [f.identifier for f in self.mm_features]
99

100
        # Read-only views
omahs's avatar
omahs committed
101
        # Prevent directly appending to these lists since
102
103
104
        # they should also be updated simultaneously.
        self.output_token_ids = ConstantList(self._output_token_ids)
        self.all_token_ids = ConstantList(self._all_token_ids)
105
106
        # trace_headers
        self.trace_headers = trace_headers
107
108
109
110
        # State
        # The number of tokens with prefix cache hits.
        self.num_cached_tokens = -1

111
112
113
114
        # The number of NaNs in logits. A value greater than 0
        # indicates that the output is corrupted
        self.num_nans_in_logits = 0

115
116
117
118
119
120
121
        self.block_hashes: list[BlockHash] = []
        self.get_hash_new_full_blocks: Optional[Callable[
            [], list[BlockHash]]] = None
        if block_hasher is not None:
            self.get_hash_new_full_blocks = partial(block_hasher, self)
            self.block_hashes = self.get_hash_new_full_blocks()

122
    @classmethod
123
124
125
126
    def from_engine_core_request(
        cls, request: EngineCoreRequest,
        block_hasher: Optional[Callable[["Request"], list["BlockHash"]]]
    ) -> "Request":
127
128
        return cls(
            request_id=request.request_id,
129
            client_index=request.client_index,
130
            prompt_token_ids=request.prompt_token_ids,
131
            mm_features=request.mm_features,
132
            sampling_params=request.sampling_params,
133
            pooling_params=request.pooling_params,
134
            eos_token_id=request.eos_token_id,
135
            arrival_time=request.arrival_time,
136
            lora_request=request.lora_request,
137
            structured_output_request=StructuredOutputRequest(
138
139
                sampling_params=request.sampling_params) \
                    if request.sampling_params else None,
140
            cache_salt=request.cache_salt,
141
            priority=request.priority,
142
            trace_headers=request.trace_headers,
143
            block_hasher=block_hasher,
144
145
        )

146
147
    def append_output_token_ids(
        self,
148
        token_ids: Union[int, list[int]],
149
150
    ) -> None:
        if isinstance(token_ids, int):
151
152
153
154
155
            self._output_token_ids.append(token_ids)
            self._all_token_ids.append(token_ids)
        else:
            self._output_token_ids.extend(token_ids)
            self._all_token_ids.extend(token_ids)
156

157
158
159
        if self.get_hash_new_full_blocks is not None:
            self.block_hashes.extend(self.get_hash_new_full_blocks())

160
161
162
163
    @property
    def is_output_corrupted(self) -> bool:
        return self.num_nans_in_logits > 0

164
165
    @property
    def num_tokens(self) -> int:
166
        return len(self._all_token_ids)
167

168
169
170
171
    @property
    def num_tokens_with_spec(self) -> int:
        return len(self._all_token_ids) + len(self.spec_token_ids)

172
173
    @property
    def num_output_tokens(self) -> int:
174
        return len(self._output_token_ids)
175
176
177
178

    def is_finished(self) -> bool:
        return RequestStatus.is_finished(self.status)

179
    def get_finished_reason(self) -> Union[FinishReason, None]:
180
181
        return RequestStatus.get_finished_reason(self.status)

182
183
    def get_num_encoder_tokens(self, input_id: int) -> int:
        assert input_id < len(self.mm_positions)
184
        num_tokens = self.mm_positions[input_id].length
185
186
        return num_tokens

187
188
189
190
191
192
193
194
195
196
197
198
199
    def record_event(
        self,
        event_type: EngineCoreEventType,
        timestamp: Optional[float] = None,
    ) -> None:
        self.events.append(EngineCoreEvent.new_event(event_type, timestamp))

    def take_events(self) -> Optional[list[EngineCoreEvent]]:
        if not self.events:
            return None
        events, self.events = self.events, []
        return events

200
201

class RequestStatus(enum.IntEnum):
202
    """Status of a request."""
203
204
    WAITING = enum.auto()
    WAITING_FOR_FSM = enum.auto()
Robert Shaw's avatar
Robert Shaw committed
205
    WAITING_FOR_REMOTE_KVS = enum.auto()
206
207
208
    RUNNING = enum.auto()
    PREEMPTED = enum.auto()
    # Note: anything after PREEMPTED will be considered
209
    # as a finished status.
210
211
212
213
    FINISHED_STOPPED = enum.auto()
    FINISHED_LENGTH_CAPPED = enum.auto()
    FINISHED_ABORTED = enum.auto()
    FINISHED_IGNORED = enum.auto()
214

215
216
217
    def __str__(self):
        return self.name

218
219
220
221
222
    @staticmethod
    def is_finished(status: "RequestStatus") -> bool:
        return status > RequestStatus.PREEMPTED

    @staticmethod
223
    def get_finished_reason(
224
            status: "RequestStatus") -> Union[FinishReason, None]:
225
226
227
228
        return _FINISHED_REASON_MAP.get(status)


# Mapping of finished statuses to their finish reasons.
229
# NOTE: The ignored requests are the requests whose prompt lengths
230
231
232
# are longer than the model's length cap. Therefore, the stop
# reason should also be "length" as in OpenAI API.
_FINISHED_REASON_MAP = {
233
234
235
236
    RequestStatus.FINISHED_STOPPED: FinishReason.STOP,
    RequestStatus.FINISHED_LENGTH_CAPPED: FinishReason.LENGTH,
    RequestStatus.FINISHED_ABORTED: FinishReason.ABORT,
    RequestStatus.FINISHED_IGNORED: FinishReason.LENGTH,
237
}