initialize.py 5.25 KB
Newer Older
Mohammad's avatar
Mohammad committed
1
# coding=utf-8
Mohammad's avatar
Mohammad committed
2
# Copyright (c) 2020, NVIDIA CORPORATION.  All rights reserved.
Mohammad's avatar
Mohammad committed
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Megatron initialization."""

import random
import os

21
import numpy as np
Mohammad's avatar
Mohammad committed
22
23
import torch

24
25
26
from megatron import get_adlr_autoresume
from megatron import get_args
from megatron import get_tensorboard_writer
Mohammad's avatar
Mohammad committed
27
from megatron import mpu
28
29
from megatron.global_vars import set_global_variables
from megatron.mpu import set_model_parallel_rank, set_model_parallel_world_size
Mohammad's avatar
Mohammad committed
30

31
def initialize_megatron(extra_args_provider=None, args_defaults={},
Raul Puri's avatar
Raul Puri committed
32
                        ignore_unknown_args=False, allow_no_cuda=False):
Mohammad's avatar
Mohammad committed
33
    """Set global variables, initialize distributed, and
Raul Puri's avatar
Raul Puri committed
34
35
36
    set autoresume and random seeds.
    `allow_no_cuda` should not be set unless using megatron for cpu only 
    data processing. In general this arg should not be set unless you know 
37
38
39
40
41
    what you are doing.
    Returns a function to finalize distributed env initialization 
    (optionally, only for args.distributed_backend == "external_ddp")

"""
Raul Puri's avatar
Raul Puri committed
42
43
44
    if not allow_no_cuda:
        # Make sure cuda is available.
        assert torch.cuda.is_available(), 'Megatron requires CUDA.'
Mohammad's avatar
Mohammad committed
45

Mohammad's avatar
Mohammad committed
46
47
    # Parse args, build tokenizer, and set adlr-autoresume,
    # tensorboard-writer, and timers.
Mohammad's avatar
Mohammad committed
48
    set_global_variables(extra_args_provider=extra_args_provider,
49
50
                         args_defaults=args_defaults,
                         ignore_unknown_args=ignore_unknown_args)
Mohammad's avatar
Mohammad committed
51

52
    # torch.distributed initialization
53
    def finish_mpu_init():
54
55
56
57
58
59
60
61
        args = get_args()
        # Pytorch distributed.
        _initialize_distributed()
        
        # Random seeds for reproducibility.
        if args.rank == 0:
            print('> setting random seeds to {} ...'.format(args.seed))
        _set_random_seed(args.seed)
Mohammad's avatar
Mohammad committed
62
63

    args = get_args()
64
    if  args.lazy_mpu_init:
65
66
67
        # delayed initialization of DDP-related stuff
        # We only set basic DDP globals    
        set_model_parallel_world_size(args.model_parallel_size)
Boris Fomitchev's avatar
Boris Fomitchev committed
68
        # and return function for external DDP manager to call when it has DDP initialized
69
        set_model_parallel_rank(args.rank)    
70
        return finish_mpu_init
71
    else:
72
73
        # Megatron's MPU is the master. Complete initialization right away.
        finish_mpu_init()
74
75
76
77
78
79
80
81
82
        
        # Autoresume.
        _init_autoresume()
        
        # Write arguments to tensorboard.
        _write_args_to_tensorboard()
        # No continuation function
        return None
        
Mohammad's avatar
Mohammad committed
83
84
85
86
87

def _initialize_distributed():
    """Initialize torch.distributed and mpu."""
    args = get_args()

Raul Puri's avatar
Raul Puri committed
88
    device_count = torch.cuda.device_count()
Mohammad's avatar
Mohammad committed
89
90
91
92
93
94
95
96
97
98
99
100
101
    if torch.distributed.is_initialized():

        if args.rank == 0:
            print('torch distributed is already initialized, '
                  'skipping initialization ...', flush=True)
        args.rank = torch.distributed.get_rank()
        args.world_size = torch.distributed.get_world_size()

    else:

        if args.rank == 0:
            print('> initializing torch distributed ...', flush=True)
        # Manually set the device ids.
102
        if device_count > 0:
Raul Puri's avatar
Raul Puri committed
103
            device = args.rank % device_count
104
105
106
107
108
109
            if args.local_rank is not None:
                assert args.local_rank == device, \
                    'expected local-rank to be the same as rank % device-count.'
            else:
                args.local_rank = device
            torch.cuda.set_device(device)
Mohammad's avatar
Mohammad committed
110
111
112
113
114
115
116
117
118
119
120
        # Call the init process
        init_method = 'tcp://'
        master_ip = os.getenv('MASTER_ADDR', 'localhost')
        master_port = os.getenv('MASTER_PORT', '6000')
        init_method += master_ip + ':' + master_port
        torch.distributed.init_process_group(
            backend=args.distributed_backend,
            world_size=args.world_size, rank=args.rank,
            init_method=init_method)

    # Set the model-parallel / data-parallel communicators.
121
122
    if device_count > 0:
        mpu.initialize_model_parallel(args.model_parallel_size)
Mohammad's avatar
Mohammad committed
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139


def _init_autoresume():
    """Set autoresume start time."""
    autoresume = get_adlr_autoresume()
    if autoresume:
        torch.distributed.barrier()
        autoresume.init()
        torch.distributed.barrier()


def _set_random_seed(seed):
    """Set random seed for reproducability."""
    if seed is not None and seed > 0:
        random.seed(seed)
        np.random.seed(seed)
        torch.manual_seed(seed)
140
141
        if torch.cuda.device_count() > 0:
            mpu.model_parallel_cuda_manual_seed(seed)
Mohammad's avatar
Mohammad committed
142
143
    else:
        raise ValueError('Seed ({}) should be a positive integer.'.format(seed))
Mohammad's avatar
Mohammad committed
144
145
146
147
148
149
150
151
152


def _write_args_to_tensorboard():
    """Write arguments to tensorboard."""
    args = get_args()
    writer = get_tensorboard_writer()
    if writer:
        for arg in vars(args):
            writer.add_text(arg, str(getattr(args, arg)))