initialize.py 4.8 KB
Newer Older
Mohammad's avatar
Mohammad committed
1
# coding=utf-8
Mohammad's avatar
Mohammad committed
2
# Copyright (c) 2020, NVIDIA CORPORATION.  All rights reserved.
Mohammad's avatar
Mohammad committed
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Megatron initialization."""

import random
import os

21
import numpy as np
Mohammad's avatar
Mohammad committed
22
23
import torch

24
25
26
from megatron import get_adlr_autoresume
from megatron import get_args
from megatron import get_tensorboard_writer
Mohammad's avatar
Mohammad committed
27
from megatron import mpu
28
from megatron.global_vars import set_global_variables
Mohammad's avatar
Mohammad committed
29
30


31
def initialize_megatron(extra_args_provider=None, args_defaults={},
Raul Puri's avatar
Raul Puri committed
32
                        ignore_unknown_args=False, allow_no_cuda=False):
Mohammad's avatar
Mohammad committed
33
    """Set global variables, initialize distributed, and
Raul Puri's avatar
Raul Puri committed
34
35
36
37
    set autoresume and random seeds.
    `allow_no_cuda` should not be set unless using megatron for cpu only 
    data processing. In general this arg should not be set unless you know 
    what you are doing."""
Raul Puri's avatar
Raul Puri committed
38
39
40
    if not allow_no_cuda:
        # Make sure cuda is available.
        assert torch.cuda.is_available(), 'Megatron requires CUDA.'
Mohammad's avatar
Mohammad committed
41

42
43
44
45
46
47
    # This is temporary WAR to make simple case like pytest calling with same args twice
    # Need to implement clean factory init.
    if mpu.model_parallel_is_initialized():
        return
    
    
Mohammad's avatar
Mohammad committed
48
49
    # Parse args, build tokenizer, and set adlr-autoresume,
    # tensorboard-writer, and timers.
Mohammad's avatar
Mohammad committed
50
    set_global_variables(extra_args_provider=extra_args_provider,
51
52
                         args_defaults=args_defaults,
                         ignore_unknown_args=ignore_unknown_args)
Mohammad's avatar
Mohammad committed
53
54
55
56
57
58
59

    # Pytorch distributed.
    _initialize_distributed()

    # Autoresume.
    _init_autoresume()

60
    # Random seeds for reproducibility.
Mohammad's avatar
Mohammad committed
61
62
63
64
65
    args = get_args()
    if args.rank == 0:
        print('> setting random seeds to {} ...'.format(args.seed))
    _set_random_seed(args.seed)

Mohammad's avatar
Mohammad committed
66
67
68
    # Write arguments to tensorboard.
    _write_args_to_tensorboard()

Mohammad's avatar
Mohammad committed
69
70
71
72
73

def _initialize_distributed():
    """Initialize torch.distributed and mpu."""
    args = get_args()

Raul Puri's avatar
Raul Puri committed
74
    device_count = torch.cuda.device_count()
Mohammad's avatar
Mohammad committed
75
76
77
78
79
80
81
    if torch.distributed.is_initialized():

        if args.rank == 0:
            print('torch distributed is already initialized, '
                  'skipping initialization ...', flush=True)
        args.rank = torch.distributed.get_rank()
        args.world_size = torch.distributed.get_world_size()
82
83
        if device_count > 0:
            device = torch.cuda.current_device()
Raul Puri's avatar
Raul Puri committed
84
            local_rank = args.rank % device_count
85
86
            assert local_rank == device, \
                'expected local-rank to be the same as rank % device-count.'
Mohammad's avatar
Mohammad committed
87
88
89
90
91
92

    else:

        if args.rank == 0:
            print('> initializing torch distributed ...', flush=True)
        # Manually set the device ids.
93
        if device_count > 0:
Raul Puri's avatar
Raul Puri committed
94
            device = args.rank % device_count
95
96
97
98
99
100
            if args.local_rank is not None:
                assert args.local_rank == device, \
                    'expected local-rank to be the same as rank % device-count.'
            else:
                args.local_rank = device
            torch.cuda.set_device(device)
Mohammad's avatar
Mohammad committed
101
102
103
104
105
106
107
108
109
110
111
        # Call the init process
        init_method = 'tcp://'
        master_ip = os.getenv('MASTER_ADDR', 'localhost')
        master_port = os.getenv('MASTER_PORT', '6000')
        init_method += master_ip + ':' + master_port
        torch.distributed.init_process_group(
            backend=args.distributed_backend,
            world_size=args.world_size, rank=args.rank,
            init_method=init_method)

    # Set the model-parallel / data-parallel communicators.
112
113
    if device_count > 0:
        mpu.initialize_model_parallel(args.model_parallel_size)
Mohammad's avatar
Mohammad committed
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130


def _init_autoresume():
    """Set autoresume start time."""
    autoresume = get_adlr_autoresume()
    if autoresume:
        torch.distributed.barrier()
        autoresume.init()
        torch.distributed.barrier()


def _set_random_seed(seed):
    """Set random seed for reproducability."""
    if seed is not None and seed > 0:
        random.seed(seed)
        np.random.seed(seed)
        torch.manual_seed(seed)
131
132
        if torch.cuda.device_count() > 0:
            mpu.model_parallel_cuda_manual_seed(seed)
Mohammad's avatar
Mohammad committed
133
134
    else:
        raise ValueError('Seed ({}) should be a positive integer.'.format(seed))
Mohammad's avatar
Mohammad committed
135
136
137
138
139
140
141
142
143


def _write_args_to_tensorboard():
    """Write arguments to tensorboard."""
    args = get_args()
    writer = get_tensorboard_writer()
    if writer:
        for arg in vars(args):
            writer.add_text(arg, str(getattr(args, arg)))