gpu_input_batch.py 36.7 KB
Newer Older
1
# SPDX-License-Identifier: Apache-2.0
2
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3
# Datastructures defining a GPU input batch
4
5

from dataclasses import dataclass
6
from typing import Optional, cast
7
8
9
10

import numpy as np
import torch

11
from vllm.lora.request import LoRARequest
12
from vllm.multimodal.inputs import MultiModalFeatureSpec
13
from vllm.pooling_params import PoolingParams
14
from vllm.sampling_params import SamplingParams, SamplingType
15
from vllm.utils import length_from_prompt_token_ids_or_embeds, swap_dict_values
16
from vllm.v1.outputs import LogprobsTensors
17
from vllm.v1.pool.metadata import PoolingMetadata
18
19
20
21
22
from vllm.v1.sample.logits_processor import (
    BatchUpdateBuilder,
    LogitsProcessors,
    MoveDirectionality,
)
23
from vllm.v1.sample.metadata import SamplingMetadata
24
from vllm.v1.spec_decode.utils import is_spec_decode_unsupported
25
from vllm.v1.utils import copy_slice
26
from vllm.v1.worker.block_table import MultiGroupBlockTable
27
28
29
30
31


@dataclass
class CachedRequestState:
    req_id: str
32
    prompt_token_ids: Optional[list[int]]
33
    mm_features: list[MultiModalFeatureSpec]
34
35
    sampling_params: Optional[SamplingParams]
    pooling_params: Optional[PoolingParams]
36
37
    generator: Optional[torch.Generator]

38
    block_ids: tuple[list[int], ...]
39
    num_computed_tokens: int
40
    output_token_ids: list[int]
41

42
43
44
    mrope_positions: Optional[torch.Tensor] = None
    mrope_position_delta: Optional[int] = None

45
    lora_request: Optional[LoRARequest] = None
46
    prompt_embeds: Optional[torch.Tensor] = None
47

48
    def __post_init__(self):
49
        self.num_prompt_tokens = length_from_prompt_token_ids_or_embeds(
50
51
            self.prompt_token_ids, self.prompt_embeds
        )
52

53
54
    @property
    def num_tokens(self) -> int:
55
56
57
58
        return self.num_prompt_tokens + len(self.output_token_ids)

    def get_token_id(self, idx: int) -> int:
        if idx < self.num_prompt_tokens:
59
60
61
            if self.prompt_token_ids is None:
                raise ValueError(
                    f"Tried to access token index {idx}, but that token was "
62
63
                    "provided via prompt_embeds, and its ID is unknown."
                )
64
            return self.prompt_token_ids[idx]
65
66
67
68
        elif idx - self.num_prompt_tokens < len(self.output_token_ids):
            return self.output_token_ids[idx - self.num_prompt_tokens]
        else:
            return -1
69
70
71
72


class InputBatch:
    def __init__(
73
74
75
76
77
78
79
80
        self,
        max_num_reqs: int,
        max_model_len: int,
        max_num_batched_tokens: int,
        device: torch.device,
        pin_memory: bool,
        vocab_size: int,
        block_sizes: list[int],  # The block_size of each kv cache group
81
        kernel_block_sizes: list[int],
82
        logitsprocs: Optional[LogitsProcessors] = None,
83
        is_spec_decode: bool = False,
84
        is_pooling_model: bool = False,
85
        num_speculative_tokens: int = 0,
86
    ):
87
        self.is_pooling_model = is_pooling_model
88
        self.is_spec_decode = is_spec_decode
89
90
        self.max_num_reqs = max_num_reqs
        self.max_model_len = max_model_len
91
        self.max_num_batched_tokens = max_num_batched_tokens
92
93
        self.device = device
        self.pin_memory = pin_memory
94
        self.vocab_size = vocab_size
95

96
97
        self._req_ids: list[Optional[str]] = []
        self.req_id_to_index: dict[str, int] = {}
98

99
100
        # TODO(woosuk): This buffer could be too large if max_model_len is big.
        # Find a way to reduce the CPU memory usage.
101
102
        # This buffer is not directly transferred to the GPU, so it does not
        # need to be pinned.
103
104
105
106
        self.token_ids_cpu_tensor = torch.zeros(
            (max_num_reqs, max_model_len),
            device="cpu",
            dtype=torch.int32,
107
            pin_memory=False,
108
109
        )
        self.token_ids_cpu = self.token_ids_cpu_tensor.numpy()
110
111
112
        self.is_token_ids = torch.zeros(
            (max_num_reqs, max_model_len), device="cpu", dtype=bool, pin_memory=False
        )
113
114
115
116
        # Store prompt embeddings per request to avoid OOM from large upfront
        # allocation if max_model_len is big.
        # Maps req_index -> tensor of shape (num_prompt_tokens, hidden_size)
        self.req_prompt_embeds: dict[int, torch.Tensor] = {}
117
        self.num_tokens = np.zeros(max_num_reqs, dtype=np.int32)
118
        self.num_tokens_no_spec = np.zeros(max_num_reqs, dtype=np.int32)
119
        self.num_prompt_tokens = np.zeros(max_num_reqs, dtype=np.int32)
120
        self.num_computed_tokens_cpu_tensor = torch.zeros(
121
            (max_num_reqs,),
122
123
124
125
            device="cpu",
            dtype=torch.int32,
            pin_memory=pin_memory,
        )
126
        self.num_computed_tokens_cpu = self.num_computed_tokens_cpu_tensor.numpy()
127

128
        # Block table.
129
        self.block_table = MultiGroupBlockTable(
130
            max_num_reqs=max_num_reqs,
131
            max_model_len=max_model_len,
132
            max_num_batched_tokens=max_num_batched_tokens,
133
            pin_memory=pin_memory,
134
            device=device,
135
            block_sizes=block_sizes,
136
            kernel_block_sizes=kernel_block_sizes,
137
            num_speculative_tokens=num_speculative_tokens,
138
139
140
        )

        # Sampling-related.
141
142
143
144
145
146
        self.temperature = torch.empty(
            (max_num_reqs,), dtype=torch.float32, device=device
        )
        self.temperature_cpu_tensor = torch.empty(
            (max_num_reqs,), dtype=torch.float32, device="cpu", pin_memory=pin_memory
        )
147
        self.temperature_cpu = self.temperature_cpu_tensor.numpy()
148
149
        self.greedy_reqs: set[str] = set()
        self.random_reqs: set[str] = set()
150

151
152
153
154
        self.top_p = torch.empty((max_num_reqs,), dtype=torch.float32, device=device)
        self.top_p_cpu_tensor = torch.empty(
            (max_num_reqs,), dtype=torch.float32, device="cpu", pin_memory=pin_memory
        )
155
        self.top_p_cpu = self.top_p_cpu_tensor.numpy()
156
        self.top_p_reqs: set[str] = set()
157

158
159
160
161
        self.top_k = torch.empty((max_num_reqs,), dtype=torch.int32, device=device)
        self.top_k_cpu_tensor = torch.empty(
            (max_num_reqs,), dtype=torch.int32, device="cpu", pin_memory=pin_memory
        )
162
        self.top_k_cpu = self.top_k_cpu_tensor.numpy()
163
        self.top_k_reqs: set[str] = set()
164

165
166
        # IDs of requests which do not support spec decoding
        self.spec_decode_unsupported_reqs: set[str] = set()
167

168
        # Frequency penalty related data structures
169
170
171
        self.frequency_penalties = torch.empty(
            (max_num_reqs,), dtype=torch.float, device=device
        )
172
        self.frequency_penalties_cpu_tensor = torch.empty(
173
174
175
            (max_num_reqs,), dtype=torch.float, device="cpu", pin_memory=pin_memory
        )
        self.frequency_penalties_cpu = self.frequency_penalties_cpu_tensor.numpy()
176
        self.frequency_penalties_reqs: set[str] = set()
177
178

        # Presence penalty related data structures
179
180
        self.presence_penalties = torch.empty(
            (max_num_reqs,), dtype=torch.float, device=device
181
        )
182
183
184
185
        self.presence_penalties_cpu_tensor = torch.empty(
            (max_num_reqs,), dtype=torch.float, device="cpu", pin_memory=pin_memory
        )
        self.presence_penalties_cpu = self.presence_penalties_cpu_tensor.numpy()
186
        self.presence_penalties_reqs: set[str] = set()
187
188

        # Repetition penalty related data structures
189
190
191
        self.repetition_penalties = torch.empty(
            (max_num_reqs,), dtype=torch.float, device=device
        )
192
        self.repetition_penalties_cpu_tensor = torch.empty(
193
194
195
            (max_num_reqs,), dtype=torch.float, device="cpu", pin_memory=pin_memory
        )
        self.repetition_penalties_cpu = self.repetition_penalties_cpu_tensor.numpy()
196
        self.repetition_penalties_reqs: set[str] = set()
197

198
        # Speculative decoding
199
200
201
202
        self.num_accepted_tokens_cpu_tensor = torch.ones(
            (max_num_reqs,), dtype=torch.int64, device="cpu", pin_memory=pin_memory
        )
        self.num_accepted_tokens_cpu = self.num_accepted_tokens_cpu_tensor.numpy()
203

204
        # lora related
205
        self.request_lora_mapping = np.zeros((self.max_num_reqs,), dtype=np.int32)
206
207
        self.lora_id_to_request_ids: dict[int, set[str]] = {}
        self.lora_id_to_lora_request: dict[int, LoRARequest] = {}
208

209
        # req_index -> generator
210
211
        # NOTE(woosuk): The indices of the requests that do not have their own
        # generator should not be included in the dictionary.
212
        self.generators: dict[int, torch.Generator] = {}
213

214
        self.num_logprobs: dict[str, int] = {}
215
216
        # NOTE(rob): num_prompt_logprobs only includes reqs
        # that are currently in the prefill phase.
217
        self.num_prompt_logprobs: dict[str, int] = {}
218

219
220
221
        # To accumulate prompt logprobs tensor chunks across prefill steps.
        self.in_progress_prompt_logprobs_cpu: dict[str, LogprobsTensors] = {}

222
223
224
225
226
227
        # Internal representation of per-step batch state changes, used for
        # reordering persistent batch and generating logitsprocs batch state
        # updates. Should reset each step.
        self.batch_update_builder = BatchUpdateBuilder()

        # TODO convert this to LogitsProcessor
228
        self.has_allowed_token_ids: set[str] = set()
229
230
        # NOTE(lufang): In the mask tensor, if the corresponding token allowed,
        # the value is False. Since we use masked_fill_ to set -inf.
231
232
        self.allowed_token_ids_mask: Optional[torch.Tensor] = None
        self.allowed_token_ids_mask_cpu_tensor: Optional[torch.Tensor] = None
233

234
235
236
        # req_index -> bad_words_token_ids
        self.bad_words_token_ids: dict[int, list[list[int]]] = {}

237
        self.logits_processing_needs_token_ids = np.zeros(max_num_reqs, dtype=bool)
238

239
        self.req_output_token_ids: list[Optional[list[int]]] = []
240

241
242
243
244
        # Store provided logitsprocs. If none are provided, initialize empty
        # data structure
        self.logitsprocs = logitsprocs or LogitsProcessors()

245
246
247
        # Store last speculative tokens for sampler.
        self.spec_token_ids: list[Optional[list[int]]] = []

248
249
250
        # This is updated each time the batch constituents change.
        self.sampling_metadata = self._make_sampling_metadata()

251
252
        self.pooling_params: dict[str, PoolingParams] = {}

253
254
255
256
257
        # Cached reference to the GPU tensor of previously sampled tokens
        self.prev_sampled_token_ids: Optional[torch.Tensor] = None
        self.prev_sampled_token_ids_invalid_indices: Optional[set[int]] = None
        self.prev_req_id_to_index: Optional[dict[str, int]] = None

258
    @property
259
    def req_ids(self) -> list[str]:
260
261
        # None elements should only be present transiently
        # while performing state updates to the batch.
262
        return cast(list[str], self._req_ids)
263

264
    def _register_add_request(self, request: "CachedRequestState") -> int:
265
266
267
268
269
270
271
272
273
274
        """Track add-request operations for logits processors.
        Not applicable to pooling models.
        """

        # Fill the next empty index if there is one.
        if (new_req_index := self.batch_update_builder.pop_removed()) is None:
            # Append to end otherwise.
            new_req_index = self.num_reqs

        assert new_req_index < self.max_num_reqs
275
276
277
278
279
        self.batch_update_builder.batch_changed = True
        if request.sampling_params:
            # Detailed added request metadata is only required for non-pooling
            # models, to support logitsprocs.
            self.batch_update_builder.added.append(
280
281
282
283
284
285
286
                (
                    new_req_index,
                    request.sampling_params,
                    request.prompt_token_ids,
                    request.output_token_ids,
                )
            )
287

288
        return new_req_index
289

290
291
292
    def add_request(
        self,
        request: "CachedRequestState",
293
    ) -> int:
294
        req_index = self._register_add_request(request)
295
296

        req_id = request.req_id
297
298
299
        if req_index == len(self._req_ids):
            self._req_ids.append(req_id)
            self.req_output_token_ids.append(request.output_token_ids)
300
            self.spec_token_ids.append([])
301
302
303
        else:
            self._req_ids[req_index] = req_id
            self.req_output_token_ids[req_index] = request.output_token_ids
304
            self.spec_token_ids[req_index] = []
305

306
307
308
        self.req_id_to_index[req_id] = req_index

        # Copy the prompt token ids and output token ids.
309
        num_prompt_tokens = length_from_prompt_token_ids_or_embeds(
310
311
            request.prompt_token_ids, request.prompt_embeds
        )
312
        self.num_prompt_tokens[req_index] = num_prompt_tokens
313
314
        start_idx = num_prompt_tokens
        end_idx = start_idx + len(request.output_token_ids)
315
        if request.prompt_token_ids is not None:
316
            self.token_ids_cpu[req_index, :num_prompt_tokens] = request.prompt_token_ids
317
318
319
320
321
            self.is_token_ids[req_index, :num_prompt_tokens] = True
        else:
            self.is_token_ids[req_index, :num_prompt_tokens] = False
        if request.prompt_embeds is not None:
            self.req_prompt_embeds[req_index] = request.prompt_embeds
322
        self.token_ids_cpu[req_index, start_idx:end_idx] = request.output_token_ids
323
324
        self.is_token_ids[req_index, start_idx:end_idx] = True
        # Number of token ids in prompt (token_ids_cpu or prompt_embeds).
325
        # NOTE(woosuk): This may include spec decode tokens.
326
        self.num_tokens[req_index] = request.num_tokens
327
328
        # Number of tokens without spec decode tokens.
        self.num_tokens_no_spec[req_index] = request.num_tokens
329
330

        self.num_computed_tokens_cpu[req_index] = request.num_computed_tokens
331
        self.block_table.add_row(request.block_ids, req_index)
332

333
        if sampling_params := request.sampling_params:
334
            if self.is_spec_decode and is_spec_decode_unsupported(sampling_params):
335
                self.spec_decode_unsupported_reqs.add(req_id)
336
            if sampling_params.sampling_type == SamplingType.GREEDY:
337
338
                # Should avoid division by zero later when apply_temperature.
                self.temperature_cpu[req_index] = 0.0
339
340
341
342
343
344
345
346
347
348
349
350
351
352
                self.greedy_reqs.add(req_id)
            else:
                self.temperature_cpu[req_index] = sampling_params.temperature
                self.random_reqs.add(req_id)

            self.top_p_cpu[req_index] = sampling_params.top_p
            if sampling_params.top_p < 1:
                self.top_p_reqs.add(req_id)
            top_k = sampling_params.top_k
            if 0 < top_k < self.vocab_size:
                self.top_k_reqs.add(req_id)
            else:
                top_k = self.vocab_size
            self.top_k_cpu[req_index] = top_k
353
            self.frequency_penalties_cpu[req_index] = sampling_params.frequency_penalty
354
355
            if sampling_params.frequency_penalty != 0.0:
                self.frequency_penalties_reqs.add(req_id)
356
            self.presence_penalties_cpu[req_index] = sampling_params.presence_penalty
357
358
            if sampling_params.presence_penalty != 0.0:
                self.presence_penalties_reqs.add(req_id)
359
360
361
            self.repetition_penalties_cpu[req_index] = (
                sampling_params.repetition_penalty
            )
362
363
364
365
366
367
368
369
370
            if sampling_params.repetition_penalty != 1.0:
                self.repetition_penalties_reqs.add(req_id)

            # NOTE(woosuk): self.generators should not include the requests that
            # do not have their own generator.
            if request.generator is not None:
                self.generators[req_index] = request.generator

            if sampling_params.logprobs is not None:
371
372
373
374
375
                self.num_logprobs[req_id] = (
                    self.vocab_size
                    if sampling_params.logprobs == -1
                    else sampling_params.logprobs
                )
376
            if sampling_params.prompt_logprobs is not None:
377
                self.num_prompt_logprobs[req_id] = (
378
379
380
381
                    self.vocab_size
                    if sampling_params.prompt_logprobs == -1
                    else sampling_params.prompt_logprobs
                )
382
383
384
385
386
387
388
389
390
391

            if sampling_params.allowed_token_ids:
                self.has_allowed_token_ids.add(req_id)
                if self.allowed_token_ids_mask_cpu_tensor is None:
                    # Lazy allocation for this tensor, which can be large.
                    # False means we don't fill with -inf.
                    self.allowed_token_ids_mask = torch.zeros(
                        self.max_num_reqs,
                        self.vocab_size,
                        dtype=torch.bool,
392
393
                        device=self.device,
                    )
394
395
396
397
                    self.allowed_token_ids_mask_cpu_tensor = torch.zeros(
                        self.max_num_reqs,
                        self.vocab_size,
                        dtype=torch.bool,
398
399
                        device="cpu",
                    )
400
                self.allowed_token_ids_mask_cpu_tensor[req_index] = True
401
                # False means we don't fill with -inf.
402
                self.allowed_token_ids_mask_cpu_tensor[req_index][
403
404
                    sampling_params.allowed_token_ids
                ] = False
405

406
            if sampling_params.bad_words_token_ids:
407
408
409
                self.bad_words_token_ids[req_index] = (
                    sampling_params.bad_words_token_ids
                )
410
411
412
        elif pooling_params := request.pooling_params:
            self.pooling_params[req_id] = pooling_params
            self.logits_processing_needs_token_ids[req_index] = (
413
414
                pooling_params.requires_token_ids
            )
415
        else:
416
            raise NotImplementedError("Unrecognized request type")
417

418
419
420
        # Speculative decoding: by default 1 token is generated.
        self.num_accepted_tokens_cpu[req_index] = 1

421
422
423
424
425
426
427
428
429
430
431
432
433
        # Add request lora ID
        if request.lora_request:
            lora_id = request.lora_request.lora_int_id
            if lora_id not in self.lora_id_to_request_ids:
                self.lora_id_to_request_ids[lora_id] = set()

            self.request_lora_mapping[req_index] = lora_id
            self.lora_id_to_request_ids[lora_id].add(request.req_id)
            self.lora_id_to_lora_request[lora_id] = request.lora_request
        else:
            # No LoRA
            self.request_lora_mapping[req_index] = 0

434
435
        return req_index

436
    def remove_request(self, req_id: str) -> Optional[int]:
437
        """This method must always be followed by a call to condense().
438

439
440
441
442
443
444
        Args:
          req_id: request to remove

        Returns:
          Removed request index, or `None` if `req_id` not recognized
        """
445

446
447
448
        req_index = self.req_id_to_index.pop(req_id, None)
        if req_index is None:
            return None
449
450

        self.batch_update_builder.removed_append(req_index)
451
452
        self._req_ids[req_index] = None
        self.req_output_token_ids[req_index] = None
453
        self.spec_token_ids[req_index] = None
454

455
456
457
458
459
460
461
462
463
464
465
466
467
468
        # LoRA
        lora_id = self.request_lora_mapping[req_index]
        if lora_id != 0:
            lora_req_ids = self.lora_id_to_request_ids[lora_id]
            lora_req_ids.discard(req_id)
            if not lora_req_ids:
                del self.lora_id_to_request_ids[lora_id]
                del self.lora_id_to_lora_request[lora_id]
            self.request_lora_mapping[req_index] = 0

        if self.is_pooling_model:
            self.pooling_params.pop(req_id, None)
            return req_index

469
470
471
472
        self.greedy_reqs.discard(req_id)
        self.random_reqs.discard(req_id)
        self.top_p_reqs.discard(req_id)
        self.top_k_reqs.discard(req_id)
473
        self.spec_decode_unsupported_reqs.discard(req_id)
474
475
476
        self.frequency_penalties_reqs.discard(req_id)
        self.presence_penalties_reqs.discard(req_id)
        self.repetition_penalties_reqs.discard(req_id)
477
478
        self.generators.pop(req_index, None)
        self.num_logprobs.pop(req_id, None)
479
        self.num_prompt_logprobs.pop(req_id, None)
480
        self.in_progress_prompt_logprobs_cpu.pop(req_id, None)
481

482
483
        self.has_allowed_token_ids.discard(req_id)
        if self.allowed_token_ids_mask_cpu_tensor is not None:
484
            # False means we don't fill with -inf.
485
            self.allowed_token_ids_mask_cpu_tensor[req_index].fill_(False)
486
        self.bad_words_token_ids.pop(req_index, None)
487
488
        return req_index

489
490
491
    def swap_states(self, i1: int, i2: int) -> None:
        old_id_i1 = self._req_ids[i1]
        old_id_i2 = self._req_ids[i2]
492
493
494
495
496
        self._req_ids[i1], self._req_ids[i2] = self._req_ids[i2], self._req_ids[i1]  # noqa
        self.req_output_token_ids[i1], self.req_output_token_ids[i2] = (
            self.req_output_token_ids[i2],
            self.req_output_token_ids[i1],
        )
497
498
499
500
        self.spec_token_ids[i1], self.spec_token_ids[i2] = (
            self.spec_token_ids[i2],
            self.spec_token_ids[i1],
        )
501
        assert old_id_i1 is not None and old_id_i2 is not None
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
        self.req_id_to_index[old_id_i1], self.req_id_to_index[old_id_i2] = (
            self.req_id_to_index[old_id_i2],
            self.req_id_to_index[old_id_i1],
        )
        self.num_tokens[i1], self.num_tokens[i2] = (
            self.num_tokens[i2],
            self.num_tokens[i1],
        )
        self.num_tokens_no_spec[i1], self.num_tokens_no_spec[i2] = (
            self.num_tokens_no_spec[i2],
            self.num_tokens_no_spec[i1],
        )
        self.num_prompt_tokens[i1], self.num_prompt_tokens[i2] = (
            self.num_prompt_tokens[i2],
            self.num_prompt_tokens[i1],
        )
        self.num_computed_tokens_cpu[i1], self.num_computed_tokens_cpu[i2] = (
            self.num_computed_tokens_cpu[i2],
            self.num_computed_tokens_cpu[i1],
        )
522

523
524
525
526
527
528
529
530
531
        # NOTE: the following is unsafe
        # self.token_ids_cpu[i1, ...], self.token_ids_cpu[i2, ...], =\
        #     self.token_ids_cpu[i2, ...], self.token_ids_cpu[i1, ...]
        # instead, we need to temporiarily copy the data for one of the indices
        # TODO(lucas): optimize this by only copying valid indices
        tmp = self.token_ids_cpu[i1, ...].copy()
        self.token_ids_cpu[i1, ...] = self.token_ids_cpu[i2, ...]
        self.token_ids_cpu[i2, ...] = tmp

532
533
534
535
536
537
538
539
540
541
542
543
544
545
        self.is_token_ids[[i1, i2], ...] = self.is_token_ids[[i2, i1], ...]

        # Swap prompt embeddings if they exist
        embeds_i1 = self.req_prompt_embeds.get(i1)
        embeds_i2 = self.req_prompt_embeds.get(i2)
        if embeds_i1 is not None:
            self.req_prompt_embeds[i2] = embeds_i1
        else:
            self.req_prompt_embeds.pop(i2, None)
        if embeds_i2 is not None:
            self.req_prompt_embeds[i1] = embeds_i2
        else:
            self.req_prompt_embeds.pop(i1, None)

546
        self.block_table.swap_row(i1, i2)
547

548
549
550
551
        self.request_lora_mapping[i1], self.request_lora_mapping[i2] = (
            self.request_lora_mapping[i2],
            self.request_lora_mapping[i1],
        )
552

553
554
555
556
557
558
        if self.is_pooling_model:
            # Sampling and logits parameters don't apply to pooling models.
            return

        # For autoregressive models, track detailed request reordering info
        # to support logitsprocs.
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
        self.batch_update_builder.moved.append((i1, i2, MoveDirectionality.SWAP))

        self.temperature_cpu[i1], self.temperature_cpu[i2] = (
            self.temperature_cpu[i2],
            self.temperature_cpu[i1],
        )
        self.top_p_cpu[i1], self.top_p_cpu[i2] = self.top_p_cpu[i2], self.top_p_cpu[i1]
        self.top_k_cpu[i1], self.top_k_cpu[i2] = self.top_k_cpu[i2], self.top_k_cpu[i1]
        self.frequency_penalties_cpu[i1], self.frequency_penalties_cpu[i2] = (
            self.frequency_penalties_cpu[i2],
            self.frequency_penalties_cpu[i1],
        )
        self.presence_penalties_cpu[i1], self.presence_penalties_cpu[i2] = (
            self.presence_penalties_cpu[i2],
            self.presence_penalties_cpu[i1],
        )
        self.repetition_penalties_cpu[i1], self.repetition_penalties_cpu[i2] = (
            self.repetition_penalties_cpu[i2],
            self.repetition_penalties_cpu[i1],
        )
        self.num_accepted_tokens_cpu[i1], self.num_accepted_tokens_cpu[i2] = (
            self.num_accepted_tokens_cpu[i2],
            self.num_accepted_tokens_cpu[i1],
        )
583
584
585
586

        swap_dict_values(self.generators, i1, i2)
        swap_dict_values(self.bad_words_token_ids, i1, i2)

587
        if self.allowed_token_ids_mask_cpu_tensor is not None:
588
589
590
591
592
593
594
            (
                self.allowed_token_ids_mask_cpu_tensor[i1],
                self.allowed_token_ids_mask_cpu_tensor[i2],
            ) = (
                self.allowed_token_ids_mask_cpu_tensor[i2],
                self.allowed_token_ids_mask_cpu_tensor[i1],
            )
595

596
597
598
599
600
601
602
603
604
    def condense(self) -> None:
        """Slide non-empty requests down into lower, empty indices.

        Any consecutive empty indices at the very end of the list are not
        filled.

        Returns:
          swaps: list of (from,to) swap tuples for moved requests
          empty_req_indices: indices not filled by condensation
605
        """
606
607
        num_reqs = self.num_reqs

608
609
610
611
        if not (empty_req_indices := self.batch_update_builder.removed):
            # All removed requests were replaced by added requests, or else no
            # requests were removed at all. No condense() needed
            return
612
        if num_reqs == 0:
613
            # The batched states are empty.
614
615
            self._req_ids.clear()
            self.req_output_token_ids.clear()
616
            self.spec_token_ids.clear()
617
618
619
620
            return

        # NOTE(woosuk): This function assumes that the empty_req_indices
        # is sorted in descending order.
621
        last_req_index = num_reqs + len(empty_req_indices) - 1
622
623
624
625
626
627
        while empty_req_indices:
            # Find the largest non-empty index.
            while last_req_index in empty_req_indices:
                last_req_index -= 1

            # Find the smallest empty index.
628
629
            empty_index = self.batch_update_builder.peek_removed()
            assert empty_index is not None
630
631
632
            if empty_index >= last_req_index:
                break

633
634
635
            # Move active request down into empty request
            # index.
            self.batch_update_builder.pop_removed()
636
637
            req_id = self._req_ids[last_req_index]
            output_token_ids = self.req_output_token_ids[last_req_index]
638
            assert req_id is not None
639
640
641
642
            self._req_ids[empty_index] = req_id
            self._req_ids[last_req_index] = None
            self.req_output_token_ids[empty_index] = output_token_ids
            self.req_output_token_ids[last_req_index] = None
643
644
            self.req_id_to_index[req_id] = empty_index

645
646
647
648
            spec_token_ids = self.spec_token_ids[last_req_index]
            self.spec_token_ids[empty_index] = spec_token_ids
            self.spec_token_ids[last_req_index] = None

649
650
            num_tokens = self.num_tokens[last_req_index]
            self.token_ids_cpu[empty_index, :num_tokens] = self.token_ids_cpu[
651
652
                last_req_index, :num_tokens
            ]
653
            self.is_token_ids[empty_index, :num_tokens] = self.is_token_ids[
654
655
                last_req_index, :num_tokens
            ]
656
            if last_req_index in self.req_prompt_embeds:
657
658
659
                self.req_prompt_embeds[empty_index] = self.req_prompt_embeds.pop(
                    last_req_index
                )
660
            self.num_tokens[empty_index] = num_tokens
661
            self.num_tokens_no_spec[empty_index] = self.num_tokens_no_spec[
662
663
664
665
666
667
                last_req_index
            ]
            self.num_prompt_tokens[empty_index] = self.num_prompt_tokens[last_req_index]
            self.num_computed_tokens_cpu[empty_index] = self.num_computed_tokens_cpu[
                last_req_index
            ]
668
            self.block_table.move_row(last_req_index, empty_index)
669
670

            self.request_lora_mapping[empty_index] = self.request_lora_mapping[
671
672
                last_req_index
            ]
673
674
675

            if self.is_pooling_model:
                last_req_index -= 1
co63oc's avatar
co63oc committed
676
                # Sampling state not used by pooling models.
677
678
679
680
681
                continue

            # Autoregressive models require detailed tracking of condense
            # operations to support logitsprocs
            self.batch_update_builder.moved.append(
682
683
                (last_req_index, empty_index, MoveDirectionality.UNIDIRECTIONAL)
            )
684

685
            self.temperature_cpu[empty_index] = self.temperature_cpu[last_req_index]
686
687
            self.top_p_cpu[empty_index] = self.top_p_cpu[last_req_index]
            self.top_k_cpu[empty_index] = self.top_k_cpu[last_req_index]
688
689
690
691
692
693
694
695
696
697
698
699
            self.frequency_penalties_cpu[empty_index] = self.frequency_penalties_cpu[
                last_req_index
            ]
            self.presence_penalties_cpu[empty_index] = self.presence_penalties_cpu[
                last_req_index
            ]
            self.repetition_penalties_cpu[empty_index] = self.repetition_penalties_cpu[
                last_req_index
            ]
            self.num_accepted_tokens_cpu[empty_index] = self.num_accepted_tokens_cpu[
                last_req_index
            ]
700
701
702
703
            generator = self.generators.pop(last_req_index, None)
            if generator is not None:
                self.generators[empty_index] = generator

704
            # TODO convert these to LogitsProcessors
705
            if self.allowed_token_ids_mask_cpu_tensor is not None:
706
707
708
                self.allowed_token_ids_mask_cpu_tensor[empty_index] = (
                    self.allowed_token_ids_mask_cpu_tensor[last_req_index]
                )
709

710
            bad_words_token_ids = self.bad_words_token_ids.pop(last_req_index, None)
711
712
            if bad_words_token_ids is not None:
                self.bad_words_token_ids[empty_index] = bad_words_token_ids
713

714
715
716
            # Decrement last_req_index since it is now empty.
            last_req_index -= 1

717
        # Trim lists to the batch size.
718
719
        del self._req_ids[num_reqs:]
        del self.req_output_token_ids[num_reqs:]
720
        del self.spec_token_ids[num_reqs:]
721

722
    def refresh_metadata(self):
723
        """Apply any batch updates to sampling metadata."""
724

725
        if self.is_pooling_model:
726
727
728
            batch_changed = self.batch_update_builder.reset()
            if batch_changed:
                self.sampling_metadata = self._make_sampling_metadata()
729
730
731
732
733
            return

        # For non-pooling models - generate and apply logitsprocs update;
        # reset batch update tracking.
        # Update sampling metadata if batch state is changed.
734
735
736
737
738
        batch_update = self.batch_update_builder.get_and_reset(self.num_reqs)
        for logit_proc in self.logitsprocs.all:
            logit_proc.update_state(batch_update)
        if batch_update:
            self.sampling_metadata = self._make_sampling_metadata()
739
740
741

    def _make_sampling_metadata(self) -> SamplingMetadata:
        num_reqs = self.num_reqs
742
        if not self.all_greedy:
743
744
745
            temperature = copy_slice(
                self.temperature_cpu_tensor, self.temperature, num_reqs
            )
746
747
        else:
            temperature = None
748
749
750
751
752
753
754
755
756
        if not self.no_top_p:
            copy_slice(self.top_p_cpu_tensor, self.top_p, num_reqs)
        if not self.no_top_k:
            copy_slice(self.top_k_cpu_tensor, self.top_k, num_reqs)

        if not self.no_penalties:
            # Since syncing these tensors is expensive only copy them
            # if necessary i.e. if there are requests which require
            # penalties to be applied during sampling.
757
758
759
760
761
762
763
764
765
766
767
            copy_slice(
                self.frequency_penalties_cpu_tensor, self.frequency_penalties, num_reqs
            )
            copy_slice(
                self.presence_penalties_cpu_tensor, self.presence_penalties, num_reqs
            )
            copy_slice(
                self.repetition_penalties_cpu_tensor,
                self.repetition_penalties,
                num_reqs,
            )
768

769
770
        needs_prompt_token_ids = (
            not self.no_penalties
771
772
            or self.logits_processing_needs_token_ids[:num_reqs].any()
        )
773
774
775
776
777
        if needs_prompt_token_ids:
            # The prompt tokens are used only for applying penalties or
            # step pooling during the sampling/pooling process.
            # Hence copy these tensors only when there are requests which
            # need penalties/step_pooler to be applied.
778
779
780
            prompt_token_ids = self._make_prompt_token_ids_tensor()
        else:
            prompt_token_ids = None
781

782
783
784
        allowed_token_ids_mask: Optional[torch.Tensor] = None
        if not self.no_allowed_token_ids:
            assert self.allowed_token_ids_mask is not None
785
786
787
788
789
            copy_slice(
                self.allowed_token_ids_mask_cpu_tensor,
                self.allowed_token_ids_mask,
                num_reqs,
            )
790
791
            allowed_token_ids_mask = self.allowed_token_ids_mask[:num_reqs]

792
        return SamplingMetadata(
793
            temperature=temperature,
794
795
            all_greedy=self.all_greedy,
            all_random=self.all_random,
796
797
            top_p=None if self.no_top_p else self.top_p[:num_reqs],
            top_k=None if self.no_top_k else self.top_k[:num_reqs],
798
799
            generators=self.generators,
            max_num_logprobs=self.max_num_logprobs,
800
801
802
803
            prompt_token_ids=prompt_token_ids,
            frequency_penalties=self.frequency_penalties[:num_reqs],
            presence_penalties=self.presence_penalties[:num_reqs],
            repetition_penalties=self.repetition_penalties[:num_reqs],
804
            output_token_ids=cast(list[list[int]], self.req_output_token_ids),
805
            spec_token_ids=cast(list[list[int]], self.spec_token_ids),
806
            no_penalties=self.no_penalties,
807
            allowed_token_ids_mask=allowed_token_ids_mask,
808
            bad_words_token_ids=self.bad_words_token_ids,
809
            logitsprocs=self.logitsprocs,
810
811
        )

812
813
814
815
816
817
    def get_pooling_params(self) -> list[PoolingParams]:
        assert len(self.req_ids) == len(self.pooling_params)
        return [self.pooling_params[req_id] for req_id in self.req_ids]

    def get_pooling_metadata(self) -> PoolingMetadata:
        pooling_params = self.get_pooling_params()
818
819

        return PoolingMetadata(
820
            prompt_lens=torch.from_numpy(self.num_prompt_tokens[: self.num_reqs]),
821
822
823
824
            prompt_token_ids=self.sampling_metadata.prompt_token_ids,
            pooling_params=pooling_params,
        )

825
    def _make_prompt_token_ids_tensor(self) -> torch.Tensor:
826
827
        num_reqs = self.num_reqs
        max_prompt_len = self.num_prompt_tokens[:num_reqs].max()
828
829
830
831
        prompt_token_ids_cpu_tensor = torch.empty(
            (self.num_reqs, max_prompt_len),
            device="cpu",
            dtype=torch.int64,
832
833
            pin_memory=self.pin_memory,
        )
834
        prompt_token_ids = prompt_token_ids_cpu_tensor.numpy()
835
        prompt_token_ids[:] = self.token_ids_cpu[:num_reqs, :max_prompt_len]
836
837
        # Use the value of vocab_size as a pad since we don't have a
        # token_id of this value.
838
        for i in range(num_reqs):
839
840
            prompt_token_ids[i, self.num_prompt_tokens[i] :] = self.vocab_size
        return prompt_token_ids_cpu_tensor.to(device=self.device, non_blocking=True)
841

842
843
    def make_lora_inputs(
        self, num_scheduled_tokens: np.ndarray
844
    ) -> tuple[tuple[int, ...], tuple[int, ...], set[LoRARequest]]:
845
846
847
848
849
850
851
852
853
854
855
        """
        Given the num_scheduled_tokens for each request in the batch, return
        datastructures used to activate the current LoRAs.
        Returns:
            1. prompt_lora_mapping: A tuple of size self.num_reqs where,
               prompt_lora_mapping[i] is the LoRA id to use for the ith prompt.
            2. token_lora_mapping: A tuple of size np.sum(num_scheduled_tokens)
               where, token_lora_mapping[i] is the LoRA id to use for ith token.
            3. lora_requests: Set of relevant LoRA requests.
        """

856
        req_lora_mapping = self.request_lora_mapping[: self.num_reqs]
857
        prompt_lora_mapping = tuple(req_lora_mapping)
858
        token_lora_mapping = tuple(req_lora_mapping.repeat(num_scheduled_tokens))
859
        active_lora_requests: set[LoRARequest] = set(
860
861
            self.lora_id_to_lora_request.values()
        )
862
863
864

        return prompt_lora_mapping, token_lora_mapping, active_lora_requests

865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
    @property
    def num_reqs(self) -> int:
        return len(self.req_id_to_index)

    @property
    def all_greedy(self) -> bool:
        return len(self.random_reqs) == 0

    @property
    def all_random(self) -> bool:
        return len(self.greedy_reqs) == 0

    @property
    def no_top_p(self) -> bool:
        return len(self.top_p_reqs) == 0

    @property
    def no_top_k(self) -> bool:
        return len(self.top_k_reqs) == 0

885
886
    @property
    def no_penalties(self) -> bool:
887
888
889
890
891
        return (
            len(self.presence_penalties_reqs) == 0
            and len(self.frequency_penalties_reqs) == 0
            and len(self.repetition_penalties_reqs) == 0
        )
892

893
    @property
894
895
    def max_num_logprobs(self) -> Optional[int]:
        return max(self.num_logprobs.values()) if self.num_logprobs else None
896
897
898

    @property
    def no_prompt_logprob(self) -> bool:
899
        return not self.num_prompt_logprobs
900
901
902
903

    @property
    def no_allowed_token_ids(self) -> bool:
        return len(self.has_allowed_token_ids) == 0