"vllm/vscode:/vscode.git/clone" did not exist on "eb6d3c264d0cd8e44dec16bca7947fbe96415ce9"
async_utils.py 4.81 KB
Newer Older
Woosuk Kwon's avatar
Woosuk Kwon committed
1
2
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3
import contextlib
Woosuk Kwon's avatar
Woosuk Kwon committed
4

5
import numpy as np
Woosuk Kwon's avatar
Woosuk Kwon committed
6
7
import torch

8
from vllm.v1.outputs import AsyncModelRunnerOutput, LogprobsTensors, ModelRunnerOutput
9
from vllm.v1.worker.gpu.sample.output import SamplerOutput
Woosuk Kwon's avatar
Woosuk Kwon committed
10
11
12
13
14
15
16


class AsyncOutput(AsyncModelRunnerOutput):
    def __init__(
        self,
        model_runner_output: ModelRunnerOutput,
        sampler_output: SamplerOutput,
17
        num_sampled_tokens: torch.Tensor,
18
        main_stream: torch.cuda.Stream,
Woosuk Kwon's avatar
Woosuk Kwon committed
19
20
21
        copy_stream: torch.cuda.Stream,
        copy_event: torch.cuda.Event,
    ):
22
23
24
        # NOTE(woosuk): We must retain references to the GPU tensors,
        # as the copy operations are performed on a different CUDA stream than
        # the one where the tensors were created.
Woosuk Kwon's avatar
Woosuk Kwon committed
25
26
27
28
29
        self.model_runner_output = model_runner_output
        self.sampler_output = sampler_output
        self.num_sampled_tokens = num_sampled_tokens
        self.copy_event = copy_event

30
31
        with stream(copy_stream, main_stream):
            copy_stream.wait_stream(main_stream)
Woosuk Kwon's avatar
Woosuk Kwon committed
32

33
            self.sampled_token_ids = async_copy_to_np(sampler_output.sampled_token_ids)
34
            self.logprobs_tensors: LogprobsTensors | None = None
Woosuk Kwon's avatar
Woosuk Kwon committed
35
            if sampler_output.logprobs_tensors is not None:
36
                self.logprobs_tensors = (
Woosuk Kwon's avatar
Woosuk Kwon committed
37
38
                    sampler_output.logprobs_tensors.to_cpu_nonblocking()
                )
39
            self.num_nans: np.ndarray | None = None
40
41
42
            if sampler_output.num_nans is not None:
                self.num_nans = async_copy_to_np(sampler_output.num_nans)
            self.num_sampled_tokens_np = async_copy_to_np(num_sampled_tokens)
43
44
45
46
47
            self.prompt_logprobs_dict = {
                k: v.to_cpu_nonblocking() if v is not None else None
                for k, v in self.model_runner_output.prompt_logprobs_dict.items()
            }
            self.copy_event.record(copy_stream)
Woosuk Kwon's avatar
Woosuk Kwon committed
48
49
50
51
52
53
54
55

    def get_output(self) -> ModelRunnerOutput:
        self.copy_event.synchronize()

        # NOTE(woosuk): The following code is to ensure compatibility with
        # the existing model runner.
        # Going forward, we should keep the data structures as NumPy arrays
        # rather than Python lists.
56
        sampled_token_ids: list[list[int]] = self.sampled_token_ids.tolist()
57
58
59
        num_sampled_tokens: list[int] = self.num_sampled_tokens_np.tolist()
        for token_ids, num_tokens in zip(sampled_token_ids, num_sampled_tokens):
            del token_ids[num_tokens:]
Woosuk Kwon's avatar
Woosuk Kwon committed
60
61
        self.model_runner_output.sampled_token_ids = sampled_token_ids

62
        if self.num_nans is not None:
63
64
65
            self.model_runner_output.num_nans_in_logits = dict(
                zip(self.model_runner_output.req_ids, self.num_nans.tolist())
            )
66

Woosuk Kwon's avatar
Woosuk Kwon committed
67
68
69
70
71
72
        if self.logprobs_tensors is not None:
            self.model_runner_output.logprobs = self.logprobs_tensors.tolists()
        self.model_runner_output.prompt_logprobs_dict = self.prompt_logprobs_dict
        return self.model_runner_output


73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
class AsyncPoolingOutput(AsyncModelRunnerOutput):
    def __init__(
        self,
        model_runner_output: ModelRunnerOutput,
        pooler_output: torch.Tensor,
        is_valid: torch.Tensor | None,
        main_stream: torch.cuda.Stream,
        copy_stream: torch.cuda.Stream,
        copy_event: torch.cuda.Event,
    ):
        self.model_runner_output = model_runner_output
        self.pooler_output = pooler_output
        self.is_valid = is_valid
        self.copy_event = copy_event

        with stream(copy_stream, main_stream):
            copy_stream.wait_stream(main_stream)
            self.pooler_output_cpu = self.pooler_output.to("cpu", non_blocking=True)
            if self.is_valid is not None:
                self.is_valid_cpu = self.is_valid.to("cpu", non_blocking=True)
            else:
                self.is_valid_cpu = None
            self.copy_event.record(copy_stream)

    def get_output(self) -> ModelRunnerOutput:
        self.copy_event.synchronize()
        pooler_output = self.pooler_output_cpu.unbind(dim=0)
        if self.is_valid_cpu is not None:
            is_valid_cpu = self.is_valid_cpu.tolist()
            for i, is_valid in enumerate(is_valid_cpu):
                if not is_valid:
                    pooler_output[i] = None
        self.model_runner_output.pooler_output = pooler_output
        return self.model_runner_output


109
110
def async_copy_to_np(x: torch.Tensor) -> np.ndarray:
    return x.to("cpu", non_blocking=True).numpy()
111
112
113
114
115
116
117
118
119
120
121
122


@contextlib.contextmanager
def stream(to_stream: torch.cuda.Stream, from_stream: torch.cuda.Stream):
    """Lightweight version of torch.cuda.stream() context manager which
    avoids current_stream and device lookups.
    """
    try:
        torch.cuda.set_stream(to_stream)
        yield
    finally:
        torch.cuda.set_stream(from_stream)