sequence.py 19.4 KB
Newer Older
1
"""Sequence and its related classes."""
2
import copy
Woosuk Kwon's avatar
Woosuk Kwon committed
3
import enum
4
from dataclasses import dataclass
5
from typing import TYPE_CHECKING, Dict, List, Optional, Union
Woosuk Kwon's avatar
Woosuk Kwon committed
6

Woosuk Kwon's avatar
Woosuk Kwon committed
7
from vllm.block import LogicalTokenBlock
8
from vllm.lora.request import LoRARequest
9
from vllm.sampling_params import SamplingParams
Woosuk Kwon's avatar
Woosuk Kwon committed
10

11
12
if TYPE_CHECKING:
    import torch
13

14
15
    from vllm.spec_decode.metrics import SpecDecodeWorkerMetrics

16
17
18

@dataclass
class Logprob:
19
20
21
22
23
24
25
    """Infos for supporting OpenAI compatible logprobs and token ranks.

    Attributes:
        logprob: The logprob of chosen token
        rank: The vocab rank of chosen token (>=1)
        decoded_token: The decoded chosen token index
    """
26
    logprob: float
27
    rank: Optional[int] = None
28
29
30
31
32
    decoded_token: Optional[str] = None


PromptLogprobs = List[Optional[Dict[int, Logprob]]]
SampleLogprobs = List[Dict[int, Logprob]]
33

Woosuk Kwon's avatar
Woosuk Kwon committed
34
35

class SequenceStatus(enum.Enum):
36
    """Status of a sequence."""
37
    WAITING = enum.auto()
Woosuk Kwon's avatar
Woosuk Kwon committed
38
    RUNNING = enum.auto()
Woosuk Kwon's avatar
Woosuk Kwon committed
39
    SWAPPED = enum.auto()
Zhuohan Li's avatar
Zhuohan Li committed
40
41
    FINISHED_STOPPED = enum.auto()
    FINISHED_LENGTH_CAPPED = enum.auto()
42
    FINISHED_ABORTED = enum.auto()
Lily Liu's avatar
Lily Liu committed
43
    FINISHED_IGNORED = enum.auto()
Zhuohan Li's avatar
Zhuohan Li committed
44
45
46
47
48
49

    @staticmethod
    def is_finished(status: "SequenceStatus") -> bool:
        return status in [
            SequenceStatus.FINISHED_STOPPED,
            SequenceStatus.FINISHED_LENGTH_CAPPED,
50
            SequenceStatus.FINISHED_ABORTED,
51
            SequenceStatus.FINISHED_IGNORED,
Zhuohan Li's avatar
Zhuohan Li committed
52
53
54
55
56
57
58
59
        ]

    @staticmethod
    def get_finished_reason(status: "SequenceStatus") -> Union[str, None]:
        if status == SequenceStatus.FINISHED_STOPPED:
            finish_reason = "stop"
        elif status == SequenceStatus.FINISHED_LENGTH_CAPPED:
            finish_reason = "length"
60
61
        elif status == SequenceStatus.FINISHED_ABORTED:
            finish_reason = "abort"
Lily Liu's avatar
Lily Liu committed
62
        elif status == SequenceStatus.FINISHED_IGNORED:
63
64
65
            # The ignored sequences are the sequences whose prompt lengths
            # are longer than the model's length cap. Therefore, the stop
            # reason should also be "length" as in OpenAI API.
Lily Liu's avatar
Lily Liu committed
66
            finish_reason = "length"
Zhuohan Li's avatar
Zhuohan Li committed
67
68
69
        else:
            finish_reason = None
        return finish_reason
Woosuk Kwon's avatar
Woosuk Kwon committed
70

71

72
73
74
75
@dataclass
class RequestMetrics:
    """Metrics associated with a request.

76
    Attributes:
77
78
79
80
81
82
83
84
85
86
87
88
89
90
        arrival_time: The time when the request arrived.
        first_scheduled_time: The time when the request was first scheduled.
        first_token_time: The time when the first token was generated.
        time_in_queue: The time the request spent in the queue.
        finished_time: The time when the request was finished.
    """
    arrival_time: float
    last_token_time: float
    first_scheduled_time: Optional[float]
    first_token_time: Optional[float]
    time_in_queue: Optional[float]
    finished_time: Optional[float] = None


91
class SequenceData:
92
93
94
95
    """Data associated with a sequence.

    Args:
        prompt_token_ids: The token IDs of the prompt.
96
97
        output_token_ids: The token IDs of the output. Set to an empty list if
            None.
98
99
100
101
102
103

    Attributes:
        prompt_token_ids: The token IDs of the prompt.
        output_token_ids: The token IDs of the output.
        cumulative_logprob: The cumulative log probability of the output.
    """
104
105
106
107

    def __init__(
        self,
        prompt_token_ids: List[int],
108
        output_token_ids: Optional[List[int]] = None,
109
    ) -> None:
110
111
112
        if output_token_ids is None:
            output_token_ids = []

113
        self.prompt_token_ids = prompt_token_ids
114
        self.output_token_ids = output_token_ids
115
116
        self.cumulative_logprob = 0.0

117
    def append_token_id(self, token_id: int, logprob: float) -> None:
118
119
        self.output_token_ids.append(token_id)
        self.cumulative_logprob += logprob
120
121
122
123

    def get_len(self) -> int:
        return len(self.output_token_ids) + len(self.prompt_token_ids)

124
125
126
    def get_prompt_len(self) -> int:
        return len(self.prompt_token_ids)

127
128
129
    def get_output_len(self) -> int:
        return len(self.output_token_ids)

130
131
132
133
134
135
136
137
    def get_token_ids(self) -> List[int]:
        return self.prompt_token_ids + self.output_token_ids

    def get_last_token_id(self) -> int:
        if not self.output_token_ids:
            return self.prompt_token_ids[-1]
        return self.output_token_ids[-1]

138
139
140
141
142
143
    def get_prompt_token_ids(self) -> int:
        return self.prompt_token_ids

    def get_output_token_ids(self) -> int:
        return self.output_token_ids

144
145
146
    def __repr__(self) -> str:
        return (f"SequenceData("
                f"prompt_token_ids={self.prompt_token_ids}, "
147
148
                f"output_token_ids={self.output_token_ids}, "
                f"cumulative_logprob={self.cumulative_logprob})")
149
150


Woosuk Kwon's avatar
Woosuk Kwon committed
151
class Sequence:
152
153
154
155
156
157
158
159
    """Stores the data, status, and block information of a sequence.

    Args:
        seq_id: The ID of the sequence.
        prompt: The prompt of the sequence.
        prompt_token_ids: The token IDs of the prompt.
        block_size: The block size of the sequence. Should be the same as the
            block size used by the block manager and cache engine.
160
        lora_request: LoRA request.
161
    """
Woosuk Kwon's avatar
Woosuk Kwon committed
162
163
164
165

    def __init__(
        self,
        seq_id: int,
166
        prompt: str,
167
        prompt_token_ids: List[int],
Woosuk Kwon's avatar
Woosuk Kwon committed
168
        block_size: int,
Cade Daniel's avatar
Cade Daniel committed
169
        eos_token_id: Optional[int] = None,
170
        lora_request: Optional[LoRARequest] = None,
Woosuk Kwon's avatar
Woosuk Kwon committed
171
172
    ) -> None:
        self.seq_id = seq_id
173
        self.prompt = prompt
Woosuk Kwon's avatar
Woosuk Kwon committed
174
        self.block_size = block_size
175
        self.eos_token_id = eos_token_id
176
        self.lora_request = lora_request
Woosuk Kwon's avatar
Woosuk Kwon committed
177

178
        self.data = SequenceData(prompt_token_ids)
179
        self.output_logprobs: SampleLogprobs = []
180
        self.output_text = ""
181

Woosuk Kwon's avatar
Woosuk Kwon committed
182
        self.logical_token_blocks: List[LogicalTokenBlock] = []
183
        # Initialize the logical token blocks with the prompt token ids.
184
        self._append_tokens_to_blocks(prompt_token_ids)
185
        self.status = SequenceStatus.WAITING
Woosuk Kwon's avatar
Woosuk Kwon committed
186

187
188
189
190
191
192
        # Used for incremental detokenization
        self.prefix_offset = 0
        self.read_offset = 0
        # Input + output tokens
        self.tokens: Optional[List[str]] = None

193
194
195
196
    @property
    def lora_int_id(self) -> int:
        return self.lora_request.lora_int_id if self.lora_request else 0

197
198
    def hash_of_block(self, logical_idx: int) -> int:
        # Compute the number of tokens in the sequence
199
200
        # TODO: The current hashing function is O(L^2). We should optimize
        # this in the future.
201
        num_tokens = self.num_hashed_tokens_of_block(logical_idx)
202
203
        return hash(
            (tuple(self.data.get_token_ids()[0:num_tokens]), self.lora_int_id))
204
205
206
207

    def num_hashed_tokens_of_block(self, logical_idx: int):
        return logical_idx * self.block_size + self.block_size

208
    def _append_logical_block(self) -> None:
Woosuk Kwon's avatar
Woosuk Kwon committed
209
210
211
212
213
214
        block = LogicalTokenBlock(
            block_number=len(self.logical_token_blocks),
            block_size=self.block_size,
        )
        self.logical_token_blocks.append(block)

215
    def _append_tokens_to_blocks(self, token_ids: List[int]) -> None:
216
217
        cursor = 0
        while cursor < len(token_ids):
Woosuk Kwon's avatar
Woosuk Kwon committed
218
            if not self.logical_token_blocks:
219
                self._append_logical_block()
Woosuk Kwon's avatar
Woosuk Kwon committed
220
221
222

            last_block = self.logical_token_blocks[-1]
            if last_block.is_full():
223
                self._append_logical_block()
Woosuk Kwon's avatar
Woosuk Kwon committed
224
225
226
                last_block = self.logical_token_blocks[-1]

            num_empty_slots = last_block.get_num_empty_slots()
227
228
229
            last_block.append_tokens(token_ids[cursor:cursor +
                                               num_empty_slots])
            cursor += num_empty_slots
Woosuk Kwon's avatar
Woosuk Kwon committed
230

231
232
233
    def append_token_id(
        self,
        token_id: int,
234
        logprobs: Dict[int, Logprob],
235
    ) -> None:
236
        assert token_id in logprobs
237
        self._append_tokens_to_blocks([token_id])
238
        self.output_logprobs.append(logprobs)
239
        self.data.append_token_id(token_id, logprobs[token_id].logprob)
240

Woosuk Kwon's avatar
Woosuk Kwon committed
241
    def get_len(self) -> int:
242
        return self.data.get_len()
Woosuk Kwon's avatar
Woosuk Kwon committed
243

244
245
246
    def get_prompt_len(self) -> int:
        return self.data.get_prompt_len()

247
248
249
    def get_output_len(self) -> int:
        return self.data.get_output_len()

Woosuk Kwon's avatar
Woosuk Kwon committed
250
    def get_token_ids(self) -> List[int]:
251
        return self.data.get_token_ids()
Woosuk Kwon's avatar
Woosuk Kwon committed
252

253
254
255
    def get_prompt_token_ids(self) -> List[int]:
        return self.data.get_prompt_token_ids()

256
    def get_last_token_id(self) -> int:
257
        return self.data.get_last_token_id()
258

259
260
261
262
263
264
    def get_output_token_ids(self) -> List[int]:
        return self.data.output_token_ids

    def get_cumulative_logprob(self) -> float:
        return self.data.cumulative_logprob

265
    def get_beam_search_score(self,
266
                              length_penalty: float = 1.0,
267
268
269
270
271
272
273
274
275
276
                              seq_len: Optional[int] = None,
                              eos_token_id: Optional[int] = None) -> float:
        """Calculate the beam search score with length penalty.

        Adapted from

        https://github.com/huggingface/transformers/blob/ccb92be23def445f2afdea94c31286f84b89eb5b/src/transformers/generation/beam_search.py#L938
        """
        if seq_len is None:
            seq_len = self.get_len()
277
            # NOTE: HF implementation does not count the EOS token
278
279
280
281
282
283
            # towards the length, we align with that here for testing.
            if (eos_token_id is not None
                    and self.get_last_token_id() == eos_token_id):
                seq_len -= 1
        return self.get_cumulative_logprob() / (seq_len**length_penalty)

284
285
286
    def is_finished(self) -> bool:
        return SequenceStatus.is_finished(self.status)

287
288
289
290
    def fork(self, new_seq_id: int) -> "Sequence":
        new_seq = copy.deepcopy(self)
        new_seq.seq_id = new_seq_id
        return new_seq
291

Woosuk Kwon's avatar
Woosuk Kwon committed
292
    def __repr__(self) -> str:
293
294
295
        return (f"Sequence(seq_id={self.seq_id}, "
                f"status={self.status.name}, "
                f"num_blocks={len(self.logical_token_blocks)})")
Woosuk Kwon's avatar
Woosuk Kwon committed
296

Woosuk Kwon's avatar
Woosuk Kwon committed
297

Nick Hill's avatar
Nick Hill committed
298
299
300
301
302
303
304
305
@dataclass
class SequenceGroupState:
    """Mutable state tied to a specific sequence group"""

    # torch.Generator used in seeded sampling
    generator: Optional = None


Woosuk Kwon's avatar
Woosuk Kwon committed
306
class SequenceGroup:
307
308
309
310
311
312
313
    """A group of sequences that are generated from the same prompt.

    Args:
        request_id: The ID of the request.
        seqs: The list of sequences.
        sampling_params: The sampling parameters used to generate the outputs.
        arrival_time: The arrival time of the request.
314
        lora_request: LoRA request.
315
    """
Woosuk Kwon's avatar
Woosuk Kwon committed
316
317
318

    def __init__(
        self,
319
        request_id: str,
Woosuk Kwon's avatar
Woosuk Kwon committed
320
        seqs: List[Sequence],
321
        sampling_params: SamplingParams,
322
        arrival_time: float,
323
        lora_request: Optional[LoRARequest] = None,
Woosuk Kwon's avatar
Woosuk Kwon committed
324
    ) -> None:
325
        self.request_id = request_id
326
        self.seqs_dict = {seq.seq_id: seq for seq in seqs}
327
        self.sampling_params = sampling_params
328
329
330
331
332
        self.metrics = RequestMetrics(arrival_time=arrival_time,
                                      last_token_time=arrival_time,
                                      first_scheduled_time=None,
                                      first_token_time=None,
                                      time_in_queue=None)
333
        self.lora_request = lora_request
334
        self.prompt_logprobs: Optional[PromptLogprobs] = None
Nick Hill's avatar
Nick Hill committed
335
        self.state = SequenceGroupState()
336
337
338
339
340
341
342
343
344
345
346
347

    @property
    def prompt(self) -> str:
        # All sequences in the group should have the same prompt.
        # We use the prompt of an arbitrary sequence.
        return next(iter(self.seqs_dict.values())).prompt

    @property
    def prompt_token_ids(self) -> List[int]:
        # All sequences in the group should have the same prompt.
        # We use the prompt of an arbitrary sequence.
        return next(iter(self.seqs_dict.values())).data.prompt_token_ids
Woosuk Kwon's avatar
Woosuk Kwon committed
348

349
350
351
352
    @property
    def lora_int_id(self) -> int:
        return self.lora_request.lora_int_id if self.lora_request else 0

353
354
    def get_last_latency(self, now: float) -> float:
        """Gets last token latency for Request level timings."""
355
356
        latency = now - self.metrics.last_token_time
        self.metrics.last_token_time = now
357
358
        return latency

359
360
361
362
363
364
    def maybe_set_first_token_time(self, time: float) -> None:
        """Sets the first token time for Request level timings."""
        if self.metrics.first_token_time is None:
            self.metrics.first_token_time = time

    def maybe_set_first_scheduled_time(self, time: float) -> None:
365
366
        """Sets the first scheduled time and time in queue for Request
        level timings."""
367
368
369
370
371
372
373
374
        if self.metrics.first_scheduled_time is None:
            self.metrics.first_scheduled_time = time
            self.metrics.time_in_queue = time - self.metrics.arrival_time

    def set_finished_time(self, time: Optional[float]) -> None:
        """Sets the finished time for Request level timings."""
        self.metrics.finished_time = time

375
376
377
378
379
380
381
382
383
384
385
386
387
388
    def get_max_num_running_seqs(self) -> int:
        """The maximum number of sequences running in parallel in the remaining
        lifetime of the request."""
        if self.sampling_params.use_beam_search:
            # For beam search, maximally there will always be `best_of` beam
            # candidates running in the future.
            return self.sampling_params.best_of
        else:
            if self.sampling_params.best_of > self.num_seqs():
                # At prompt stage, the sequence group is not yet filled up
                # and only have one sequence running. However, in the
                # generation stage, we will have `best_of` sequences running.
                return self.sampling_params.best_of
            # At sampling stages, return the number of actual sequences
389
390
            # that are not finished yet.
            return self.num_unfinished_seqs()
391

392
393
394
395
    def get_seqs(
        self,
        status: Optional[SequenceStatus] = None,
    ) -> List[Sequence]:
396
397
398
        return list(self.seqs_dict.values()) if status is None else [
            seq for seq in self.seqs_dict.values() if seq.status == status
        ]
399

400
401
402
403
404
    def get_unfinished_seqs(self) -> List[Sequence]:
        return [
            seq for seq in self.seqs_dict.values() if not seq.is_finished()
        ]

405
406
    def get_finished_seqs(self) -> List[Sequence]:
        return [seq for seq in self.seqs_dict.values() if seq.is_finished()]
407
408
409

    def num_seqs(self, status: Optional[SequenceStatus] = None) -> int:
        return len(self.get_seqs(status))
410

411
412
413
414
415
416
    def num_unfinished_seqs(self) -> int:
        return len(self.get_unfinished_seqs())

    def num_finished_seqs(self) -> int:
        return len(self.get_finished_seqs())

417
    def find(self, seq_id: int) -> Sequence:
418
419
420
421
422
423
424
425
426
427
428
429
430
        if seq_id not in self.seqs_dict:
            raise ValueError(f"Sequence {seq_id} not found.")
        return self.seqs_dict[seq_id]

    def add(self, seq: Sequence) -> None:
        if seq.seq_id in self.seqs_dict:
            raise ValueError(f"Sequence {seq.seq_id} already exists.")
        self.seqs_dict[seq.seq_id] = seq

    def remove(self, seq_id: int) -> None:
        if seq_id not in self.seqs_dict:
            raise ValueError(f"Sequence {seq_id} not found.")
        del self.seqs_dict[seq_id]
Woosuk Kwon's avatar
Woosuk Kwon committed
431

Woosuk Kwon's avatar
Woosuk Kwon committed
432
    def is_finished(self) -> bool:
433
        return all(seq.is_finished() for seq in self.get_seqs())
Woosuk Kwon's avatar
Woosuk Kwon committed
434

Woosuk Kwon's avatar
Woosuk Kwon committed
435
    def __repr__(self) -> str:
436
437
        return (f"SequenceGroup(request_id={self.request_id}, "
                f"sampling_params={self.sampling_params}, "
438
                f"num_seqs={len(self.seqs_dict)})")
439
440


441
class SequenceGroupMetadata:
442
    """Metadata for a sequence group. Used to create `AttentionMetadata`.
443
444
445
446
447
448
449
450

    Args:
        request_id: The ID of the request.
        is_prompt: Whether the request is at prompt stage.
        seq_data: The sequence data. (Seq id -> sequence data)
        sampling_params: The sampling parameters used to generate the outputs.
        block_tables: The block tables. (Seq id -> list of physical block
            numbers)
Nick Hill's avatar
Nick Hill committed
451
        state: Internal state tied to this sequence group.
452
        lora_request: LoRA request.
453
    """
454
455
456

    def __init__(
        self,
457
        request_id: str,
458
        is_prompt: bool,
459
        seq_data: Dict[int, SequenceData],
460
        sampling_params: SamplingParams,
461
        block_tables: Dict[int, List[int]],
462
        lora_request: Optional[LoRARequest] = None,
463
        computed_block_nums: Optional[List[int]] = None,
Nick Hill's avatar
Nick Hill committed
464
        state: Optional[SequenceGroupState] = None,
465
    ) -> None:
466
        self.request_id = request_id
467
        self.is_prompt = is_prompt
468
        self.seq_data = seq_data
469
470
        self.sampling_params = sampling_params
        self.block_tables = block_tables
471
        self.lora_request = lora_request
472
        self.computed_block_nums = computed_block_nums
Nick Hill's avatar
Nick Hill committed
473
        self.state = SequenceGroupState() if state is None else state
474

475
476
477
478
    @property
    def lora_int_id(self) -> int:
        return self.lora_request.lora_int_id if self.lora_request else 0

479

Zhuohan Li's avatar
Zhuohan Li committed
480
class SequenceOutput:
481
482
483
484
485
486
487
488
489
    """The model output associated with a sequence.

    Args:
        parent_seq_id: The ID of the parent sequence (for forking in beam
            search).
        output_token: The output token ID.
        logprobs: The logprobs of the output token.
            (Token id -> logP(x_i+1 | x_0, ..., x_i))
    """
490
491
492
493
494

    def __init__(
        self,
        parent_seq_id: int,
        output_token: int,
495
        logprobs: Dict[int, Logprob],
496
497
498
499
500
501
    ) -> None:
        self.parent_seq_id = parent_seq_id
        self.output_token = output_token
        self.logprobs = logprobs

    def __repr__(self) -> str:
Zhuohan Li's avatar
Zhuohan Li committed
502
        return (f"SequenceOutput(parent_seq_id={self.parent_seq_id}, "
503
504
                f"output_token={self.output_token}, "
                f"logprobs={self.logprobs})")
Zhuohan Li's avatar
Zhuohan Li committed
505

506
    def __eq__(self, other: object) -> bool:
Zhuohan Li's avatar
Zhuohan Li committed
507
        if not isinstance(other, SequenceOutput):
Zhuohan Li's avatar
Zhuohan Li committed
508
            raise NotImplementedError()
509
510
511
512
        equal = (self.parent_seq_id == other.parent_seq_id
                 and self.output_token == other.output_token)
        log_probs_equal = other.logprobs == self.logprobs
        return equal and log_probs_equal
513
514


Zhuohan Li's avatar
Zhuohan Li committed
515
516
class SequenceGroupOutput:
    """The model output associated with a sequence group."""
517
518
519

    def __init__(
        self,
Zhuohan Li's avatar
Zhuohan Li committed
520
        samples: List[SequenceOutput],
521
522
523
524
525
526
        prompt_logprobs: Optional[PromptLogprobs],
    ) -> None:
        self.samples = samples
        self.prompt_logprobs = prompt_logprobs

    def __repr__(self) -> str:
Zhuohan Li's avatar
Zhuohan Li committed
527
        return (f"SequenceGroupOutput(samples={self.samples}, "
528
529
                f"prompt_logprobs={self.prompt_logprobs})")

530
    def __eq__(self, other: object) -> bool:
Zhuohan Li's avatar
Zhuohan Li committed
531
        if not isinstance(other, SequenceGroupOutput):
532
533
534
535
            raise NotImplementedError()
        return (self.samples == other.samples
                and self.prompt_logprobs == other.prompt_logprobs)

536

537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
@dataclass
class SamplerOutput:
    """For each sequence group, we generate a list of SequenceOutput object,
    each of which contains one possible candidate for the next token.

    This datastructure implements methods so it can be used like a list, but
    also has optional fields for device tensors.
    """

    outputs: List[SequenceGroupOutput]

    # On-device tensor containing probabilities of each token.
    sampled_token_probs: Optional["torch.Tensor"] = None

    # On-device tensor containing the sampled token ids.
    sampled_token_ids: Optional["torch.Tensor"] = None

    # Spec decode metrics populated by workers.
    spec_decode_worker_metrics: Optional["SpecDecodeWorkerMetrics"] = None

    def __getitem__(self, idx: int):
        return self.outputs[idx]

    def __setitem__(self, idx: int, value):
        self.outputs[idx] = value

    def __len__(self):
        return len(self.outputs)

    def __eq__(self, other: object):
        return isinstance(other,
                          self.__class__) and self.outputs == other.outputs