parallel_state.py 16.7 KB
Newer Older
Woosuk Kwon's avatar
Woosuk Kwon committed
1
# Copyright 2023 The vLLM team.
2
3
# Adapted from
# https://github.com/NVIDIA/Megatron-LM/blob/main/megatron/core/parallel_state.py
Zhuohan Li's avatar
Zhuohan Li committed
4
# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved.
5
"""Tensor and pipeline parallel groups."""
6
7
import contextlib
from multiprocessing import resource_tracker, shared_memory
8
from typing import List, Optional
Zhuohan Li's avatar
Zhuohan Li committed
9
10

import torch
11
from torch.distributed import ProcessGroup
Zhuohan Li's avatar
Zhuohan Li committed
12

13
import vllm.envs as envs
14
15
16
17
from vllm.logger import init_logger

logger = init_logger(__name__)

18
19
_ENABLE_CUSTOM_ALL_REDUCE = True

20
# Tensor model parallel group that the current rank belongs to.
21
22
23
_TP_DEVICE_GROUP: Optional[ProcessGroup] = None
_TP_CPU_GROUP: Optional[ProcessGroup] = None
_TP_PYNCCL_COMMUNICATOR = None
24
_TP_CA_COMMUNICATOR = None
25
# Pipeline model parallel group that the current rank belongs to.
26
_PP_DEVICE_GROUP: Optional[ProcessGroup] = None
27
28
_PP_CPU_GROUP: Optional[ProcessGroup] = None
_PP_PYNCCL_COMMUNICATOR = None
Zhuohan Li's avatar
Zhuohan Li committed
29

30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# when people blindly call `torch.distributed.all_reduce` etc,
# it will use this group. It is initialized with the `backend`
# parameter of `init_distributed_environment` below.
# Essentially, this is `torch.distributed.group.WORLD`.
# We leave a line here to note that this is device-specific.
# Note that this variable is not safe to use, because when users
# call `init_distributed_environment` first, and then destroy
# the process group themselves, this variable will keep a reference to the
# destroyed process group, which is not useful.
_DEVICE_WORLD_GROUP = None

# duing `init_distributed_environment`, we will also initialize a
# group with `gloo` backend, to allow direct coordination between
# processes through the CPU.
_CPU_WORLD_GROUP = None

# In summary, after calling `init_distributed_environment`, we will
# always have two groups: one for device-specific (and is the default)
# and one for CPU. All processes will be part of both groups.

50
51
# A list of global ranks for each pipeline group to ease calculation of the
# source rank when broadcasting from the first or last pipeline stage.
52
_PP_GLOBAL_RANKS: Optional[List[int]] = None
Zhuohan Li's avatar
Zhuohan Li committed
53

54
55
56
_LOCAL_RANK = -1


57
58
59
60
61
def set_custom_all_reduce(enable: bool):
    global _ENABLE_CUSTOM_ALL_REDUCE
    _ENABLE_CUSTOM_ALL_REDUCE = enable


62
63
64
65
66
def get_pp_pynccl_communicator():
    global _PP_PYNCCL_COMMUNICATOR
    return _PP_PYNCCL_COMMUNICATOR


67
68
69
70
71
def get_tp_pynccl_communicator():
    global _TP_PYNCCL_COMMUNICATOR
    return _TP_PYNCCL_COMMUNICATOR


72
73
74
75
76
def get_tp_ca_communicator():
    global _TP_CA_COMMUNICATOR
    return _TP_CA_COMMUNICATOR


77
78
79
80
def get_local_rank():
    global _LOCAL_RANK
    return _LOCAL_RANK

Zhuohan Li's avatar
Zhuohan Li committed
81

82
def init_distributed_environment(
83
84
85
    world_size: int = -1,
    rank: int = -1,
    distributed_init_method: str = "env://",
86
87
88
    local_rank: int = -1,
    backend: str = "nccl",
):
89
90
91
92
    logger.debug(
        "world_size=%d rank=%d local_rank=%d "
        "distributed_init_method=%s backend=%s", world_size, rank, local_rank,
        distributed_init_method, backend)
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
    if not torch.distributed.is_initialized():
        assert distributed_init_method is not None, (
            "distributed_init_method must be provided when initializing "
            "distributed environment")
        # this backend is used for WORLD
        torch.distributed.init_process_group(
            backend=backend,
            init_method=distributed_init_method,
            world_size=world_size,
            rank=rank)
        global _DEVICE_WORLD_GROUP, _CPU_WORLD_GROUP
        _DEVICE_WORLD_GROUP = torch.distributed.group.WORLD
        ranks = list(range(torch.distributed.get_world_size()))
        _CPU_WORLD_GROUP = torch.distributed.new_group(ranks=ranks,
                                                       backend="gloo")
108
109
110
        # set the local rank
        # local_rank is not available in torch ProcessGroup,
        # see https://github.com/pytorch/pytorch/issues/122816
111
112
113
114
115
116
117
        if local_rank == -1:
            # local rank not set, this usually happens in single-node
            # setting, where we can use rank as local rank
            if distributed_init_method == "env://":
                local_rank = envs.LOCAL_RANK
            else:
                local_rank = rank
118
119
        global _LOCAL_RANK
        _LOCAL_RANK = local_rank
120
121
122
123
124
        # A small all_reduce for warmup.
        data = torch.zeros(1)
        if torch.cuda.is_available():
            data = data.to(device=f"cuda:{local_rank}")
        torch.distributed.all_reduce(data)
125
126
127
        if torch.cuda.is_available():
            torch.cuda.synchronize()
        del data
128
129


Zhuohan Li's avatar
Zhuohan Li committed
130
131
132
def initialize_model_parallel(
    tensor_model_parallel_size: int = 1,
    pipeline_model_parallel_size: int = 1,
133
    backend: Optional[str] = None,
Zhuohan Li's avatar
Zhuohan Li committed
134
135
) -> None:
    """
136
    Initialize model parallel groups.
Zhuohan Li's avatar
Zhuohan Li committed
137
138

    Arguments:
139
140
141
142
143
144
        tensor_model_parallel_size: number of GPUs used for tensor model
            parallelism.
        pipeline_model_parallel_size: number of GPUs used for pipeline model
            parallelism.

    Let's say we have a total of 8 GPUs denoted by g0 ... g7 and we
Zhuohan Li's avatar
Zhuohan Li committed
145
146
    use 2 GPUs to parallelize the model tensor, and 4 GPUs to parallelize
    the model pipeline. The present function will
147
148
149
150
151
    create 4 tensor model-parallel groups and 2 pipeline model-parallel groups:
        4 tensor model-parallel groups:
            [g0, g1], [g2, g3], [g4, g5], [g6, g7]
        2 pipeline model-parallel groups:
            [g0, g2, g4, g6], [g1, g3, g5, g7]
Zhuohan Li's avatar
Zhuohan Li committed
152
153
154
155
156
157
158
159
    Note that for efficiency, the caller should make sure adjacent ranks
    are on the same DGX box. For example if we are using 2 DGX-1 boxes
    with a total of 16 GPUs, rank 0 to 7 belong to the first box and
    ranks 8 to 15 belong to the second box.
    """
    # Get world size and rank. Ensure some consistencies.
    assert torch.distributed.is_initialized()
    world_size: int = torch.distributed.get_world_size()
160
161
    # get the backend of _DEVICE_WORLD_GROUP
    backend = backend or torch.distributed.get_backend()
Zhuohan Li's avatar
Zhuohan Li committed
162

163
164
    if (world_size !=
            tensor_model_parallel_size * pipeline_model_parallel_size):
Zhuohan Li's avatar
Zhuohan Li committed
165
        raise RuntimeError(
166
167
168
169
170
171
172
173
            f"world_size ({world_size}) is not equal to "
            f"tensor_model_parallel_size ({tensor_model_parallel_size}) x "
            f"pipeline_model_parallel_size ({pipeline_model_parallel_size})")

    num_tensor_model_parallel_groups: int = (world_size //
                                             tensor_model_parallel_size)
    num_pipeline_model_parallel_groups: int = (world_size //
                                               pipeline_model_parallel_size)
Zhuohan Li's avatar
Zhuohan Li committed
174
175
176
    rank = torch.distributed.get_rank()

    # Build the tensor model-parallel groups.
177
178
    global _TP_DEVICE_GROUP, _TP_CPU_GROUP
    global _TP_PYNCCL_COMMUNICATOR, _TP_CA_COMMUNICATOR
179
    assert _TP_DEVICE_GROUP is None, (
180
        "tensor model parallel group is already initialized")
Zhuohan Li's avatar
Zhuohan Li committed
181
    for i in range(num_tensor_model_parallel_groups):
182
183
184
        ranks = list(
            range(i * tensor_model_parallel_size,
                  (i + 1) * tensor_model_parallel_size))
185
        group = torch.distributed.new_group(ranks, backend=backend)
186
        cpu_group = torch.distributed.new_group(ranks, backend="gloo")
Zhuohan Li's avatar
Zhuohan Li committed
187
        if rank in ranks:
188
189
            _TP_DEVICE_GROUP = group
            _TP_CPU_GROUP = cpu_group
Zhuohan Li's avatar
Zhuohan Li committed
190

191
    from vllm.distributed.device_communicators.pynccl import PyNcclCommunicator
192
193
194
195
196
    if tensor_model_parallel_size > 1:
        _TP_PYNCCL_COMMUNICATOR = PyNcclCommunicator(
            group=_TP_CPU_GROUP,
            device=_LOCAL_RANK,
        )
197

198
199
200
201
202
203
204
205
206
    # Initialize a custom fast all-reduce implementation.
    if _ENABLE_CUSTOM_ALL_REDUCE:
        from vllm.distributed.device_communicators.custom_all_reduce import (
            CustomAllreduce)
        _TP_CA_COMMUNICATOR = CustomAllreduce(
            group=_TP_CPU_GROUP,
            device=_LOCAL_RANK,
        )

207
    # Build the pipeline model-parallel groups.
208
209
    global _PP_DEVICE_GROUP, _PP_CPU_GROUP
    global _PP_PYNCCL_COMMUNICATOR
210
211
    global _PP_GLOBAL_RANKS
    assert _PP_DEVICE_GROUP is None, (
212
        "pipeline model parallel group is already initialized")
Zhuohan Li's avatar
Zhuohan Li committed
213
    for i in range(num_pipeline_model_parallel_groups):
214
        ranks = list(range(i, world_size, num_pipeline_model_parallel_groups))
215
        group = torch.distributed.new_group(ranks, backend=backend)
216
        cpu_group = torch.distributed.new_group(ranks, backend="gloo")
Zhuohan Li's avatar
Zhuohan Li committed
217
        if rank in ranks:
218
            _PP_DEVICE_GROUP = group
219
            _PP_CPU_GROUP = cpu_group
220
            _PP_GLOBAL_RANKS = ranks
Zhuohan Li's avatar
Zhuohan Li committed
221

222
223
224
225
226
227
    if pipeline_model_parallel_size > 1:
        _PP_PYNCCL_COMMUNICATOR = PyNcclCommunicator(
            group=_PP_CPU_GROUP,
            device=_LOCAL_RANK,
        )

Zhuohan Li's avatar
Zhuohan Li committed
228

229
230
231
def ensure_model_parallel_initialized(
    tensor_model_parallel_size: int,
    pipeline_model_parallel_size: int,
232
    backend: Optional[str] = None,
233
234
235
236
237
) -> None:
    """Helper to initialize model parallel groups if they are not initialized,
    or ensure tensor-parallel and pipeline-parallel sizes are equal to expected
    values if the model parallel groups are initialized.
    """
238
239
    # get the backend of _DEVICE_WORLD_GROUP
    backend = backend or torch.distributed.get_backend()
240
241
    if not model_parallel_is_initialized():
        initialize_model_parallel(tensor_model_parallel_size,
242
                                  pipeline_model_parallel_size, backend)
243
244
245
246
247
248
249
250
251
252
253
254
255
256
        return

    assert (
        get_tensor_model_parallel_world_size() == tensor_model_parallel_size
    ), ("tensor parallel group already initialized, but of unexpected size: "
        f"{get_tensor_model_parallel_world_size()=} vs. "
        f"{tensor_model_parallel_size=}")
    assert (get_pipeline_model_parallel_world_size(
    ) == pipeline_model_parallel_size), (
        "pipeline parallel group already initialized, but of unexpected size: "
        f"{get_pipeline_model_parallel_world_size()=} vs. "
        f"{pipeline_model_parallel_size=}")


Zhuohan Li's avatar
Zhuohan Li committed
257
def model_parallel_is_initialized():
258
    """Check if tensor and pipeline parallel groups are initialized."""
259
    return (_TP_DEVICE_GROUP is not None and _PP_DEVICE_GROUP is not None)
Zhuohan Li's avatar
Zhuohan Li committed
260
261


262
263
264
265
266
267
def get_cpu_world_group():
    """Get the CPU world group."""
    assert _CPU_WORLD_GROUP is not None, ("CPU world group is not initialized")
    return _CPU_WORLD_GROUP


Zhuohan Li's avatar
Zhuohan Li committed
268
269
def get_tensor_model_parallel_group():
    """Get the tensor model parallel group the caller rank belongs to."""
270
    assert _TP_DEVICE_GROUP is not None, (
271
        "tensor model parallel group is not initialized")
272
273
274
275
276
277
278
279
    return _TP_DEVICE_GROUP


def get_tensor_model_parallel_cpu_group():
    """Get the tensor model parallel cpu group the caller rank belongs to."""
    assert _TP_CPU_GROUP is not None, (
        "tensor model parallel cpu group is not initialized")
    return _TP_CPU_GROUP
Zhuohan Li's avatar
Zhuohan Li committed
280
281
282
283


def get_pipeline_model_parallel_group():
    """Get the pipeline model parallel group the caller rank belongs to."""
284
    assert _PP_DEVICE_GROUP is not None, (
285
        "pipeline model parallel group is not initialized")
286
    return _PP_DEVICE_GROUP
Zhuohan Li's avatar
Zhuohan Li committed
287
288


289
290
291
292
293
294
295
def get_pipeline_model_parallel_cpu_group():
    """Get the pipeline model parallel cpu group the caller rank belongs to."""
    assert _PP_CPU_GROUP is not None, (
        "pipeline model parallel cpu group is not initialized")
    return _PP_CPU_GROUP


Zhuohan Li's avatar
Zhuohan Li committed
296
297
def get_tensor_model_parallel_world_size():
    """Return world size for the tensor model parallel group."""
298
299
    return torch.distributed.get_world_size(
        group=get_tensor_model_parallel_group())
Zhuohan Li's avatar
Zhuohan Li committed
300
301
302
303


def get_pipeline_model_parallel_world_size():
    """Return world size for the pipeline model parallel group."""
304
305
    return torch.distributed.get_world_size(
        group=get_pipeline_model_parallel_group())
Zhuohan Li's avatar
Zhuohan Li committed
306
307
308
309
310
311
312
313
314


def get_tensor_model_parallel_rank():
    """Return my rank for the tensor model parallel group."""
    return torch.distributed.get_rank(group=get_tensor_model_parallel_group())


def get_pipeline_model_parallel_rank():
    """Return my rank for the pipeline model parallel group."""
315
316
    return torch.distributed.get_rank(
        group=get_pipeline_model_parallel_group())
Zhuohan Li's avatar
Zhuohan Li committed
317
318
319
320
321
322
323
324
325
326
327
328
329


def get_tensor_model_parallel_src_rank():
    """Calculate the global rank corresponding to the first local rank
    in the tensor model parallel group."""
    global_rank = torch.distributed.get_rank()
    local_world_size = get_tensor_model_parallel_world_size()
    return (global_rank // local_world_size) * local_world_size


def get_pipeline_model_parallel_first_rank():
    """Return the global rank of the first process in the pipeline for the
    current tensor parallel group"""
330
    assert _PP_GLOBAL_RANKS is not None, (
331
        "Pipeline parallel group is not initialized")
332
    return _PP_GLOBAL_RANKS[0]
Zhuohan Li's avatar
Zhuohan Li committed
333
334
335
336
337


def get_pipeline_model_parallel_last_rank():
    """Return the global rank of the last process in the pipeline for the
    current tensor parallel group"""
338
    assert _PP_GLOBAL_RANKS is not None, (
339
        "Pipeline parallel group is not initialized")
Zhuohan Li's avatar
Zhuohan Li committed
340
    last_rank_local = get_pipeline_model_parallel_world_size() - 1
341
    return _PP_GLOBAL_RANKS[last_rank_local]
Zhuohan Li's avatar
Zhuohan Li committed
342

Zhuohan Li's avatar
Zhuohan Li committed
343

Zhuohan Li's avatar
Zhuohan Li committed
344
345
def get_pipeline_model_parallel_next_rank():
    """Return the global rank that follows the caller in the pipeline"""
346
    assert _PP_GLOBAL_RANKS is not None, (
347
        "Pipeline parallel group is not initialized")
Zhuohan Li's avatar
Zhuohan Li committed
348
349
    rank_in_pipeline = get_pipeline_model_parallel_rank()
    world_size = get_pipeline_model_parallel_world_size()
350
    return _PP_GLOBAL_RANKS[(rank_in_pipeline + 1) % world_size]
Zhuohan Li's avatar
Zhuohan Li committed
351
352
353


def get_pipeline_model_parallel_prev_rank():
354
    """Return the global rank that precedes the caller in the pipeline"""
355
    assert _PP_GLOBAL_RANKS is not None, (
356
        "Pipeline parallel group is not initialized")
Zhuohan Li's avatar
Zhuohan Li committed
357
358
    rank_in_pipeline = get_pipeline_model_parallel_rank()
    world_size = get_pipeline_model_parallel_world_size()
359
    return _PP_GLOBAL_RANKS[(rank_in_pipeline - 1) % world_size]
Zhuohan Li's avatar
Zhuohan Li committed
360
361
362


def destroy_model_parallel():
363
    """Set the groups to none and destroy them."""
364
365
366
367
368
369
370
371
    global _TP_DEVICE_GROUP
    if _TP_DEVICE_GROUP:
        torch.distributed.destroy_process_group(_TP_DEVICE_GROUP)
    _TP_DEVICE_GROUP = None
    global _TP_CPU_GROUP
    if _TP_CPU_GROUP:
        torch.distributed.destroy_process_group(_TP_CPU_GROUP)
    _TP_CPU_GROUP = None
372
373
374
375
376
377
378
379
380
    global _TP_PYNCCL_COMMUNICATOR
    _TP_PYNCCL_COMMUNICATOR = None

    global _PP_DEVICE_GROUP
    if _PP_DEVICE_GROUP:
        torch.distributed.destroy_process_group(_PP_DEVICE_GROUP)
    _PP_DEVICE_GROUP = None
    global _PP_GLOBAL_RANKS
    _PP_GLOBAL_RANKS = None
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445


def is_in_the_same_node(pg: ProcessGroup):
    """
    This is a collective operation that checks if all processes in the group
    are in the same node. It tests if all processes are attached to the same
    memory system (shared access to shared memory).
    """
    assert torch.distributed.get_backend(
        pg) != torch.distributed.Backend.NCCL, (
            "is_in_the_same_node should be tested with a non-NCCL group.")
    # local rank inside the group
    rank = torch.distributed.get_rank(group=pg)
    world_size = torch.distributed.get_world_size(group=pg)

    # local tensor in each process to store the result
    is_in_the_same_node = torch.tensor([0] * world_size, dtype=torch.int32)

    # global ranks of the processes in the group
    ranks = torch.distributed.get_process_group_ranks(pg)

    magic_message = b"magic_message"
    shm = None

    try:
        with contextlib.suppress(OSError):
            if rank == 0:
                # create a shared memory segment
                shm = shared_memory.SharedMemory(create=True, size=128)
                shm.buf[:len(magic_message)] = magic_message
                torch.distributed.broadcast_object_list([shm.name],
                                                        src=ranks[0],
                                                        group=pg)
                is_in_the_same_node[0] = 1
            else:
                # try to open the shared memory segment
                recv = [None]
                torch.distributed.broadcast_object_list(recv,
                                                        src=ranks[0],
                                                        group=pg)
                name = recv[0]
                shm = shared_memory.SharedMemory(name=name)
                if shm.buf[:len(magic_message)] == magic_message:
                    is_in_the_same_node[rank] = 1
    except Exception as e:
        logger.error("Error ignored in is_in_the_same_node: %s", e)
    finally:
        if shm:
            shm.close()

    torch.distributed.barrier(group=pg)

    # clean up the shared memory segment
    with contextlib.suppress(OSError):
        if rank == 0:
            if shm:
                shm.unlink()
        else:
            if shm:
                # fix to https://stackoverflow.com/q/62748654/9191338
                resource_tracker.unregister(
                    shm._name, "shared_memory")  # type: ignore[attr-defined]
    torch.distributed.all_reduce(is_in_the_same_node, group=pg)

    return is_in_the_same_node.sum().item() == world_size