Commit abe36e2e authored by Raul Puri's avatar Raul Puri
Browse files

large update including model parallelism and gpt2


Co-authored-by: default avatarshoeybi <shoeybim@gmail.com>
Co-authored-by: default avatarraulpuric <raulpuric@berkeley.edu>
Co-authored-by: default avatarjaredcasper <jaredcasper@gmail.com>
Co-authored-by: default avatarmpatwary <mostofa.patwary@gmail.com>
Co-authored-by: default avatarplegresl <plegresl@gmail.com>
parent 0399d32c
# coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
import numpy as np
import torch
from torch.multiprocessing import Lock
from torch.utils.data import Dataset
import mpu
from data_utils.samplers import DistributedBatchSampler
from data_utils.tokenization_gpt2 import GPT2Tokenizer
def make_gpt2_dataloaders(args):
# Input parameters.
input_data_sizes_file = args.input_data_sizes_file
seq_length = args.seq_length
initial_seed = args.seed
# Data parallel arguments.
world_size = mpu.get_data_parallel_world_size()
rank = mpu.get_data_parallel_rank()
global_batch_size = args.batch_size * world_size
num_workers = args.num_workers
def make_data_loader_(data_path):
# Build the dataset.
dataset = GPT2Dataset(data_path, input_data_sizes_file,
seq_length, initial_seed)
# Use a simple sampler with distributed batch sampler.
sampler = torch.utils.data.SequentialSampler(dataset)
batch_sampler = DistributedBatchSampler(sampler=sampler,
batch_size=global_batch_size,
drop_last=True,
rank=rank,
world_size=world_size)
# Torch dataloader.
return torch.utils.data.DataLoader(dataset,
batch_sampler=batch_sampler,
num_workers=num_workers,
pin_memory=True)
train = make_data_loader_(args.train_data_path)
valid = make_data_loader_(args.val_data_path)
test = make_data_loader_(args.test_data_path)
# Tokenizer.
tokenizer = GPT2Tokenizer.from_pretrained('gpt2', cache_dir=args.cache_dir)
eod_token = tokenizer.encoder['<|endoftext|>']
num_tokens = eod_token + 1
return (train, valid, test), num_tokens, eod_token
class GPT2Dataset(Dataset):
def __init__(self, data_path, sizes_filename, seq_length,
initial_seed, max_epochs=100):
# Input parameters.
self.data_path = data_path
self.sizes_filename = sizes_filename
self.seq_length = seq_length
self.initial_seed = initial_seed
self.max_epochs = max_epochs
# Lock for building the dataset.
self.lock = Lock()
# Shard stuff.
# Dictionary from shard nameto its size (number of element).
self.master_shard_size_dict = None
# Dictionary from shard name to modified size so it is
# divisible by self.seq_length.
self.shard_size_dict = None
# Long array (self.max_epochs * num-shards) populated
# randomly with shard names.
self.shards_name = None
# Start index of the data for a shard.
self.shards_start_index = None
self.build_shard_mappings_()
self.data_length = self.shards_start_index[-1]
# Data.
self.shards_data = [None]*self.shards_name.size
self.shards_sample_index = [None]*self.shards_name.size
def __len__(self):
return self.data_length
def __getitem__(self, idx):
# Find which shard we need.
shard_index = np.searchsorted(self.shards_start_index,
idx, side='right') - 1
# data index in the shard.
data_idx = idx - self.shards_start_index[shard_index]
# Load the shard if it is not in memory.
#self.lock.acquire()
if self.shards_data[shard_index] is None:
print('global rank {} is building data for shard index {} ...'.
format(torch.distributed.get_rank(), shard_index))
self.build_dataset_(shard_index)
#assert self.shards_data[shard_index] is not None
#self.lock.release()
# Start index.
start_index = self.shards_sample_index[shard_index][data_idx]
# Add one for label shift.
end_index = start_index + self.seq_length + 1
data = self.shards_data[shard_index][start_index:end_index]
return {'text': np.array(data, dtype=np.int64)}
def build_dataset_(self, shard_index):
# Garbage collect so we don't use a lot of memory.
# Leave the last one in case other threads have not catche up yet.
for i in range(shard_index - 1):
self.shards_data[i] = None
self.shards_sample_index[i] = None
# Read the shard.
filename = os.path.join(self.data_path, self.shards_name[shard_index])
print('loading {}'.format(filename))
data = np.load(filename, allow_pickle=True)
# Shuffle the data
rng = np.random.RandomState(self.initial_seed + shard_index)
rng.shuffle(data)
# Flatten.
data = np.hstack(data)
size = (data.shape[0] - 1) // self.seq_length
last_index = size * self.seq_length + 1
data = data[0:last_index]
self.shards_data[shard_index] = data
indices = np.arange(size) * self.seq_length
rng.shuffle(indices)
self.shards_sample_index[shard_index] = indices
def build_shard_mappings_(self):
# Load the sizes file.
sizes_filename = os.path.join(self.data_path, self.sizes_filename)
if torch.distributed.get_rank() == 0:
print(' > loading sizes from {}'.format(sizes_filename))
with open(sizes_filename, 'r') as f:
self.master_shard_size_dict = json.load(f)
if torch.distributed.get_rank() == 0:
print(' found {} shards'.format(len(self.master_shard_size_dict)))
# Adjust sizes to be a multiple of seq_length.
self.shard_size_dict = self.master_shard_size_dict.copy()
total_samples = 0
for shard in self.shard_size_dict:
size = self.shard_size_dict[shard]
size = ((size - 1) // self.seq_length) * self.seq_length
total_samples += size // self.seq_length
self.shard_size_dict[shard] = size
if torch.distributed.get_rank() == 0:
print(' found {} samples in the dataset'.format(total_samples))
# Build a list of shards.
shards_ = np.sort(np.array(list(self.shard_size_dict.keys())))
rng = np.random.RandomState(self.initial_seed)
self.shards_name = np.copy(shards_)
rng.shuffle(self.shards_name)
for i in range(1, self.max_epochs):
shards_c = np.copy(shards_)
rng.shuffle(shards_c)
self.shards_name = np.append(self.shards_name, shards_c)
# Build the global indexing.
self.shards_start_index = np.zeros(self.shards_name.size, dtype=np.int)
self.shards_start_index[0] = 0
for i in range(1, self.shards_name.size):
shard = str(self.shards_name[i-1])
size = self.shard_size_dict[shard]
self.shards_start_index[i] = self.shards_start_index[i-1] + \
size // self.seq_length
'''
if __name__ == '__main__':
print('gpt2 data loader ...')
path = '/raid/mshoeybi/data/gpt2/adlr/reddit_all_ftfy_lg200/npys'
dataset = GPT2Dataset(path, 'sizes.txt', 1024, 1234, 100)
print('dataset contains {} samples'.format(dataset.data_length))
for i in range(len(dataset)):
if i % 512000 == 0:
print(i)
data = dataset[i]
'''
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
# limitations under the License. # limitations under the License.
"""PyTorch DataLoader for TFRecords""" """PyTorch DataLoader for TFRecords"""
import torch
from torch.optim.lr_scheduler import _LRScheduler from torch.optim.lr_scheduler import _LRScheduler
import math import math
...@@ -30,7 +31,8 @@ class AnnealingLR(_LRScheduler): ...@@ -30,7 +31,8 @@ class AnnealingLR(_LRScheduler):
self.end_iter = num_iters self.end_iter = num_iters
self.decay_style = decay_style.lower() if isinstance(decay_style, str) else None self.decay_style = decay_style.lower() if isinstance(decay_style, str) else None
self.step(self.num_iters) self.step(self.num_iters)
print('learning rate decaying', decay_style) if torch.distributed.get_rank() == 0:
print('learning rate decaying', decay_style)
def get_lr(self): def get_lr(self):
# https://openreview.net/pdf?id=BJYwwY9ll pg. 4 # https://openreview.net/pdf?id=BJYwwY9ll pg. 4
......
...@@ -14,4 +14,7 @@ ...@@ -14,4 +14,7 @@
# limitations under the License. # limitations under the License.
from .distributed import * from .distributed import *
from .model import * from .gpt2_modeling import gpt2_get_params_for_weight_decay_optimization
from .gpt2_modeling import GPT2Model
from .model import BertModel
from .model import get_params_for_weight_decay_optimization
...@@ -19,6 +19,7 @@ import torch.distributed as dist ...@@ -19,6 +19,7 @@ import torch.distributed as dist
from torch.nn.modules import Module from torch.nn.modules import Module
from torch.autograd import Variable from torch.autograd import Variable
import mpu
class DistributedDataParallel(Module): class DistributedDataParallel(Module):
...@@ -27,10 +28,11 @@ class DistributedDataParallel(Module): ...@@ -27,10 +28,11 @@ class DistributedDataParallel(Module):
self.warn_on_half = True if dist._backend == dist.dist_backend.GLOO else False self.warn_on_half = True if dist._backend == dist.dist_backend.GLOO else False
self.module = module self.module = module
self.data_parallel_group = mpu.get_data_parallel_group()
src_rank = mpu.get_model_parallel_rank()
for p in self.module.parameters(): for p in self.module.parameters():
if torch.is_tensor(p): if torch.is_tensor(p):
dist.broadcast(p, 0) dist.broadcast(p, src_rank, group=self.data_parallel_group)
def allreduce_params(reduce_after=True, no_scale=False, fp32_allreduce=False): def allreduce_params(reduce_after=True, no_scale=False, fp32_allreduce=False):
if(self.needs_reduction): if(self.needs_reduction):
...@@ -54,11 +56,11 @@ class DistributedDataParallel(Module): ...@@ -54,11 +56,11 @@ class DistributedDataParallel(Module):
if fp32_allreduce: if fp32_allreduce:
coalesced = coalesced.float() coalesced = coalesced.float()
if not no_scale and not reduce_after: if not no_scale and not reduce_after:
coalesced /= dist.get_world_size() coalesced /= dist.get_world_size(group=self.data_parallel_group)
dist.all_reduce(coalesced) dist.all_reduce(coalesced, group=self.data_parallel_group)
torch.cuda.synchronize() torch.cuda.synchronize()
if not no_scale and reduce_after: if not no_scale and reduce_after:
coalesced /= dist.get_world_size() coalesced /= dist.get_world_size(group=self.data_parallel_group)
for buf, synced in zip(grads, _unflatten_dense_tensors(coalesced, grads)): for buf, synced in zip(grads, _unflatten_dense_tensors(coalesced, grads)):
buf.copy_(synced) buf.copy_(synced)
self.hook_handles = [] self.hook_handles = []
......
# coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""GPT-2 model."""
import torch
import torch.nn.functional as F
import mpu
def init_method_normal(std=0.02):
"""Init method based on normal distribution.
This is only used for embeddings. The transformer has its
own initializer.
"""
def init_(tensor):
return torch.nn.init.normal_(tensor, mean=0.0, std=std)
return init_
class GPT2Model(torch.nn.Module):
"""GPT-2 Language model.
The output of the forward method are the logits (parallel or
serial depending on the `parallel_output` flag.
"""
def __init__(self,
num_layers,
vocab_size,
hidden_size,
num_attention_heads,
embedding_dropout_prob,
attention_dropout_prob,
output_dropout_prob,
max_sequence_length,
checkpoint_activations,
checkpoint_num_layers=1,
parallel_output=True):
super(GPT2Model, self).__init__()
self.parallel_output = parallel_output
init_method = init_method_normal(std=0.02)
# Word embeddings (parallel).
self.word_embeddings = mpu.VocabParallelEmbedding(
vocab_size, hidden_size, init_method=init_method)
# Position embedding (serial).
self.position_embeddings = torch.nn.Embedding(max_sequence_length,
hidden_size)
# Initialize the position embeddings.
init_method(self.position_embeddings.weight)
# Embeddings dropout
self.embedding_dropout = torch.nn.Dropout(embedding_dropout_prob)
# Transformer
self.transformer = mpu.GPT2ParallelTransformer(num_layers,
hidden_size,
num_attention_heads,
attention_dropout_prob,
output_dropout_prob,
checkpoint_activations,
checkpoint_num_layers)
def forward(self, input_ids, position_ids, attention_mask):
# Embeddings.
words_embeddings = self.word_embeddings(input_ids)
position_embeddings = self.position_embeddings(position_ids)
embeddings = words_embeddings + position_embeddings
# Dropout.
embeddings = self.embedding_dropout(embeddings)
# Transformer.
transformer_output = self.transformer(embeddings, attention_mask)
# Parallel logits.
transformer_output_parallel = mpu.copy_to_model_parallel_region(
transformer_output)
logits_parallel = F.linear(transformer_output_parallel,
self.word_embeddings.weight)
if self.parallel_output:
return logits_parallel
return mpu.gather_from_model_parallel_region(logits_parallel)
def gpt2_get_params_for_weight_decay_optimization(module):
weight_decay_params = {'params': []}
no_weight_decay_params = {'params': [], 'weight_decay': 0.0}
for module_ in module.modules():
if isinstance(module_, (mpu.LayerNorm, torch.nn.LayerNorm)):
no_weight_decay_params['params'].extend(
[p for p in list(module_._parameters.values())
if p is not None])
else:
weight_decay_params['params'].extend(
[p for n, p in list(module_._parameters.items())
if p is not None and n != 'bias'])
no_weight_decay_params['params'].extend(
[p for n, p in list(module_._parameters.items())
if p is not None and n == 'bias'])
return weight_decay_params, no_weight_decay_params
...@@ -18,14 +18,14 @@ ...@@ -18,14 +18,14 @@
import torch import torch
from .modeling import BertConfig from .modeling import BertConfig
from .modeling import BertForPreTraining from .modeling import BertForPreTraining, BertForMaskedLM
from .modeling import BertLayerNorm from .modeling import BertLayerNorm
def get_params_for_weight_decay_optimization(module): def get_params_for_weight_decay_optimization(module):
weight_decay_params = {'params': []} weight_decay_params = {'params': []}
no_weight_decay_params = {'params': [], 'weight_decay': 0} no_weight_decay_params = {'params': [], 'weight_decay': 0.0}
for module_ in module.modules(): for module_ in module.modules():
if isinstance(module_, (BertLayerNorm, torch.nn.LayerNorm)): if isinstance(module_, (BertLayerNorm, torch.nn.LayerNorm)):
no_weight_decay_params['params'].extend( no_weight_decay_params['params'].extend(
...@@ -44,7 +44,7 @@ def get_params_for_weight_decay_optimization(module): ...@@ -44,7 +44,7 @@ def get_params_for_weight_decay_optimization(module):
class BertModel(torch.nn.Module): class BertModel(torch.nn.Module):
def __init__(self, tokenizer, args): def __init__(self, args):
super(BertModel, self).__init__() super(BertModel, self).__init__()
if args.pretrained_bert: if args.pretrained_bert:
self.model = BertForPreTraining.from_pretrained( self.model = BertForPreTraining.from_pretrained(
...@@ -59,7 +59,7 @@ class BertModel(torch.nn.Module): ...@@ -59,7 +59,7 @@ class BertModel(torch.nn.Module):
else: else:
intermediate_size = args.intermediate_size intermediate_size = args.intermediate_size
self.config = BertConfig( self.config = BertConfig(
tokenizer.num_tokens, args.tokenizer_num_tokens,
hidden_size=args.hidden_size, hidden_size=args.hidden_size,
num_hidden_layers=args.num_layers, num_hidden_layers=args.num_layers,
num_attention_heads=args.num_attention_heads, num_attention_heads=args.num_attention_heads,
...@@ -67,11 +67,12 @@ class BertModel(torch.nn.Module): ...@@ -67,11 +67,12 @@ class BertModel(torch.nn.Module):
hidden_dropout_prob=args.hidden_dropout, hidden_dropout_prob=args.hidden_dropout,
attention_probs_dropout_prob=args.attention_dropout, attention_probs_dropout_prob=args.attention_dropout,
max_position_embeddings=args.max_position_embeddings, max_position_embeddings=args.max_position_embeddings,
type_vocab_size=tokenizer.num_type_tokens, type_vocab_size=args.tokenizer_num_type_tokens,
fp32_layernorm=args.fp32_layernorm, fp32_layernorm=args.fp32_layernorm,
fp32_embedding=args.fp32_embedding, fp32_embedding=args.fp32_embedding,
fp32_tokentypes=args.fp32_tokentypes, fp32_tokentypes=args.fp32_tokentypes,
layernorm_epsilon=args.layernorm_epsilon) layernorm_epsilon=args.layernorm_epsilon,
deep_init=args.deep_init)
self.model = BertForPreTraining(self.config) self.model = BertForPreTraining(self.config)
def forward(self, input_tokens, token_type_ids=None, def forward(self, input_tokens, token_type_ids=None,
...@@ -86,3 +87,4 @@ class BertModel(torch.nn.Module): ...@@ -86,3 +87,4 @@ class BertModel(torch.nn.Module):
def load_state_dict(self, state_dict, strict=True): def load_state_dict(self, state_dict, strict=True):
return self.model.load_state_dict(state_dict, strict=strict) return self.model.load_state_dict(state_dict, strict=strict)
...@@ -32,10 +32,26 @@ from torch import nn ...@@ -32,10 +32,26 @@ from torch import nn
import torch.nn.functional as F import torch.nn.functional as F
from torch.nn import CrossEntropyLoss from torch.nn import CrossEntropyLoss
from torch.utils.checkpoint import checkpoint #from torch.utils.checkpoint import checkpoint
from data_utils.file_utils import cached_path from data_utils.file_utils import cached_path
import mpu
def normal_init_method(mean, std):
def init_(tensor):
return torch.nn.init.normal_(tensor, mean=mean, std=std)
return init_
def scaled_init_method(mean, std, num_layers):
"""Init method based on N(0, sigma/sqrt(2*num_layers)."""
std = std / math.sqrt(2.0 * num_layers)
def init_(tensor):
return torch.nn.init.normal_(tensor, mean=mean, std=std)
return init_
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
PRETRAINED_MODEL_ARCHIVE_MAP = { PRETRAINED_MODEL_ARCHIVE_MAP = {
...@@ -141,6 +157,7 @@ class BertConfig(object): ...@@ -141,6 +157,7 @@ class BertConfig(object):
max_position_embeddings=512, max_position_embeddings=512,
type_vocab_size=2, type_vocab_size=2,
initializer_range=0.02, initializer_range=0.02,
deep_init=False,
fp32_layernorm=False, fp32_layernorm=False,
fp32_embedding=False, fp32_embedding=False,
fp32_tokentypes=False, fp32_tokentypes=False,
...@@ -186,6 +203,7 @@ class BertConfig(object): ...@@ -186,6 +203,7 @@ class BertConfig(object):
self.max_position_embeddings = max_position_embeddings self.max_position_embeddings = max_position_embeddings
self.type_vocab_size = type_vocab_size self.type_vocab_size = type_vocab_size
self.initializer_range = initializer_range self.initializer_range = initializer_range
self.deep_init = deep_init
self.fp32_layernorm = fp32_layernorm self.fp32_layernorm = fp32_layernorm
self.fp32_embedding = fp32_embedding self.fp32_embedding = fp32_embedding
self.layernorm_epsilon = layernorm_epsilon self.layernorm_epsilon = layernorm_epsilon
...@@ -221,46 +239,35 @@ class BertConfig(object): ...@@ -221,46 +239,35 @@ class BertConfig(object):
"""Serializes this instance to a JSON string.""" """Serializes this instance to a JSON string."""
return json.dumps(self.to_dict(), indent=2, sort_keys=True) + "\n" return json.dumps(self.to_dict(), indent=2, sort_keys=True) + "\n"
# try: try:
# from apex.normalization.fused_layer_norm import FusedLayerNorm as BertLayerNorm from apex.normalization.fused_layer_norm import FusedLayerNorm as BertLayerNorm
# except ImportError: except ImportError:
# print("Better speed can be achieved with apex installed from https://www.github.com/nvidia/apex.") print("Better speed can be achieved with apex installed from https://www.github.com/nvidia/apex.")
# class BertLayerNorm(nn.Module): class BertLayerNorm(nn.Module):
# def __init__(self, hidden_size, eps=1e-12): def __init__(self, hidden_size, eps=1e-12):
# """Construct a layernorm module in the TF style (epsilon inside the square root). """Construct a layernorm module in the TF style (epsilon inside the square root).
# """ """
# super(BertLayerNorm, self).__init__() super(BertLayerNorm, self).__init__()
# self.weight = nn.Parameter(torch.ones(hidden_size)) self.weight = nn.Parameter(torch.ones(hidden_size))
# self.bias = nn.Parameter(torch.zeros(hidden_size)) self.bias = nn.Parameter(torch.zeros(hidden_size))
# self.variance_epsilon = eps self.variance_epsilon = eps
# def forward(self, x): def forward(self, x):
# u = x.mean(-1, keepdim=True) u = x.mean(-1, keepdim=True)
# s = (x - u).pow(2).mean(-1, keepdim=True) s = (x - u).pow(2).mean(-1, keepdim=True)
# x = (x - u) / torch.sqrt(s + self.variance_epsilon) x = (x - u) / torch.sqrt(s + self.variance_epsilon)
# return self.weight * x + self.bias return self.weight * x + self.bias
class BertLayerNorm(nn.Module):
def __init__(self, hidden_size, eps=1e-12):
"""Construct a layernorm module in the TF style (epsilon inside the square root).
"""
super(BertLayerNorm, self).__init__()
self.weight = nn.Parameter(torch.ones(hidden_size))
self.bias = nn.Parameter(torch.zeros(hidden_size))
self.variance_epsilon = eps
def forward(self, x):
u = x.mean(-1, keepdim=True)
s = (x - u).pow(2).mean(-1, keepdim=True)
x = (x - u) / torch.sqrt(s + self.variance_epsilon)
return self.weight * x + self.bias
class BertEmbeddings(nn.Module): class BertEmbeddings(nn.Module):
"""Construct the embeddings from word, position and token_type embeddings. """Construct the embeddings from word, position and token_type embeddings.
""" """
def __init__(self, config): def __init__(self, config):
super(BertEmbeddings, self).__init__() super(BertEmbeddings, self).__init__()
self.word_embeddings = nn.Embedding(config.vocab_size, config.hidden_size) #self.word_embeddings = nn.Embedding(config.vocab_size, config.hidden_size)
self.word_embeddings = mpu.VocabParallelEmbedding(
config.vocab_size, config.hidden_size,
init_method=normal_init_method(mean=0.0,
std=config.initializer_range))
self.position_embeddings = nn.Embedding(config.max_position_embeddings, config.hidden_size) self.position_embeddings = nn.Embedding(config.max_position_embeddings, config.hidden_size)
self.token_type_embeddings = nn.Embedding(config.type_vocab_size, config.hidden_size) self.token_type_embeddings = nn.Embedding(config.type_vocab_size, config.hidden_size)
...@@ -369,7 +376,20 @@ class BertSelfAttention(nn.Module): ...@@ -369,7 +376,20 @@ class BertSelfAttention(nn.Module):
class BertSelfOutput(nn.Module): class BertSelfOutput(nn.Module):
def __init__(self, config): def __init__(self, config):
super(BertSelfOutput, self).__init__() super(BertSelfOutput, self).__init__()
self.dense = nn.Linear(config.hidden_size, config.hidden_size) if hasattr(config, 'deep_init') and config.deep_init:
init_method = scaled_init_method(mean=0.0,
std=config.initializer_range,
num_layers=config.num_hidden_layers)
else:
init_method = normal_init_method(mean=0.0,
std=config.initializer_range)
self.dense = mpu.RowParallelLinear(
input_size=config.hidden_size,
output_size=config.hidden_size,
bias=True,
input_is_parallel=True,
stride=1,
init_method=init_method)
self.fp32_layernorm = config.fp32_layernorm self.fp32_layernorm = config.fp32_layernorm
self.LayerNorm = BertLayerNorm(config.hidden_size, eps=config.layernorm_epsilon) self.LayerNorm = BertLayerNorm(config.hidden_size, eps=config.layernorm_epsilon)
self.dropout = nn.Dropout(config.hidden_dropout_prob) self.dropout = nn.Dropout(config.hidden_dropout_prob)
...@@ -390,7 +410,13 @@ class BertSelfOutput(nn.Module): ...@@ -390,7 +410,13 @@ class BertSelfOutput(nn.Module):
class BertAttention(nn.Module): class BertAttention(nn.Module):
def __init__(self, config): def __init__(self, config):
super(BertAttention, self).__init__() super(BertAttention, self).__init__()
self.self = BertSelfAttention(config) self.self = mpu.BertParallelSelfAttention(
hidden_size=config.hidden_size,
num_attention_heads=config.num_attention_heads,
dropout_prob=config.attention_probs_dropout_prob,
output_parallel=True,
init_method=normal_init_method(mean=0.0,
std=config.initializer_range))
self.output = BertSelfOutput(config) self.output = BertSelfOutput(config)
def forward(self, input_tensor, attention_mask): def forward(self, input_tensor, attention_mask):
...@@ -402,7 +428,14 @@ class BertAttention(nn.Module): ...@@ -402,7 +428,14 @@ class BertAttention(nn.Module):
class BertIntermediate(nn.Module): class BertIntermediate(nn.Module):
def __init__(self, config): def __init__(self, config):
super(BertIntermediate, self).__init__() super(BertIntermediate, self).__init__()
self.dense = nn.Linear(config.hidden_size, config.intermediate_size) self.dense = mpu.ColumnParallelLinear(
input_size=config.hidden_size,
output_size=config.intermediate_size,
bias=True,
gather_output=False,
stride=1,
init_method=normal_init_method(mean=0.0,
std=config.initializer_range))
self.intermediate_act_fn = ACT2FN[config.hidden_act] \ self.intermediate_act_fn = ACT2FN[config.hidden_act] \
if isinstance(config.hidden_act, str) else config.hidden_act if isinstance(config.hidden_act, str) else config.hidden_act
...@@ -415,7 +448,20 @@ class BertIntermediate(nn.Module): ...@@ -415,7 +448,20 @@ class BertIntermediate(nn.Module):
class BertOutput(nn.Module): class BertOutput(nn.Module):
def __init__(self, config): def __init__(self, config):
super(BertOutput, self).__init__() super(BertOutput, self).__init__()
self.dense = nn.Linear(config.intermediate_size, config.hidden_size) if hasattr(config, 'deep_init') and config.deep_init:
init_method = scaled_init_method(mean=0.0,
std=config.initializer_range,
num_layers=config.num_hidden_layers)
else:
init_method = normal_init_method(mean=0.0,
std=config.initializer_range)
self.dense = mpu.RowParallelLinear(
input_size=config.intermediate_size,
output_size=config.hidden_size,
bias=True,
input_is_parallel=True,
stride=1,
init_method=init_method)
self.fp32_layernorm = config.fp32_layernorm self.fp32_layernorm = config.fp32_layernorm
self.LayerNorm = BertLayerNorm(config.hidden_size, eps=config.layernorm_epsilon) self.LayerNorm = BertLayerNorm(config.hidden_size, eps=config.layernorm_epsilon)
self.dropout = nn.Dropout(config.hidden_dropout_prob) self.dropout = nn.Dropout(config.hidden_dropout_prob)
...@@ -450,8 +496,9 @@ class BertLayer(nn.Module): ...@@ -450,8 +496,9 @@ class BertLayer(nn.Module):
class BertEncoder(nn.Module): class BertEncoder(nn.Module):
def __init__(self, config): def __init__(self, config):
super(BertEncoder, self).__init__() super(BertEncoder, self).__init__()
layer = BertLayer(config) #layer = BertLayer(config)
self.layer = nn.ModuleList([copy.deepcopy(layer) for _ in range(config.num_hidden_layers)]) #self.layer = nn.ModuleList([copy.deepcopy(layer) for _ in range(config.num_hidden_layers)])
self.layer = nn.ModuleList([BertLayer(config) for _ in range(config.num_hidden_layers)])
# def forward(self, hidden_states, attention_mask, output_all_encoded_layers=True): # def forward(self, hidden_states, attention_mask, output_all_encoded_layers=True):
# all_encoder_layers = [] # all_encoder_layers = []
...@@ -476,9 +523,9 @@ class BertEncoder(nn.Module): ...@@ -476,9 +523,9 @@ class BertEncoder(nn.Module):
if checkpoint_activations: if checkpoint_activations:
l = 0 l = 0
num_layers = len(self.layer) num_layers = len(self.layer)
chunk_length = math.ceil(math.sqrt(num_layers)) chunk_length = 1 #math.ceil(math.sqrt(num_layers))
while l < num_layers: while l < num_layers:
hidden_states = checkpoint(custom(l, l+chunk_length), hidden_states, attention_mask*1) hidden_states = mpu.checkpoint(custom(l, l+chunk_length), hidden_states, attention_mask*1)
l += chunk_length l += chunk_length
# decoder layers # decoder layers
else: else:
...@@ -536,11 +583,12 @@ class BertLMPredictionHead(nn.Module): ...@@ -536,11 +583,12 @@ class BertLMPredictionHead(nn.Module):
# The output weights are the same as the input embeddings, but there is # The output weights are the same as the input embeddings, but there is
# an output-only bias for each token. # an output-only bias for each token.
self.decoder = nn.Linear(bert_model_embedding_weights.size(1), #self.decoder = nn.Linear(bert_model_embedding_weights.size(1),
bert_model_embedding_weights.size(0), # bert_model_embedding_weights.size(0),
bias=False) # bias=False)
self.decoder.weight = bert_model_embedding_weights self.decoder_weight = bert_model_embedding_weights
self.bias = nn.Parameter(torch.zeros(bert_model_embedding_weights.size(0))) self.bias = nn.Parameter(torch.zeros(bert_model_embedding_weights.size(0)))
self.bias.model_parallel = True
self.fp32_embedding = config.fp32_embedding self.fp32_embedding = config.fp32_embedding
self.fp32_layernorm = config.fp32_layernorm self.fp32_layernorm = config.fp32_layernorm
def convert_to_type(tensor): def convert_to_type(tensor):
...@@ -560,7 +608,10 @@ class BertLMPredictionHead(nn.Module): ...@@ -560,7 +608,10 @@ class BertLMPredictionHead(nn.Module):
self.transform.LayerNorm.float() self.transform.LayerNorm.float()
hidden_states = self.transform(self.type_converter(hidden_states)) hidden_states = self.transform(self.type_converter(hidden_states))
# hidden_states = self.decoder(hidden_states) + self.bias # hidden_states = self.decoder(hidden_states) + self.bias
hidden_states = F.linear(self.type_converter(hidden_states), self.type_converter(self.decoder.weight), self.type_converter(self.bias)) hidden_states = mpu.copy_to_model_parallel_region(hidden_states)
hidden_states = F.linear(self.type_converter(hidden_states),
self.type_converter(self.decoder_weight),
self.type_converter(self.bias))
return hidden_states return hidden_states
...@@ -896,8 +947,8 @@ class BertForPreTraining(PreTrainedBertModel): ...@@ -896,8 +947,8 @@ class BertForPreTraining(PreTrainedBertModel):
if masked_lm_labels is not None and next_sentence_label is not None: if masked_lm_labels is not None and next_sentence_label is not None:
loss_fct = CrossEntropyLoss(ignore_index=-1) loss_fct = CrossEntropyLoss(ignore_index=-1)
masked_lm_loss = loss_fct(prediction_scores.view(-1, self.config.vocab_size), masked_lm_labels.view(-1)) masked_lm_loss = loss_fct(prediction_scores.view(-1, self.config.vocab_size).float(), masked_lm_labels.view(-1))
next_sentence_loss = loss_fct(seq_relationship_score.view(-1, 2), next_sentence_label.view(-1)) next_sentence_loss = loss_fct(seq_relationship_score.view(-1, 2).float(), next_sentence_label.view(-1))
total_loss = masked_lm_loss + next_sentence_loss total_loss = masked_lm_loss + next_sentence_loss
return total_loss return total_loss
else: else:
...@@ -1212,12 +1263,21 @@ class BertForTokenClassification(PreTrainedBertModel): ...@@ -1212,12 +1263,21 @@ class BertForTokenClassification(PreTrainedBertModel):
self.num_labels = num_labels self.num_labels = num_labels
self.bert = BertModel(config) self.bert = BertModel(config)
self.dropout = nn.Dropout(config.hidden_dropout_prob) self.dropout = nn.Dropout(config.hidden_dropout_prob)
self.classifier = nn.Linear(config.hidden_size, num_labels) #self.classifier = nn.Linear(config.hidden_size, num_labels)
self.classifier = mpu.RowParallelLinear(
input_size=config.hidden_size,
output_size=num_labels,
bias=True,
input_is_parallel=True,
stride=1,
init_method=normal_init_method(mean=0.0,
std=config.initializer_range))
self.apply(self.init_bert_weights) self.apply(self.init_bert_weights)
def forward(self, input_ids, token_type_ids=None, attention_mask=None, labels=None, checkpoint_activations=False): def forward(self, input_ids, token_type_ids=None, attention_mask=None, labels=None, checkpoint_activations=False):
sequence_output, _ = self.bert(input_ids, token_type_ids, attention_mask, output_all_encoded_layers=False, checkpoint_activations=checkpoint_activations) sequence_output, _ = self.bert(input_ids, token_type_ids, attention_mask, output_all_encoded_layers=False, checkpoint_activations=checkpoint_activations)
sequence_output = self.dropout(sequence_output) with mpu.get_cuda_rng_tracker().fork():
sequence_output = self.dropout(sequence_output)
logits = self.classifier(sequence_output) logits = self.classifier(sequence_output)
if labels is not None: if labels is not None:
...@@ -1280,7 +1340,15 @@ class BertForQuestionAnswering(PreTrainedBertModel): ...@@ -1280,7 +1340,15 @@ class BertForQuestionAnswering(PreTrainedBertModel):
self.bert = BertModel(config) self.bert = BertModel(config)
# TODO check with Google if it's normal there is no dropout on the token classifier of SQuAD in the TF version # TODO check with Google if it's normal there is no dropout on the token classifier of SQuAD in the TF version
# self.dropout = nn.Dropout(config.hidden_dropout_prob) # self.dropout = nn.Dropout(config.hidden_dropout_prob)
self.qa_outputs = nn.Linear(config.hidden_size, 2) #self.qa_outputs = nn.Linear(config.hidden_size, 2)
self.qa_outputs = mpu.RowParallelLinear(
input_size=config.hidden_size,
output_size=2,
bias=True,
input_is_parallel=True,
stride=1,
init_method=normal_init_method(mean=0.0,
std=config.initializer_range))
self.apply(self.init_bert_weights) self.apply(self.init_bert_weights)
def forward(self, input_ids, token_type_ids=None, attention_mask=None, start_positions=None, end_positions=None, checkpoint_activations=False): def forward(self, input_ids, token_type_ids=None, attention_mask=None, start_positions=None, end_positions=None, checkpoint_activations=False):
......
# coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Model parallel utility interface."""
from .cross_entropy import vocab_parallel_cross_entropy
from .data import broadcast_data
from .grads import clip_grad_norm
from .initialize import destroy_model_parallel
from .initialize import get_data_parallel_group
from .initialize import get_data_parallel_rank
from .initialize import get_data_parallel_world_size
from .initialize import get_model_parallel_group
from .initialize import get_model_parallel_rank
from .initialize import get_model_parallel_src_rank
from .initialize import get_model_parallel_world_size
from .initialize import initialize_model_parallel
from .initialize import model_parallel_is_initialized
from .layers import ColumnParallelLinear
from .layers import ParallelEmbedding
from .layers import RowParallelLinear
from .layers import VocabParallelEmbedding
from .mappings import copy_to_model_parallel_region
from .mappings import gather_from_model_parallel_region
from .mappings import reduce_from_model_parallel_region
from .mappings import scatter_to_model_parallel_region
from .random import checkpoint
from .random import get_cuda_rng_tracker
from .random import model_parallel_cuda_manual_seed
from .transformer import BertParallelSelfAttention
from .transformer import BertParallelTransformerLayer
from .transformer import GPT2ParallelTransformer
from .transformer import LayerNorm
# coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import torch
from .initialize import get_model_parallel_group
from .initialize import get_model_parallel_rank
from .initialize import get_model_parallel_world_size
from .utils import VocabUtility
class _VocabParallelCrossEntropy(torch.autograd.Function):
@staticmethod
def forward(ctx, vocab_parallel_logits, target):
# Copy so the input remains unchanged.
logits = vocab_parallel_logits.clone()
# Maximum value along vocab dimension across all GPUs.
logits_max = torch.max(logits, dim=-1)[0]
torch.distributed.all_reduce(logits_max,
op=torch.distributed.ReduceOp.MAX,
group=get_model_parallel_group())
# Subtract the maximum value.
logits.sub_(logits_max.unsqueeze(dim=-1))
# Sum of exponential of logits along vocab dimension across all GPUs.
exp_logits = logits.exp()
sum_exp_logits = exp_logits.sum(dim=-1)
torch.distributed.all_reduce(sum_exp_logits,
op=torch.distributed.ReduceOp.SUM,
group=get_model_parallel_group())
# Get the partition's vocab indecies
get_vocab_range = VocabUtility.vocab_range_from_per_partition_vocab_size
partition_vocab_size = vocab_parallel_logits.size()[-1]
rank = get_model_parallel_rank()
world_size = get_model_parallel_world_size()
vocab_start_index, vocab_end_index = get_vocab_range(
partition_vocab_size, rank, world_size)
# Create a mask of valid vocab ids (1 means it needs to be masked).
target_mask = (target < vocab_start_index) | (target >= vocab_end_index)
masked_target = target.clone() - vocab_start_index
masked_target[target_mask] = 0
# Get predicted-logits = logits[target].
# For Simplicity, we convert logits to a 2-D tensor with size
# [*, partition-vocab-size] and target to a 1-D tensor of size [*].
logits_2d = logits.view(-1, partition_vocab_size)
masked_target_1d = masked_target.view(-1)
arange_1d = torch.arange(start=0, end=logits_2d.size()[0],
device=logits_2d.device)
predicted_logits_1d = logits_2d[arange_1d, masked_target_1d]
predicted_logits = predicted_logits_1d.view_as(target)
predicted_logits[target_mask] = 0.0
# All reduce is needed to get the chunks from other GPUs.
torch.distributed.all_reduce(predicted_logits,
op=torch.distributed.ReduceOp.SUM,
group=get_model_parallel_group())
# Loss = log(sum(exp(logits))) - predicted-logit.
loss = torch.log(sum_exp_logits) - predicted_logits
# Store softmax, target-mask and masked-target for backward pass.
exp_logits.div_(sum_exp_logits.unsqueeze(dim=-1))
ctx.save_for_backward(exp_logits, target_mask, masked_target_1d)
return loss
@staticmethod
def backward(ctx, grad_output):
# Retreive tensors from the forward path.
softmax, target_mask, masked_target_1d = ctx.saved_tensors
# All the inputs have softmax as thier gradient.
grad_input = softmax
# For simplicity, work with the 2D gradient.
partition_vocab_size = softmax.size()[-1]
grad_2d = grad_input.view(-1, partition_vocab_size)
# Add the gradient from matching classes.
arange_1d = torch.arange(start=0, end=grad_2d.size()[0],
device=grad_2d.device)
grad_2d[arange_1d, masked_target_1d] -= (
1.0 - target_mask.view(-1).float())
# Finally elementwise multiplication with the output gradients.
grad_input.mul_(grad_output.unsqueeze(dim=-1))
return grad_input, None
def vocab_parallel_cross_entropy(vocab_parallel_logits, target):
"""Helper function for the cross entropy."""
return _VocabParallelCrossEntropy.apply(vocab_parallel_logits, target)
# coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import torch
from .initialize import get_model_parallel_group
from .initialize import get_model_parallel_rank
from .initialize import get_model_parallel_src_rank
_MAX_DATA_DIM = 4
def _check_data_types(keys, data, target_dtype):
"""Check that all the keys have the same target data type."""
for key in keys:
assert data[key].dtype == target_dtype, '{} has data type {} which '\
'is different than {}'.format(key, data[key].dtype, target_dtype)
def _build_key_size_numel_dictionaries(keys, data):
"""Build the size on rank 0 and broadcast."""
max_dim = _MAX_DATA_DIM
sizes = [0 for _ in range(max_dim) for _ in keys]
# Pack the sizes on rank zero.
if get_model_parallel_rank() == 0:
offset = 0
for key in keys:
assert data[key].dim() < max_dim, 'you should increase MAX_DATA_DIM'
size = data[key].size()
for i, s in enumerate(size):
sizes[i + offset] = s
offset += max_dim
# Move to GPU and broadcast.
sizes_cuda = torch.cuda.LongTensor(sizes)
torch.distributed.broadcast(sizes_cuda, get_model_parallel_src_rank(),
group=get_model_parallel_group())
# Move back to cpu and unpack.
sizes_cpu = sizes_cuda.cpu()
key_size = {}
key_numel = {}
total_numel = 0
offset = 0
for key in keys:
i = 0
size = []
numel = 1
while sizes_cpu[offset + i] > 0:
this_size = sizes_cpu[offset + i]
size.append(this_size)
numel *= this_size
i += 1
key_size[key] = size
key_numel[key] = numel
total_numel += numel
offset += max_dim
return key_size, key_numel, total_numel
def broadcast_data(keys, data, datatype):
"""Broadcast data from rank zero of each model parallel group to the
members of the same model parallel group.
Arguments:
keys: list of keys in the data disctionary to be broadcasted
data: data dictionary of string keys and cpu tensor values.
datatype: torch data type of all tensors in data associated
with keys.
"""
# Build (key, size) and (key, number of elements) dictionaries along
# with the total number of elements on all ranks.
key_size, key_numel, total_numel = _build_key_size_numel_dictionaries(keys,
data)
# Pack on rank zero.
if get_model_parallel_rank() == 0:
# Check that all keys have the same data type.
_check_data_types(keys, data, datatype)
# Flatten the data associated with the keys
flatten_data = torch.cat(
[data[key].contiguous().view(-1) for key in keys], dim=0).cuda()
else:
flatten_data = torch.empty(total_numel,
device=torch.cuda.current_device(),
dtype=datatype)
# Boradcast
torch.distributed.broadcast(flatten_data, get_model_parallel_src_rank(),
group=get_model_parallel_group())
# Unpack
output = {}
offset = 0
for key in keys:
size = key_size[key]
numel = key_numel[key]
output[key] = flatten_data.narrow(0, offset, numel).view(size)
offset += numel
return output
# coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Parts of the code here are adapted from PyTorch
# repo: https://github.com/pytorch/pytorch
import torch
from torch._six import inf
from .initialize import get_model_parallel_group
from .initialize import get_model_parallel_rank
def clip_grad_norm(parameters, max_norm, norm_type=2):
"""Clips gradient norm of an iterable of parameters.
This is adapted from torch.nn.utils.clip_grad.clip_grad_norm_ and
added functionality to handle model parallel parameters. Note that
the gradients are modified in place.
Arguments:
parameters (Iterable[Tensor] or Tensor): an iterable of Tensors or a
single Tensor that will have gradients normalized
max_norm (float or int): max norm of the gradients
norm_type (float or int): type of the used p-norm. Can be ``'inf'`` for
infinity norm.
Returns:
Total norm of the parameters (viewed as a single vector).
"""
if isinstance(parameters, torch.Tensor):
parameters = [parameters]
parameters = list(filter(lambda p: p.grad is not None, parameters))
max_norm = float(max_norm)
norm_type = float(norm_type)
if norm_type == inf:
total_norm = max(p.grad.data.abs().max() for p in parameters)
total_norm_cuda = torch.cuda.FloatTensor([float(total_norm)])
# Take max across all GPUs.
torch.distributed.all_reduce(total_norm_cuda,
op=torch.distributed.ReduceOp.MAX,
group=get_model_parallel_group())
total_norm = total_norm_cuda[0].item()
else:
total_norm = 0
for p in parameters:
if p.model_parallel or (get_model_parallel_rank() == 0):
param_norm = p.grad.data.norm(norm_type)
total_norm += param_norm.item() ** norm_type
# Sum across all model parallel GPUs.
total_norm_cuda = torch.cuda.FloatTensor([float(total_norm)])
torch.distributed.all_reduce(total_norm_cuda,
op=torch.distributed.ReduceOp.SUM,
group=get_model_parallel_group())
total_norm = total_norm_cuda[0].item() ** (1. / norm_type)
clip_coef = max_norm / (total_norm + 1e-6)
if clip_coef < 1:
for p in parameters:
p.grad.data.mul_(clip_coef)
return total_norm
# coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Model and data parallel groups."""
import torch
from .utils import ensure_divisibility
# Model parallel group that the current rank belongs to.
_MODEL_PARALLEL_GROUP = None
# Data parallel group that the current rank belongs to.
_DATA_PARALLEL_GROUP = None
def initialize_model_parallel(model_parallel_size_):
"""
Initialize model data parallel groups.
Arguments:
model_parallel_size: number of GPUs used to parallelize model.
Let's say we have a total of 8 GPUs denoted by g0 ... g7 and we
use 2 GPUs to parallelize the model. The present function will
create 4 model parallel groups and 2 data parallel grous as:
4 model parallel groups:
[g0, g1], [g2, g3], [g4, g5], [g6, g7]
2 data parallel groups:
[g0, g2, g4, g6], [g1, g3, g5, g7]
Note that for efficiency, the caller should make sure adjacent ranks
are on the same DGX box. For example if we are using 2 DGX-1 boxes
with a total of 16 GPUs, rank 0 to 7 belong to the first box and
ranks 8 to 15 belong to the second box.
"""
if torch.distributed.get_rank() == 0:
print('> initializing model parallel with size {}'.format(
model_parallel_size_))
# Get world size and rank. Ensure some consistencies.
assert torch.distributed.is_initialized()
world_size = torch.distributed.get_world_size()
model_parallel_size = min(model_parallel_size_, world_size)
ensure_divisibility(world_size, model_parallel_size)
rank = torch.distributed.get_rank()
# Build the data parallel groups.
global _DATA_PARALLEL_GROUP
assert _DATA_PARALLEL_GROUP is None, \
'data parallel group is already initialized'
for i in range(model_parallel_size):
ranks = range(i, world_size, model_parallel_size)
group = torch.distributed.new_group(ranks)
if i == (rank % model_parallel_size):
_DATA_PARALLEL_GROUP = group
# Build the model parallel groups.
global _MODEL_PARALLEL_GROUP
assert _MODEL_PARALLEL_GROUP is None, \
'model parallel group is already initialized'
for i in range(world_size // model_parallel_size):
ranks = range(i * model_parallel_size,
(i + 1) * model_parallel_size)
group = torch.distributed.new_group(ranks)
if i == (rank // model_parallel_size):
_MODEL_PARALLEL_GROUP = group
def model_parallel_is_initialized():
"""Check if model and data parallel groups are initialized."""
if _MODEL_PARALLEL_GROUP is None or _DATA_PARALLEL_GROUP is None:
return False
return True
def get_model_parallel_group():
"""Get the model parallel group the caller rank belongs to."""
assert _MODEL_PARALLEL_GROUP is not None, \
'model parallel group is not initialized'
return _MODEL_PARALLEL_GROUP
def get_data_parallel_group():
"""Get the data parallel group the caller rank belongs to."""
assert _DATA_PARALLEL_GROUP is not None, \
'data parallel group is not initialized'
return _DATA_PARALLEL_GROUP
def get_model_parallel_world_size():
"""Return world size for the model parallel group."""
return torch.distributed.get_world_size(group=get_model_parallel_group())
def get_model_parallel_rank():
"""Return my rank for the model parallel group."""
return torch.distributed.get_rank(group=get_model_parallel_group())
def get_model_parallel_src_rank():
"""Calculate the global rank corresponding to a local rank zeor
in the model parallel group."""
global_rank = torch.distributed.get_rank()
local_world_size = get_model_parallel_world_size()
return (global_rank // local_world_size) * local_world_size
def get_data_parallel_world_size():
"""Return world size for the data parallel group."""
return torch.distributed.get_world_size(group=get_data_parallel_group())
def get_data_parallel_rank():
"""Return my rank for the data parallel group."""
return torch.distributed.get_rank(group=get_data_parallel_group())
def destroy_model_parallel():
"""Set the groups to none."""
global _MODEL_PARALLEL_GROUP
_MODEL_PARALLEL_GROUP = None
global _DATA_PARALLEL_GROUP
_DATA_PARALLEL_GROUP = None
# coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Parts of the code here are adapted from PyTorch
# repo: https://github.com/pytorch/pytorch
import math
import torch
import torch.nn.functional as F
import torch.nn.init as init
from torch.nn.parameter import Parameter
from apex.normalization.fused_layer_norm import FusedLayerNorm as LayerNorm
from .initialize import get_model_parallel_rank
from .initialize import get_model_parallel_world_size
from .mappings import copy_to_model_parallel_region
from .mappings import gather_from_model_parallel_region
from .mappings import reduce_from_model_parallel_region
from .mappings import scatter_to_model_parallel_region
from .random import get_cuda_rng_tracker
from .utils import divide
from .utils import split_tensor_along_last_dim
from .utils import VocabUtility
def _initialize_affine_weight(weight, output_size, input_size,
per_partition_size, partition_dim, init_method,
stride=1, return_master_weight=False):
"""Initialize affine weight for model parallel.
Build the master weight on all processes and scatter
the relevant chunk."""
# If we only use 1 process for model parallelism, bypass scatter.
world_size = get_model_parallel_world_size()
if world_size == 1:
init_method(weight)
if return_master_weight:
return weight
return None
# Initialize master weight
master_weight = torch.empty(output_size, input_size,
dtype=weight.dtype,
requires_grad=False)
init_method(master_weight)
# Split and copy
per_partition_per_stride_size = divide(per_partition_size, stride)
weight_list = torch.split(master_weight, per_partition_per_stride_size,
dim=partition_dim)
rank = get_model_parallel_rank()
my_weight_list = weight_list[rank::world_size]
with torch.no_grad():
torch.cat(my_weight_list, dim=partition_dim, out=weight)
if return_master_weight:
return master_weight
return None
class VocabParallelEmbedding(torch.nn.Module):
"""Embedding parallelized in the vocabulary dimension.
This is mainly adapted from torch.nn.Embedding and all the default
values are kept.
Arguments:
num_embeddings: vocabulary size.
embedding_dim: size of hidden state.
init_method: method to initialize weights.
"""
def __init__(self, num_embeddings, embedding_dim,
init_method=init.xavier_normal_):
super(VocabParallelEmbedding, self).__init__()
# Keep the input dimensions.
self.num_embeddings = num_embeddings
self.embedding_dim = embedding_dim
# Set the detauls for compatibility.
self.padding_idx = None
self.max_norm = None
self.norm_type = 2.
self.scale_grad_by_freq = False
self.sparse = False
self._weight = None
# Divide the weight matrix along the vocaburaly dimension.
self.vocab_start_index, self.vocab_end_index = \
VocabUtility.vocab_range_from_global_vocab_size(
self.num_embeddings, get_model_parallel_rank(),
get_model_parallel_world_size())
self.num_embeddings_per_partition = self.vocab_end_index - \
self.vocab_start_index
# Allocate weights.
self.weight = Parameter(torch.Tensor(self.num_embeddings_per_partition,
self.embedding_dim))
self.weight.model_parallel = True
# And initialize.
_initialize_affine_weight(
self.weight, self.num_embeddings, self.embedding_dim,
self.num_embeddings_per_partition, 0, init_method)
def forward(self, input_):
# Build the mask.
input_mask = (input_ < self.vocab_start_index) | \
(input_ >= self.vocab_end_index)
# Mask the input.
masked_input = input_.clone() - self.vocab_start_index
masked_input[input_mask] = 0
# Get the embeddings.
output_parallel = F.embedding(masked_input, self.weight,
self.padding_idx, self.max_norm,
self.norm_type, self.scale_grad_by_freq,
self.sparse)
# Mask the output embedding.
output_parallel[input_mask, :] = 0.0
# Reduce across all the model parallel GPUs.
output = reduce_from_model_parallel_region(output_parallel)
return output
class ParallelEmbedding(torch.nn.Module):
"""Embedding parallelized in the embedding dimension.
This is mainly adapted from torch.nn.Embedding and all the default
values are kept.
Arguments:
num_embeddings: vocabulary size.
embedding_dim: size of hidden state.
init_method: method to initialize weights.
"""
def __init__(self, num_embeddings, embedding_dim,
init_method=init.xavier_normal_,
keep_master_weight_for_test=False):
super(ParallelEmbedding, self).__init__()
# Keep the input dimensions.
self.num_embeddings = num_embeddings
self.embedding_dim = embedding_dim
# Set some detauls for compatibility.
self.padding_idx = None
self.max_norm = None
self.norm_type = 2.
self.scale_grad_by_freq = False
self.sparse = False
self._weight = None
# Divide the weight matrix along the embedding dimension.
world_size = get_model_parallel_world_size()
self.embedding_dim_per_partition = divide(self.embedding_dim,
world_size)
# Allocate weights.
self.weight = Parameter(torch.Tensor(self.num_embeddings,
self.embedding_dim_per_partition))
self.weight.model_parallel = True
# And initialize.
_initialize_affine_weight(
self.weight, self.num_embeddings, self.embedding_dim,
self.embedding_dim_per_partition, 1, init_method,
stride=1, return_master_weight=False)
def forward(self, input_):
input_parallel = copy_to_model_parallel_region(input_)
output_parallel = F.embedding(input_parallel, self.weight,
self.padding_idx, self.max_norm,
self.norm_type, self.scale_grad_by_freq,
self.sparse)
output = gather_from_model_parallel_region(output_parallel)
return output
class ColumnParallelLinear(torch.nn.Module):
"""Linear layer with column parallelism.
The linear layer is defined as Y = XA + b. A is parallelized along
its second dimension as A = [A_1, ..., A_p].
Arguments:
input_size: first dimension of matrix A.
output_size: second dimension of matrix A.
bias: If true, add bias
gather_output: If true, call all-gether on output and make Y avaiable
to all GPUs, otherwise, every GPU will have its output
which is Y_i = XA_i
init_method: method to initialize weights. Note that bias is always set
to zero.
stride: For the strided linear layers.
keep_master_weight_for_test: This was added for testing and should be
set to False. It returns the master weights
used for initialization.
"""
def __init__(self, input_size, output_size, bias=True, gather_output=True,
init_method=init.xavier_normal_, stride=1,
keep_master_weight_for_test=False):
super(ColumnParallelLinear, self).__init__()
# Keep input parameters
self.input_size = input_size
self.output_size = output_size
self.gather_output = gather_output
# Divide the weight matrix along the last dimension.
world_size = get_model_parallel_world_size()
self.output_size_per_partition = divide(output_size, world_size)
# Parameters.
# Note: torch.nn.functional.linear performs XA^T + b and as a result
# we allocate the transpose.
self.weight = Parameter(torch.Tensor(self.output_size_per_partition,
self.input_size))
self.weight.model_parallel = True
if bias:
self.bias = Parameter(torch.Tensor(self.output_size_per_partition))
self.bias.model_parallel = True
# Always initialize bias to zero.
with torch.no_grad():
self.bias.zero_()
else:
self.register_parameter('bias', None)
# Initialize weight.
self.master_weight = _initialize_affine_weight(
self.weight, self.output_size, self.input_size,
self.output_size_per_partition, 0, init_method,
stride=stride, return_master_weight=keep_master_weight_for_test)
def forward(self, input_):
# Set up backprop all-reduce.
input_parallel = copy_to_model_parallel_region(input_)
# Matrix multiply.
output_parallel = F.linear(input_parallel, self.weight, self.bias)
if self.gather_output:
# All-gather across the partitions.
output = gather_from_model_parallel_region(output_parallel)
else:
output = output_parallel
return output
class RowParallelLinear(torch.nn.Module):
"""Linear layer with row parallelism.
The linear layer is defined as Y = XA + b. A is parallelized along
its first dimension and X along its second dimension as:
- -
| A_1 |
| . |
A = | . | X = [X_1, ..., X_p]
| . |
| A_p |
- -
Arguments:
input_size: first dimension of matrix A.
output_size: second dimension of matrix A.
bias: If true, add bias. Note that bias is not parallelized.
input_is_parallel: If true, we assume that the input is already
split across the GPUs and we do not split
again.
init_method: method to initialize weights. Note that bias is always set
to zero.
stride: For the strided linear layers.
keep_master_weight_for_test: This was added for testing and should be
set to False. It returns the master weights
used for initialization.
"""
def __init__(self, input_size, output_size, bias=True,
input_is_parallel=False,
init_method=init.xavier_normal_, stride=1,
keep_master_weight_for_test=False):
super(RowParallelLinear, self).__init__()
# Keep input parameters
self.input_size = input_size
self.output_size = output_size
self.input_is_parallel = input_is_parallel
# Divide the weight matrix along the last dimension.
world_size = get_model_parallel_world_size()
self.input_size_per_partition = divide(input_size, world_size)
# Parameters.
# Note: torch.nn.functional.linear performs XA^T + b and as a result
# we allocate the transpose.
self.weight = Parameter(torch.Tensor(self.output_size,
self.input_size_per_partition))
self.weight.model_parallel = True
if bias:
self.bias = Parameter(torch.Tensor(self.output_size))
# Always initialize bias to zero.
with torch.no_grad():
self.bias.zero_()
else:
self.register_parameter('bias', None)
# Initialize weight.
self.master_weight = _initialize_affine_weight(
self.weight, self.output_size, self.input_size,
self.input_size_per_partition, 1, init_method,
stride=stride, return_master_weight=keep_master_weight_for_test)
def forward(self, input_):
# Set up backprop all-reduce.
if self.input_is_parallel:
input_parallel = input_
else:
input_parallel = scatter_to_model_parallel_region(input_)
# Matrix multiply.
output_parallel = F.linear(input_parallel, self.weight)
# All-reduce across all the partitions.
output_ = reduce_from_model_parallel_region(output_parallel)
if self.bias is not None:
output = output_ + self.bias
else:
output = output_
return output
# coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import torch
from .initialize import get_model_parallel_group
from .utils import split_tensor_along_last_dim
def _reduce(input_):
"""All-reduce the the input tensor across model parallel group."""
group = get_model_parallel_group()
# Bypass the function if we are using only 1 GPU.
if torch.distributed.get_world_size(group=group) == 1:
return input_
# All-reduce.
torch.distributed.all_reduce(input_, group=group)
return input_
def _split(input_):
"""Split the tensor along its last dimension and keep the
corresponding slice."""
group = get_model_parallel_group()
# Bypass the function if we are using only 1 GPU.
if torch.distributed.get_world_size(group=group) == 1:
return input_
# Split along last dimension.
world_size = torch.distributed.get_world_size(group=group)
input_list = split_tensor_along_last_dim(input_, world_size)
# Note: torch.split does not create contiguous tensors by default.
rank = torch.distributed.get_rank(group=group)
output = input_list[rank].contiguous()
return output
def _gather(input_):
"""Gather tensors and concatinate along the last dimension."""
group = get_model_parallel_group()
# Bypass the function if we are using only 1 GPU.
if torch.distributed.get_world_size(group=group) == 1:
return input_
# Size and dimension.
last_dim = input_.dim() - 1
rank = torch.distributed.get_rank(group=group)
world_size = torch.distributed.get_world_size(group=group)
tensor_list = [torch.empty_like(input_) for _ in range(world_size)]
tensor_list[rank] = input_
torch.distributed.all_gather(tensor_list, input_, group=group)
# Note: torch.cat already creates a contiguous tensor.
output = torch.cat(tensor_list, dim=last_dim).contiguous()
return output
class _CopyToModelParallelRegion(torch.autograd.Function):
"""Pass the input to the model parallel region."""
@staticmethod
def forward(ctx, input_):
return input_
@staticmethod
def backward(ctx, grad_output):
return _reduce(grad_output)
class _ReduceFromModelParallelRegion(torch.autograd.Function):
"""All-redcue the input from the model parallel region."""
@staticmethod
def forward(ctx, input_):
return _reduce(input_)
@staticmethod
def backward(ctx, grad_output):
return grad_output
class _ScatterToModelParallelRegion(torch.autograd.Function):
"""Split the input and keep only the corresponding chuck to the rank."""
@staticmethod
def forward(ctx, input_):
return _split(input_)
@staticmethod
def backward(ctx, grad_output):
return _gather(grad_output)
class _GatherFromModelParallelRegion(torch.autograd.Function):
"""Gather the input from model parallel region and concatinate."""
@staticmethod
def forward(ctx, input_):
return _gather(input_)
@staticmethod
def backward(ctx, grad_output):
return _split(grad_output)
# -----------------
# Helper functions.
# -----------------
def copy_to_model_parallel_region(input_):
return _CopyToModelParallelRegion.apply(input_)
def reduce_from_model_parallel_region(input_):
return _ReduceFromModelParallelRegion.apply(input_)
def scatter_to_model_parallel_region(input_):
return _ScatterToModelParallelRegion.apply(input_)
def gather_from_model_parallel_region(input_):
return _GatherFromModelParallelRegion.apply(input_)
# coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Parts of the code here are adapted from PyTorch
# repo: https://github.com/pytorch/pytorch
import contextlib
import torch
from torch import _C
from torch.cuda import _lazy_call, device as device_ctx_manager
from torch.utils.checkpoint import detach_variable
from .initialize import get_data_parallel_rank
from .initialize import get_model_parallel_rank
# Default name for the model parallel rng tracker.
_MODEL_PARALLEL_RNG_TRACKER_NAME = 'model-parallel-rng'
def _set_cuda_rng_state(new_state, device=-1):
"""Sets the random number generator state of the current GPU.
Argumentss:
new_state (torch.ByteTensor): The desired state
This function is adapted from PyTorch repo (torch.cuda.set_rng_state)
with a single change: the input state is not cloned. Cloning caused
major performance issues for +4 GPU cases.
"""
def cb():
with device_ctx_manager(device):
_C._cuda_setRNGState(new_state)
_lazy_call(cb)
class CudaRNGStatesTracker:
"""Tracker for the cuda RNG states.
Using the `add` method, a cuda rng state is initialized based on
the input `seed` and is assigned to `name`. Later, by forking the
rng state, we can perform operations and return to our starting
cuda state.
"""
def __init__(self):
# Map from a string name to the cuda rng state.
self.states_ = {}
# Seeds are just for book keeping and ensure no seed is set twice.
self.seeds_ = set()
def reset(self):
"""Set to the initial state (no tracker)."""
self.states_ = {}
self.seeds_ = set()
def get_states(self):
"""Get rng states. Copy the dictionary so we have direct
pointers to the states, not just a pointer to the dictionary."""
states = {}
for name in self.states_:
states[name] = self.states_[name]
return states
def set_states(self, states):
"""Set the rng states. For efficiency purposes, we do not check
the size of seed for compatibility."""
self.states_ = states
def add(self, name, seed):
"""Track the rng state."""
# Check seed is not already used.
if seed in self.seeds_:
raise Exception('seed {} already exists'.format(seed))
self.seeds_.add(seed)
# Check that state is not already defined.
if name in self.states_:
raise Exception('cuda rng state {} already exists'.format(name))
# Get the current rng state.
orig_rng_state = torch.cuda.get_rng_state()
# Set the new state and store it.
torch.cuda.manual_seed(seed)
self.states_[name] = torch.cuda.get_rng_state()
# Reset rng state to what it was.
_set_cuda_rng_state(orig_rng_state)
@contextlib.contextmanager
def fork(self, name=_MODEL_PARALLEL_RNG_TRACKER_NAME):
"""Fork the cuda rng state, perform operations, and exit with
the original state."""
# Check if we have added the state
if name not in self.states_:
raise Exception('cuda rng state {} is not added'.format(name))
# Store current rng state.
orig_cuda_rng_state = torch.cuda.get_rng_state()
# Set rng state to the desired one
_set_cuda_rng_state(self.states_[name])
# Do the stuff we wanted to do.
try:
yield
finally:
# Update the current rng state for later use.
self.states_[name] = torch.cuda.get_rng_state()
# And set the state to the original state we started with.
_set_cuda_rng_state(orig_cuda_rng_state)
# RNG tracker object.
_CUDA_RNG_STATE_TRACKER = CudaRNGStatesTracker()
def get_cuda_rng_tracker():
"""Get cuda rng tracker."""
return _CUDA_RNG_STATE_TRACKER
def model_parallel_cuda_manual_seed(seed):
"""Initialize model parallel cuda seed.
This function should be called after the model parallel is
initialized. Also, no torch.cuda.manual_seed should be called
after this function. Basically, this is replacement for that
function.
Two set of RNG states are tracked:
default state: This is for data parallelism and is the same among a
set of model parallel GPUs but different across
different model paralle groups. This is used for
example for dropout in the non-model-parallel regions.
model-parallel state: This state is different among a set of model
parallel GPUs, but the same across data parallel
groups. This is used for example for dropout in
model parallel regions.
"""
# 2718 is just for fun and any POSITIVE value will work.
offset = seed + 2718
model_parallel_seed = offset + get_model_parallel_rank()
# Data parallel gets the original sedd.
data_parallel_seed = seed
if torch.distributed.get_rank() == 0:
print('> initializing model parallel cuda seeds on global rank {}, '
'model parallel rank {}, and data parallel rank {} with '
'model parallel seed: {} and data parallel seed: {}'.format(
torch.distributed.get_rank(), get_model_parallel_rank(),
get_data_parallel_rank(), model_parallel_seed,
data_parallel_seed), flush=True)
_CUDA_RNG_STATE_TRACKER.reset()
# Set the default state.
torch.cuda.manual_seed(data_parallel_seed)
# and model parallel state.
_CUDA_RNG_STATE_TRACKER.add(_MODEL_PARALLEL_RNG_TRACKER_NAME,
model_parallel_seed)
class CheckpointFunction(torch.autograd.Function):
"""This function is adapted from torch.utils.checkpoint with
two main changes:
1) torch.cuda.set_rng_state is replaced with `_set_cuda_rng_state`
2) the states in the model parallel tracker are also properly
tracked/set/reset.
"""
@staticmethod
def forward(ctx, run_function, *args):
ctx.run_function = run_function
# Copy the rng states.
ctx.fwd_cpu_rng_state = torch.get_rng_state()
ctx.fwd_cuda_rng_state = torch.cuda.get_rng_state()
ctx.fwd_cuda_rng_state_tracker = get_cuda_rng_tracker().get_states()
ctx.save_for_backward(*args)
with torch.no_grad():
outputs = run_function(*args)
return outputs
@staticmethod
def backward(ctx, *args):
if not torch.autograd._is_checkpoint_valid():
raise RuntimeError("Checkpointing is not compatible with .grad(), "
"please use .backward() if possible")
inputs = ctx.saved_tensors
# Store the current states.
bwd_cpu_rng_state = torch.get_rng_state()
bwd_cuda_rng_state = torch.cuda.get_rng_state()
bwd_cuda_rng_state_tracker = get_cuda_rng_tracker().get_states()
# Set the states to what it used to be before the forward pass.
torch.set_rng_state(ctx.fwd_cpu_rng_state)
_set_cuda_rng_state(ctx.fwd_cuda_rng_state)
get_cuda_rng_tracker().set_states(ctx.fwd_cuda_rng_state_tracker)
# Compute the forward pass.
detached_inputs = detach_variable(inputs)
with torch.enable_grad():
outputs = ctx.run_function(*detached_inputs)
# Set the states back to what it was at the start of this function.
torch.set_rng_state(bwd_cpu_rng_state)
_set_cuda_rng_state(bwd_cuda_rng_state)
get_cuda_rng_tracker().set_states(bwd_cuda_rng_state_tracker)
if isinstance(outputs, torch.Tensor):
outputs = (outputs,)
torch.autograd.backward(outputs, args)
return (None,) + tuple(inp.grad for inp in detached_inputs)
def checkpoint(function, *args):
"""Checkpoint a model or part of the model.
This has been directly copied from torch.utils.checkpoint."""
return CheckpointFunction.apply(function, *args)
# coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import os
import random
import numpy
import torch
import mpu
class IdentityLayer(torch.nn.Module):
def __init__(self, size, scale=1.0):
super(IdentityLayer, self).__init__()
self.weight = torch.nn.Parameter(scale * torch.randn(size))
def forward(self):
return self.weight
def set_random_seed(seed):
"""Set random seed for reproducability."""
random.seed(seed)
numpy.random.seed(seed)
torch.manual_seed(seed)
mpu.model_parallel_cuda_manual_seed(seed)
def initialize_distributed(backend='nccl'):
"""Initialize torch.distributed."""
# Get local rank in case it is provided.
parser = argparse.ArgumentParser()
parser.add_argument('--local_rank', type=int, default=None,
help='local rank passed from distributed launcher')
args = parser.parse_args()
local_rank = args.local_rank
# Get rank and world size.
rank = int(os.getenv('RANK', '0'))
world_size = int(os.getenv("WORLD_SIZE", '1'))
print('> initializing torch.distributed with local rank: {}, '
'rank: {}, world size: {}'.format(local_rank, rank, world_size))
# Set the device id.
device = rank % torch.cuda.device_count()
if local_rank is not None:
device = local_rank
torch.cuda.set_device(device)
# Call the init process.
init_method = 'tcp://'
master_ip = os.getenv('MASTER_ADDR', 'localhost')
master_port = os.getenv('MASTER_PORT', '6000')
init_method += master_ip + ':' + master_port
torch.distributed.init_process_group(
backend=backend,
world_size=world_size,
rank=rank,
init_method=init_method)
def print_separator(message):
torch.distributed.barrier()
filler_len = (78 - len(message)) // 2
filler = '-' * filler_len
string = '\n' + filler + ' {} '.format(message) + filler
if torch.distributed.get_rank() == 0:
print(string, flush=True)
torch.distributed.barrier()
# coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import sys
sys.path.append("../..")
import torch
import torch.nn.functional as F
import mpu
from mpu.cross_entropy import vocab_parallel_cross_entropy
from commons import initialize_distributed
from commons import print_separator
from commons import IdentityLayer
from commons import set_random_seed
def torch_cross_entropy(batch_size, seq_length, vocab_size,
logits_scale, seed):
set_random_seed(seed)
identity = IdentityLayer((batch_size, seq_length, vocab_size),
scale=logits_scale).cuda()
logits = identity()
target = torch.cuda.LongTensor(
size=(batch_size, seq_length)).random_(0, vocab_size)
loss = F.cross_entropy(logits.view(-1, logits.size()[-1]),
target.view(-1),
reduction='none').view_as(target).mean()
loss.backward()
return loss, identity.weight.grad
def mpu_cross_entropy(batch_size, seq_length, vocab_size,
logits_scale, seed):
set_random_seed(seed)
identity = IdentityLayer((batch_size, seq_length, vocab_size),
scale=logits_scale).cuda()
logits = identity()
logits_parallel = mpu.scatter_to_model_parallel_region(logits)
target = torch.cuda.LongTensor(
size=(batch_size, seq_length)).random_(0, vocab_size)
loss = vocab_parallel_cross_entropy(logits_parallel, target).mean()
loss.backward()
return loss, identity.weight.grad
def test_cross_entropy(model_parallel_size):
if torch.distributed.get_rank() == 0:
print('> testing cross entropy with model parallel size {} ...'.
format(model_parallel_size))
mpu.initialize_model_parallel(model_parallel_size)
model_parallel_size = mpu.get_model_parallel_world_size()
batch_size = 13
seq_length = 17
vocab_size_per_partition = 11
logits_scale = 1000.0
vocab_size = vocab_size_per_partition * model_parallel_size
seed = 1234
loss_torch, grad_torch = torch_cross_entropy(batch_size, seq_length,
vocab_size, logits_scale,
seed)
loss_mpu, grad_mpu = mpu_cross_entropy(batch_size, seq_length,
vocab_size, logits_scale,
seed)
error = loss_torch.sub_(loss_mpu).abs().max()
print(' max error in loss on global rank {}: {}'.format(
torch.distributed.get_rank(), error))
assert error < 1.0e-6
error = grad_torch.sub_(grad_mpu).abs().max()
print(' max error in grad on global rank {}: {}'.format(
torch.distributed.get_rank(), error))
assert error < 1.0e-6
# Reset groups
mpu.destroy_model_parallel()
torch.distributed.barrier()
if torch.distributed.get_rank() == 0:
print('>> passed the test :-)')
if __name__ == '__main__':
initialize_distributed()
world_size = torch.distributed.get_world_size()
model_parallel_size = 1
while model_parallel_size <= world_size:
print_separator('test cross entropy')
test_cross_entropy(model_parallel_size)
model_parallel_size *= 2
# coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import operator
import sys
sys.path.append("../..")
import torch
import mpu
from mpu import data as data_utils
from commons import initialize_distributed
from commons import print_separator
def test_boradcast_data(model_parallel_size):
if torch.distributed.get_rank() == 0:
print('> testing boradcast_data with model parallel size {} ...'.
format(model_parallel_size))
mpu.initialize_model_parallel(model_parallel_size)
torch.manual_seed(1234 + mpu.get_data_parallel_rank())
model_parallel_size = mpu.get_model_parallel_world_size()
key_size_t = {'key1': [7, 11],
'key2': [8, 2, 1],
'key3': [13],
'key4': [5, 1, 2],
'key5': [5, 12]}
keys = list(key_size_t.keys())
data = {}
data_t = {}
for key in key_size_t:
data[key] = torch.LongTensor(size=key_size_t[key]).random_(0, 1000)
data_t[key] = data[key].clone()
data['keyX'] = torch.FloatTensor(size=(5, )).random_(0, 1000)
data_t['keyX'] = data['keyX'].clone()
if mpu.get_model_parallel_rank() != 0:
data = None
data_utils._check_data_types(keys, data_t, torch.int64)
key_size, key_numel, \
total_numel = data_utils._build_key_size_numel_dictionaries(keys, data)
for key in keys:
assert key_size[key] == key_size_t[key]
total_numel_t = 0
for key in keys:
target_size = functools.reduce(operator.mul, key_size_t[key], 1)
assert key_numel[key] == target_size
total_numel_t += target_size
assert total_numel == total_numel_t
data_b = data_utils.broadcast_data(keys, data, torch.int64)
for key in keys:
tensor = data_t[key].cuda()
assert data_b[key].sub(tensor).abs().max() == 0
# Reset groups
mpu.destroy_model_parallel()
torch.distributed.barrier()
if torch.distributed.get_rank() == 0:
print('>> passed the test :-)')
if __name__ == '__main__':
initialize_distributed()
world_size = torch.distributed.get_world_size()
model_parallel_size = 1
while model_parallel_size <= world_size:
print_separator('test test boradcast data')
test_boradcast_data(model_parallel_size)
model_parallel_size *= 2
# coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
sys.path.append("../..")
import torch
import mpu
from commons import initialize_distributed
from commons import print_separator
def test_initialize_model_parallel(model_parallel_size):
if torch.distributed.get_rank() == 0:
print('> testing initialize_model_parallel with size {} ...'.format(
model_parallel_size))
model_parallel_size_ = min(model_parallel_size,
torch.distributed.get_world_size())
assert not mpu.model_parallel_is_initialized()
mpu.initialize_model_parallel(model_parallel_size_)
assert mpu.model_parallel_is_initialized()
# Checks.
def check(group, world_size, rank):
assert world_size == torch.distributed.get_world_size(group=group)
assert rank == torch.distributed.get_rank(group=group)
# Model parallel.
world_size = model_parallel_size_
rank = torch.distributed.get_rank() % model_parallel_size_
assert world_size == mpu.get_model_parallel_world_size()
assert rank == mpu.get_model_parallel_rank()
check(mpu.get_model_parallel_group(), world_size, rank)
# Data parallel.
world_size = torch.distributed.get_world_size() // model_parallel_size_
rank = torch.distributed.get_rank() // model_parallel_size
assert world_size == mpu.get_data_parallel_world_size()
assert rank == mpu.get_data_parallel_rank()
check(mpu.get_data_parallel_group(), world_size, rank)
# Reset groups
mpu.destroy_model_parallel()
torch.distributed.barrier()
if torch.distributed.get_rank() == 0:
print('>> passed the test :-)')
def test_get_model_parallel_src_rank(model_parallel_size_):
if torch.distributed.get_rank() == 0:
print('> testing get_model_parallel_src_rank with size {} ...'.format(
model_parallel_size_))
model_parallel_size = min(model_parallel_size_,
torch.distributed.get_world_size())
assert not mpu.model_parallel_is_initialized()
mpu.initialize_model_parallel(model_parallel_size)
assert mpu.model_parallel_is_initialized()
# Checks
src_rank = torch.distributed.get_rank() - mpu.get_model_parallel_rank()
assert mpu.get_model_parallel_src_rank() == src_rank
# Reset groups
mpu.destroy_model_parallel()
torch.distributed.barrier()
if torch.distributed.get_rank() == 0:
print('>> passed the test :-)')
if __name__ == '__main__':
initialize_distributed()
world_size = torch.distributed.get_world_size()
model_parallel_size = 1
while model_parallel_size <= world_size:
print_separator('test initialize model parallel')
test_initialize_model_parallel(model_parallel_size)
print_separator('test model parallel source rank')
test_get_model_parallel_src_rank(model_parallel_size)
model_parallel_size *= 2
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment