async_utils.py 4.81 KB
Newer Older
Woosuk Kwon's avatar
Woosuk Kwon committed
1
2
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3
import contextlib
Woosuk Kwon's avatar
Woosuk Kwon committed
4

5
import numpy as np
Woosuk Kwon's avatar
Woosuk Kwon committed
6
7
import torch

8
from vllm.v1.outputs import AsyncModelRunnerOutput, LogprobsTensors, ModelRunnerOutput
9
from vllm.v1.worker.gpu.sample.output import SamplerOutput
Woosuk Kwon's avatar
Woosuk Kwon committed
10
11
12
13
14
15
16


class AsyncOutput(AsyncModelRunnerOutput):
    def __init__(
        self,
        model_runner_output: ModelRunnerOutput,
        sampler_output: SamplerOutput,
17
        num_sampled_tokens: torch.Tensor,
18
        main_stream: torch.cuda.Stream,
Woosuk Kwon's avatar
Woosuk Kwon committed
19
20
21
        copy_stream: torch.cuda.Stream,
        copy_event: torch.cuda.Event,
    ):
22
23
24
        # NOTE(woosuk): We must retain references to the GPU tensors,
        # as the copy operations are performed on a different CUDA stream than
        # the one where the tensors were created.
Woosuk Kwon's avatar
Woosuk Kwon committed
25
26
27
28
29
        self.model_runner_output = model_runner_output
        self.sampler_output = sampler_output
        self.num_sampled_tokens = num_sampled_tokens
        self.copy_event = copy_event

30
31
        with stream(copy_stream, main_stream):
            copy_stream.wait_stream(main_stream)
Woosuk Kwon's avatar
Woosuk Kwon committed
32

33
            self.sampled_token_ids = async_copy_to_np(sampler_output.sampled_token_ids)
34
            self.logprobs_tensors: LogprobsTensors | None = None
Woosuk Kwon's avatar
Woosuk Kwon committed
35
            if sampler_output.logprobs_tensors is not None:
36
                self.logprobs_tensors = (
Woosuk Kwon's avatar
Woosuk Kwon committed
37
38
                    sampler_output.logprobs_tensors.to_cpu_nonblocking()
                )
39
            self.num_nans: np.ndarray | None = None
40
41
42
            if sampler_output.num_nans is not None:
                self.num_nans = async_copy_to_np(sampler_output.num_nans)
            self.num_sampled_tokens_np = async_copy_to_np(num_sampled_tokens)
43
44
45
46
47
            self.prompt_logprobs_dict = {
                k: v.to_cpu_nonblocking() if v is not None else None
                for k, v in self.model_runner_output.prompt_logprobs_dict.items()
            }
            self.copy_event.record(copy_stream)
Woosuk Kwon's avatar
Woosuk Kwon committed
48
49
50
51
52
53
54
55

    def get_output(self) -> ModelRunnerOutput:
        self.copy_event.synchronize()

        # NOTE(woosuk): The following code is to ensure compatibility with
        # the existing model runner.
        # Going forward, we should keep the data structures as NumPy arrays
        # rather than Python lists.
56
        sampled_token_ids: list[list[int]] = self.sampled_token_ids.tolist()
57
58
59
        num_sampled_tokens: list[int] = self.num_sampled_tokens_np.tolist()
        for token_ids, num_tokens in zip(sampled_token_ids, num_sampled_tokens):
            del token_ids[num_tokens:]
Woosuk Kwon's avatar
Woosuk Kwon committed
60
61
        self.model_runner_output.sampled_token_ids = sampled_token_ids

62
        if self.num_nans is not None:
63
64
65
            self.model_runner_output.num_nans_in_logits = dict(
                zip(self.model_runner_output.req_ids, self.num_nans.tolist())
            )
66

Woosuk Kwon's avatar
Woosuk Kwon committed
67
68
69
70
71
72
        if self.logprobs_tensors is not None:
            self.model_runner_output.logprobs = self.logprobs_tensors.tolists()
        self.model_runner_output.prompt_logprobs_dict = self.prompt_logprobs_dict
        return self.model_runner_output


73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
class AsyncPoolingOutput(AsyncModelRunnerOutput):
    def __init__(
        self,
        model_runner_output: ModelRunnerOutput,
        pooler_output: torch.Tensor,
        is_valid: torch.Tensor | None,
        main_stream: torch.cuda.Stream,
        copy_stream: torch.cuda.Stream,
        copy_event: torch.cuda.Event,
    ):
        self.model_runner_output = model_runner_output
        self.pooler_output = pooler_output
        self.is_valid = is_valid
        self.copy_event = copy_event

        with stream(copy_stream, main_stream):
            copy_stream.wait_stream(main_stream)
            self.pooler_output_cpu = self.pooler_output.to("cpu", non_blocking=True)
            if self.is_valid is not None:
                self.is_valid_cpu = self.is_valid.to("cpu", non_blocking=True)
            else:
                self.is_valid_cpu = None
            self.copy_event.record(copy_stream)

    def get_output(self) -> ModelRunnerOutput:
98
        pooler_output = list(self.pooler_output_cpu.unbind(dim=0))
99
100
101
102
103
104
105
106
107
108
        self.copy_event.synchronize()
        if self.is_valid_cpu is not None:
            is_valid_cpu = self.is_valid_cpu.tolist()
            for i, is_valid in enumerate(is_valid_cpu):
                if not is_valid:
                    pooler_output[i] = None
        self.model_runner_output.pooler_output = pooler_output
        return self.model_runner_output


109
110
def async_copy_to_np(x: torch.Tensor) -> np.ndarray:
    return x.to("cpu", non_blocking=True).numpy()
111
112
113
114
115
116
117
118
119
120
121
122


@contextlib.contextmanager
def stream(to_stream: torch.cuda.Stream, from_stream: torch.cuda.Stream):
    """Lightweight version of torch.cuda.stream() context manager which
    avoids current_stream and device lookups.
    """
    try:
        torch.cuda.set_stream(to_stream)
        yield
    finally:
        torch.cuda.set_stream(from_stream)