Commit 9350ee08 authored by Neel Kant's avatar Neel Kant
Browse files

Merge staging into ict-stable

parents 9238c57a 63262827
# coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Pretrain BERT"""
import os
import json
import math
import random
import numpy as np
import torch
from arguments import get_args
from configure_data import configure_data
from megatron.fp16 import FP16_Module
from megatron.fp16 import FP16_Optimizer
from megatron.learning_rates import AnnealingLR
from megatron.model import GPT2Model
from megatron.model import DistributedDataParallel as DDP
from megatron import mpu
from apex.optimizers import FusedAdam as Adam
from megatron.utils import Timers
from megatron.utils import load_checkpoint
from megatron.utils import report_memory
from megatron.utils import print_params_min_max_norm
from megatron.utils import print_rank_0
from megatron.data_utils import make_tokenizer
from detokenizer import *
def get_model(args):
"""Build the model."""
print_rank_0('building GPT2 model ...')
model = GPT2Model(num_layers=args.num_layers,
vocab_size=args.vocab_size,
hidden_size=args.hidden_size,
num_attention_heads=args.num_attention_heads,
embedding_dropout_prob=args.hidden_dropout,
attention_dropout_prob=args.attention_dropout,
output_dropout_prob=args.hidden_dropout,
max_sequence_length=args.max_position_embeddings,
checkpoint_activations=args.checkpoint_activations,
checkpoint_num_layers=args.checkpoint_num_layers,
parallel_output=not args.cloze_eval)
print_rank_0(' > number of parameters: {}'.format(
sum([p.nelement() for p in model.parameters()])))
# GPU allocation.
model.cuda(torch.cuda.current_device())
# Fp16 conversion.
if args.fp16:
model = FP16_Module(model)
# Wrap model for distributed training.
model = DDP(model)
return model
def setup_model(args):
"""Setup model and optimizer."""
model = get_model(args)
if args.load is not None:
_ = load_checkpoint(
model, None, None, args)
return model
def get_masks_and_position_ids(data,
eod_token,
reset_position_ids,
reset_attention_mask):
# Extract batch size and sequence length.
batch_size, seq_length = data.size()
# Attention mask (lower triangular).
if reset_attention_mask:
att_mask_batch = batch_size
else:
att_mask_batch = 1
attention_mask = torch.tril(torch.ones(
(att_mask_batch, seq_length, seq_length), device=data.device)).view(
att_mask_batch, 1, seq_length, seq_length)
# Loss mask.
loss_mask = torch.ones(data.size(), dtype=torch.float, device=data.device)
loss_mask[data == eod_token] = 0.0
# Position ids.
position_ids = torch.arange(seq_length, dtype=torch.long,
device=data.device)
position_ids = position_ids.unsqueeze(0).expand_as(data)
# We need to clone as the ids will be modifed based on batch index.
if reset_position_ids:
position_ids = position_ids.clone()
if reset_position_ids or reset_attention_mask:
# Loop through the batches:
for b in range(batch_size):
# Find indecies where EOD token is.
eod_index = position_ids[b, data[b] == eod_token]
# Detach indecies from positions if going to modify positions.
if reset_position_ids:
eod_index = eod_index.clone()
# Loop through EOD indecies:
prev_index = 0
for j in range(eod_index.size()[0]):
i = eod_index[j]
# Mask attention loss.
if reset_attention_mask:
attention_mask[b, 0, (i+1):, :(i+1)] = 0
# Reset positions.
if reset_position_ids:
position_ids[b, (i+1):] -= (i + 1 - prev_index)
prev_index = i + 1
return attention_mask, loss_mask, position_ids
def get_batch(data_iterator, args, timers):
''' get_batch subdivides the source data into chunks of
length args.seq_length. If source is equal to the example
output of the data loading example, with a seq_length limit
of 2, we'd get the following two Variables for i = 0:
┌ a g m s ┐ ┌ b h n t ┐
└ b h n t ┘ └ c i o u ┘
Note that despite the name of the function, the subdivison of data is not
done along the batch dimension (i.e. dimension 1), since that was handled
by the data loader. The chunks are along dimension 0, corresponding
to the seq_len dimension in the LSTM. A Variable representing an appropriate
shard reset mask of the same dimensions is also returned.
'''
# Items and their type.
keys = ['text', 'pad_mask']
datatype = torch.int64
# Broadcast data.
timers('data loader').start()
if data_iterator is not None:
data = next(data_iterator)
else:
data = None
timers('data loader').stop()
data_b = mpu.broadcast_data(keys, data, datatype)
# Unpack.
tokens_ = data_b['text'].long()
lm_labels = tokens_[:, 1:].contiguous()
tokens = tokens_[:, :-1].contiguous()
padding_mask = data_b['pad_mask'].byte()
# Get the masks and postition ids.
attention_mask, loss_mask, position_ids = get_masks_and_position_ids(
tokens,
args.eod_token,
args.reset_position_ids,
args.reset_attention_mask)
# Convert
if args.fp16:
attention_mask = attention_mask.half()
return tokens, lm_labels, attention_mask, position_ids, padding_mask
def forward_step(data_iterator, model, args, timers):
"""Forward step."""
# Get the batch.
timers('batch generator').start()
batch = get_batch(data_iterator, args, timers)
if batch is None:
return None
tokens, lm_labels, attention_mask, position_ids, loss_mask = batch
timers('batch generator').stop()
# Forward model.
if args.eval_hf:
output, _ = model(tokens)
else:
output = model(tokens, position_ids, attention_mask)
if not args.cloze_eval:
#losses = torch.nn.CrossEntropyLoss(reduce=False)(
losses = mpu.vocab_parallel_cross_entropy(
output.contiguous().float(), lm_labels.contiguous())
loss_mask = loss_mask.contiguous()
loss_mask = loss_mask.view(-1)
lm_loss = torch.sum(
losses.view(-1) * loss_mask.float())
else:
outputs = torch.argmax(output, -1)
correct = (outputs == lm_labels).float()
correct[(1-loss_mask).bool()] = 1
correct = correct.prod(-1)
lm_loss = correct.sum()
# loss_mask = loss_mask.contiguous().view(-1).float()
# lm_loss = torch.sum(acc * loss_mask)
return lm_loss
def evaluate(data_loader, model, args, timers,
num_iterations=None):
"""Evaluation."""
# Turn on evaluation mode which disables dropout.
model.eval()
total_lm_loss = 0
if num_iterations is not None:
max_iters = num_iterations
else:
if mpu.get_model_parallel_rank() == 0:
max_iters_gpu = torch.cuda.LongTensor([len(data_loader)])
else:
max_iters_gpu = torch.cuda.LongTensor([0])
torch.distributed.broadcast(max_iters_gpu,
mpu.get_model_parallel_src_rank(),
group=mpu.get_model_parallel_group())
max_iters = max_iters_gpu[0].item()
print_rank_0('global rank: {} | max iters: {}'.format(
torch.distributed.get_rank(), max_iters))
if data_loader is not None:
data_iterator = iter(data_loader)
else:
data_iterator = None
with torch.no_grad():
iteration = 0
while iteration < max_iters:
if iteration % args.log_interval == 0:
print_rank_0('global rank: {} | iteration: {}'.format(
torch.distributed.get_rank(), iteration))
# Forward evaluation.
lm_loss = forward_step(data_iterator, model, args, timers)
if lm_loss is None:
break
# Reduce across processes.
if isinstance(model, DDP):
torch.distributed.all_reduce(lm_loss.data)
if args.cloze_eval:
lm_loss.data = lm_loss.data / args.world_size
else:
lm_loss.data = lm_loss.data / args.model_parallel_size
if not args.cloze_eval:
total_lm_loss += lm_loss.data.detach().float().item()/(args.num_tokenized_tokens-1)
else:
total_lm_loss += lm_loss.data.detach().float().item()
iteration += 1
# Move model back to the train mode.
model.train()
return total_lm_loss
def evaluate_and_print_results(prefix, data_iterator, model,
args, timers, num_iterations=None):
"""Helper function to evaluate and dump results on screen."""
if not args.cloze_eval:
lm_loss = evaluate(data_iterator, model, args, timers, num_iterations)
val_loss = lm_loss
ppl = math.exp(min(20, val_loss))
token_ratio = (args.num_tokenized_tokens-1)/(args.num_original_tokens-1)
adjusted_ppl = math.exp(min(20, val_loss*token_ratio))
print_rank_0('-' * 100)
string = ' validation results on {} | '.format(prefix)
string += 'avg loss: {:.4E} | '.format(val_loss)
string += 'ppl: {:.4E} | '.format(ppl)
string += 'adjusted ppl: {:.4E} | '.format(adjusted_ppl)
string += 'token ratio: {} |'.format(token_ratio)
length = len(string) + 1
print_rank_0('-' * length)
print_rank_0(string)
print_rank_0('-' * length)
return val_loss
else:
num_correct = evaluate(data_iterator, model, args, timers, num_iterations)
acc = num_correct / args.num_examples
print_rank_0('-' * 100)
string = ' validation results on {} | '.format(prefix)
string += 'number correct: {:.4E} | '.format(num_correct)
string += 'total examples: {:.4E} | '.format(args.num_examples)
string += 'avg accuracy: {:.4E}'.format(acc)
length = len(string) + 1
print_rank_0('-' * length)
print_rank_0(string)
print_rank_0('-' * length)
return acc
def initialize_distributed(args):
"""Initialize torch.distributed."""
# Manually set the device ids.
device = args.rank % torch.cuda.device_count()
if args.local_rank is not None:
device = args.local_rank
torch.cuda.set_device(device)
# Call the init process
init_method = 'tcp://'
master_ip = os.getenv('MASTER_ADDR', 'localhost')
master_port = os.getenv('MASTER_PORT', '6000')
init_method += master_ip + ':' + master_port
torch.distributed.init_process_group(
backend=args.distributed_backend,
world_size=args.world_size, rank=args.rank,
init_method=init_method)
# Set the model-parallel / data-parallel communicators.
mpu.initialize_model_parallel(args.model_parallel_size)
def set_random_seed(seed):
"""Set random seed for reproducability."""
if seed is not None and seed > 0:
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
mpu.model_parallel_cuda_manual_seed(seed)
class LM_Eval_Dataset(torch.utils.data.Dataset):
def __init__(self, tokens, seq_len, pad_idx, overalapping_eval=None, **kwargs):
self.tokens = tokens
self.seq_len = seq_len
self.pad_idx = pad_idx
self.overalapping_eval = overalapping_eval
if self.overalapping_eval is None:
self.overalapping_eval = self.seq_len
self.overalapping_eval = max(1, self.overalapping_eval)
self.total_targets = len(self.tokens) - 1
# remove first sequence tokens
targets = max(self.total_targets - self.overalapping_eval, 0)
self.total_sequences = max(math.ceil(targets / self.overalapping_eval)+1, 1)
def __len__(self):
return self.total_sequences
def __getitem__(self, idx):
start_idx = idx * self.overalapping_eval
end_idx = start_idx + self.seq_len
tokens = self.tokens[start_idx:end_idx+1]
num_tokens = len(tokens)
pad_mask = [1]*num_tokens
if num_tokens < self.seq_len+1:
num_pad = (self.seq_len+1-num_tokens)
pad_mask += [0]*(num_pad)
tokens += [self.pad_idx] * num_pad
pad_mask = np.array(pad_mask[1:])
if self.overalapping_eval != self.seq_len and idx!=0:
pad_mask[:-self.overalapping_eval] *= 0
return {'text': np.array(tokens), 'pad_mask': pad_mask}
class Lambada_Eval_Dataset(torch.utils.data.Dataset):
def __init__(self, path, tokenizer, seq_len, strict=False, **kwargs):
self.seq_len = seq_len
self.pad_idx = tokenizer.get_command('pad').Id
self.tokenizer = tokenizer
self.strict = strict
self.tokens = []
self.labels = []
with open(path, 'r') as f:
for line in f.readlines():
text = json.loads(line)['text']
tokens, labels = self.get_tokens(text)
self.tokens.append(tokens)
self.labels.append(labels)
def get_tokens(self, text):
if not self.strict:
tokens = self.tokenizer.EncodeAsIds(text).tokenization
return tokens[:-1], [tokens[-1]]
last_token = text.split()[-1]
start_idx = text.rfind(last_token)
beginning_tokens = self.tokenizer.EncodeAsIds(text[:start_idx].strip()).tokenization
last_token = self.tokenizer.EncodeAsIds(' '+last_token).tokenization
return beginning_tokens, last_token
def __len__(self):
return len(self.tokens)
def __getitem__(self, idx):
tokens = self.tokens[idx]
num_tokens = len(tokens)
pad_mask = [0]*num_tokens
labels = self.labels[idx]
pad_mask += [1]*len(labels)
tokens = tokens+labels
num_tokens = len(tokens)
if num_tokens < self.seq_len+1:
num_pad = (self.seq_len+1-num_tokens)
pad_mask += [0]*(num_pad)
tokens += [self.pad_idx] * num_pad
pad_mask = np.array(pad_mask[1:])
return {'text': np.array(tokens), 'pad_mask': pad_mask}
def get_tokenizer(args):
tokenizer_args = {
'tokenizer_type': args.tokenizer_type,
'corpus': None,
'model_path': args.tokenizer_path,
'vocab_size': args.vocab_size,
'model_type': args.tokenizer_model_type,
'cache_dir': args.cache_dir}
return make_tokenizer(**tokenizer_args)
def get_eval_data(args):
val_dataloader = None
if mpu.get_model_parallel_rank() == 0:
eval_batch_size = args.eval_batch_size
eval_batch_size = args.batch_size if eval_batch_size is None else eval_batch_size
seq_len = args.seq_length
valid_data = args.valid_data
valid_data = valid_data[0] if isinstance(valid_data, list) else valid_data
tokenizer = get_tokenizer(args)
if not args.cloze_eval:
with open(valid_data, "rb") as reader:
entire_data = reader.read().decode('utf-8')
num_original_tokens = len(entire_data.strip().split(" "))
entire_data = get_detokenizer(valid_data)(entire_data)
tokenized_data = tokenizer.EncodeAsIds(entire_data).tokenization
num_tokenized_tokens = len(tokenized_data)
string = 'Original Tokens: %d, Detokenized tokens: %d' % (num_tokenized_tokens, num_original_tokens)
print_rank_0(string)
eod_token = tokenizer.get_command('pad').Id
val_dataset = LM_Eval_Dataset(tokenized_data, seq_len, eod_token,
args.overlapping_eval)
else:
val_dataset = Lambada_Eval_Dataset(valid_data, tokenizer, seq_len, args.strict_lambada)
num_tokenized_tokens = 0
num_original_tokens = 0
val_dataloader = torch.utils.data.DataLoader(
val_dataset, batch_size=eval_batch_size, drop_last=False)
before = tokenizer.num_tokens
after = before
multiple = args.make_vocab_size_divisible_by * \
mpu.get_model_parallel_world_size()
while (after % multiple) != 0:
after += 1
print_rank_0('> padded vocab (size: {}) with {} dummy tokens (new size: {})'.
format(before, after - before, after))
eod_token = tokenizer.get_command('pad').Id
num_examples = len(val_dataset)
token_counts = torch.cuda.LongTensor([after, eod_token, num_examples,
num_original_tokens,
num_tokenized_tokens])
else:
token_counts = torch.cuda.LongTensor([0, 0, 0, 0, 0])
torch.distributed.broadcast(token_counts,
mpu.get_model_parallel_src_rank(),
group=mpu.get_model_parallel_group())
args.vocab_size = token_counts[0].item()
args.eod_token = token_counts[1].item()
args.num_examples = token_counts[2].item()
args.num_original_tokens = token_counts[3].item()
args.num_tokenized_tokens = token_counts[4].item()
print('global rank: {} | vocab size: {} | eod token: {} | '
'num_examples: {} | num_original_tokens: {} | '
'num_tokenized_tokens: {}'.format(
torch.distributed.get_rank(), args.vocab_size,
args.eod_token, args.num_examples, args.num_original_tokens,
args.num_tokenized_tokens ))
return val_dataloader
def main():
"""Main training program."""
print('Evaluate GPT2 model')
# Disable CuDNN.
torch.backends.cudnn.enabled = False
# Timer.
timers = Timers()
# Arguments.
args = get_args()
# Pytorch distributed.
initialize_distributed(args)
# Random seeds for reproducability.
set_random_seed(args.seed)
# Data stuff.
eval_data = get_eval_data(args)
# Model, optimizer, and learning rate.
if args.eval_hf:
from pytorch_pretrained_bert import GPT2LMHeadModel
from pytorch_pretrained_bert import GPT2Model as HFGPT2Model
if args.num_layers == 24:
model_path = args.load
#model_path = '/home/universal-lm-data.cosmos549/repos/gpt2_mp/models/345M'
hfmodel = HFGPT2Model.from_pretrained(model_path, cache_dir='gpt2_weights', from_tf=True).cuda()
model = GPT2LMHeadModel(hfmodel.config)
model.transformer.load_state_dict(hfmodel.state_dict())
model.cuda()
else:
model = GPT2LMHeadModel.from_pretrained('gpt2', cache_dir='gpt2_weights').cuda()
else:
if args.load_openai:
from megatron.utils import move_weights
model_path = args.load
args.load = None
model = setup_model(args)
from pytorch_pretrained_bert import GPT2LMHeadModel
from pytorch_pretrained_bert import GPT2Model as HFGPT2Model
model_path = 'gpt2'
from_tf = False
print('loading openai weights')
model.cpu()
if args.num_layers == 24:
#model_path = '/home/universal-lm-data.cosmos549/repos/gpt2_mp/models/345M'
hfmodel = HFGPT2Model.from_pretrained(model_path, cache_dir='gpt2_weights', from_tf=True)
gpt2model = GPT2LMHeadModel(hfmodel.config)
gpt2model.transformer.load_state_dict(hfmodel.state_dict())
gpt2model
else:
gpt2model = GPT2LMHeadModel.from_pretrained('gpt2', cache_dir='gpt2_weights')
model2fill = model
while isinstance(model2fill, (DDP, FP16_Module)):
model2fill = model2fill.module
move_weights(model2fill, gpt2model)
model.cuda()
else:
model = setup_model(args)
# Run on test data.
prefix = "wiki" #os.path.basename(args.valid_data)
evaluate_and_print_results(prefix, eval_data,
model, args, timers)
if __name__ == "__main__":
main()
# coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Sample Generate GPT2"""
from megatron import get_args
from megatron import get_tokenizer
from megatron import print_rank_0
from megatron.checkpointing import load_checkpoint
from megatron.initialize import initialize_megatron
from megatron.model import GPT2Model
from megatron.training import get_model
from megatron.text_generation_utils import generate_and_write_samples_unconditional
from megatron.text_generation_utils import generate_samples_input_from_file
from megatron.text_generation_utils import generate_samples_interactive
def model_provider():
"""Build the model."""
print_rank_0('building GPT2 model ...')
model = GPT2Model(num_tokentypes=0, parallel_output=False)
return model
def add_text_generate_args(parser):
"""Text generation arguments."""
group = parser.add_argument_group(title='text generation')
group.add_argument("--temperature", type=float, default=1.0,
help='Sampling temperature.')
group.add_argument("--greedy", action='store_true', default=False,
help='Use greedy sampling.')
group.add_argument("--top_p", type=float, default=0.0,
help='Top p sampling.')
group.add_argument("--top_k", type=int, default=0,
help='Top k sampling.')
group.add_argument("--out-seq-length", type=int, default=1024,
help='Size of the output generated text.')
group.add_argument("--sample-input-file", type=str, default=None,
help='Get input from file instead of interactive mode, '
'each line is an input.')
group.add_argument("--sample-output-file", type=str, default=None,
help='Output file got from --sample-input-file')
group.add_argument("--num-samples", type=int, default=0,
help='Number of samples to generate unconditionally, '
'defaults to 0 and interactive conditional sampling')
group.add_argument("--genfile", type=str,
help='Output file when generating unconditionally')
group.add_argument("--recompute", action='store_true',
help='During generation recompute all attention '
'instead of using previously computed keys/values.')
return parser
def main():
"""Main program."""
initialize_megatron(extra_args_provider=add_text_generate_args,
args_defaults={'tokenizer_type': 'GPT2BPETokenizer'})
# Set up model and load checkpoint.
model = get_model(model_provider)
args = get_args()
if args.load is not None:
_ = load_checkpoint(model, None, None)
# Generate samples.
if args.num_samples == 0:
args.batch_size = 1
if args.sample_input_file != "":
generate_samples_input_from_file(model)
else:
generate_samples_interactive(model)
else:
generate_and_write_samples_unconditional(model)
if __name__ == "__main__":
main()
# coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import torch
from .global_vars import get_args
from .global_vars import get_tokenizer
from .global_vars import get_tensorboard_writer
from .global_vars import get_adlr_autoresume
from .global_vars import get_timers
def print_rank_0(message):
"""If distributed is initialized print only on rank 0."""
if torch.distributed.is_initialized():
if torch.distributed.get_rank() == 0:
print(message, flush=True)
else:
print(message, flush=True)
...@@ -13,141 +13,184 @@ ...@@ -13,141 +13,184 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
"""argparser configuration""" """Megatron arguments."""
import argparse import argparse
import os import os
import torch
def add_model_config_args(parser): def parse_args(extra_args_provider=None, defaults={}):
"""Model arguments""" """Parse all arguments."""
parser = argparse.ArgumentParser(description='Megatron-LM Arguments')
# Standard arguments.
parser = _add_network_size_args(parser)
parser = _add_regularization_args(parser)
parser = _add_training_args(parser)
parser = _add_initialization_args(parser)
parser = _add_learning_rate_args(parser)
parser = _add_checkpointing_args(parser)
parser = _add_mixed_precision_args(parser)
parser = _add_distributed_args(parser)
parser = _add_validation_args(parser)
parser = _add_data_args(parser)
parser = _add_autoresume_args(parser)
# TODO: Refactor
parser = _add_gpt2_args(parser)
# Custom arguments.
if extra_args_provider is not None:
parser = extra_args_provider(parser)
# Parse.
args = parser.parse_args()
group = parser.add_argument_group('model', 'model configuration') # Set input defaults.
for key in defaults:
# For default to be valid, it should not be provided in the
# arguments that are passed to the program. We check this by
# ensuring the arg is set to None.
assert getattr(args, key) is None, \
'defaults can only be overwritten for args with None values.'
setattr(args, key, defaults[key])
group.add_argument('--pretrained-bert', action='store_true', # Distributed args.
help='use a pretrained bert-large-uncased model instead' args.rank = int(os.getenv('RANK', '0'))
'of initializing from scratch. See ' args.world_size = int(os.getenv("WORLD_SIZE", '1'))
'--tokenizer-model-type to specify which pretrained ' args.model_parallel_size = min(args.model_parallel_size, args.world_size)
'BERT model to use') if args.rank == 0:
group.add_argument('--attention-dropout', type=float, default=0.1, print('using world size: {} and model-parallel size: {} '.format(
help='dropout probability for attention weights') args.world_size, args.model_parallel_size))
group.add_argument('--num-attention-heads', type=int, default=16,
help='num of transformer attention heads') # Fp16 loss scaling.
group.add_argument('--hidden-size', type=int, default=1024, args.dynamic_loss_scale = False
help='tansformer hidden size') if args.loss_scale is None:
group.add_argument('--intermediate-size', type=int, default=None, args.dynamic_loss_scale = True
help='transformer embedding dimension for FFN'
'set to 4*`--hidden-size` if it is None') # Checks.
group.add_argument('--num-layers', type=int, default=24, assert args.hidden_size % args.num_attention_heads == 0
help='num decoder layers') if args.seq_length is not None:
group.add_argument('--layernorm-epsilon', type=float, default=1e-5, assert args.max_position_embeddings >= args.seq_length
help='layer norm epsilon') if args.lr is not None:
group.add_argument('--hidden-dropout', type=float, default=0.1, assert args.min_lr <= args.lr
help='dropout probability for hidden state transformer') if args.save is not None:
group.add_argument('--max-position-embeddings', type=int, default=512, assert args.save_interval is not None
help='maximum number of position embeddings to use')
group.add_argument('--vocab-size', type=int, default=None, _print_args(args)
help='vocab size to use for non-character-level ' return args
'tokenization. This value will only be used when '
'creating a tokenizer')
group.add_argument('--deep-init', action='store_true', def _print_args(args):
help='initialize bert model similar to gpt2 model.' """Print arguments."""
'scales initialization of projection layers by a ' if args.rank == 0:
'factor of 1/sqrt(2N). Necessary to train bert ' print('-------------------- arguments --------------------', flush=True)
'models larger than BERT-Large.') str_list = []
for arg in vars(args):
dots = '.' * (32 - len(arg))
str_list.append(' {} {} {}'.format(arg, dots, getattr(args, arg)))
for arg in sorted(str_list, key=lambda x: x.lower()):
print(arg, flush=True)
print('---------------- end of arguments ----------------', flush=True)
def _add_network_size_args(parser):
group = parser.add_argument_group(title='network size')
group.add_argument('--num-layers', type=int, required=True,
help='Number of transformer layers.')
group.add_argument('--hidden-size', type=int, required=True,
help='Tansformer hidden size.')
group.add_argument('--num-attention-heads', type=int, required=True,
help='Number of transformer attention heads.')
group.add_argument('--max-position-embeddings', type=int, required=True,
help='Maximum number of position embeddings to use. '
'This is the size of position embedding.')
group.add_argument('--make-vocab-size-divisible-by', type=int, default=128, group.add_argument('--make-vocab-size-divisible-by', type=int, default=128,
help='Pad the vocab size to be divisible by this value.' help='Pad the vocab size to be divisible by this value.'
'This is added for computational efficieny reasons.') 'This is added for computational efficieny reasons.')
group.add_argument('--layernorm-epsilon', type=float, default=1e-5,
help='Layer norm epsilon.')
group.add_argument('--apply-residual-connection-post-layernorm',
action='store_true',
help='If set, use original BERT residula connection '
'ordering.')
return parser return parser
def add_fp16_config_args(parser): def _add_regularization_args(parser):
"""Mixed precision arguments.""" group = parser.add_argument_group(title='regularization')
group = parser.add_argument_group('fp16', 'fp16 configurations') group.add_argument('--attention-dropout', type=float, default=0.1,
help='Post attention dropout ptobability.')
group.add_argument('--fp16', action='store_true', group.add_argument('--hidden-dropout', type=float, default=0.1,
help='Run model in fp16 mode') help='Dropout probability for hidden state transformer.')
group.add_argument('--apply-query-key-layer-scaling', action='store_true', group.add_argument('--weight-decay', type=float, default=0.01,
help='Scale Q * K^T by 1 / layer-number. If this flag ' help='Weight decay coefficient for L2 regularization.')
'is set, then it will automatically set ' group.add_argument('--clip-grad', type=float, default=1.0,
'attention-softmax-in-fp32 to true') help='Gradient clipping based on global L2 norm.')
group.add_argument('--attention-softmax-in-fp32', action='store_true',
help='Run attention masking and softmax in fp32.')
group.add_argument('--fp32-embedding', action='store_true',
help='embedding in fp32')
group.add_argument('--fp32-layernorm', action='store_true',
help='layer norm in fp32')
group.add_argument('--fp32-tokentypes', action='store_true',
help='embedding token types in fp32')
group.add_argument('--fp32-allreduce', action='store_true',
help='all-reduce in fp32')
group.add_argument('--hysteresis', type=int, default=2,
help='hysteresis for dynamic loss scaling')
group.add_argument('--loss-scale', type=float, default=None,
help='Static loss scaling, positive power of 2 '
'values can improve fp16 convergence. If None, dynamic'
'loss scaling is used.')
group.add_argument('--loss-scale-window', type=float, default=1000,
help='Window over which to raise/lower dynamic scale')
group.add_argument('--min-scale', type=float, default=1,
help='Minimum loss scale for dynamic loss scale')
return parser return parser
def add_training_args(parser): def _add_training_args(parser):
"""Training arguments.""" group = parser.add_argument_group(title='training')
group = parser.add_argument_group('train', 'training configurations')
group.add_argument('--batch-size', type=int, default=4, group.add_argument('--batch-size', type=int, default=None,
help='Data Loader batch size') help='Batch size per model instance (local batch size). '
group.add_argument('--weight-decay', type=float, default=0.01, 'Global batch size is local batch size times data '
help='weight decay coefficient for L2 regularization') 'parallel size.')
group.add_argument('--checkpoint-activations', action='store_true', group.add_argument('--checkpoint-activations', action='store_true',
help='checkpoint activation to allow for training ' help='Checkpoint activation to allow for training '
'with larger models and sequences') 'with larger models, sequences, and batch sizes.')
group.add_argument('--checkpoint-num-layers', type=int, default=1, group.add_argument('--checkpoint-num-layers', type=int, default=1,
help='chunk size (number of layers) for checkpointing') help='chunk size (number of layers) for checkpointing.')
group.add_argument('--clip-grad', type=float, default=1.0, group.add_argument('--train-iters', type=int, default=None,
help='gradient clipping') help='Total number of iterations to train over all '
group.add_argument('--train-iters', type=int, default=1000000, 'training runs.')
help='total number of iterations to train over all training runs')
group.add_argument('--log-interval', type=int, default=100, group.add_argument('--log-interval', type=int, default=100,
help='report interval') help='Report loss and timing interval.')
group.add_argument('--exit-interval', type=int, default=None, group.add_argument('--exit-interval', type=int, default=None,
help='Exit the program after this many new iterations.') help='Exit the program after the iteration is divisible '
'by this value.')
group.add_argument('--tensorboard-dir', type=str, default=None, group.add_argument('--tensorboard-dir', type=str, default=None,
help='Write TensorBoard logs to this directory') help='Write TensorBoard logs to this directory.')
return parser
def _add_initialization_args(parser):
group = parser.add_argument_group(title='initialization')
group.add_argument('--seed', type=int, default=1234, group.add_argument('--seed', type=int, default=1234,
help='random seed') help='Random seed used for python, numpy, '
# Batch prodecuer arguments 'pytorch, and cuda.')
group.add_argument('--reset-position-ids', action='store_true', group.add_argument('--init-method-std', type=float, default=0.02,
help='Reset posistion ids after end-of-document token.') help='Standard deviation of the zero mean normal '
group.add_argument('--reset-attention-mask', action='store_true', 'distribution used for weight initialization.')
help='Reset self attention maske after '
'end-of-document token.')
group.add_argument('--eod-mask-loss', action='store_true',
help='Mask loss for the end of document tokens')
# Learning rate. return parser
group.add_argument('--lr-decay-iters', type=int, default=None,
help='number of iterations to decay LR over,'
' If None defaults to `--train-iters`*`--epochs`') def _add_learning_rate_args(parser):
group = parser.add_argument_group(title='learning rate')
group.add_argument('--lr', type=float, default=None,
help='Initial learning rate. Depending on decay style '
'and initial warmup, the learing rate at each '
'iteration would be different.')
group.add_argument('--lr-decay-style', type=str, default='linear', group.add_argument('--lr-decay-style', type=str, default='linear',
choices=['constant', 'linear', 'cosine', 'exponential'], choices=['constant', 'linear', 'cosine', 'exponential'],
help='learning rate decay function') help='Learning rate decay function.')
group.add_argument('--lr', type=float, default=1.0e-4, group.add_argument('--lr-decay-iters', type=int, default=None,
help='initial learning rate') help='number of iterations to decay learning rate over,'
' If None defaults to `--train-iters`')
group.add_argument('--min-lr', type=float, default=0.0, group.add_argument('--min-lr', type=float, default=0.0,
help='Minumum value for learning rate. The scheduler' help='Minumum value for learning rate. The scheduler'
'clip values below this threshold.') 'clip values below this threshold.')
group.add_argument('--warmup', type=float, default=0.01, group.add_argument('--warmup', type=float, default=0.01,
help='percentage of data to warmup on (.01 = 1% of all ' help='Percentage of total iterations to warmup on '
'training iters). Default 0.01') '(.01 = 1 percent of all training iters).')
group.add_argument('--override-lr-scheduler', action='store_true', group.add_argument('--override-lr-scheduler', action='store_true',
help='Reset the values of the scheduler (learning rate,' help='Reset the values of the scheduler (learning rate,'
'warmup iterations, minimum learning rate, maximum ' 'warmup iterations, minimum learning rate, maximum '
...@@ -158,20 +201,24 @@ def add_training_args(parser): ...@@ -158,20 +201,24 @@ def add_training_args(parser):
help='Use checkpoint to set the values of the scheduler ' help='Use checkpoint to set the values of the scheduler '
'(learning rate, warmup iterations, minimum learning ' '(learning rate, warmup iterations, minimum learning '
'rate, maximum number of iterations, and decay style ' 'rate, maximum number of iterations, and decay style '
'from input arguments and ignore values from ' 'from checkpoint and ignore input arguments.')
'checkpoints. Notethat all the above values will be '
'reset.') return parser
# model checkpointing
def _add_checkpointing_args(parser):
group = parser.add_argument_group(title='checkpointing')
group.add_argument('--save', type=str, default=None, group.add_argument('--save', type=str, default=None,
help='Output directory to save checkpoints to.') help='Output directory to save checkpoints to.')
group.add_argument('--save-interval', type=int, default=5000, group.add_argument('--save-interval', type=int, default=None,
help='number of iterations between saves') help='Number of iterations between checkpoint saves.')
group.add_argument('--no-save-optim', action='store_true', group.add_argument('--no-save-optim', action='store_true',
help='Do not save current optimizer.') help='Do not save current optimizer.')
group.add_argument('--no-save-rng', action='store_true', group.add_argument('--no-save-rng', action='store_true',
help='Do not save current rng state.') help='Do not save current rng state.')
group.add_argument('--load', type=str, default=None, group.add_argument('--load', type=str, default=None,
help='Path to a directory containing a model checkpoint.') help='Directory containing a model checkpoint.')
group.add_argument('--no-load-optim', action='store_true', group.add_argument('--no-load-optim', action='store_true',
help='Do not load optimizer when loading checkpoint.') help='Do not load optimizer when loading checkpoint.')
group.add_argument('--no-load-rng', action='store_true', group.add_argument('--no-load-rng', action='store_true',
...@@ -180,235 +227,146 @@ def add_training_args(parser): ...@@ -180,235 +227,146 @@ def add_training_args(parser):
help='Load model for finetuning. Do not load optimizer ' help='Load model for finetuning. Do not load optimizer '
'or rng state from checkpoint and set iteration to 0. ' 'or rng state from checkpoint and set iteration to 0. '
'Assumed when loading a release checkpoint.') 'Assumed when loading a release checkpoint.')
group.add_argument('--resume-dataloader', action='store_true',
help='Resume the dataloader when resuming training. ' return parser
'Does not apply to tfrecords dataloader, try resuming'
'with a different seed in this case.')
# distributed training args def _add_mixed_precision_args(parser):
group = parser.add_argument_group(title='mixed precision')
group.add_argument('--fp16', action='store_true',
help='Run model in fp16 mode.')
group.add_argument('--apply-query-key-layer-scaling', action='store_true',
help='Scale Q * K^T by 1 / layer-number. If this flag '
'is set, then it will automatically set '
'attention-softmax-in-fp32 to true')
group.add_argument('--attention-softmax-in-fp32', action='store_true',
help='Run attention masking and softmax in fp32.')
group.add_argument('--fp32-allreduce', action='store_true',
help='All-reduce in fp32')
group.add_argument('--hysteresis', type=int, default=2,
help='hysteresis for dynamic loss scaling')
group.add_argument('--loss-scale', type=float, default=None,
help='Static loss scaling, positive power of 2 '
'values can improve fp16 convergence. If None, dynamic'
'loss scaling is used.')
group.add_argument('--loss-scale-window', type=float, default=1000,
help='Window over which to raise/lower dynamic scale.')
group.add_argument('--min-scale', type=float, default=1,
help='Minimum loss scale for dynamic loss scale.')
return parser
def _add_distributed_args(parser):
group = parser.add_argument_group(title='mixed precision')
group.add_argument('--model-parallel-size', type=int, default=1,
help='Size of the model parallel.')
group.add_argument('--distributed-backend', default='nccl', group.add_argument('--distributed-backend', default='nccl',
help='which backend to use for distributed ' choices=['nccl', 'gloo'],
'training. One of [gloo, nccl]') help='Which backend to use for distributed training.')
group.add_argument('--DDP-impl', default='local', group.add_argument('--DDP-impl', default='local',
choices=['local', 'torch'],
help='which DistributedDataParallel implementation ' help='which DistributedDataParallel implementation '
'to use. One of [local, torch]') 'to use.')
group.add_argument('--local_rank', type=int, default=None, group.add_argument('--local_rank', type=int, default=None,
help='local rank passed from distributed launcher') help='local rank passed from distributed launcher.')
# autoresume
group.add_argument('--adlr-autoresume', action='store_true',
help='enable autoresume on adlr cluster.')
group.add_argument('--adlr-autoresume-interval', type=int, default=1000,
help='intervals over which check for autoresume'
'termination signal')
return parser return parser
def add_evaluation_args(parser): def _add_validation_args(parser):
"""Evaluation arguments.""" group = parser.add_argument_group(title='validation')
group = parser.add_argument_group('validation', 'validation configurations')
group.add_argument('--eval-batch-size', type=int, default=None,
help='Data Loader batch size for evaluation datasets.'
'Defaults to `--batch-size`')
group.add_argument('--eval-iters', type=int, default=100, group.add_argument('--eval-iters', type=int, default=100,
help='number of iterations to run for evaluation' help='Number of iterations to run for evaluation'
'validation/test for') 'validation/test for.')
group.add_argument('--eval-interval', type=int, default=1000, group.add_argument('--eval-interval', type=int, default=1000,
help='interval between running evaluation on validation set') help='Interval between running evaluation on '
group.add_argument('--eval-seq-length', type=int, default=None, 'validation set.')
help='Maximum sequence length to process for '
'evaluation. Defaults to `--seq-length`')
group.add_argument('--eval-max-preds-per-seq', type=int, default=None,
help='Maximum number of predictions to use for '
'evaluation. Defaults to '
'math.ceil(`--eval-seq-length`*.15/10)*10')
group.add_argument('--overlapping-eval', type=int, default=32,
help='sliding window for overlapping eval ')
group.add_argument('--cloze-eval', action='store_true',
help='Evaluation dataset from `--valid-data` is a cloze task')
group.add_argument('--strict-lambada', action='store_true',
help='use more difficult formulation of lambada')
group.add_argument('--eval-hf', action='store_true',
help='perform evaluation with huggingface openai model.'
'use `--load` to specify weights path to be loaded')
group.add_argument('--load-openai', action='store_true',
help='load openai weights into our model. Use `--load` '
'to specify weights path to be loaded')
return parser return parser
def add_text_generate_args(parser):
"""Text generate arguments."""
group = parser.add_argument_group('Text generation', 'configurations')
group.add_argument("--temperature", type=float, default=1.0)
group.add_argument("--greedy", action='store_true', default=False)
group.add_argument("--top_p", type=float, default=0.0)
group.add_argument("--top_k", type=int, default=0)
group.add_argument("--out-seq-length", type=int, default=1024)
group.add_argument("--sample-input-file", type=str, default="",
help='get input from file instead of interactive mode, '
'each line is an input' )
group.add_argument("--sample-output-file", type=str, default="",
help='output file got from --sample-input-file')
group.add_argument("--num-samples", type=int, default=0,
help='number of samples to generate unconditionally, '
'defaults to 0 and interactive conditional sampling')
group.add_argument("--genfile", type=str,
help='output file when generating unconditionally')
group.add_argument("--recompute", action='store_true',
help='during generation recompute all attention '
'instead of using previously computed keys/values.')
return parser
def add_data_args(parser): def _add_data_args(parser):
"""Train/valid/test data arguments.""" group = parser.add_argument_group(title='data and dataloader')
group.add_argument('--data-path', type=str, default=None,
help='Path to combined dataset to split.')
group.add_argument('--split', type=str, default='969, 30, 1',
help='Comma-separated list of proportions for training,'
' validation, and test split. For example the split '
'`90,5,5` will use 90% of data for training, 5% for '
'validation and 5% for test.')
group.add_argument('--vocab-file', type=str, default=None,
help='Path to the vocab file.')
group.add_argument('--merge-file', type=str, default=None,
help='Path to the BPE merge file.')
group.add_argument('--seq-length', type=int, default=None,
help="Maximum sequence length to process.")
group.add_argument('--mask-prob', type=float, default=0.15,
help='Probability of replacing a token with mask.')
group.add_argument('--short-seq-prob', type=float, default=0.1,
help='Probability of producing a short sequence.')
group.add_argument('--mmap-warmup', action='store_true',
help='Warm up mmap files.')
group.add_argument('--num-workers', type=int, default=2,
help="Dataloader number of workers.")
group.add_argument('--tokenizer-type', type=str,
default=None,
choices=['BertWordPieceLowerCase',
'GPT2BPETokenizer'],
help='What type of tokenizer to use.')
group.add_argument('--data-impl', type=str, default='infer',
choices=['lazy', 'cached', 'mmap', 'infer'],
help='Implementation of indexed datasets.')
group.add_argument('--reset-position-ids', action='store_true',
help='Reset posistion ids after end-of-document token.')
group.add_argument('--reset-attention-mask', action='store_true',
help='Reset self attention maske after '
'end-of-document token.')
group.add_argument('--eod-mask-loss', action='store_true',
help='Mask loss for the end of document tokens.')
group = parser.add_argument_group('data', 'data configurations') return parser
group.add_argument('--model-parallel-size', type=int, default=1,
help='size of the model parallel.')
group.add_argument('--shuffle', action='store_true',
help='Shuffle data. Shuffling is deterministic '
'based on seed and current epoch.')
group.add_argument('--data-loader', type=str, default=None,
choices=['raw', 'lazy', 'tfrecords', 'numpy', 'binary'],
help='Which data loader to use. Default varies by model.')
group.add_argument('--train-data', nargs='+', default=None, def _add_autoresume_args(parser):
help='Whitespace separated paths or corpora names ' group = parser.add_argument_group(title='autoresume')
'for training.')
group.add_argument('--valid-data', nargs='*', default=None,
help='path(s) to the validation data.')
group.add_argument('--test-data', nargs='*', default=None,
help='path(s) to the testing data.')
group.add_argument('--data-path', nargs='+', default=None,
help='path to combined dataset to split')
group.add_argument('--split', default='1000,1,1',
help='comma-separated list of proportions for training,'
' validation, and test split')
group.add_argument('--seq-length', type=int, default=512,
help="Maximum sequence length to process")
group.add_argument('--max-preds-per-seq', type=int, default=None,
help='Maximum number of predictions to use per sequence.'
'Defaults to math.ceil(`--seq-length`*.15/10)*10.'
'MUST BE SPECIFIED IF `--data-loader tfrecords`.')
# arguments for binary data loader
parser.add_argument('--vocab', type=str, default='vocab.txt',
help='path to vocab file')
parser.add_argument('--data-impl', type=str, default='infer',
help='implementation of indexed datasets',
choices=['lazy', 'cached', 'mmap', 'infer'])
parser.add_argument('--max-num-samples', type=int, default=None,
help='Maximum number of samples to plan for, defaults to total iters * batch-size.')
parser.add_argument('--data-epochs', type=int, default=None,
help='Number of epochs to plan for, defaults to using --max-num-samples')
parser.add_argument('--mask-prob', default=0.15, type=float,
help='probability of replacing a token with mask')
parser.add_argument('--short-seq-prob', default=0.1, type=float,
help='probability of producing a short sequence')
parser.add_argument('--skip-mmap-warmup', action='store_true',
help='skip warming up mmap files')
# arguments for numpy data loader
group.add_argument('--input-data-sizes-file', type=str, default='sizes.txt',
help='the filename containing all the shards sizes for numpy data loader')
# arguments for raw/tfrecords data loader
group.add_argument('--delim', default=',',
help='delimiter used to parse csv data files')
group.add_argument('--text-key', default='sentence',
help='key to use to extract text from json/csv')
group.add_argument('--eval-text-key', default=None,
help='key to use to extract text from '
'json/csv evaluation datasets')
group.add_argument('--loose-json', action='store_true',
help='Use loose json (one json-formatted string per '
'newline), instead of tight json (data file is one '
'json string)')
group.add_argument('--presplit-sentences', action='store_true',
help='Dataset content consists of documents where '
'each document consists of newline separated sentences')
group.add_argument('--num-workers', type=int, default=2, group.add_argument('--adlr-autoresume', action='store_true',
help="""Number of workers to use for dataloading""") help='Enable autoresume on adlr cluster.')
group.add_argument('--tokenizer-model-type', type=str, group.add_argument('--adlr-autoresume-interval', type=int, default=1000,
default='bert-large-uncased', help='Intervals over which check for autoresume'
help="Model type to use for sentencepiece tokenization \ 'termination signal')
(one of ['bpe', 'char', 'unigram', 'word']) or \
bert vocab to use for BertWordPieceTokenizer (one of \
['bert-large-uncased', 'bert-large-cased', etc.])")
group.add_argument('--tokenizer-path', type=str, default='tokenizer.model',
help='path used to save/load sentencepiece tokenization '
'models')
group.add_argument('--tokenizer-type', type=str,
default='BertWordPieceTokenizer',
choices=['CharacterLevelTokenizer',
'SentencePieceTokenizer',
'BertWordPieceTokenizer',
'GPT2BPETokenizer'],
help='what type of tokenizer to use')
group.add_argument("--cache-dir", default=None, type=str,
help="Where to store pre-trained BERT downloads")
return parser return parser
def get_args(): ########################################################################
"""Parse all the args."""
parser = argparse.ArgumentParser(description='PyTorch BERT Model')
parser = add_model_config_args(parser)
parser = add_fp16_config_args(parser)
parser = add_training_args(parser)
parser = add_evaluation_args(parser)
parser = add_text_generate_args(parser)
parser = add_data_args(parser)
args = parser.parse_args() def _add_gpt2_args(parser):
group = parser.add_argument_group(title='gpt2')
if not args.train_data and not args.data_path: group.add_argument('--input-data-sizes-file', type=str, default='sizes.txt',
print('WARNING: No training data specified') help='The filename containing all the shards '
'sizes for numpy data loader')
args.cuda = torch.cuda.is_available() return parser
args.rank = int(os.getenv('RANK', '0'))
args.world_size = int(os.getenv("WORLD_SIZE", '1'))
if os.getenv('OMPI_COMM_WORLD_LOCAL_RANK'):
# We are using (OpenMPI) mpirun for launching distributed data parallel processes
local_rank = int(os.getenv('OMPI_COMM_WORLD_LOCAL_RANK'))
local_size = int(os.getenv('OMPI_COMM_WORLD_LOCAL_SIZE'))
# Possibly running with Slurm def add_data_args_(parser):
num_nodes = int(os.getenv('SLURM_JOB_NUM_NODES', '1')) """Train/valid/test data arguments."""
nodeid = int(os.getenv('SLURM_NODEID', '0'))
args.local_rank = local_rank group = parser.add_argument_group('data', 'data configurations')
args.rank = nodeid*local_size + local_rank
args.world_size = num_nodes*local_size
args.model_parallel_size = min(args.model_parallel_size, args.world_size) group.add_argument('--data-loader', type=str, default=None,
if args.rank == 0: choices=['raw', 'lazy', 'tfrecords', 'numpy', 'binary'],
print('using world size: {} and model-parallel size: {} '.format( help='Which data loader to use. Default varies by model.')
args.world_size, args.model_parallel_size))
args.dynamic_loss_scale = False
if args.loss_scale is None:
args.dynamic_loss_scale = True
if args.rank == 0:
print(' > using dynamic loss scaling')
# The args fp32_* or fp16_* meant to be active when the return parser
# args fp16 is set. So the default behaviour should all
# be false.
if not args.fp16:
args.fp32_embedding = False
args.fp32_tokentypes = False
args.fp32_layernorm = False
return args
# coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Input/output checkpointing."""
import os
import random
import sys
import numpy as np
import torch
from torch.nn.parallel.distributed import DistributedDataParallel as torchDDP
from megatron import mpu
from megatron import get_args
from megatron import print_rank_0
def check_checkpoint_args(checkpoint_args):
"""Ensure fixed arguments for a model are the same for the input
arguments and the one retreived frm checkpoint."""
args = get_args()
def _compare(arg_name):
checkpoint_value = getattr(checkpoint_args, arg_name)
args_value = getattr(args, arg_name)
error_message = '{} value from checkpoint ({}) is not equal to the ' \
'input argument value ({}).'.format(
arg_name, checkpoint_value, args_value)
assert checkpoint_value == args_value, error_message
_compare('num_layers')
_compare('hidden_size')
_compare('num_attention_heads')
_compare('max_position_embeddings')
_compare('make_vocab_size_divisible_by')
_compare('padded_vocab_size')
_compare('tokenizer_type')
_compare('model_parallel_size')
def ensure_directory_exists(filename):
"""Build filename's path if it does not already exists."""
dirname = os.path.dirname(filename)
if not os.path.exists(dirname):
os.makedirs(dirname)
def get_checkpoint_name(checkpoints_path, iteration,
release=False, mp_rank=None):
"""A unified checkpoint name."""
if release:
directory = 'release'
else:
directory = 'iter_{:07d}'.format(iteration)
return os.path.join(checkpoints_path, directory,
'mp_rank_{:02d}'.format(
mpu.get_model_parallel_rank() if mp_rank is None \
else mp_rank),
'model_optim_rng.pt')
def get_checkpoint_tracker_filename(checkpoints_path):
"""Tracker file rescords the latest chckpoint during
training to restart from."""
return os.path.join(checkpoints_path, 'latest_checkpointed_iteration.txt')
def save_checkpoint(iteration, model, optimizer, lr_scheduler):
"""Save a model checkpoint."""
args = get_args()
# Only rank zero of the data parallel writes to the disk.
if isinstance(model, torchDDP):
model = model.module
if mpu.get_data_parallel_rank() == 0:
# Arguments, iteration, and model.
state_dict = {}
state_dict['args'] = args
state_dict['iteration'] = iteration
state_dict['model'] = model.state_dict_for_save_checkpoint()
# Optimizer stuff.
if not args.no_save_optim:
if optimizer is not None:
state_dict['optimizer'] = optimizer.state_dict()
if lr_scheduler is not None:
state_dict['lr_scheduler'] = lr_scheduler.state_dict()
# RNG states.
if not args.no_save_rng:
state_dict['random_rng_state'] = random.getstate()
state_dict['np_rng_state'] = np.random.get_state()
state_dict['torch_rng_state'] = torch.get_rng_state()
state_dict['cuda_rng_state'] = torch.cuda.get_rng_state()
state_dict['rng_tracker_states'] \
= mpu.get_cuda_rng_tracker().get_states()
# Save.
checkpoint_name = get_checkpoint_name(args.save, iteration)
print('global rank {} is saving checkpoint at iteration {:7d} to {}'.
format(torch.distributed.get_rank(), iteration, checkpoint_name))
ensure_directory_exists(checkpoint_name)
torch.save(state_dict, checkpoint_name)
print(' successfully saved {}'.format(checkpoint_name))
# Wait so everyone is done (necessary)
torch.distributed.barrier()
# And update the latest iteration
if torch.distributed.get_rank() == 0:
tracker_filename = get_checkpoint_tracker_filename(args.save)
with open(tracker_filename, 'w') as f:
f.write(str(iteration))
# Wait so everyone is done (not necessary)
torch.distributed.barrier()
def load_checkpoint(model, optimizer, lr_scheduler):
"""Load a model checkpoint and return the iteration."""
args = get_args()
if isinstance(model, torchDDP):
model = model.module
# Read the tracker file and set the iteration.
tracker_filename = get_checkpoint_tracker_filename(args.load)
# If no tracker file, return iretation zero.
if not os.path.isfile(tracker_filename):
print_rank_0('WARNING: could not find the metadata file {} '.format(
tracker_filename))
print_rank_0(' will not load any checkpoints and will start from '
'random')
return 0
# Otherwise, read the tracker file and either set the iteration or
# mark it as a release checkpoint.
iteration = 0
release = False
with open(tracker_filename, 'r') as f:
metastring = f.read().strip()
try:
iteration = int(metastring)
except ValueError:
release = metastring == 'release'
if not release:
print_rank_0('ERROR: Invalid metadata file {}. Exiting'.format(
tracker_filename))
sys.exit()
assert iteration > 0 or release, 'error parsing metadata file {}'.format(
tracker_filename)
# Checkpoint.
checkpoint_name = get_checkpoint_name(args.load, iteration, release)
if mpu.get_data_parallel_rank() == 0:
print('global rank {} is loading checkpoint {}'.format(
torch.distributed.get_rank(), checkpoint_name))
# Load the checkpoint.
try:
state_dict = torch.load(checkpoint_name, map_location='cpu')
except ModuleNotFoundError:
# For backward compatibility.
print_rank_0(' > deserializing using the old code structure ...')
sys.modules['fp16.loss_scaler'] = sys.modules[
'megatron.fp16.loss_scaler']
state_dict = torch.load(checkpoint_name, map_location='cpu')
sys.modules.pop('fp16.loss_scaler', None)
except:
print_rank_0('could not load the checkpoint')
sys.exit()
# Set iteration.
if args.finetune or release:
iteration = 0
else:
try:
iteration = state_dict['iteration']
except KeyError:
try: # Backward compatible with older checkpoints
iteration = state_dict['total_iters']
except KeyError:
print_rank_0('A metadata file exists but unable to load '
'iteration from checkpoint {}, exiting'.format(
checkpoint_name))
sys.exit()
# Check arguments.
if 'args' in state_dict:
checkpoint_args = state_dict['args']
check_checkpoint_args(checkpoint_args)
else:
print_rank_0('could not find arguments in the checkpoint ...')
# Model.
model.load_state_dict(state_dict['model'])
# Optimizer.
if not release and not args.finetune and not args.no_load_optim:
try:
if optimizer is not None:
optimizer.load_state_dict(state_dict['optimizer'])
if lr_scheduler is not None:
lr_scheduler.load_state_dict(state_dict['lr_scheduler'])
except KeyError:
print_rank_0('Unable to load optimizer from checkpoint {}. '
'Specify --no-load-optim or --finetune to prevent '
'attempting to load the optimizer state, '
'exiting ...'.format(checkpoint_name))
sys.exit()
# rng states.
if not release and not args.finetune and not args.no_load_rng:
try:
random.setstate(state_dict['random_rng_state'])
np.random.set_state(state_dict['np_rng_state'])
torch.set_rng_state(state_dict['torch_rng_state'])
torch.cuda.set_rng_state(state_dict['cuda_rng_state'])
mpu.get_cuda_rng_tracker().set_states(
state_dict['rng_tracker_states'])
except KeyError:
print_rank_0('Unable to load optimizer from checkpoint {}. '
'Specify --no-load-rng or --finetune to prevent '
'attempting to load the optimizer state, '
'exiting ...'.format(checkpoint_name))
sys.exit()
torch.distributed.barrier()
if mpu.get_data_parallel_rank() == 0:
print(' successfully loaded {}'.format(checkpoint_name))
return iteration
from . import indexed_dataset from . import indexed_dataset
from .bert_tokenization import FullTokenizer as FullBertTokenizer
from .albert_dataset import AlbertDataset
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
"""ALBERT Style dataset.""" """BERT Style dataset."""
import os import os
import time import time
...@@ -22,24 +22,19 @@ import numpy as np ...@@ -22,24 +22,19 @@ import numpy as np
import torch import torch
from torch.utils.data import Dataset from torch.utils.data import Dataset
from megatron import get_tokenizer
from megatron import mpu from megatron import mpu
from megatron.data import helpers from megatron.data import helpers
from megatron.data import FullBertTokenizer
from megatron.data.dataset_utils import build_training_sample from megatron.data.dataset_utils import build_training_sample
from megatron.data.indexed_dataset import make_dataset as make_indexed_dataset from megatron.data.indexed_dataset import make_dataset as make_indexed_dataset
from megatron.utils import print_rank_0 from megatron import print_rank_0
def build_train_valid_test_datasets(vocab_file, data_prefix, data_impl, def build_train_valid_test_datasets(data_prefix, data_impl, splits_string,
splits_string, train_valid_test_num_samples, train_valid_test_num_samples,
max_seq_length, masked_lm_prob, max_seq_length, masked_lm_prob,
short_seq_prob, seed, skip_warmup): short_seq_prob, seed, skip_warmup):
# Tokenizer is the same
tokenizer = FullBertTokenizer(vocab_file, do_lower_case=True)
print_rank_0(' > using full BERT tokenizer with vocabulary size: {}'.format(
tokenizer.vocab_size()))
# Indexed dataset. # Indexed dataset.
indexed_dataset = get_indexed_dataset_(data_prefix, indexed_dataset = get_indexed_dataset_(data_prefix,
data_impl, data_impl,
...@@ -79,10 +74,9 @@ def build_train_valid_test_datasets(vocab_file, data_prefix, data_impl, ...@@ -79,10 +74,9 @@ def build_train_valid_test_datasets(vocab_file, data_prefix, data_impl,
# New doc_idx view. # New doc_idx view.
indexed_dataset.set_doc_idx(doc_idx_ptr[start_index:end_index]) indexed_dataset.set_doc_idx(doc_idx_ptr[start_index:end_index])
# Build the dataset accordingly. # Build the dataset accordingly.
dataset = AlbertDataset( dataset = BertDataset(
name=name, name=name,
indexed_dataset=indexed_dataset, indexed_dataset=indexed_dataset,
tokenizer=tokenizer,
data_prefix=data_prefix, data_prefix=data_prefix,
num_epochs=None, num_epochs=None,
max_num_samples=train_valid_test_num_samples[index], max_num_samples=train_valid_test_num_samples[index],
...@@ -105,9 +99,9 @@ def build_train_valid_test_datasets(vocab_file, data_prefix, data_impl, ...@@ -105,9 +99,9 @@ def build_train_valid_test_datasets(vocab_file, data_prefix, data_impl,
return (train_dataset, valid_dataset, test_dataset) return (train_dataset, valid_dataset, test_dataset)
class AlbertDataset(Dataset): class BertDataset(Dataset):
def __init__(self, name, indexed_dataset, tokenizer, data_prefix, def __init__(self, name, indexed_dataset, data_prefix,
num_epochs, max_num_samples, masked_lm_prob, num_epochs, max_num_samples, masked_lm_prob,
max_seq_length, short_seq_prob, seed): max_seq_length, short_seq_prob, seed):
...@@ -117,8 +111,7 @@ class AlbertDataset(Dataset): ...@@ -117,8 +111,7 @@ class AlbertDataset(Dataset):
self.masked_lm_prob = masked_lm_prob self.masked_lm_prob = masked_lm_prob
self.max_seq_length = max_seq_length self.max_seq_length = max_seq_length
# Tokenizer and dataset. # Dataset.
self.tokenizer = tokenizer
self.indexed_dataset = indexed_dataset self.indexed_dataset = indexed_dataset
...@@ -133,16 +126,13 @@ class AlbertDataset(Dataset): ...@@ -133,16 +126,13 @@ class AlbertDataset(Dataset):
self.name) self.name)
# Vocab stuff. # Vocab stuff.
self.vocab_id_list = list(self.tokenizer.inv_vocab.keys()) tokenizer = get_tokenizer()
self.vocab_id_to_token_dict = self.tokenizer.inv_vocab self.vocab_id_list = list(tokenizer.inv_vocab.keys())
self.cls_id = self.tokenizer.vocab['[CLS]'] self.vocab_id_to_token_dict = tokenizer.inv_vocab
self.sep_id = self.tokenizer.vocab['[SEP]'] self.cls_id = tokenizer.cls
self.mask_id = self.tokenizer.vocab['[MASK]'] self.sep_id = tokenizer.sep
self.pad_id = self.tokenizer.vocab['[PAD]'] self.mask_id = tokenizer.mask
self.pad_id = tokenizer.pad
def num_tokens(self):
return self.tokenizer.vocab_size()
def __len__(self): def __len__(self):
......
...@@ -13,71 +13,15 @@ ...@@ -13,71 +13,15 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
"""GPT2 dataset."""
import json import json
import os import os
import numpy as np import numpy as np
import torch import torch
from torch.multiprocessing import Lock
from torch.utils.data import Dataset from torch.utils.data import Dataset
from megatron import mpu
from megatron.data_utils.samplers import DistributedBatchSampler
from megatron.data_utils.tokenization_gpt2 import GPT2Tokenizer
def make_gpt2_dataloaders(args):
# Input parameters.
input_data_sizes_file = args.input_data_sizes_file
seq_length = args.seq_length
initial_seed = args.seed
# Data parallel arguments.
world_size = mpu.get_data_parallel_world_size()
rank = mpu.get_data_parallel_rank()
global_batch_size = args.batch_size * world_size
num_workers = args.num_workers
def make_data_loader_(data_path):
# Build the dataset.
dataset = GPT2Dataset(data_path, input_data_sizes_file,
seq_length, initial_seed)
# Use a simple sampler with distributed batch sampler.
sampler = torch.utils.data.SequentialSampler(dataset)
batch_sampler = DistributedBatchSampler(sampler=sampler,
batch_size=global_batch_size,
drop_last=True,
rank=rank,
world_size=world_size)
# Torch dataloader.
return torch.utils.data.DataLoader(dataset,
batch_sampler=batch_sampler,
num_workers=num_workers,
pin_memory=True)
train = make_data_loader_(args.train_data)
valid = make_data_loader_(args.valid_data)
test = make_data_loader_(args.test_data)
args.do_train = False
args.do_valid = False
args.do_test = False
if train is not None:
args.do_train = True
if valid is not None:
args.do_valid = True
if test is not None:
args.do_test = True
# Tokenizer.
tokenizer = GPT2Tokenizer.from_pretrained('gpt2', cache_dir=args.cache_dir)
eod_token = tokenizer.encoder['<|endoftext|>']
num_tokens = eod_token + 1
return (train, valid, test), num_tokens, eod_token
class GPT2Dataset(Dataset): class GPT2Dataset(Dataset):
...@@ -89,8 +33,6 @@ class GPT2Dataset(Dataset): ...@@ -89,8 +33,6 @@ class GPT2Dataset(Dataset):
self.seq_length = seq_length self.seq_length = seq_length
self.initial_seed = initial_seed self.initial_seed = initial_seed
self.max_epochs = max_epochs self.max_epochs = max_epochs
# Lock for building the dataset.
self.lock = Lock()
# Shard stuff. # Shard stuff.
# Dictionary from shard nameto its size (number of element). # Dictionary from shard nameto its size (number of element).
...@@ -120,13 +62,11 @@ class GPT2Dataset(Dataset): ...@@ -120,13 +62,11 @@ class GPT2Dataset(Dataset):
# data index in the shard. # data index in the shard.
data_idx = idx - self.shards_start_index[shard_index] data_idx = idx - self.shards_start_index[shard_index]
# Load the shard if it is not in memory. # Load the shard if it is not in memory.
#self.lock.acquire()
if self.shards_data[shard_index] is None: if self.shards_data[shard_index] is None:
print('global rank {} is building data for shard index {} ...'. print('global rank {} is building data for shard index {} ...'.
format(torch.distributed.get_rank(), shard_index)) format(torch.distributed.get_rank(), shard_index))
self.build_dataset_(shard_index) self.build_dataset_(shard_index)
#assert self.shards_data[shard_index] is not None #assert self.shards_data[shard_index] is not None
#self.lock.release()
# Start index. # Start index.
start_index = self.shards_sample_index[shard_index][data_idx] start_index = self.shards_sample_index[shard_index][data_idx]
# Add one for label shift. # Add one for label shift.
...@@ -194,18 +134,3 @@ class GPT2Dataset(Dataset): ...@@ -194,18 +134,3 @@ class GPT2Dataset(Dataset):
size = self.shard_size_dict[shard] size = self.shard_size_dict[shard]
self.shards_start_index[i] = self.shards_start_index[i-1] + \ self.shards_start_index[i] = self.shards_start_index[i-1] + \
size // self.seq_length size // self.seq_length
'''
if __name__ == '__main__':
print('gpt2 data loader ...')
path = '/raid/mshoeybi/data/gpt2/adlr/reddit_all_ftfy_lg200/npys'
dataset = GPT2Dataset(path, 'sizes.txt', 1024, 1234, 100)
print('dataset contains {} samples'.format(dataset.data_length))
for i in range(len(dataset)):
if i % 512000 == 0:
print(i)
data = dataset[i]
'''
...@@ -18,7 +18,7 @@ from itertools import accumulate ...@@ -18,7 +18,7 @@ from itertools import accumulate
import numpy as np import numpy as np
import torch import torch
from megatron.utils import print_rank_0 from megatron import print_rank_0
def __best_fitting_dtype(vocab_size=None): def __best_fitting_dtype(vocab_size=None):
if vocab_size is not None and vocab_size < 65500: if vocab_size is not None and vocab_size < 65500:
......
# coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Batch samplers that work with either random or sequential data samplers."""
import torch
from torch.utils import data
class RandomSampler(data.sampler.Sampler):
"""Based off of pytorch RandomSampler and DistributedSampler. Essentially
a RandomSampler, but this class lets the user set an epoch like
DistributedSampler Samples elements randomly. If without replacement, then
sample from a shuffled dataset. If with replacement, then user can
specify ``num_samples`` to draw.
Arguments:
data_source (Dataset): dataset to sample from
num_samples (int): number of samples to draw, default=len(dataset)
replacement (bool): samples are drawn with replacement if ``True``,
default=False
"""
def __init__(self, data_source, replacement=False, num_samples=None):
self.data_source = data_source
self.replacement = replacement
self._num_samples = num_samples
self.epoch = -1
if self._num_samples is not None and replacement is False:
raise ValueError("With replacement=False, num_samples should not "
"be specified, since a random permute will be "
"performed.")
if not isinstance(self.num_samples, int) or self.num_samples <= 0:
raise ValueError("num_samples should be a positive integer "
"value, but got num_samples={}".format(
self.num_samples))
if not isinstance(self.replacement, bool):
raise ValueError("replacement should be a boolean value, but got "
"replacement={}".format(self.replacement))
@property
def num_samples(self):
# dataset size might change at runtime
if self._num_samples is None:
return len(self.data_source)
return self._num_samples
def __iter__(self):
n = len(self.data_source)
g = torch.Generator()
if self.epoch >= 0:
g.manual_seed(self.epoch)
if self.replacement:
return iter(torch.randint(high=n, size=(self.num_samples,),
dtype=torch.int64, generator=g).tolist())
return iter(torch.randperm(n, generator=g).tolist())
def __len__(self):
return self.num_samples
def set_epoch(self, epoch):
self.epoch = epoch
class DistributedBatchSampler(data.sampler.BatchSampler):
"""Similar to normal implementation of distributed sampler, except
implementation is at the batch sampler level, instead of just the
sampler level. This allows wrapping of arbitrary data samplers
(sequential, random, WeightedRandomSampler, etc.) with this batch
sampler."""
def __init__(self, sampler, batch_size, drop_last, rank=-1,
world_size=2, wrap_last=False):
super(DistributedBatchSampler, self).__init__(sampler, batch_size,
drop_last)
if rank == -1:
assert False, 'should not be here'
rank = torch.distributed.get_rank()
self.rank = rank
self.world_size = world_size
self.sampler.wrap_around = 0
self.wrap_around = 0
self.wrap_last = wrap_last
self.start_iter = 0
def __iter__(self):
batch = []
i = 0
for idx in self.data_iterator(self.sampler, wrap_around=False):
batch.append(idx)
if len(batch) == self.batch_size:
tbatch = self._batch(batch)
if i >= self.start_iter:
yield tbatch
self.start_iter = 0
i += 1
batch = []
batch_len = len(batch)
if batch_len > 0 and not self.drop_last:
if self.wrap_last:
self.sampler.wrap_around -= (self.batch_size)
self.wrap_around += (len(batch))
self.wrap_around %= self.batch_size
yield self._batch(batch)
if self.wrap_last:
self.sampler.wrap_around += self.batch_size
def data_iterator(self, _iter, wrap_around=False):
"""iterates through data and handles wrap around"""
for i, idx in enumerate(_iter):
if i < self.wrap_around%self.batch_size:
continue
if wrap_around:
self.wrap_around += 1
self.wrap_around %= self.batch_size
yield idx
def _batch(self, batch):
"""extracts samples only pertaining to this worker's batch"""
start = self.rank*self.batch_size//self.world_size
end = (self.rank+1)*self.batch_size//self.world_size
return batch[start:end]
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment