"vscode:/vscode.git/clone" did not exist on "e24e0a43a4f6138c8cf611f880c3609e3edf7624"
request.py 9.13 KB
Newer Older
1
# SPDX-License-Identifier: Apache-2.0
2
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3

4
import enum
5
import time
6
from collections.abc import Callable, Mapping
7
from functools import partial
8
from typing import TYPE_CHECKING, Any, Optional
9

10
11
import torch

12
from vllm.multimodal.inputs import MultiModalFeatureSpec
13
from vllm.pooling_params import PoolingParams
14
from vllm.sampling_params import SamplingParams
15
from vllm.utils import length_from_prompt_token_ids_or_embeds
16
17
18
19
20
21
from vllm.v1.engine import (
    EngineCoreEvent,
    EngineCoreEventType,
    EngineCoreRequest,
    FinishReason,
)
22
from vllm.v1.structured_output.request import StructuredOutputRequest
23
from vllm.v1.utils import ConstantList
24

25
if TYPE_CHECKING:
26
    from vllm.lora.request import LoRARequest
27
    from vllm.v1.core.kv_cache_utils import BlockHash
28

29
30
31
32
33

class Request:
    def __init__(
        self,
        request_id: str,
34
35
36
37
        prompt_token_ids: list[int] | None,
        sampling_params: SamplingParams | None,
        pooling_params: PoolingParams | None,
        eos_token_id: int | None,
38
        client_index: int = 0,
39
40
41
        arrival_time: float | None = None,
        prompt_embeds: torch.Tensor | None = None,
        mm_features: list[MultiModalFeatureSpec] | None = None,
42
        lora_request: Optional["LoRARequest"] = None,
43
        cache_salt: str | None = None,
44
        priority: int = 0,
45
46
        trace_headers: Mapping[str, str] | None = None,
        block_hasher: Callable[["Request"], list["BlockHash"]] | None = None,
47
48
    ) -> None:
        self.request_id = request_id
49
        self.client_index = client_index
50
        self.priority = priority
51
        self.sampling_params = sampling_params
52
        self.pooling_params = pooling_params
53
54
55
        # Because of LoRA, the eos token id can be different for each request.
        self.eos_token_id = eos_token_id
        self.lora_request = lora_request
56
57
58
        self.structured_output_request = StructuredOutputRequest.from_sampling_params(
            sampling_params
        )
59
        self.arrival_time = arrival_time if arrival_time is not None else time.time()
60

61
        self.status = RequestStatus.WAITING
62
        self.events: list[EngineCoreEvent] = []
63
        self.stop_reason: int | str | None = None
64
65

        # P/D: Connector-specific KV transfer parameters.
66
        self.kv_transfer_params: dict[str, Any] | None = None
67
68

        if pooling_params is not None:
69
            # Pooling models.
70
71
            self.max_tokens = 1
        elif sampling_params is not None:
72
            # Generative models.
73
74
            assert sampling_params.max_tokens is not None
            self.max_tokens = sampling_params.max_tokens
75
            if self.structured_output_request is not None:
76
77
78
                self.status = RequestStatus.WAITING_FOR_FSM

            if sampling_params.extra_args is not None:
79
80
81
                self.kv_transfer_params = sampling_params.extra_args.get(
                    "kv_transfer_params"
                )
82
        else:
83
            raise ValueError("sampling_params and pooling_params can't both be unset")
84

85
        self.prompt_token_ids = prompt_token_ids
86
87
        self.prompt_embeds = prompt_embeds
        self.num_prompt_tokens = length_from_prompt_token_ids_or_embeds(
88
89
            prompt_token_ids, prompt_embeds
        )
90
        self._output_token_ids: list[int] = []
91
92
93
94
95
        self._all_token_ids: list[int] = (
            self.prompt_token_ids.copy()
            if self.prompt_token_ids is not None
            else [0] * self.num_prompt_tokens
        )
96
        self.num_output_placeholders = 0  # Used in async scheduling.
97
        self.spec_token_ids: list[int] = []
98
        self.num_computed_tokens = 0
99
        self.cache_salt: str | None = cache_salt
100

101
        # Multi-modal related
102
103
        self.mm_features = mm_features or []
        self.num_encoder_inputs = len(self.mm_features)
104
        self.has_encoder_inputs = self.num_encoder_inputs > 0
105

106
        # Read-only views
omahs's avatar
omahs committed
107
        # Prevent directly appending to these lists since
108
109
110
        # they should also be updated simultaneously.
        self.output_token_ids = ConstantList(self._output_token_ids)
        self.all_token_ids = ConstantList(self._all_token_ids)
111
112
        # trace_headers
        self.trace_headers = trace_headers
113
114
115
116
        # State
        # The number of tokens with prefix cache hits.
        self.num_cached_tokens = -1

117
118
119
120
        # The number of NaNs in logits. A value greater than 0
        # indicates that the output is corrupted
        self.num_nans_in_logits = 0

121
122
123
        # The number of requests being preempted by the scheduler
        self.num_preemptions = 0

124
        self.block_hashes: list[BlockHash] = []
125
        self.get_hash_new_full_blocks: Callable[[], list[BlockHash]] | None = None
126
127
128
129
        if block_hasher is not None:
            self.get_hash_new_full_blocks = partial(block_hasher, self)
            self.block_hashes = self.get_hash_new_full_blocks()

130
131
        self.skip_reading_prefix_cache = self.get_skip_reading_prefix_cache()

132
    @classmethod
133
    def from_engine_core_request(
134
135
        cls,
        request: EngineCoreRequest,
136
        block_hasher: Callable[["Request"], list["BlockHash"]] | None,
137
    ) -> "Request":
138
139
        return cls(
            request_id=request.request_id,
140
            client_index=request.client_index,
141
            prompt_token_ids=request.prompt_token_ids,
142
            prompt_embeds=request.prompt_embeds,
143
            mm_features=request.mm_features,
144
            sampling_params=request.sampling_params,
145
            pooling_params=request.pooling_params,
146
            eos_token_id=request.eos_token_id,
147
            arrival_time=request.arrival_time,
148
            lora_request=request.lora_request,
149
            cache_salt=request.cache_salt,
150
            priority=request.priority,
151
            trace_headers=request.trace_headers,
152
            block_hasher=block_hasher,
153
154
        )

155
156
    def append_output_token_ids(
        self,
157
        token_ids: int | list[int],
158
159
    ) -> None:
        if isinstance(token_ids, int):
160
161
162
163
164
            self._output_token_ids.append(token_ids)
            self._all_token_ids.append(token_ids)
        else:
            self._output_token_ids.extend(token_ids)
            self._all_token_ids.extend(token_ids)
165

166
167
168
        if self.get_hash_new_full_blocks is not None:
            self.block_hashes.extend(self.get_hash_new_full_blocks())

169
170
171
172
    @property
    def use_structured_output(self) -> bool:
        return self.structured_output_request is not None

173
174
    @property
    def num_tokens(self) -> int:
175
        return len(self._all_token_ids)
176

177
178
179
180
    @property
    def num_tokens_with_spec(self) -> int:
        return len(self._all_token_ids) + len(self.spec_token_ids)

181
182
    @property
    def num_output_tokens(self) -> int:
183
        return len(self._output_token_ids)
184

185
186
187
188
189
190
191
192
193
194
195
196
197
    def get_skip_reading_prefix_cache(self) -> bool:
        if (
            self.sampling_params is not None
            and self.sampling_params.skip_reading_prefix_cache is not None
        ):
            return self.sampling_params.skip_reading_prefix_cache
        elif (
            self.pooling_params is not None
            and self.pooling_params.skip_reading_prefix_cache is not None
        ):
            return self.pooling_params.skip_reading_prefix_cache
        return False

198
199
200
    def is_finished(self) -> bool:
        return RequestStatus.is_finished(self.status)

201
    def get_finished_reason(self) -> FinishReason | None:
202
203
        return RequestStatus.get_finished_reason(self.status)

204
    def get_num_encoder_tokens(self, input_id: int) -> int:
205
206
        assert input_id < len(self.mm_features)
        num_tokens = self.mm_features[input_id].mm_position.length
207
208
        return num_tokens

209
210
211
    def record_event(
        self,
        event_type: EngineCoreEventType,
212
        timestamp: float | None = None,
213
214
215
    ) -> None:
        self.events.append(EngineCoreEvent.new_event(event_type, timestamp))

216
    def take_events(self) -> list[EngineCoreEvent] | None:
217
218
219
220
221
        if not self.events:
            return None
        events, self.events = self.events, []
        return events

222
223

class RequestStatus(enum.IntEnum):
224
    """Status of a request."""
225

226
227
    WAITING = enum.auto()
    WAITING_FOR_FSM = enum.auto()
Robert Shaw's avatar
Robert Shaw committed
228
    WAITING_FOR_REMOTE_KVS = enum.auto()
229
230
231
    RUNNING = enum.auto()
    PREEMPTED = enum.auto()
    # Note: anything after PREEMPTED will be considered
232
    # as a finished status.
233
234
235
236
    FINISHED_STOPPED = enum.auto()
    FINISHED_LENGTH_CAPPED = enum.auto()
    FINISHED_ABORTED = enum.auto()
    FINISHED_IGNORED = enum.auto()
237

238
239
240
    def __str__(self):
        return self.name

241
242
243
244
245
    @staticmethod
    def is_finished(status: "RequestStatus") -> bool:
        return status > RequestStatus.PREEMPTED

    @staticmethod
246
    def get_finished_reason(status: "RequestStatus") -> FinishReason | None:
247
248
249
250
        return _FINISHED_REASON_MAP.get(status)


# Mapping of finished statuses to their finish reasons.
251
# NOTE: The ignored requests are the requests whose prompt lengths
252
253
254
# are longer than the model's length cap. Therefore, the stop
# reason should also be "length" as in OpenAI API.
_FINISHED_REASON_MAP = {
255
256
257
258
    RequestStatus.FINISHED_STOPPED: FinishReason.STOP,
    RequestStatus.FINISHED_LENGTH_CAPPED: FinishReason.LENGTH,
    RequestStatus.FINISHED_ABORTED: FinishReason.ABORT,
    RequestStatus.FINISHED_IGNORED: FinishReason.LENGTH,
259
}