"tests/models/decoder_only/language/test_hybrid.py" did not exist on "0a4806f0a99880df1f74b10a6dceaf638cd3981c"
dp_utils.py 3.82 KB
Newer Older
Woosuk Kwon's avatar
Woosuk Kwon committed
1
2
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3
4
from __future__ import annotations

Woosuk Kwon's avatar
Woosuk Kwon committed
5
6
7
import torch
import torch.distributed as dist

8
from vllm.config.compilation import CUDAGraphMode
Woosuk Kwon's avatar
Woosuk Kwon committed
9
from vllm.distributed.parallel_state import get_dp_group
10
11
12
13
from vllm.v1.worker.gpu.cudagraph_utils import (
    BatchExecutionDescriptor,
    CudaGraphManager,
)
Woosuk Kwon's avatar
Woosuk Kwon committed
14
15


16
def sync_cudagraph_and_dp_padding(
17
    cudagraph_manager: CudaGraphManager | None,
18
    desired_batch_desc: BatchExecutionDescriptor,
19
    num_tokens: int,
20
21
    num_reqs: int,
    uniform_token_count: int | None,
22
23
    dp_size: int,
    dp_rank: int,
24
25
26
27
28
29
30
) -> tuple[BatchExecutionDescriptor, torch.Tensor | None]:
    """
    Coordinates the batch descriptor and DP padding across all ranks.

    Returns (synced_batch_desc, num_tokens_across_dp).
    """
    assert dp_size > 1, "DP size must be greater than 1"
Woosuk Kwon's avatar
Woosuk Kwon committed
31
    group = get_dp_group().cpu_group
32
    tensor = torch.zeros(3, dp_size, dtype=torch.int32, device="cpu")
Woosuk Kwon's avatar
Woosuk Kwon committed
33
    tensor[0][dp_rank] = num_tokens
34
35
    tensor[1][dp_rank] = desired_batch_desc.cg_mode.value
    tensor[2][dp_rank] = uniform_token_count or 0  # (0 means None)
Woosuk Kwon's avatar
Woosuk Kwon committed
36
    dist.all_reduce(tensor, group=group)
37

38
39
40
    num_tokens_across_dp = tensor[0]
    cg_mode_across_dp = tensor[1]
    uniform_token_counts_across_dp = tensor[2]
41

42
43
44
45
46
    if torch.all(num_tokens_across_dp == 0).item():
        synced_desc = BatchExecutionDescriptor(
            cg_mode=CUDAGraphMode.NONE, num_tokens=0, num_reqs=0
        )
        return synced_desc, None
47

48
    synced_cg_mode = CUDAGraphMode(int(cg_mode_across_dp.min().item()))
49

50
51
52
53
54
55
56
57
    # If any rank wants to run eager, all ranks run eager
    if synced_cg_mode == CUDAGraphMode.NONE:
        return BatchExecutionDescriptor(
            cg_mode=CUDAGraphMode.NONE,
            num_tokens=num_tokens,
            num_reqs=num_reqs,
        ), num_tokens_across_dp

58
59
60
61
    assert cudagraph_manager is not None, (
        "cudagraph_manager should only be None during profile run, "
        "where synced_cg_mode must be NONE across all DP ranks"
    )
62
63
64
65
66
67
68
69
70
71
72
73
    synced_num_tokens = int(num_tokens_across_dp.max().item())
    synced_uniform_token_count = uniform_token_counts_across_dp[0]
    # If ranks disagree on the uniform token count, or its 0 (means None) set to None
    if synced_uniform_token_count == 0 or not torch.all(
        uniform_token_counts_across_dp == synced_uniform_token_count
    ):
        synced_uniform_token_count = None

    # Dispatch for the final synced values, use num_reqs instead of synced_num_reqs
    # so we don't perform request padding for PIECEWISE graphs
    synced_desc = cudagraph_manager.dispatch(
        num_reqs, synced_num_tokens, synced_uniform_token_count
74
    )
75

76
77
    # Update num_tokens_across_dp to reflect padded size.
    num_tokens_across_dp[:] = synced_desc.num_tokens
78

79
    return synced_desc, num_tokens_across_dp
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117


def dispatch_cg_and_sync_dp(
    cudagraph_manager: CudaGraphManager | None,
    num_reqs: int,
    num_tokens: int,
    uniform_token_count: int | None,
    dp_size: int,
    dp_rank: int,
    need_eager: bool = False,
) -> tuple[BatchExecutionDescriptor, torch.Tensor | None]:
    if need_eager:
        batch_desc = BatchExecutionDescriptor(
            cg_mode=CUDAGraphMode.NONE,
            num_tokens=num_tokens,
            num_reqs=num_reqs,
        )
    else:
        assert cudagraph_manager is not None, (
            "cudagraph_manager should only be None during profile run, "
            "where need_eager must be True"
        )
        batch_desc = cudagraph_manager.dispatch(
            num_reqs, num_tokens, uniform_token_count
        )

    if dp_size == 1:
        return batch_desc, None

    return sync_cudagraph_and_dp_padding(
        cudagraph_manager,
        batch_desc,
        num_tokens,
        num_reqs,
        uniform_token_count,
        dp_size,
        dp_rank,
    )