gpu_input_batch.py 22.6 KB
Newer Older
1
2
# SPDX-License-Identifier: Apache-2.0

3
4
5
# Datastructures defining an input batch

from dataclasses import dataclass
6
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple
7
8
9
10

import numpy as np
import torch

11
from vllm.lora.request import LoRARequest
12
13
14
from vllm.multimodal import MultiModalKwargs
from vllm.sampling_params import SamplingParams, SamplingType
from vllm.v1.sample.metadata import SamplingMetadata
15
from vllm.v1.worker.block_table import BlockTable
16

17
18
_SAMPLING_EPS = 1e-5

19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
if TYPE_CHECKING:
    from vllm.multimodal.inputs import PlaceholderRange


@dataclass
class CachedRequestState:

    req_id: str
    prompt_token_ids: List[int]
    prompt: Optional[str]
    mm_inputs: List[MultiModalKwargs]
    mm_positions: List["PlaceholderRange"]
    sampling_params: SamplingParams
    generator: Optional[torch.Generator]

    block_ids: List[int]
    num_computed_tokens: int
    output_token_ids: List[int]

38
39
40
    mrope_positions: Optional[torch.Tensor] = None
    mrope_position_delta: Optional[int] = None

41
42
    lora_request: Optional[LoRARequest] = None

43
44
45
46
47
48
49
50
51
52
53
54
55
56
    @property
    def num_tokens(self) -> int:
        return len(self.prompt_token_ids) + len(self.output_token_ids)


class InputBatch:

    def __init__(
        self,
        max_num_reqs: int,
        max_model_len: int,
        max_num_blocks_per_req: int,
        device: torch.device,
        pin_memory: bool,
57
        vocab_size: int,
58
59
60
61
62
63
    ):
        self.max_num_reqs = max_num_reqs
        self.max_model_len = max_model_len
        self.max_num_blocks_per_req = max_num_blocks_per_req
        self.device = device
        self.pin_memory = pin_memory
64
        self.vocab_size = vocab_size
65
66
67
68

        self.req_ids: List[Optional[str]] = [None] * max_num_reqs
        self.req_id_to_index: Dict[str, int] = {}

69
70
        # TODO(woosuk): This buffer could be too large if max_model_len is big.
        # Find a way to reduce the CPU memory usage.
71
72
        # This buffer is not directly transferred to the GPU, so it does not
        # need to be pinned.
73
74
75
76
        self.token_ids_cpu_tensor = torch.zeros(
            (max_num_reqs, max_model_len),
            device="cpu",
            dtype=torch.int32,
77
            pin_memory=False,
78
79
        )
        self.token_ids_cpu = self.token_ids_cpu_tensor.numpy()
80
        self.num_tokens = np.zeros(max_num_reqs, dtype=np.int32)
81
        self.num_prompt_tokens = np.zeros(max_num_reqs, dtype=np.int32)
82
        self.num_computed_tokens_cpu = np.empty(max_num_reqs, dtype=np.int32)
83

84
85
86
87
88
        # Block table.
        self.block_table = BlockTable(
            max_num_reqs=max_num_reqs,
            max_model_len=max_model_len,
            max_num_blocks_per_req=max_num_blocks_per_req,
89
            pin_memory=pin_memory,
90
            device=device,
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
        )

        # Sampling-related.
        self.temperature = torch.empty((max_num_reqs, ),
                                       dtype=torch.float32,
                                       device=device)
        self.temperature_cpu_tensor = torch.empty((max_num_reqs, ),
                                                  dtype=torch.float32,
                                                  device="cpu",
                                                  pin_memory=pin_memory)
        self.temperature_cpu = self.temperature_cpu_tensor.numpy()
        self.greedy_reqs: Set[str] = set()
        self.random_reqs: Set[str] = set()

        self.top_p = torch.empty((max_num_reqs, ),
                                 dtype=torch.float32,
                                 device=device)
        self.top_p_cpu_tensor = torch.empty((max_num_reqs, ),
                                            dtype=torch.float32,
                                            device="cpu",
                                            pin_memory=pin_memory)
        self.top_p_cpu = self.top_p_cpu_tensor.numpy()
        self.top_p_reqs: Set[str] = set()

        self.top_k = torch.empty((max_num_reqs, ),
                                 dtype=torch.int32,
                                 device=device)
        self.top_k_cpu_tensor = torch.empty((max_num_reqs, ),
                                            dtype=torch.int32,
                                            device="cpu",
                                            pin_memory=pin_memory)
        self.top_k_cpu = self.top_k_cpu_tensor.numpy()
        self.top_k_reqs: Set[str] = set()

125
126
127
128
129
130
131
132
133
134
        self.min_p = torch.empty((max_num_reqs, ),
                                 dtype=torch.float32,
                                 device=device)
        self.min_p_cpu_tensor = torch.empty((max_num_reqs, ),
                                            dtype=torch.float32,
                                            device="cpu",
                                            pin_memory=pin_memory)
        self.min_p_cpu = self.min_p_cpu_tensor.numpy()
        self.min_p_reqs: Set[str] = set()

135
136
137
138
139
140
141
142
143
144
        # Frequency penalty related data structures
        self.frequency_penalties = torch.empty((max_num_reqs, ),
                                               dtype=torch.float,
                                               device=device)
        self.frequency_penalties_cpu_tensor = torch.empty(
            (max_num_reqs, ),
            dtype=torch.float,
            device="cpu",
            pin_memory=pin_memory)
        self.frequency_penalties_cpu = \
145
                self.frequency_penalties_cpu_tensor.numpy()
146
147
148
149
150
151
152
153
154
155
        self.frequency_penalties_reqs: Set[str] = set()

        # Presence penalty related data structures
        self.presence_penalties = torch.empty((max_num_reqs, ),
                                              dtype=torch.float,
                                              device=device)
        self.presence_penalties_cpu_tensor = torch.empty((max_num_reqs, ),
                                                         dtype=torch.float,
                                                         device="cpu",
                                                         pin_memory=pin_memory)
156
157
        self.presence_penalties_cpu = self.presence_penalties_cpu_tensor.numpy(
        )
158
159
160
161
162
163
164
165
166
167
168
169
        self.presence_penalties_reqs: Set[str] = set()

        # Repetition penalty related data structures
        self.repetition_penalties = torch.empty((max_num_reqs, ),
                                                dtype=torch.float,
                                                device=device)
        self.repetition_penalties_cpu_tensor = torch.empty(
            (max_num_reqs, ),
            dtype=torch.float,
            device="cpu",
            pin_memory=pin_memory)
        self.repetition_penalties_cpu = \
170
                self.repetition_penalties_cpu_tensor.numpy()
171
172
173
174
175
176
177
178
        self.repetition_penalties_reqs: Set[str] = set()

        self.min_tokens: List[int] = [0] * max_num_reqs
        self.stop_token_ids: List[Set[int]] = [
            set() for _ in range(max_num_reqs)
        ]
        self.prompt_token_ids: Optional[torch.Tensor] = None

179
180
181
182
183
184
        # lora related
        self.request_lora_mapping = np.zeros((self.max_num_reqs, ),
                                             dtype=np.int32)
        self.lora_id_to_request_ids: Dict[int, Set[str]] = {}
        self.lora_id_to_lora_request: Dict[int, LoRARequest] = {}

185
        # req_index -> generator
186
187
        # NOTE(woosuk): The indices of the requests that do not have their own
        # generator should not be included in the dictionary.
188
189
190
        self.generators: Dict[int, torch.Generator] = {}

        self.num_logprobs: Dict[str, int] = {}
191
192
193
        # NOTE(rob): num_prompt_logprobs only includes reqs
        # that are currently in the prefill phase.
        self.num_prompt_logprobs: Dict[str, int] = {}
194

195
196
197
        self.logit_bias: List[Optional[Dict[int,
                                            float]]] = [None] * max_num_reqs

198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
    def add_request(
        self,
        request: "CachedRequestState",
        req_index: Optional[int] = None,
    ) -> None:
        if req_index is None:
            req_index = self.num_reqs
        assert req_index < self.max_num_reqs

        req_id = request.req_id
        self.req_ids[req_index] = req_id
        self.req_id_to_index[req_id] = req_index

        # Copy the prompt token ids and output token ids.
        num_prompt_tokens = len(request.prompt_token_ids)
213
        self.num_prompt_tokens[req_index] = num_prompt_tokens
214
215
216
217
218
219
        self.token_ids_cpu[
            req_index, :num_prompt_tokens] = request.prompt_token_ids
        start_idx = num_prompt_tokens
        end_idx = start_idx + len(request.output_token_ids)
        self.token_ids_cpu[req_index,
                           start_idx:end_idx] = request.output_token_ids
220
        self.num_tokens[req_index] = request.num_tokens
221
222

        self.num_computed_tokens_cpu[req_index] = request.num_computed_tokens
223
        self.block_table.add_row(req_index, request.block_ids)
224
225
226
227
228
229
230
231
232
233
234
235
236
237

        sampling_params = request.sampling_params
        self.temperature_cpu[req_index] = sampling_params.temperature
        if sampling_params.sampling_type == SamplingType.GREEDY:
            self.greedy_reqs.add(req_id)
        else:
            self.random_reqs.add(req_id)

        self.top_p_cpu[req_index] = sampling_params.top_p
        if sampling_params.top_p < 1:
            self.top_p_reqs.add(req_id)
        self.top_k_cpu[req_index] = sampling_params.top_k
        if sampling_params.top_k > 0:
            self.top_k_reqs.add(req_id)
238
        self.min_p_cpu[req_index] = sampling_params.min_p
239
240
        self.frequency_penalties_cpu[
            req_index] = sampling_params.frequency_penalty
241
242
        if sampling_params.min_p > _SAMPLING_EPS:
            self.min_p_reqs.add(req_id)
243
244
        if sampling_params.frequency_penalty != 0.0:
            self.frequency_penalties_reqs.add(req_id)
245
246
        self.presence_penalties_cpu[
            req_index] = sampling_params.presence_penalty
247
248
        if sampling_params.presence_penalty != 0.0:
            self.presence_penalties_reqs.add(req_id)
249
250
        self.repetition_penalties_cpu[
            req_index] = sampling_params.repetition_penalty
251
252
253
254
        if sampling_params.repetition_penalty != 1.0:
            self.repetition_penalties_reqs.add(req_id)
        self.min_tokens[req_index] = sampling_params.min_tokens
        self.stop_token_ids[req_index] = sampling_params.all_stop_token_ids
255

256
257
258
259
        # NOTE(woosuk): self.generators should not include the requests that
        # do not have their own generator.
        if request.generator is not None:
            self.generators[req_index] = request.generator
260

261
262
263
264
        if sampling_params.logprobs is not None:
            self.num_logprobs[req_id] = sampling_params.logprobs
        if sampling_params.prompt_logprobs is not None:
            self.num_prompt_logprobs[req_id] = sampling_params.prompt_logprobs
265
266
        if sampling_params.logit_bias is not None:
            self.logit_bias[req_index] = sampling_params.logit_bias
267

268
269
270
271
272
273
274
275
276
277
278
279
280
        # Add request lora ID
        if request.lora_request:
            lora_id = request.lora_request.lora_int_id
            if lora_id not in self.lora_id_to_request_ids:
                self.lora_id_to_request_ids[lora_id] = set()

            self.request_lora_mapping[req_index] = lora_id
            self.lora_id_to_request_ids[lora_id].add(request.req_id)
            self.lora_id_to_lora_request[lora_id] = request.lora_request
        else:
            # No LoRA
            self.request_lora_mapping[req_index] = 0

281
282
283
284
285
286
287
288
289
290
    def remove_request(self, req_id: str) -> Optional[int]:
        req_index = self.req_id_to_index.pop(req_id, None)
        if req_index is None:
            return None
        self.req_ids[req_index] = None

        self.greedy_reqs.discard(req_id)
        self.random_reqs.discard(req_id)
        self.top_p_reqs.discard(req_id)
        self.top_k_reqs.discard(req_id)
291
        self.min_p_reqs.discard(req_id)
292
293
294
        self.frequency_penalties_reqs.discard(req_id)
        self.presence_penalties_reqs.discard(req_id)
        self.repetition_penalties_reqs.discard(req_id)
295
296
        self.generators.pop(req_index, None)
        self.num_logprobs.pop(req_id, None)
297
        self.num_prompt_logprobs.pop(req_id, None)
298
299
300
301
302
303
304
305
306
307

        # LoRA
        lora_id = self.request_lora_mapping[req_index]
        if lora_id != 0:
            self.lora_id_to_request_ids[lora_id].discard(req_id)
            if len(self.lora_id_to_request_ids[lora_id]) == 0:
                self.lora_id_to_request_ids.pop(lora_id)
                self.lora_id_to_lora_request.pop(lora_id)
            self.request_lora_mapping[req_index] = 0

308
        self.logit_bias[req_index] = None
309
310
311
312
313
314
315
316
317
        return req_index

    def clear(self) -> None:
        self.req_ids = [None] * self.max_num_reqs
        self.req_id_to_index.clear()
        self.greedy_reqs.clear()
        self.random_reqs.clear()
        self.top_p_reqs.clear()
        self.top_k_reqs.clear()
318
        self.min_p_reqs.clear()
319
320
321
        self.frequency_penalties_reqs.clear()
        self.presence_penalties_reqs.clear()
        self.repetition_penalties_reqs.clear()
322
323
        self.generators.clear()
        self.num_logprobs.clear()
324
        self.num_prompt_logprobs.clear()
325
326
327
        self.request_lora_mapping.fill(0)
        self.lora_id_to_lora_request.clear()
        self.lora_id_to_request_ids.clear()
328
        self.logit_bias = [None] * self.max_num_reqs
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349

    def condense(self, empty_req_indices: List[int]) -> None:
        if self.num_reqs == 0:
            # The batched states are empty.
            return

        # NOTE(woosuk): This function assumes that the empty_req_indices
        # is sorted in descending order.
        last_req_index = self.num_reqs + len(empty_req_indices) - 1
        while empty_req_indices:
            # Find the largest non-empty index.
            while last_req_index in empty_req_indices:
                last_req_index -= 1

            # Find the smallest empty index.
            empty_index = empty_req_indices.pop()
            if empty_index >= last_req_index:
                break

            # Swap the states.
            req_id = self.req_ids[last_req_index]
350
            assert req_id is not None
351
352
353
354
            self.req_ids[empty_index] = req_id
            self.req_ids[last_req_index] = None
            self.req_id_to_index[req_id] = empty_index

355
356
357
358
            num_tokens = self.num_tokens[last_req_index]
            self.token_ids_cpu[empty_index, :num_tokens] = self.token_ids_cpu[
                last_req_index, :num_tokens]
            self.num_tokens[empty_index] = num_tokens
359
360
            self.num_prompt_tokens[empty_index] = self.num_prompt_tokens[
                last_req_index]
361
362
            self.num_computed_tokens_cpu[
                empty_index] = self.num_computed_tokens_cpu[last_req_index]
363
            self.block_table.move_row(last_req_index, empty_index)
364
365
366
367
            self.temperature_cpu[empty_index] = self.temperature_cpu[
                last_req_index]
            self.top_p_cpu[empty_index] = self.top_p_cpu[last_req_index]
            self.top_k_cpu[empty_index] = self.top_k_cpu[last_req_index]
368
369
370
371
372
373
            self.frequency_penalties_cpu[
                empty_index] = self.frequency_penalties_cpu[last_req_index]
            self.presence_penalties_cpu[
                empty_index] = self.presence_penalties_cpu[last_req_index]
            self.repetition_penalties_cpu[
                empty_index] = self.repetition_penalties_cpu[last_req_index]
374
            self.min_p_cpu[empty_index] = self.min_p_cpu[last_req_index]
375
            self.min_tokens[empty_index] = self.min_tokens[last_req_index]
376
377
            self.stop_token_ids[empty_index] = self.stop_token_ids[
                last_req_index]
378
379
380
381
            generator = self.generators.pop(last_req_index, None)
            if generator is not None:
                self.generators[empty_index] = generator

382
383
384
            self.request_lora_mapping[empty_index] = self.request_lora_mapping[
                last_req_index]

385
386
            self.logit_bias[empty_index] = self.logit_bias[last_req_index]

387
388
389
390
391
            # Decrement last_req_index since it is now empty.
            last_req_index -= 1

    def make_sampling_metadata(
        self,
392
        req_id_output_token_ids: Dict[str, List[int]],
393
        req_id_to_spec_token_ids: Dict[str, List[int]],
394
395
396
397
398
399
400
401
402
        skip_copy: bool = False,
    ) -> SamplingMetadata:
        if not skip_copy:
            self.temperature[:self.num_reqs].copy_(
                self.temperature_cpu_tensor[:self.num_reqs], non_blocking=True)
            self.top_p[:self.num_reqs].copy_(
                self.top_p_cpu_tensor[:self.num_reqs], non_blocking=True)
            self.top_k[:self.num_reqs].copy_(
                self.top_k_cpu_tensor[:self.num_reqs], non_blocking=True)
403
404
            self.min_p[:self.num_reqs].copy_(
                self.min_p_cpu_tensor[:self.num_reqs], non_blocking=True)
405
406
407
408
409
410
            if not self.no_penalties:
                # Since syncing these tensors is expensive only copy them
                # if necessary i.e. if there are requests which require
                # penalties to be applied during sampling.
                self.frequency_penalties[:self.num_reqs].copy_(
                    self.frequency_penalties_cpu_tensor[:self.num_reqs],
411
412
                    non_blocking=True,
                )
413
414
                self.presence_penalties[:self.num_reqs].copy_(
                    self.presence_penalties_cpu_tensor[:self.num_reqs],
415
416
                    non_blocking=True,
                )
417
418
                self.repetition_penalties[:self.num_reqs].copy_(
                    self.repetition_penalties_cpu_tensor[:self.num_reqs],
419
420
                    non_blocking=True,
                )
421
422
423
424
425
426
                # The prompt tokens are used only for applying penalties during
                # the sampling process. Hence copy these tensors only when
                # there are requests which need penalties to be applied.
                self.prompt_token_ids = self._make_prompt_token_ids_tensor()

        output_token_ids: List[List[int]] = []
427
428
        spec_token_ids: List[List[int]] = []
        rejection_sampling = False
429
430
431
432
433
434
435
436
437
438
        for req_id in self.req_ids[:self.num_reqs]:
            assert req_id is not None
            # Currently we create a tensor for output_token_ids from scratch
            # at each step. However, for the penalties computation what we
            # need is stats about the token ids present in the output. This
            # stats can be maintained incrementally instead of computing it
            # from scratch at each step.
            # TODO - Replace this with incremental update to output token
            # statistics.
            output_token_ids.append(req_id_output_token_ids[req_id])
439
440
441
442
443
444
            req_spec_token_ids = req_id_to_spec_token_ids.get(req_id, [])
            spec_token_ids.append(req_spec_token_ids)
            if req_spec_token_ids:
                # If any of the requests require speculative decoding, set the
                # flag to True.
                rejection_sampling = True
445

446
447
448
449
        return SamplingMetadata(
            temperature=self.temperature[:self.num_reqs],
            all_greedy=self.all_greedy,
            all_random=self.all_random,
450
            rejection_sampling=rejection_sampling,
451
452
            top_p=self.top_p[:self.num_reqs],
            top_k=self.top_k[:self.num_reqs],
453
454
            min_p=self.min_p[:self.num_reqs],
            no_min_p=self.no_min_p,
455
456
457
458
            no_top_p=self.no_top_p,
            no_top_k=self.no_top_k,
            generators=self.generators,
            max_num_logprobs=self.max_num_logprobs,
459
460
461
462
463
            prompt_token_ids=self.prompt_token_ids,
            frequency_penalties=self.frequency_penalties[:self.num_reqs],
            presence_penalties=self.presence_penalties[:self.num_reqs],
            repetition_penalties=self.repetition_penalties[:self.num_reqs],
            output_token_ids=output_token_ids,
464
            spec_token_ids=spec_token_ids,
465
466
467
            min_tokens=self.min_tokens[:self.num_reqs],
            stop_token_ids=self.stop_token_ids[:self.num_reqs],
            no_penalties=self.no_penalties,
468
            logit_bias=self.logit_bias[:self.num_reqs],
469
470
        )

471
472
473
474
475
476
    def _make_prompt_token_ids_tensor(self) -> torch.Tensor:
        max_prompt_len = self.num_prompt_tokens[:self.num_reqs].max()
        prompt_token_ids_cpu_tensor = torch.empty(
            (self.num_reqs, max_prompt_len),
            device="cpu",
            dtype=torch.int64,
477
478
            pin_memory=self.pin_memory,
        )
479
        prompt_token_ids = prompt_token_ids_cpu_tensor.numpy()
480
481
        prompt_token_ids[:] = self.token_ids_cpu[:self.
                                                 num_reqs, :max_prompt_len]
482
483
484
485
486
487
488
        # Use the value of vocab_size as a pad since we don't have a
        # token_id of this value.
        for i in range(self.num_reqs):
            prompt_token_ids[i, self.num_prompt_tokens[i]:] = self.vocab_size
        return prompt_token_ids_cpu_tensor.to(device=self.device,
                                              non_blocking=True)

489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
    def make_lora_inputs(
        self, num_scheduled_tokens: np.ndarray
    ) -> Tuple[Tuple[int, ...], Tuple[int, ...], Set[LoRARequest]]:
        """
        Given the num_scheduled_tokens for each request in the batch, return
        datastructures used to activate the current LoRAs.
        Returns:
            1. prompt_lora_mapping: A tuple of size self.num_reqs where,
               prompt_lora_mapping[i] is the LoRA id to use for the ith prompt.
            2. token_lora_mapping: A tuple of size np.sum(num_scheduled_tokens)
               where, token_lora_mapping[i] is the LoRA id to use for ith token.
            3. lora_requests: Set of relevant LoRA requests.
        """

        req_lora_mapping = self.request_lora_mapping[:self.num_reqs]
        prompt_lora_mapping = tuple(req_lora_mapping)
        token_lora_mapping = tuple(
            req_lora_mapping.repeat(num_scheduled_tokens))
        active_lora_requests: Set[LoRARequest] = set(
            self.lora_id_to_lora_request.values())

        return prompt_lora_mapping, token_lora_mapping, active_lora_requests

512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
    @property
    def num_reqs(self) -> int:
        return len(self.req_id_to_index)

    @property
    def all_greedy(self) -> bool:
        return len(self.random_reqs) == 0

    @property
    def all_random(self) -> bool:
        return len(self.greedy_reqs) == 0

    @property
    def no_top_p(self) -> bool:
        return len(self.top_p_reqs) == 0

    @property
    def no_top_k(self) -> bool:
        return len(self.top_k_reqs) == 0

532
533
534
535
    @property
    def no_min_p(self) -> bool:
        return len(self.min_p_reqs) == 0

536
537
538
539
540
541
    @property
    def no_penalties(self) -> bool:
        return (len(self.presence_penalties_reqs) == 0
                and len(self.frequency_penalties_reqs) == 0
                and len(self.repetition_penalties_reqs) == 0)

542
    @property
543
544
    def max_num_logprobs(self) -> Optional[int]:
        return max(self.num_logprobs.values()) if self.num_logprobs else None
545
546
547

    @property
    def no_prompt_logprob(self) -> bool:
548
        return not self.num_prompt_logprobs