distributed.py 1.68 KB
Newer Older
Rick Ho's avatar
Rick Ho committed
1
2
3
r"""
distributed support for Megatron
"""
Rick Ho's avatar
Rick Ho committed
4
5
import torch

Rick Ho's avatar
Rick Ho committed
6
7
8
from fmoe.distributed import DistributedGroupedDataParallel


Rick Ho's avatar
Rick Ho committed
9
10
11
12
13
14
_groups = None


def _set_groups(**kwargs):
    global _groups
    _groups = kwargs
Rick Ho's avatar
Rick Ho committed
15
16


Rick Ho's avatar
Rick Ho committed
17
18
19
20
def _init():
    from megatron import get_args
    from megatron import mpu
    args = get_args()
Rick Ho's avatar
Rick Ho committed
21

Rick Ho's avatar
Rick Ho committed
22
    # Create a comm prependicular to the pipeline group as gate group
Rick Ho's avatar
Rick Ho committed
23
24
25
26
27
28
    stage_size = args.world_size // args.pipeline_model_parallel_size
    for i in range(0, args.world_size, stage_size):
        ranks = range(i, i + stage_size)
        group = torch.distributed.new_group(ranks)
        if args.rank in ranks:
            gate_group = group
Rick Ho's avatar
Rick Ho committed
29

Rick Ho's avatar
Rick Ho committed
30
31
32
33
    _set_groups(
            dp_group=mpu.get_data_parallel_group(),
            moe_group=mpu.get_data_parallel_group(),
            gate_group=gate_group)
34
35


Rick Ho's avatar
Rick Ho committed
36
37
38
39
40
41
42
43
class DistributedDataParallel(DistributedGroupedDataParallel):
    r"""
    A wrapper that is used to replace the DDP module provided by Megatron, which
    is adapted to enable the sophiscated parallel and reduction strategies in
    Fast MoE.
    """

    def __init__(self, module):
Rick Ho's avatar
Rick Ho committed
44
45
46
        if _groups is None:
            _init()
        super().__init__(module, **_groups)
Rick Ho's avatar
Rick Ho committed
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64

    def state_dict(self, *args, **kwargs):
        r"""
        Keep consitency with Megatron
        """
        return self.module.state_dict(*args, **kwargs)

    def state_dict_for_save_checkpoint(self, *args, **kwargs):
        r"""
        Keep consitency with Megatron
        """
        return self.module.state_dict_for_save_checkpoint(*args, **kwargs)

    def load_state_dict(self, *args, **kwargs):
        r"""
        Keep consitency with Megatron
        """
        return self.module.load_state_dict(*args, **kwargs)