"vllm/vscode:/vscode.git/clone" did not exist on "1ac66942975e163d3cc8a4bbb9b1832f9f7edc05"
sequence.py 5.04 KB
Newer Older
1
# SPDX-License-Identifier: Apache-2.0
2
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3
"""Sequence and its related classes."""
4
5
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Optional, Union
Woosuk Kwon's avatar
Woosuk Kwon committed
6

7
import msgspec
8
9
import torch

10
11
12
if TYPE_CHECKING:
    from vllm.v1.worker.kv_connector_model_runner_mixin import (
        KVConnectorOutput)
13
14
15
else:
    LoRARequest = Any
    KVConnectorOutput = Any
16

17
VLLM_TOKEN_ID_ARRAY_TYPE = "l"
18

19
20
VLLM_INVALID_TOKEN_ID = -1

21

22
23
24
25
@dataclass
class RequestMetrics:
    """Metrics associated with a request.

26
    Attributes:
27
28
29
30
31
        arrival_time: The time when the request arrived.
        first_scheduled_time: The time when the request was first scheduled.
        first_token_time: The time when the first token was generated.
        time_in_queue: The time the request spent in the queue.
        finished_time: The time when the request was finished.
32
33
34
35
36
37
38
        scheduler_time: The time spent in the scheduler when this request was
                        being considered by the scheduler.
        model_forward_time: The time spent in the model forward pass when this
                            request was in the batch.
        model_execute_time: The time spent in the model execute function. This
                            will include model forward, block/sync across
                            workers, cpu-gpu sync time and sampling time.
39
40
41
42
43
44
45
    """
    arrival_time: float
    last_token_time: float
    first_scheduled_time: Optional[float]
    first_token_time: Optional[float]
    time_in_queue: Optional[float]
    finished_time: Optional[float] = None
46
47
48
    scheduler_time: Optional[float] = None
    model_forward_time: Optional[float] = None
    model_execute_time: Optional[float] = None
49
50


51
class PoolingSequenceGroupOutput(
52
53
54
55
        msgspec.Struct,
        omit_defaults=True,  # type: ignore[call-arg]
        array_like=True,  # type: ignore[call-arg]
):
56
57
58
59
    """The model output associated with a pooling sequence group."""
    # Annotated as Any to be compatible with msgspec
    # The actual type is in SequenceGroup.pooled_data
    data: Any
60

61
62
63
64
    def get_data_nbytes(self) -> int:
        data: torch.Tensor = self.data
        return data.nbytes

65
    def __repr__(self) -> str:
66
        return f"PoolingSequenceGroupOutput(data={self.data}"
67
68

    def __eq__(self, other: object) -> bool:
69
        if not isinstance(other, PoolingSequenceGroupOutput):
70
            raise NotImplementedError()
71
        return self.data == other.data
72
73


74
75
76
# cannot use msgspec.Struct here because Dynamo does not support it
@dataclass
class IntermediateTensors:
77
78
79
    """For all pipeline stages except the last, we need to return the hidden
    states and residuals to be sent to the next stage. This data structure
    contains the hidden states and residuals for a request.
80
    
81
    Each stage also needs to handle its own kv_connector_output.
82
83
    """

84
    tensors: dict[str, torch.Tensor]
85
    kv_connector_output: Optional[KVConnectorOutput]
86

87
88
89
90
91
92
93
    def __init__(self, tensors):
        # manually define this function, so that
        # Dynamo knows `IntermediateTensors()` comes from this file.
        # Otherwise, dataclass will generate this function by evaluating
        # a string, and we will lose the information about the source file.
        self.tensors = tensors

94
95
96
97
98
99
    def __getitem__(self, key: Union[str, slice]):
        if isinstance(key, str):
            return self.tensors[key]
        elif isinstance(key, slice):
            return self.__class__({k: v[key] for k, v in self.tensors.items()})

100
    def __setitem__(self, key: str, value: torch.Tensor):
101
102
        self.tensors[key] = value

103
104
105
    def items(self):
        return self.tensors.items()

106
107
108
109
    def __len__(self):
        return len(self.tensors)

    def __eq__(self, other: object):
110
111
112
113
114
115
116
        if not isinstance(other, self.__class__):
            return False
        if self.tensors.keys() != other.tensors.keys():
            return False
        return all(
            torch.equal(self.tensors[k], other.tensors[k])
            for k in self.tensors)
117
118
119
120
121

    def __repr__(self) -> str:
        return f"IntermediateTensors(tensors={self.tensors})"


122
123
124
125
class PoolerOutput(
        msgspec.Struct,
        omit_defaults=True,  # type: ignore[call-arg]
        array_like=True):  # type: ignore[call-arg]
126
    """The output from a pooling operation in the pooling model."""
127
    outputs: list[PoolingSequenceGroupOutput]
128

129
130
131
    def get_data_nbytes(self) -> int:
        return sum(o.get_data_nbytes() for o in self.outputs)

132
    def __getitem__(self, idx: int) -> PoolingSequenceGroupOutput:
133
134
        return self.outputs[idx]

135
    def __setitem__(self, idx: int, value: PoolingSequenceGroupOutput):
136
137
138
139
140
141
142
143
144
145
        self.outputs[idx] = value

    def __len__(self):
        return len(self.outputs)

    def __eq__(self, other: object):
        return isinstance(other,
                          self.__class__) and self.outputs == other.outputs


146
147
148
149
class ExecuteModelRequest(
        msgspec.Struct,
        array_like=True,  # type: ignore[call-arg]
        omit_defaults=True):  # type: ignore[call-arg]
150
151
    # Placeholder. Remove.
    pass