batch_expansion.py 18.8 KB
Newer Older
1
from array import array
2
from itertools import chain, count
3
from typing import Iterator, List, Optional, Tuple
4
5
6

import torch

7
from vllm import SamplingParams
8
9
10
from vllm.sequence import (VLLM_TOKEN_ID_ARRAY_TYPE, ExecuteModelRequest,
                           SamplerOutput, SequenceData, SequenceGroupMetadata,
                           get_all_seq_ids)
11
12
from vllm.spec_decode.interfaces import (SpeculativeProposals,
                                         SpeculativeScorer, SpeculativeScores)
13
from vllm.spec_decode.util import (nvtx_range, sampler_output_to_torch,
14
                                   split_batch_by_proposal_len)
15
from vllm.worker.worker_base import WorkerBase
16
17
18
19
20

SeqId = int
TargetSeqId = int
TokenId = int

21
22
DEFAULT_SIMPLE_SAMPLING_PARAMS = SamplingParams()

23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

class BatchExpansionTop1Scorer(SpeculativeScorer):
    """Implements a speculative scorer that uses batch expansion to get
    probabilities of speculative tokens according to the scoring model.

    Batch expansion converts a list of sequences and multiple query positions
    to a new batch of sequences, each with a single query position. This allows
    for MQA-like scoring in speculative decoding without requiring an MQA
    kernel.

    It is strictly less efficient than MQA scoring.

    It only supports scoring the top1 proposal tokens of the proposer, instead
    of topk/tree.
    """

39
40
    def __init__(self, scorer_worker: WorkerBase, device: str,
                 vocab_size: int):
41
42
43
44
45
46
47
        self._scorer_worker = scorer_worker
        self._device = device
        self._vocab_size = vocab_size

    @nvtx_range("BatchExpansionTop1Scorer.score_proposals")
    def score_proposals(
        self,
48
        execute_model_req: ExecuteModelRequest,
49
50
51
52
53
54
55
56
57
58
59
60
        proposals: SpeculativeProposals,
    ) -> SpeculativeScores:
        """Score the proposed tokens via the scorer model.

        This converts each input sequence to a set of k+1 target sequences. The
        target sequences have the unique continuations to be scored and a
        unique sequence ID that is different from all input sequence ids.

        If a speculative sequence length would exceed the max model length, then
        no speculation is produced for that sequence.

        Args:
61
            execute_model_req: The execution request.
62
63
64
65
66
67
68
69
70
71
            proposals: The speculative proposals to score.
        Returns:
            SpeculativeScores: The scores of each speculative token, along with
                which sequences were ignored during scoring.
        """

        # TODO(cade) perform this on GPU to remove blocking call.
        proposal_lens_list = proposals.proposal_lens.tolist()
        proposal_token_ids_list = proposals.proposal_token_ids.tolist()

72
73
74
75
76
77
        # Filter the list to ignore -1 proposals.
        proposal_token_ids_list_without_skips = [
            proposals for proposals in proposal_token_ids_list
            if -1 not in proposals
        ]

78
79
        (spec_indices, non_spec_indices, target_seq_group_metadata_list,
         num_scoring_tokens) = self._expand_batch(
80
             seq_group_metadata_list=execute_model_req.seq_group_metadata_list,
81
             proposal_token_ids_list=proposal_token_ids_list_without_skips,
82
83
             proposal_lens_list=proposal_lens_list,
         )
84
85

        target_sampler_output = self._scorer_worker.execute_model(
86
            execute_model_req=execute_model_req.clone(
87
                seq_group_metadata_list=target_seq_group_metadata_list))
88
89
        assert len(target_sampler_output) == 1, "expected single-step output"
        target_sampler_output = target_sampler_output[0]
90

91
92
93
94
95
96
97
98
99
100
        (all_tokens, all_probs, spec_logprobs,
         all_hidden_states) = self._contract_batch(
             contracted_bs=len(execute_model_req.seq_group_metadata_list),
             target_sampler_output=target_sampler_output,
             proposals=proposals,
             num_scoring_tokens=num_scoring_tokens,
             non_spec_indices=non_spec_indices,
             spec_indices=spec_indices,
             k=execute_model_req.num_lookahead_slots,
         )
101
102
103
104

        return SpeculativeScores(
            probs=all_probs,
            token_ids=all_tokens,
105
            logprobs=spec_logprobs,
106
            hidden_states=all_hidden_states,
107
108
109
110
111
        )

    def _expand_batch(
        self,
        seq_group_metadata_list: List[SequenceGroupMetadata],
112
        proposal_token_ids_list: List[List[TokenId]],
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
        proposal_lens_list: List[int],
    ) -> Tuple[List[int], List[int], List[SequenceGroupMetadata], int]:
        """Given the input sequences and potentially multiple corresponding
        proposal tokens, create a new batch where each sequence has a single
        query token.
        """

        # vLLM currently only supports proposal lens equal to zero or the batch
        # proposal len. This adds some complexity (splitting the batch into spec
        # and non spec sequences) and should be removed in the future. It can be
        # done by supporting per-sequence proposal lens.
        spec_seqs, spec_indices = split_batch_by_proposal_len(
            seq_group_metadata_list,
            proposal_lens_list,
            select_proposal_len_zero=False)
        non_spec_seqs, non_spec_indices = split_batch_by_proposal_len(
            seq_group_metadata_list,
            proposal_lens_list,
            select_proposal_len_zero=True)

        target_seq_group_metadata_list = self._create_scoring_model_input(
134
135
136
137
138
139
140
141
            seq_group_metadata_list=spec_seqs,
            proposal_token_ids=proposal_token_ids_list,
            # NOTE: We determine the seq ids in the expanded batch using the
            # full seq_group_metadata_list, instead of only spec_seqs.
            target_seq_ids_iter=self._create_target_seq_id_iterator(
                seq_ids=get_all_seq_ids(seq_group_metadata_list)),
        )

142
143
144
        num_scoring_tokens = len(target_seq_group_metadata_list)
        target_seq_group_metadata_list.extend(non_spec_seqs)

145
146
        return (spec_indices, non_spec_indices, target_seq_group_metadata_list,
                num_scoring_tokens)
147

148
    def _contract_batch(
149
150
151
152
153
        self, contracted_bs: int, target_sampler_output: SamplerOutput,
        proposals: SpeculativeProposals, num_scoring_tokens: int,
        non_spec_indices: List[int], spec_indices: List[int], k: int
    ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor,
               Optional[torch.Tensor]]:
154
155
156
        """Contract the expanded batch back into its original size.
        This maps the scores of speculative tokens back to their original
        sequences.
157

158
159
160
        contracted_bs is the original batch size, and the batch size that the
        target_sampler_output will be contracted to.
        """
161
        (target_token_ids, target_probs, target_logprobs, target_hidden_states,
162
         non_spec_target_token_ids, non_spec_target_probs,
163
164
         non_spec_target_logprobs,
         non_spec_target_hidden_states) = self._split_scoring_output(
165
166
167
168
             target_sampler_output, num_scoring_tokens)

        # Map distinct sequences used to score each token
        # of shape [batch_size * k + 1] back to [batch_size, k + 1].
169
170
171
172
173
174
175
        expanded_batch_size, k = proposals.proposal_token_ids.shape

        # The number of tokens in the expanded batch used for speculation is
        # equal to the total expanded batch size minus the number of samples for
        # non-speculative sequences.
        non_spec_expanded_bs, _ = non_spec_target_token_ids.shape
        spec_expanded_bs = expanded_batch_size - non_spec_expanded_bs
176

177
178
179
180
181
        target_token_ids = target_token_ids.reshape(spec_expanded_bs, k + 1)
        target_probs = target_probs.reshape(*target_token_ids.shape,
                                            self._vocab_size)
        target_logprobs = target_logprobs.reshape(target_probs.shape)

182
183
184
185
        if target_hidden_states is not None:
            target_hidden_states = target_hidden_states.reshape(
                spec_expanded_bs, k + 1, target_hidden_states.shape[-1])

186
187
188
189
190
        all_tokens = target_token_ids.new_full(size=(contracted_bs, k + 1),
                                               fill_value=-1)
        all_probs = target_probs.new_zeros(*all_tokens.shape, self._vocab_size)
        all_logprobs = target_logprobs.new_full(size=all_probs.shape,
                                                fill_value=-float("inf"))
191

192
193
194
195
196
197
        if target_sampler_output.hidden_states is not None:
            all_hidden_states = target_hidden_states.new_zeros(
                size=(contracted_bs, k + 1, target_hidden_states.shape[-1]))
        else:
            all_hidden_states = None

198
        if non_spec_indices:
199
            all_tokens[non_spec_indices, :1] = non_spec_target_token_ids
200
            all_probs[non_spec_indices, :1, :] = non_spec_target_probs
201
            all_logprobs[non_spec_indices, :1, :] = non_spec_target_logprobs
202

203
204
205
206
            if all_hidden_states is not None:
                all_hidden_states[
                    non_spec_indices, :1, :] = non_spec_target_hidden_states

207
208
209
        if spec_indices:
            all_tokens[spec_indices] = target_token_ids
            all_probs[spec_indices] = target_probs
210
            all_logprobs[spec_indices] = target_logprobs
211

212
213
214
215
            if all_hidden_states is not None:
                all_hidden_states[spec_indices] = target_hidden_states

        return all_tokens, all_probs, all_logprobs, all_hidden_states
216
217

    def _create_scoring_model_input(
218
219
220
221
        self,
        seq_group_metadata_list: List[SequenceGroupMetadata],
        proposal_token_ids: List[List[TokenId]],  # shape: [batch_size, k]
        target_seq_ids_iter: Iterator[TargetSeqId],
222
223
224
    ) -> List[SequenceGroupMetadata]:
        """Given the original input sequences and proposed tokens from the draft
        model, create a list of target sequences that can be used for scoring.
225
226
227
228

        target_seq_ids_iter provides sequence ids for the expanded batch,
        fulfilling the requirement that no seq id in the expanded batch is equal
        to the seq id in the original batch.
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
        """

        if not seq_group_metadata_list:
            return []

        target_seq_group_metadata = list(
            chain.from_iterable(
                self._create_target_seq_group_metadata(
                    seq_group_metadata,
                    proposal_token_ids,
                    i,
                    target_seq_ids_iter,
                ) for i, seq_group_metadata in enumerate(
                    seq_group_metadata_list)))

        return target_seq_group_metadata

    def _create_target_seq_group_metadata(
        self,
        input_seq_group_metadata: SequenceGroupMetadata,
249
        proposal_token_ids: List[List[TokenId]],  # shape: [batch_size, k]
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
        batch_index: int,
        target_seq_ids_iter: Iterator[TargetSeqId],
    ) -> List[SequenceGroupMetadata]:
        """Given an input sequence group metadata and a list of draft tokens,
        create a list of target SequenceGroupMetadata, one for each
        token id that needs to be scored.

        Naive speculative decoding requires K target model scores, one for each
        draft model token. However one can add a bonus token such that if each
        token is accepted, then a final token may be sampled from the model.
        This function creates K+1 target SequenceGroupMetadata to take
        advantage of the bonus token.
        """
        assert not input_seq_group_metadata.is_prompt, (
            "Speculating on "
            "prompts not yet supported")
        assert len(input_seq_group_metadata.seq_data) == 1, (
            "Beam search "
            "not supported in speculative decoding")
        input_seq_id = next(iter(input_seq_group_metadata.seq_data.keys()))

        token_ids_to_score = self._get_token_ids_to_score(
            proposal_token_ids[batch_index])

274
275
276
277
278
279
280
281
282
283
        # Use simpler sampling parameters apart from for final token
        # (in particular don't do seeded sampling) since those sampled tokens
        # aren't used.
        # We don't replace the sampling_params in the greedy case because
        # this also controls whether the probs get modified in the sampler
        # (see use of _modify_greedy_probs_inplace there).
        sampling_params = input_seq_group_metadata.sampling_params
        non_bonus_sampling_params = DEFAULT_SIMPLE_SAMPLING_PARAMS \
            if sampling_params.temperature else sampling_params

284
        target_seq_group_metadata_list: List[SequenceGroupMetadata] = []
285
286
287
288
        last_index = len(token_ids_to_score) - 1
        for i, token_ids in enumerate(token_ids_to_score):
            target_sampling_params = sampling_params if i == last_index \
                else non_bonus_sampling_params
289
290
291
292
293
294
            target_seq_group_metadata_list.append(
                self._create_single_target_seq_group_metadata(
                    input_seq_group_metadata,
                    input_seq_id,
                    next(target_seq_ids_iter),
                    token_ids,
295
                    sampling_params=target_sampling_params,
296
297
298
299
                ))

        return target_seq_group_metadata_list

300
    @staticmethod
301
302
303
304
305
    def _create_single_target_seq_group_metadata(
        seq_group_metadata: SequenceGroupMetadata,
        seq_id: SeqId,
        target_seq_id: TargetSeqId,
        token_ids: List[TokenId],
306
        sampling_params: SamplingParams,
307
308
309
310
311
312
313
314
315
316
317
    ) -> SequenceGroupMetadata:
        """Create a single target SequenceGroupMetadata.

        Args:
            seq_group_metadata: The metadata for the input sequence.
            seq_id: The input sequence ID.
            target_seq_id: The corresponding target sequence ID.
            token_ids: The list of token ids that are to be appended to the
                input sequence.
        """
        seq_data = seq_group_metadata.seq_data[seq_id]
318
        prompt_token_ids = seq_data.prompt_token_ids_array
319
320
        new_output_token_ids = [*seq_data.get_output_token_ids(), *token_ids]

321
322
323
        new_seq_data_dict = {
            target_seq_id:
            SequenceData(
324
325
326
                prompt_token_ids,
                _output_token_ids=array(VLLM_TOKEN_ID_ARRAY_TYPE,
                                        new_output_token_ids),
327
328
329
330
331
332
333
334
335
            ),
        }
        # This is a hack. Technically, spec decoding should compute
        # num_lookahead slots at one shot, but instead, it expands the batch
        # and evaluate one by one right now. context_len is seq_len - 1 because
        # the kv cache is filled by a previous batch in the batch expansion.
        for data in new_seq_data_dict.values():
            data.update_num_computed_tokens(data.get_len() - 1)

336
337
338
        return SequenceGroupMetadata(
            request_id=seq_group_metadata.request_id,
            is_prompt=seq_group_metadata.is_prompt,
339
            seq_data=new_seq_data_dict,
340
            sampling_params=sampling_params,
341
342
343
344
            block_tables={
                target_seq_id: seq_group_metadata.block_tables[seq_id],
            },
            lora_request=None,
345
            token_chunk_size=1,
346
347
348
349
        )

    def _split_scoring_output(
        self, sampler_output: SamplerOutput, num_scoring_tokens: int
350
351
352
    ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor,
               Optional[torch.Tensor], torch.Tensor, torch.Tensor,
               torch.Tensor, Optional[torch.Tensor]]:
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
        """Split the target model output into speculative and non-speculative
        output.
        """

        # vLLM currently only supports proposal lens equal to zero or the batch
        # proposal len. This adds some complexity (splitting the batch into spec
        # and non spec sequences) and should be removed in the future. It can be
        # done by supporting per-sequence proposal lens.
        #
        # First samples are from speculative scoring, latter samples are non-
        # speculative samples.
        split_sizes = [
            num_scoring_tokens,
            sampler_output.sampled_token_ids.numel() - num_scoring_tokens
        ]
        (spec_probs, non_spec_probs
         ) = sampler_output.sampled_token_probs.split(split_sizes)
        (spec_sampled_tokens, non_spec_sampled_tokens
         ) = sampler_output.sampled_token_ids.flatten().split(split_sizes)
372
373
374
375
        (
            spec_logprobs,
            non_spec_logprobs,
        ) = sampler_output.logprobs.split(split_sizes)
376

377
378
379
380
381
382
383
384
        if sampler_output.hidden_states is not None:
            (
                spec_hidden_states,
                non_spec_hidden_states,
            ) = sampler_output.hidden_states.split(split_sizes)
        else:
            spec_hidden_states, non_spec_hidden_states = None, None

385
386
387
        # Convert scores to tensors.
        sampler_output.sampled_token_probs = spec_probs
        sampler_output.sampled_token_ids = spec_sampled_tokens
388
        sampler_output.logprobs = spec_logprobs
389
390
391
392
        sampler_output.hidden_states = spec_hidden_states
        (target_token_ids, target_probs, target_logprobs,
         target_hidden_states) = sampler_output_to_torch([sampler_output],
                                                         True)
393
394
395
396

        # Convert non-speculative output tokens to tensors.
        sampler_output.sampled_token_probs = non_spec_probs
        sampler_output.sampled_token_ids = non_spec_sampled_tokens
397
        sampler_output.logprobs = non_spec_logprobs
398
        sampler_output.hidden_states = non_spec_hidden_states
399
        (non_spec_target_token_ids, non_spec_target_probs,
400
401
402
         non_spec_target_logprobs,
         non_spec_target_hidden_states) = sampler_output_to_torch(
             [sampler_output], True)
403
404

        return (target_token_ids, target_probs, target_logprobs,
405
406
407
                target_hidden_states, non_spec_target_token_ids,
                non_spec_target_probs, non_spec_target_logprobs,
                non_spec_target_hidden_states)
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438

    def _create_target_seq_id_iterator(
            self, seq_ids: List[SeqId]) -> Iterator[TargetSeqId]:
        """Create an iterator for creating target sequence ids.
        Target sequence ids are distinct from sequence ids because we create a
        distinct target sequence id for each proposal token to be scored.

        This implementation increments a counter starting at 1 + max of all
        provided input sequence ids.
        """
        return count(start=max(seq_ids) + 1)

    def _get_token_ids_to_score(
        self,
        full_spec_token_ids: List[TokenId]  # shape: [k]
    ) -> List[List[TokenId]]:
        """Given an int tensor of proposal token ids, return a list of
        token ids that should be scored.

        Returns k+1 output lists. The additional one is used for generating the
        bonus token.

        Example:
            Input: [0, 1, 2, 3] (k=4)
            Output: (k+1 lists)
                []
                [0]
                [0, 1]
                [0, 1, 2]
                [0, 1, 2, 3]
        """
439
        empty_token_ids: List[TokenId] = []
440
441
442
443
444
445
446

        token_ids_to_score = [empty_token_ids]
        token_ids_to_score.extend([
            full_spec_token_ids[:i + 1]
            for i in range(len(full_spec_token_ids))
        ])
        return token_ids_to_score