megatron_model.py 3.96 KB
Newer Older
aiss's avatar
aiss committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
from pathlib import Path
import torch
import os
import sys
import math

from .common import get_test_path
from deepspeed.pipe import PipelineModule, LayerSpec


def get_megatron_version():
    p = os.popen("pip list --format=columns | grep megatron-lm")
    pip_list = p.read()
    assert 'megatron-lm' in pip_list, 'Please install Megatron-LM before getting its version'
    ver_str = pip_list.split()[1]
    return float(ver_str[0])


def get_gpt2_model(args_others, mp_size=1):
    from megatron.model import GPT2Model
    from megatron.initialize import initialize_megatron

    args_defaults = {
        'vocab_file': get_test_path('gpt2-vocab.json'),
        'merge_file': get_test_path('gpt2-merges.txt'),
        'tokenizer_type': 'GPT2BPETokenizer',
    }

    args_defaults.update(args_others)

    # setting "make-vocab-size-divisible-by" to avoid word-embedding size change in resizing testing.
    sys.argv.extend([
        '--model-parallel-size',
        str(mp_size),
        '--make-vocab-size-divisible-by',
        str(1)
    ])

    initialize_megatron(args_defaults=args_defaults, ignore_unknown_args=True)
    model = GPT2Model(num_tokentypes=0, parallel_output=False)
    model.cuda()
    from torch.nn.parallel.distributed import DistributedDataParallel as torchDDP
    from megatron import mpu
    i = torch.cuda.current_device()
    model = torchDDP(model,
                     device_ids=[i],
                     output_device=i,
                     process_group=mpu.get_data_parallel_group())

    return model


class MockGPT2ModelPipe(PipelineModule):
    def __init__(self, num_layers, mp_size, args_others, topo, **kwargs):
        from megatron.initialize import initialize_megatron

        args_defaults = {
            'vocab_file': get_test_path('gpt2-vocab.json'),
            'merge_file': get_test_path('gpt2-merges.txt'),
            'tokenizer_type': 'GPT2BPETokenizer',
        }

        args_defaults.update(args_others)

        # setting "make-vocab-size-divisible-by" to avoid word-embedding size change in resizing testing.
        sys.argv.extend([
            '--model-parallel-size',
            str(mp_size),
            '--make-vocab-size-divisible-by',
            str(1)
        ])

        initialize_megatron(args_defaults=args_defaults, ignore_unknown_args=True)

        from megatron.model.transformer import ParallelTransformerLayer

        class ParallelTransformerLayerPipe(ParallelTransformerLayer):
            def forward(self, args):
                # hardcode attn mask for testing, PP requires the attn_mask to be stashed
                attention_mask = torch.tensor([[True]],
                                              device=torch.cuda.current_device())
                return super().forward(args, attention_mask)

        layers = []
        for x in range(num_layers):
            layers.append(
                LayerSpec(ParallelTransformerLayerPipe,
                          self.gpt2_attention_mask_func,
                          self.init_method_normal(0.02),
                          self.scaled_init_method_normal(0.02,
                                                         num_layers),
                          x))
        super().__init__(layers=layers,
                         loss_fn=torch.nn.CrossEntropyLoss(),
                         topology=topo,
                         **kwargs)

    def gpt2_attention_mask_func(self, attention_scores, ltor_mask):
        attention_scores.masked_fill_(ltor_mask, -10000.0)
        return attention_scores

    def init_method_normal(self, sigma):
        """Init method based on N(0, sigma)."""
        def init_(tensor):
            return torch.nn.init.normal_(tensor, mean=0.0, std=sigma)

        return init_

    def scaled_init_method_normal(self, sigma, num_layers):
        """Init method based on N(0, sigma/sqrt(2*num_layers)."""
        std = sigma / math.sqrt(2.0 * num_layers)

        def init_(tensor):
            return torch.nn.init.normal_(tensor, mean=0.0, std=std)

        return init_