Commit 22c0e300 authored by Jared Casper's avatar Jared Casper
Browse files

Merge branch 'staging' into 'master'

April 2020 Release

See merge request ADLR/megatron-lm!69
parents 70174ae3 acf8780b
# coding=utf-8 # coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved. # Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
...@@ -13,140 +13,113 @@ ...@@ -13,140 +13,113 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
"""Pretrain utilities""" """Pretrain utilities."""
from datetime import datetime from datetime import datetime
import math import math
import sys
import torch import torch
from torch.nn.parallel.distributed import DistributedDataParallel as torchDDP from torch.nn.parallel.distributed import DistributedDataParallel as torchDDP
from apex.optimizers import FusedAdam as Adam from apex.optimizers import FusedAdam as Adam
from arguments import get_args from megatron import get_args
from megatron import get_timers
from megatron import get_tensorboard_writer
from megatron import mpu from megatron import mpu
from megatron import print_rank_0
from megatron.checkpointing import load_checkpoint
from megatron.checkpointing import save_checkpoint
from megatron.fp16 import FP16_Module from megatron.fp16 import FP16_Module
from megatron.fp16 import FP16_Optimizer from megatron.fp16 import FP16_Optimizer
from megatron.initialize import initialize_megatron
from megatron.learning_rates import AnnealingLR from megatron.learning_rates import AnnealingLR
from megatron.model import DistributedDataParallel as LocalDDP from megatron.model import DistributedDataParallel as LocalDDP
from megatron.model import get_params_for_weight_decay_optimization from megatron.model import get_params_for_weight_decay_optimization
from megatron.utils import check_adlr_autoresume_termination from megatron.utils import check_adlr_autoresume_termination
from megatron.utils import enable_adlr_autoresume from megatron.utils import make_data_loader
from megatron.utils import get_tensorboard_writer
from megatron.utils import initialize_distributed
from megatron.utils import load_checkpoint
from megatron.utils import print_args
from megatron.utils import print_rank_0
from megatron.utils import report_memory from megatron.utils import report_memory
from megatron.utils import save_checkpoint
from megatron.utils import set_random_seed
from megatron.utils import Timers
def run(top_level_message, train_val_test_data_provider, def pretrain(train_valid_test_dataset_provider, model_provider,
model_provider, forward_step_func): forward_step_func, extra_args_provider=None, args_defaults={}):
"""Main training program. """Main training program.
This function will run the followings in the order provided: This function will run the followings in the order provided:
1) get input arguments. 1) initialize Megatron.
2) initialize distributed and seeds. 2) setup model, optimizer and lr schedule using the model_provider.
3) call train_val_test_data_provider to get train/val/test datasets. 3) call train_val_test_data_provider to get train/val/test datasets.
4) setup model, optimizer and lr schedule using the model_provider. 4) train the modle using the forward_step_func.
5) train the modle using the forward_step_func.
Arguments: Arguments:
top_level_message: a meesage to print at the top of the run. train_valid_test_dataset_provider: a function that takes the size of
train_val_test_data_provider: a function that takes `args` as input train/valid/test dataset and returns `train, valid, test` datasets.
and returns `train, val, test` dataloaders. Note that args are model_provider: a function that returns a vanilla version of the
passed and can be modified in case we need to use some parameters model. By vanilla we mean a simple model on cpu with no fp16 or ddp.
later. For example, we can set vocab size using forward_step_func: a function that takes a `data iterator` and `model`,
args.vocab_size = ... and returns a `loss` scalar with a dictionary with key:values being
and later use this value in `model_provider`. the info we would like to monitor during training, for example
model_provider: a function that takes `args` and returns a vanilla `lm-loss: value`. We also require that this function add
version of the model. By vanilla we mean a simple model on cpu `batch generator` to the timers class.
with no fp16 or ddp. extra_args_provider: a function that takes a parser and adds arguments
forward_step_func: a function that takes a `data iterator`, `model`, to it. It is used for programs to add their own arguments.
`args`, and `timers` and returns a `loss` scalar with a dictionary args_defaults: a dictionary from argument-name to argument-value. It
with key:values being the info we would like to monitor during to set already parse arguments.
training, for example `lm-loss: value`. We also require that this
function add `batch generator` to the timers class.
""" """
# Arguments. # Initalize and get arguments, timers, and Tensorboard writer.
initialize_megatron(extra_args_provider=extra_args_provider,
args_defaults=args_defaults)
args = get_args() args = get_args()
timers = get_timers()
# Timer. # Model, optimizer, and learning rate.
timers = Timers() timers('model and optimizer').start()
model, optimizer, lr_scheduler = setup_model_and_optimizer(model_provider)
# Tensorboard writer timers('model and optimizer').stop()
writer = get_tensorboard_writer(args)
# Initalize.
initialize_megatron(top_level_message, args, writer)
# Data stuff. # Data stuff.
train_data, val_data, test_data = train_val_test_data_provider(args) timers('train/valid/test data iterators').start()
train_data_iterator, valid_data_iterator, test_data_iterator \
= build_train_valid_test_data_iterators(
train_valid_test_dataset_provider)
timers('train/valid/test data iterators').stop()
# Model, optimizer, and learning rate. # Print setup timing.
model, optimizer, lr_scheduler = setup_model_and_optimizer(model_provider, print_rank_0('done with setups ...')
args) timers.log(['model and optimizer', 'train/valid/test data iterators'])
print_rank_0('training ...')
# Train, validation, and test data.
train_data_iterator, val_data_iterator, \
test_data_iterator = get_train_val_test_data_iterators(train_data,
val_data,
test_data,
args)
iteration = 0 iteration = 0
if args.train_iters > 0: if args.do_train and args.train_iters > 0:
if args.do_train: if args.do_train:
iteration, _ = train(forward_step_func, model, iteration, _ = train(forward_step_func,
optimizer, lr_scheduler, model, optimizer, lr_scheduler,
train_data_iterator, val_data_iterator, train_data_iterator, valid_data_iterator)
timers, args, writer)
if args.do_valid: if args.do_valid:
prefix = 'the end of training for val data' prefix = 'the end of training for val data'
evaluate_and_print_results(prefix, forward_step_func, evaluate_and_print_results(prefix, forward_step_func,
val_data_iterator, model, valid_data_iterator, model,
args, writer, iteration, iteration, False)
timers, False)
if args.save and iteration != 0: if args.save and iteration != 0:
save_checkpoint(iteration, model, optimizer, save_checkpoint(iteration, model, optimizer, lr_scheduler)
lr_scheduler, args)
if args.do_test: if args.do_test:
# Run on test data. # Run on test data.
prefix = 'the end of training for test data' prefix = 'the end of training for test data'
evaluate_and_print_results(prefix, forward_step_func, evaluate_and_print_results(prefix, forward_step_func,
test_data_iterator, model, test_data_iterator, model,
args, None, 0, timers, True) 0, True)
def initialize_megatron(message, args, writer):
""""Initialize distributed, random seed, and autoresume."""
# Pytorch distributed.
initialize_distributed(args)
if torch.distributed.get_rank() == 0:
print(message, flush=True)
print_args(args, writer)
# Autoresume. def get_model(model_provider_func):
torch.distributed.barrier()
if args.adlr_autoresume:
enable_adlr_autoresume(args)
# Random seeds for reproducability.
set_random_seed(args.seed)
def get_model(model_provider_func, args):
"""Build the model.""" """Build the model."""
args = get_args()
# Build model on cpu. # Build model on cpu.
model = model_provider_func(args) model = model_provider_func()
# Print number of parameters. # Print number of parameters.
if mpu.get_data_parallel_rank() == 0: if mpu.get_data_parallel_rank() == 0:
...@@ -164,26 +137,23 @@ def get_model(model_provider_func, args): ...@@ -164,26 +137,23 @@ def get_model(model_provider_func, args):
# Wrap model for distributed training.""" # Wrap model for distributed training."""
if args.DDP_impl == 'torch': if args.DDP_impl == 'torch':
i = torch.cuda.current_device() i = torch.cuda.current_device()
args.DDP_type = torchDDP model = torchDDP(model, device_ids=[i], output_device=i,
model = args.DDP_type(model, device_ids=[i], output_device=i, process_group=mpu.get_data_parallel_group())
process_group=mpu.get_data_parallel_group())
return model return model
if args.DDP_impl == 'local': if args.DDP_impl == 'local':
args.DDP_type = LocalDDP model = LocalDDP(model)
model = args.DDP_type(model)
return model return model
print_rank_0('Unknown DDP implementation specified: {}. ' raise NotImplementedError('Unknown DDP implementation specified: {}. '
'Exiting.'.format(args.DDP_impl)) 'Exiting.'.format(args.DDP_impl))
exit()
return model
def get_optimizer(model, args): def get_optimizer(model):
"""Set up the optimizer.""" """Set up the optimizer."""
args = get_args()
# Build parameter groups (weight decay and non-decay). # Build parameter groups (weight decay and non-decay).
while isinstance(model, (args.DDP_type, FP16_Module)): while isinstance(model, (torchDDP, LocalDDP, FP16_Module)):
model = model.module model = model.module
param_groups = get_params_for_weight_decay_optimization(model) param_groups = get_params_for_weight_decay_optimization(model)
...@@ -194,8 +164,7 @@ def get_optimizer(model, args): ...@@ -194,8 +164,7 @@ def get_optimizer(model, args):
param.model_parallel = False param.model_parallel = False
# Use Adam. # Use Adam.
optimizer = Adam(param_groups, optimizer = Adam(param_groups, lr=args.lr, weight_decay=args.weight_decay)
lr=args.lr, weight_decay=args.weight_decay)
# Wrap into fp16 optimizer. # Wrap into fp16 optimizer.
if args.fp16: if args.fp16:
...@@ -204,14 +173,15 @@ def get_optimizer(model, args): ...@@ -204,14 +173,15 @@ def get_optimizer(model, args):
dynamic_loss_scale=args.dynamic_loss_scale, dynamic_loss_scale=args.dynamic_loss_scale,
dynamic_loss_args={ dynamic_loss_args={
'scale_window': args.loss_scale_window, 'scale_window': args.loss_scale_window,
'min_scale':args.min_scale, 'min_scale': args.min_scale,
'delayed_shift': args.hysteresis}) 'delayed_shift': args.hysteresis})
return optimizer return optimizer
def get_learning_rate_scheduler(optimizer, args): def get_learning_rate_scheduler(optimizer):
"""Build the learning rate scheduler.""" """Build the learning rate scheduler."""
args = get_args()
# Add linear learning rate scheduler. # Add linear learning rate scheduler.
if args.lr_decay_iters is not None: if args.lr_decay_iters is not None:
...@@ -219,13 +189,13 @@ def get_learning_rate_scheduler(optimizer, args): ...@@ -219,13 +189,13 @@ def get_learning_rate_scheduler(optimizer, args):
else: else:
num_iters = args.train_iters num_iters = args.train_iters
num_iters = max(1, num_iters) num_iters = max(1, num_iters)
init_step = -1 init_step = 0
warmup_iter = args.warmup * num_iters warmup_iter = args.warmup * num_iters
lr_scheduler = AnnealingLR( lr_scheduler = AnnealingLR(
optimizer, optimizer,
start_lr=args.lr, start_lr=args.lr,
warmup_iter=warmup_iter, warmup_iter=warmup_iter,
num_iters=num_iters, total_iters=num_iters,
decay_style=args.lr_decay_style, decay_style=args.lr_decay_style,
last_iter=init_step, last_iter=init_step,
min_lr=args.min_lr, min_lr=args.min_lr,
...@@ -235,26 +205,29 @@ def get_learning_rate_scheduler(optimizer, args): ...@@ -235,26 +205,29 @@ def get_learning_rate_scheduler(optimizer, args):
return lr_scheduler return lr_scheduler
def setup_model_and_optimizer(model_provider_func, args): def setup_model_and_optimizer(model_provider_func):
"""Setup model and optimizer.""" """Setup model and optimizer."""
args = get_args()
model = get_model(model_provider_func, args) model = get_model(model_provider_func)
optimizer = get_optimizer(model, args) optimizer = get_optimizer(model)
lr_scheduler = get_learning_rate_scheduler(optimizer, args) lr_scheduler = get_learning_rate_scheduler(optimizer)
if args.load is not None: if args.load is not None:
args.iteration = load_checkpoint(model, optimizer, lr_scheduler, args) args.iteration = load_checkpoint(model, optimizer, lr_scheduler)
else: else:
args.iteration = 0 args.iteration = 0
return model, optimizer, lr_scheduler return model, optimizer, lr_scheduler
def backward_step(optimizer, model, loss, args, timers): def backward_step(optimizer, model, loss):
"""Backward step.""" """Backward step."""
args = get_args()
timers = get_timers()
# Backward pass. # Backward pass.
optimizer.zero_grad() optimizer.zero_grad(set_grads_to_None=True)
if args.fp16: if args.fp16:
optimizer.backward(loss, update_master_grads=False) optimizer.backward(loss, update_master_grads=False)
else: else:
...@@ -279,18 +252,20 @@ def backward_step(optimizer, model, loss, args, timers): ...@@ -279,18 +252,20 @@ def backward_step(optimizer, model, loss, args, timers):
optimizer.clip_master_grads(args.clip_grad) optimizer.clip_master_grads(args.clip_grad)
def train_step(forward_step_func, data_iterator, model, optimizer, lr_scheduler, def train_step(forward_step_func, data_iterator,
args, timers): model, optimizer, lr_scheduler):
"""Single training step.""" """Single training step."""
args = get_args()
timers = get_timers()
# Forward model for one step. # Forward model for one step.
timers('forward').start() timers('forward').start()
loss, loss_reduced = forward_step_func(data_iterator, model, args, timers) loss, loss_reduced = forward_step_func(data_iterator, model)
timers('forward').stop() timers('forward').stop()
# Calculate gradients, reduce across processes, and clip. # Calculate gradients, reduce across processes, and clip.
timers('backward').start() timers('backward').start()
backward_step(optimizer, model, loss, args, timers) backward_step(optimizer, model, loss)
timers('backward').stop() timers('backward').stop()
# Update parameters. # Update parameters.
...@@ -309,7 +284,11 @@ def train_step(forward_step_func, data_iterator, model, optimizer, lr_scheduler, ...@@ -309,7 +284,11 @@ def train_step(forward_step_func, data_iterator, model, optimizer, lr_scheduler,
def training_log(loss_dict, total_loss_dict, learning_rate, iteration, def training_log(loss_dict, total_loss_dict, learning_rate, iteration,
loss_scale, report_memory_flag, writer, args, timers): loss_scale, report_memory_flag):
"""Log training information such as losses, timing, ...."""
args = get_args()
timers = get_timers()
writer = get_tensorboard_writer()
# Update losses. # Update losses.
for key in loss_dict: for key in loss_dict:
...@@ -317,6 +296,7 @@ def training_log(loss_dict, total_loss_dict, learning_rate, iteration, ...@@ -317,6 +296,7 @@ def training_log(loss_dict, total_loss_dict, learning_rate, iteration,
# Logging. # Logging.
timers_to_log = [] timers_to_log = []
def add_to_logging(name): def add_to_logging(name):
if name in timers.timers: if name in timers.timers:
timers_to_log.append(name) timers_to_log.append(name)
...@@ -365,8 +345,10 @@ def training_log(loss_dict, total_loss_dict, learning_rate, iteration, ...@@ -365,8 +345,10 @@ def training_log(loss_dict, total_loss_dict, learning_rate, iteration,
def train(forward_step_func, model, optimizer, lr_scheduler, def train(forward_step_func, model, optimizer, lr_scheduler,
train_data_iterator, val_data_iterator, timers, args, writer): train_data_iterator, valid_data_iterator):
"""Train the model function.""" """Train the model function."""
args = get_args()
timers = get_timers()
# Turn on training mode which enables dropout. # Turn on training mode which enables dropout.
model.train() model.train()
...@@ -385,51 +367,52 @@ def train(forward_step_func, model, optimizer, lr_scheduler, ...@@ -385,51 +367,52 @@ def train(forward_step_func, model, optimizer, lr_scheduler,
train_data_iterator, train_data_iterator,
model, model,
optimizer, optimizer,
lr_scheduler, lr_scheduler)
args, timers)
skipped_iters += skipped_iter skipped_iters += skipped_iter
iteration += 1 iteration += 1
# Logging. # Logging.
loss_scale = None
if args.fp16:
loss_scale = optimizer.loss_scale
report_memory_flag = training_log(loss_dict, total_loss_dict, report_memory_flag = training_log(loss_dict, total_loss_dict,
optimizer.param_groups[0]['lr'], optimizer.param_groups[0]['lr'],
iteration, optimizer.loss_scale, iteration, loss_scale,
report_memory_flag, writer, args, report_memory_flag)
timers)
# Autoresume # Autoresume
if (iteration % args.adlr_autoresume_interval == 0) and \ if args.adlr_autoresume and \
args.adlr_autoresume: (iteration % args.adlr_autoresume_interval == 0):
check_adlr_autoresume_termination(iteration, model, optimizer, check_adlr_autoresume_termination(iteration, model, optimizer,
lr_scheduler, args) lr_scheduler)
# Checkpointing # Checkpointing
if args.save and args.save_interval and \ if args.save and args.save_interval and \
iteration % args.save_interval == 0: iteration % args.save_interval == 0:
save_checkpoint(iteration, model, optimizer, lr_scheduler, args) save_checkpoint(iteration, model, optimizer, lr_scheduler)
# Evaluation # Evaluation
if args.eval_interval and iteration % args.eval_interval == 0 and \ if args.eval_interval and iteration % args.eval_interval == 0 and \
args.do_valid: args.do_valid:
prefix = 'iteration {}'.format(iteration) prefix = 'iteration {}'.format(iteration)
evaluate_and_print_results(prefix, forward_step_func, evaluate_and_print_results(prefix, forward_step_func,
val_data_iterator, model, args, valid_data_iterator, model,
writer, iteration, timers, False) iteration, False)
if args.exit_interval and iteration % args.exit_interval == 0: if args.exit_interval and iteration % args.exit_interval == 0:
torch.distributed.barrier() torch.distributed.barrier()
time_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S') time_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
rank = torch.distributed.get_rank() rank = torch.distributed.get_rank()
print('rank: {} | time: {} | exiting the program at iteration {}'. print_rank_0('rank: {} | time: {} | exiting the program at '
format(rank, time_str, iteration), flush=True) 'iteration {}'.format(rank, time_str, iteration))
exit() sys.exit()
return iteration, skipped_iters return iteration, skipped_iters
def evaluate(forward_step_func, data_iterator, model, def evaluate(forward_step_func, data_iterator, model, verbose=False):
args, timers, verbose=False):
"""Evaluation.""" """Evaluation."""
args = get_args()
# Turn on evaluation mode which disables dropout. # Turn on evaluation mode which disables dropout.
model.eval() model.eval()
...@@ -444,12 +427,11 @@ def evaluate(forward_step_func, data_iterator, model, ...@@ -444,12 +427,11 @@ def evaluate(forward_step_func, data_iterator, model,
print_rank_0('Evaluating iter {}/{}'.format(iteration, print_rank_0('Evaluating iter {}/{}'.format(iteration,
args.eval_iters)) args.eval_iters))
# Forward evaluation. # Forward evaluation.
_, loss_dict = forward_step_func(data_iterator, model, _, loss_dict = forward_step_func(data_iterator, model)
args, timers)
# Reduce across processes. # Reduce across processes.
for key in loss_dict: for key in loss_dict:
total_loss_dict[key] = total_loss_dict.get(key, 0.) + \ total_loss_dict[key] = total_loss_dict.get(key, 0.) + \
loss_dict[key] loss_dict[key]
# Move model back to the train mode. # Move model back to the train mode.
model.train() model.train()
...@@ -461,11 +443,11 @@ def evaluate(forward_step_func, data_iterator, model, ...@@ -461,11 +443,11 @@ def evaluate(forward_step_func, data_iterator, model,
def evaluate_and_print_results(prefix, forward_step_func, def evaluate_and_print_results(prefix, forward_step_func,
data_iterator, model, data_iterator, model,
args, writer, iteration, iteration, verbose=False):
timers, verbose=False):
"""Helper function to evaluate and dump results on screen.""" """Helper function to evaluate and dump results on screen."""
total_loss_dict = evaluate(forward_step_func, data_iterator, model, writer = get_tensorboard_writer()
args, timers, verbose)
total_loss_dict = evaluate(forward_step_func, data_iterator, model, verbose)
string = ' validation loss at {} | '.format(prefix) string = ' validation loss at {} | '.format(prefix)
for key in total_loss_dict: for key in total_loss_dict:
string += '{} value: {:.6E} | '.format(key, total_loss_dict[key].item()) string += '{} value: {:.6E} | '.format(key, total_loss_dict[key].item())
...@@ -483,37 +465,87 @@ def evaluate_and_print_results(prefix, forward_step_func, ...@@ -483,37 +465,87 @@ def evaluate_and_print_results(prefix, forward_step_func,
print_rank_0('-' * length) print_rank_0('-' * length)
def get_train_val_test_data_iterators(train_data, val_data, test_data, args): def build_train_valid_test_data_iterators(
"""Build train/validation/test iterators""" build_train_valid_test_datasets_provider):
"""XXX"""
# If resume is on, shift the start iterations. args = get_args()
if args.resume_dataloader:
if train_data is not None: (train_dataloader, valid_dataloader, test_dataloader) = (None, None, None)
train_data.batch_sampler.start_iter = args.iteration % \
len(train_data) print_rank_0('> building train, validation, and test datasets ...')
print_rank_0('setting training data start iteration to {}'. # Data loader only on rank 0 of each model parallel group.
format(train_data.batch_sampler.start_iter)) if mpu.get_model_parallel_rank() == 0:
if val_data is not None: # Rank, size, and global batch size.
start_iter_val = (args.iteration // args.eval_interval) * \ data_parallel_size = mpu.get_data_parallel_world_size()
args.eval_iters global_batch_size = args.batch_size * data_parallel_size
val_data.batch_sampler.start_iter = start_iter_val % \
len(val_data) # Number of train/valid/test samples.
print_rank_0('setting validation data start iteration to {}'. train_iters = args.train_iters
format(val_data.batch_sampler.start_iter)) eval_iters = (train_iters // args.eval_interval + 1) * args.eval_iters
test_iters = args.eval_iters
if train_data is not None: train_val_test_num_samples = [train_iters * global_batch_size,
train_data_iterator = iter(train_data) eval_iters * global_batch_size,
test_iters * global_batch_size]
print_rank_0(' > datasets target sizes (minimum size):')
print_rank_0(' train: {}'.format(train_val_test_num_samples[0]))
print_rank_0(' validation: {}'.format(train_val_test_num_samples[1]))
print_rank_0(' test: {}'.format(train_val_test_num_samples[2]))
# Build the datasets.
train_ds, valid_ds, test_ds = build_train_valid_test_datasets_provider(
train_val_test_num_samples)
# Build dataloders.
train_dataloader = make_data_loader(train_ds)
valid_dataloader = make_data_loader(valid_ds)
test_dataloader = make_data_loader(test_ds)
# Flags to know if we need to do training/validation/testing.
do_train = train_dataloader is not None and args.train_iters > 0
do_valid = valid_dataloader is not None and args.eval_iters > 0
do_test = test_dataloader is not None and args.eval_iters > 0
# Need to broadcast num_tokens and num_type_tokens.
flags = torch.cuda.LongTensor(
[int(do_train), int(do_valid), int(do_test)])
else:
flags = torch.cuda.LongTensor([0, 0, 0])
# Broadcast num tokens.
torch.distributed.broadcast(flags,
mpu.get_model_parallel_src_rank(),
group=mpu.get_model_parallel_group())
args.do_train = flags[0].item()
args.do_valid = flags[1].item()
args.do_test = flags[2].item()
# Shift the start iterations.
if train_dataloader is not None:
train_dataloader.batch_sampler.start_iter = args.iteration % \
len(train_dataloader)
print_rank_0('setting training data start iteration to {}'.
format(train_dataloader.batch_sampler.start_iter))
if valid_dataloader is not None:
start_iter_val = (args.iteration // args.eval_interval) * \
args.eval_iters
valid_dataloader.batch_sampler.start_iter = start_iter_val % \
len(valid_dataloader)
print_rank_0('setting validation data start iteration to {}'.
format(valid_dataloader.batch_sampler.start_iter))
# Build iterators.
if train_dataloader is not None:
train_data_iterator = iter(train_dataloader)
else: else:
train_data_iterator = None train_data_iterator = None
if val_data is not None: if valid_dataloader is not None:
val_data_iterator = iter(val_data) valid_data_iterator = iter(valid_dataloader)
else: else:
val_data_iterator = None valid_data_iterator = None
if test_data is not None: if test_dataloader is not None:
test_data_iterator = iter(test_data) test_data_iterator = iter(test_dataloader)
else: else:
test_data_iterator = None test_data_iterator = None
return train_data_iterator, val_data_iterator, test_data_iterator return train_data_iterator, valid_data_iterator, test_data_iterator
# coding=utf-8 # coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved. # Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
...@@ -13,22 +13,106 @@ ...@@ -13,22 +13,106 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
"""Utilities for logging and serialization""" """General utilities."""
import os import sys
import random
import time
import numpy as np
import torch
from torch.nn.parallel.distributed import DistributedDataParallel as torchDDP
from apex.optimizers import FusedAdam as Adam import torch
from megatron import get_args
from megatron import get_adlr_autoresume
from megatron import mpu from megatron import mpu
from megatron.fp16 import FP16_Module from megatron import print_rank_0
from megatron.checkpointing import save_checkpoint
from megatron.data.samplers import DistributedBatchSampler
from megatron.fp16 import FP16_Optimizer from megatron.fp16 import FP16_Optimizer
from megatron.model import DistributedDataParallel as LocalDDP
from megatron.model import get_params_for_weight_decay_optimization
def reduce_losses(losses):
"""Reduce a tensor of losses across all GPUs."""
reduced_losses = torch.cat(
[loss.clone().detach().view(1) for loss in losses])
torch.distributed.all_reduce(reduced_losses)
reduced_losses = reduced_losses / torch.distributed.get_world_size()
return reduced_losses
def report_memory(name):
"""Simple GPU memory report."""
mega_bytes = 1024.0 * 1024.0
string = name + ' memory (MB)'
string += ' | allocated: {}'.format(
torch.cuda.memory_allocated() / mega_bytes)
string += ' | max allocated: {}'.format(
torch.cuda.max_memory_allocated() / mega_bytes)
string += ' | cached: {}'.format(torch.cuda.memory_cached() / mega_bytes)
string += ' | max cached: {}'.format(
torch.cuda.max_memory_cached() / mega_bytes)
print_rank_0(string)
def print_params_min_max_norm(optimizer, iteration):
"""Print min, max, and norm of all parameters."""
index = 0
rank = torch.distributed.get_rank()
string = 'iteration, rank, index, model-parallel,min, max, norm\n'
optimizer_ = optimizer
if isinstance(optimizer, FP16_Optimizer):
optimizer_ = optimizer.optimizer
for param_group in optimizer_.param_groups:
for param in param_group['params']:
index += 1
min_ = param.data.min()
max_ = param.data.max()
norm = param.data.norm()
string += '{:7d}, {:4d}, {:4d}, {:2d}, '.format(
iteration, rank, index, int(param.model_parallel))
string += '{:.6E}, {:.6E}, {:.6E}\n'.format(min_, max_, norm)
print(string, flush=True)
def check_adlr_autoresume_termination(iteration, model,
optimizer, lr_scheduler):
"""Check for autoresume signal and exit if it is received."""
args = get_args()
autoresume = get_adlr_autoresume()
# Add barrier to ensure consistnecy.
torch.distributed.barrier()
if autoresume.termination_requested():
if args.save:
save_checkpoint(iteration, model, optimizer, lr_scheduler)
print_rank_0(">>> autoresume termination request found!")
if torch.distributed.get_rank() == 0:
autoresume.request_resume()
print_rank_0(">>> training terminated. Returning")
sys.exit(0)
def make_data_loader(dataset):
"""Buld dataloader given an input dataset."""
if dataset is None:
return None
args = get_args()
# Data parallel arguments.
world_size = mpu.get_data_parallel_world_size()
rank = mpu.get_data_parallel_rank()
global_batch_size = args.batch_size * world_size
num_workers = args.num_workers
# Use a simple sampler with distributed batch sampler.
sampler = torch.utils.data.SequentialSampler(dataset)
batch_sampler = DistributedBatchSampler(sampler=sampler,
batch_size=global_batch_size,
drop_last=True,
rank=rank,
world_size=world_size)
# Torch dataloader.
return torch.utils.data.DataLoader(dataset,
batch_sampler=batch_sampler,
num_workers=num_workers,
pin_memory=True)
def get_ltor_masks_and_position_ids(data, def get_ltor_masks_and_position_ids(data,
...@@ -79,474 +163,13 @@ def get_ltor_masks_and_position_ids(data, ...@@ -79,474 +163,13 @@ def get_ltor_masks_and_position_ids(data,
i = eod_index[j] i = eod_index[j]
# Mask attention loss. # Mask attention loss.
if reset_attention_mask: if reset_attention_mask:
attention_mask[b, 0, (i+1):, :(i+1)] = 0 attention_mask[b, 0, (i + 1):, :(i + 1)] = 0
# Reset positions. # Reset positions.
if reset_position_ids: if reset_position_ids:
position_ids[b, (i+1):] -= (i + 1 - prev_index) position_ids[b, (i + 1):] -= (i + 1 - prev_index)
prev_index = i + 1 prev_index = i + 1
return attention_mask, loss_mask, position_ids # Convert attention mask to binary:
attention_mask = (attention_mask < 0.5)
def reduce_losses(losses):
reduced_losses = torch.cat(
[loss.clone().detach().view(1) for loss in losses])
torch.distributed.all_reduce(reduced_losses)
reduced_losses = reduced_losses / torch.distributed.get_world_size()
return reduced_losses
def get_tensorboard_writer(args):
writer = None
if hasattr(args, 'tensorboard_dir') and \
args.tensorboard_dir and args.rank == 0:
try:
from torch.utils.tensorboard import SummaryWriter
writer = SummaryWriter(log_dir=args.tensorboard_dir)
except ModuleNotFoundError:
print_rank_0('WARNING: TensorBoard writing requested but is not '
'available (are you using PyTorch 1.1.0 or later?), '
'no TensorBoard logs will be written.')
writer = None
return writer
def print_rank_0(message):
if torch.distributed.is_initialized():
if torch.distributed.get_rank() == 0:
print(message, flush=True)
else:
print(message, flush=True)
def enable_adlr_autoresume(args):
print_rank_0('enabling autoresume ...')
import sys
sys.path.append(os.environ.get('SUBMIT_SCRIPTS', '.'))
try:
from userlib.auto_resume import AutoResume
except:
print_rank_0('ADLR autoresume is not available, exiting ...')
exit()
args.AutoResume = AutoResume
args.AutoResume.init()
def check_adlr_autoresume_termination(iteration, model, optimizer,
lr_scheduler, args):
# Add barrier to ensure consistnecy.
torch.distributed.barrier()
if args.AutoResume.termination_requested():
if args.save:
save_checkpoint(iteration, model, optimizer, lr_scheduler, args)
print_rank_0(">>> autoresume termination request found!")
if torch.distributed.get_rank() == 0:
args.AutoResume.request_resume()
print_rank_0(">>> training terminated. Returning")
exit(0)
def print_args(args, writer=None):
"""Print arguments."""
print('arguments:', flush=True)
for arg in vars(args):
dots = '.' * (29 - len(arg))
print(' {} {} {}'.format(arg, dots, getattr(args, arg)), flush=True)
if writer:
writer.add_text(arg, str(getattr(args, arg)))
def print_params_min_max_norm(optimizer, iteration):
"""Print min, max, and norm of all parameters."""
index = 0
rank = torch.distributed.get_rank()
string = 'iteration, rank, index, model-parallel,min, max, norm\n'
optimizer_ = optimizer
if isinstance(optimizer, FP16_Optimizer):
optimizer_ = optimizer.optimizer
for param_group in optimizer_.param_groups:
for param in param_group['params']:
index += 1
min_ = param.data.min()
max_ = param.data.max()
norm = param.data.norm()
string += '{:7d}, {:4d}, {:4d}, {:2d}, '.format(
iteration, rank, index, int(param.model_parallel))
string += '{:.6E}, {:.6E}, {:.6E}\n'.format(min_, max_, norm)
print(string, flush=True)
class Timers:
"""Group of timers."""
class Timer:
"""Timer."""
def __init__(self, name):
self.name_ = name
self.elapsed_ = 0.0
self.started_ = False
self.start_time = time.time()
def start(self):
"""Start the timer."""
assert not self.started_, 'timer has already been started'
torch.cuda.synchronize()
self.start_time = time.time()
self.started_ = True
def stop(self):
"""Stop the timer."""
assert self.started_, 'timer is not started'
torch.cuda.synchronize()
self.elapsed_ += (time.time() - self.start_time)
self.started_ = False
def reset(self):
"""Reset timer."""
self.elapsed_ = 0.0
self.started_ = False
def elapsed(self, reset=True):
"""Calculate the elapsed time."""
started_ = self.started_
# If the timing in progress, end it first.
if self.started_:
self.stop()
# Get the elapsed time.
elapsed_ = self.elapsed_
# Reset the elapsed time
if reset:
self.reset()
# If timing was in progress, set it back.
if started_:
self.start()
return elapsed_
def __init__(self):
self.timers = {}
def __call__(self, name):
if name not in self.timers:
self.timers[name] = self.Timer(name)
return self.timers[name]
def write(self, names, writer, iteration, normalizer=1.0, reset=False):
"""Write timers to a tensorboard writer"""
# currently when using add_scalars,
# torch.utils.add_scalars makes each timer its own run, which
# polutes the runs list, so we just add each as a scalar
assert normalizer > 0.0
for name in names:
value = self.timers[name].elapsed(reset=reset) / normalizer
writer.add_scalar(name + '_time', value, iteration)
def log(self, names, normalizer=1.0, reset=True):
"""Log a group of timers."""
assert normalizer > 0.0
string = 'time (ms)'
for name in names:
elapsed_time = self.timers[name].elapsed(
reset=reset) * 1000.0/ normalizer
string += ' | {}: {:.2f}'.format(name, elapsed_time)
print_rank_0(string)
def report_memory(name):
"""Simple GPU memory report."""
mega_bytes = 1024.0 * 1024.0
string = name + ' memory (MB)'
string += ' | allocated: {}'.format(
torch.cuda.memory_allocated() / mega_bytes)
string += ' | max allocated: {}'.format(
torch.cuda.max_memory_allocated() / mega_bytes)
string += ' | cached: {}'.format(torch.cuda.memory_cached() / mega_bytes)
string += ' | max cached: {}'.format(
torch.cuda.max_memory_cached()/ mega_bytes)
print_rank_0(string)
def vocab_size_with_padding(num_tokens, args):
after = num_tokens
multiple = args.make_vocab_size_divisible_by * \
mpu.get_model_parallel_world_size()
while (after % multiple) != 0:
after += 1
print_rank_0('> padded vocab (size: {}) with {} dummy '
'tokens (new size: {})'.format(
num_tokens, after - num_tokens, after))
return after
def initialize_distributed(args): return attention_mask, loss_mask, position_ids
"""Initialize torch.distributed."""
# Manually set the device ids.
device = args.rank % torch.cuda.device_count()
if args.local_rank is not None:
device = args.local_rank
torch.cuda.set_device(device)
# Call the init process
init_method = 'tcp://'
master_ip = os.getenv('MASTER_ADDR', 'localhost')
master_port = os.getenv('MASTER_PORT', '6000')
init_method += master_ip + ':' + master_port
torch.distributed.init_process_group(
backend=args.distributed_backend,
world_size=args.world_size, rank=args.rank,
init_method=init_method)
# Set the model-parallel / data-parallel communicators.
mpu.initialize_model_parallel(args.model_parallel_size)
def set_random_seed(seed):
"""Set random seed for reproducability."""
if seed is not None and seed > 0:
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
mpu.model_parallel_cuda_manual_seed(seed)
def get_checkpoint_name(checkpoints_path, iteration, release=False,
mp_rank=None):
if release:
d = 'release'
else:
d = 'iter_{:07d}'.format(iteration)
return os.path.join(checkpoints_path, d,
'mp_rank_{:02d}'.format(
mpu.get_model_parallel_rank() if mp_rank is None \
else mp_rank),
'model_optim_rng.pt')
def ensure_directory_exists(filename):
dirname = os.path.dirname(filename)
if not os.path.exists(dirname):
os.makedirs(dirname)
def get_checkpoint_tracker_filename(checkpoints_path):
return os.path.join(checkpoints_path, 'latest_checkpointed_iteration.txt')
def save_checkpoint(iteration, model, optimizer,
lr_scheduler, args):
"""Save a model checkpoint."""
# Only rank zer0 of the data parallel writes to the disk.
if isinstance(model, torchDDP):
model = model.module
if mpu.get_data_parallel_rank() == 0:
checkpoint_name = get_checkpoint_name(args.save, iteration)
print('global rank {} is saving checkpoint at iteration {:7d} to {}'.
format(torch.distributed.get_rank(), iteration, checkpoint_name))
sd = {}
sd['iteration'] = iteration
sd['model'] = model.state_dict_for_save_checkpoint()
# Optimizer stuff.
if not args.no_save_optim:
if optimizer is not None:
sd['optimizer'] = optimizer.state_dict()
if lr_scheduler is not None:
sd['lr_scheduler'] = lr_scheduler.state_dict()
# rng states.
if not args.no_save_rng:
sd['random_rng_state'] = random.getstate()
sd['np_rng_state'] = np.random.get_state()
sd['torch_rng_state'] = torch.get_rng_state()
sd['cuda_rng_state'] = torch.cuda.get_rng_state()
sd['rng_tracker_states'] = mpu.get_cuda_rng_tracker().get_states()
ensure_directory_exists(checkpoint_name)
torch.save(sd, checkpoint_name)
print(' successfully saved {}'.format(checkpoint_name))
# Wait so everyone is done (necessary)
torch.distributed.barrier()
# And update the latest iteration
if torch.distributed.get_rank() == 0:
tracker_filename = get_checkpoint_tracker_filename(args.save)
with open(tracker_filename, 'w') as f:
f.write(str(iteration))
# Wait so everyone is done (not necessary)
torch.distributed.barrier()
def load_checkpoint(model, optimizer, lr_scheduler, args):
"""Load a model checkpoint."""
if isinstance(model, torchDDP):
model = model.module
# Read the tracker file and set the iteration.
tracker_filename = get_checkpoint_tracker_filename(args.load)
if not os.path.isfile(tracker_filename):
print_rank_0('WARNING: could not find the metadata file {} '.format(
tracker_filename))
print_rank_0(' will not load any checkpoints and will start from '
'random')
return 0
iteration = 0
release = False
with open(tracker_filename, 'r') as f:
metastring = f.read().strip()
try:
iteration = int(metastring)
except ValueError:
release = metastring == 'release'
if not release:
print_rank_0('ERROR: Invalid metadata file {}. Exiting'.format(
tracker_filename))
exit()
assert iteration > 0 or release, 'error parsing metadata file {}'.format(
tracker_filename)
# Checkpoint.
checkpoint_name = get_checkpoint_name(args.load, iteration, release)
if mpu.get_data_parallel_rank() == 0:
print('global rank {} is loading checkpoint {}'.format(
torch.distributed.get_rank(), checkpoint_name))
# Load the checkpoint.
try:
sd = torch.load(checkpoint_name, map_location='cpu')
except ModuleNotFoundError:
# For backward compatibility.
print_rank_0(' > deserializing using the old code structure ...')
import sys
sys.modules['fp16.loss_scaler'] = sys.modules[
'megatron.fp16.loss_scaler']
sd = torch.load(checkpoint_name, map_location='cpu')
sys.modules.pop('fp16.loss_scaler', None)
except:
print_rank_0('could not load the checkpoint')
exit()
# Iterations.
if args.finetune or release:
iteration = 0
else:
try:
iteration = sd['iteration']
except KeyError:
try: # Backward compatible with older checkpoints
iteration = sd['total_iters']
except KeyError:
print_rank_0('A metadata file exists but Unable to load iteration '
' from checkpoint {}, exiting'.format(checkpoint_name))
exit()
# Model.
try:
model.load_state_dict(sd['model'])
except KeyError:
print_rank_0('A metadata file exists but unable to load model '
'from checkpoint {}, exiting'.format(checkpoint_name))
exit()
# Optimizer.
if not release and not args.finetune and not args.no_load_optim:
try:
if optimizer is not None:
optimizer.load_state_dict(sd['optimizer'])
if lr_scheduler is not None:
lr_scheduler.load_state_dict(sd['lr_scheduler'])
except KeyError:
print_rank_0('Unable to load optimizer from checkpoint {}, exiting. '
'Specify --no-load-optim or --finetune to prevent '
'attempting to load the optimizer '
'state.'.format(checkpoint_name))
exit()
# rng states.
if not release and not args.finetune and not args.no_load_rng:
try:
random.setstate(sd['random_rng_state'])
np.random.set_state(sd['np_rng_state'])
torch.set_rng_state(sd['torch_rng_state'])
torch.cuda.set_rng_state(sd['cuda_rng_state'])
mpu.get_cuda_rng_tracker().set_states(sd['rng_tracker_states'])
except KeyError:
print_rank_0('Unable to load optimizer from checkpoint {}, exiting.'
'Specify --no-load-optim or --finetune to prevent '
'attempting to load the optimizer '
'state.'.format(checkpoint_name))
exit()
torch.distributed.barrier()
if mpu.get_data_parallel_rank() == 0:
print(' successfully loaded {}'.format(checkpoint_name))
return iteration
def load_weights(src, dst, dst2src=False):
"""
Loads weights from src to dst via in place copy.
src is a huggingface gpt2model, while dst is one of our models.
dst2src=True loads parameters from our models into huggingface's.
^dst2src is still untested
"""
conv_layer = 'Conv1D' in str(type(src))
for n, p in src.named_parameters():
if dst2src:
data = dst._parameters[n].data
load = p.data
else:
data = p.data
load = dst._parameters[n].data
if conv_layer and 'weight' in n:
data = data.t().contiguous()
load.copy_(data)
# dst._parameters[n].data.copy_(data)
def load_mlp(our, oai, dst2src=False):
load_weights(oai.c_fc, our.dense_h_to_4h, dst2src)
load_weights(oai.c_proj, our.dense_4h_to_h, dst2src)
def load_attention(our, oai, dst2src=False):
load_weights(oai.c_attn, our.query_key_value, dst2src)
load_weights(oai.c_proj, our.dense, dst2src)
def load_transformer_layer(our, oai, dst2src=False):
load_weights(oai.ln_1, our.input_layernorm, dst2src)
load_weights(oai.ln_2, our.post_attention_layernorm, dst2src)
load_mlp(our.mlp, oai.mlp, dst2src)
load_attention(our.attention, oai.attn, dst2src)
def move_weights(our, oai, dst2src=False):
"""
Loads weights from `oai` to `our` via in place copy.
`oai` is a huggingface gpt2model, while `our` is one of our models.
dst2src=True loads parameters from our models into huggingface's.
^dst2src=True is still untested
"""
# while isinstance(our, (torchDDP, model.distributed.DistributedDataParallel, FP16_Module)):
# our=our.module
transformer_model = oai.transformer
load_weights(transformer_model.ln_f, our.transformer.final_layernorm, dst2src)
load_weights(transformer_model.wte, our.word_embeddings, dst2src)
load_weights(transformer_model.wpe, our.position_embeddings, dst2src)
for our_layer, oai_layer in zip(our.transformer.layers, oai.transformer.h):
load_transformer_layer(our_layer, oai_layer, dst2src)
def merge_parallel_state_dicts(state_dicts):
temp_sd = {}
for sd in state_dicts:
for k, v in sd.items():
temp_sd[k].append()
pass
def merge_parallel_checkpoints(checkpoint_dir, model_parallel_size):
pass
# coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import numpy as np
import time
import os
import sys
from tokenizer import Tokenizer
def tokenize_corpus(filename, np_filename, print_interval=10000):
print(' > tokenizing {}'.format(filename))
tokenizer = Tokenizer(cache_dir='./cache')
tokenized_docs = []
num_docs = 0
num_tokens = 0
start_time = time.time()
with open(filename, 'r') as f:
for line in f:
try:
myjson = json.loads(line)
url = myjson['url']
sample = myjson['text']
tokens = tokenizer.tokenize_document(sample)
tokenized_docs.append(np.array(tokens, dtype=np.uint16))
num_docs += 1
num_tokens += len(tokens)
if num_docs % print_interval == 0:
print(' processed {:9d} documents in {:.2f} (s) so far'.
format(num_docs, time.time() - start_time),
flush=True)
except Exception as e:
print(' skipping ', line, e)
print(' >> processed {} document with total of {} tokens ...'.format(
num_docs, num_tokens))
tokenized_docs = np.array(tokenized_docs, dtype=object)
np.save(np_filename, tokenized_docs, allow_pickle=True)
print(' >> saved the tokenzed document to {} ...'.format(np_filename))
if __name__ == '__main__':
print('building gpt2 dataset ...')
path = sys.argv[1]
shard = sys.argv[2]
input_filename = os.path.join(path,
'shards/shard_{:04d}'.format(int(shard)))
output_filename = os.path.join(path,
'npys/shard_{:04d}.npy'.format(int(shard)))
print('will be reading {}'.format(input_filename))
print('and will write the results to {}'.format(output_filename))
tokenize_corpus(input_filename, output_filename)
import glob
import json
import os
import time
import sys
import numpy as np
if __name__ == '__main__':
print('building the shard sizes ...')
path = sys.argv[1]
print('> reading numpy files from {}'.format(path))
npy_files = glob.glob(path + '/*.npy')
npy_files.sort()
print(' found {} numpy files'.format(len(npy_files)))
size_dict = {}
counter = 0
start_time = time.time()
for filename in npy_files:
data = np.load(filename, allow_pickle=True)
size = np.hstack(data).size
np_filename = os.path.basename(filename)
size_dict[np_filename] = size
counter += 1
if counter % 10 == 0:
print(' processed {} files in {:.2f} seconds'.format(
counter, time.time() - start_time))
output_filename = os.path.join(path, 'sizes.txt')
with open(output_filename, 'w') as f:
json.dump(size_dict, f)
print('> wrote sizes to {}'.format(output_filename))
#!/bin/bash
echo "processing gpt2 data ..."
DIR="/raid/mpatwary/redownload_v0/0-21"
for thread in {0..3}; do
echo " launching thread "$thread && python make_gpt2_dataset.py $DIR $thread > $DIR/logs/shard_$thread.log 2>&1 &
done
# coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Pretrain ALBERT"""
import torch
import torch.nn.functional as F
from megatron import mpu
from megatron.model import BertModel
from megatron.utils import print_rank_0
from megatron.utils import reduce_losses
from megatron.utils import vocab_size_with_padding
from megatron.training import run
from megatron.data.albert_dataset import build_train_valid_test_datasets
from megatron.data_utils.samplers import DistributedBatchSampler
def model_provider(args):
"""Build the model."""
print_rank_0('building BERT model ...')
model = BertModel(
num_layers=args.num_layers,
vocab_size=args.vocab_size,
hidden_size=args.hidden_size,
num_attention_heads=args.num_attention_heads,
embedding_dropout_prob=args.hidden_dropout,
attention_dropout_prob=args.attention_dropout,
output_dropout_prob=args.hidden_dropout,
max_sequence_length=args.max_position_embeddings,
checkpoint_activations=args.checkpoint_activations,
checkpoint_num_layers=args.checkpoint_num_layers,
add_binary_head=True,
layernorm_epsilon=args.layernorm_epsilon,
num_tokentypes=args.tokentype_size,
parallel_output=True,
apply_query_key_layer_scaling=args.apply_query_key_layer_scaling,
attention_softmax_in_fp32=args.attention_softmax_in_fp32)
return model
def get_batch(data_iterator, timers):
# Items and their type.
keys = ['text', 'types', 'labels', 'is_random', 'loss_mask', 'padding_mask']
datatype = torch.int64
# Broadcast data.
timers('data loader').start()
if data_iterator is not None:
data = next(data_iterator)
else:
data = None
timers('data loader').stop()
data_b = mpu.broadcast_data(keys, data, datatype)
# Unpack.
tokens = data_b['text'].long()
types = data_b['types'].long()
sentence_order = data_b['is_random'].long()
loss_mask = data_b['loss_mask'].float()
lm_labels = data_b['labels'].long()
padding_mask = data_b['padding_mask'].long()
return tokens, types, sentence_order, loss_mask, lm_labels, padding_mask
def forward_step(data_iterator, model, args, timers):
"""Forward step."""
# Get the batch.
timers('batch generator').start()
tokens, types, sentence_order, loss_mask, lm_labels, padding_mask \
= get_batch(data_iterator, timers)
timers('batch generator').stop()
# Forward model.
lm_logits, sop_logits = model(tokens, padding_mask, tokentype_ids=types)
sop_loss = F.cross_entropy(sop_logits.view(-1, 2).contiguous().float(),
sentence_order.view(-1).contiguous(),
ignore_index=-1)
lm_loss_ = mpu.vocab_parallel_cross_entropy(lm_logits.contiguous().float(),
lm_labels.contiguous())
lm_loss = torch.sum(
lm_loss_.view(-1) * loss_mask.reshape(-1)) / loss_mask.sum()
loss = lm_loss + sop_loss
reduced_losses = reduce_losses([lm_loss, sop_loss])
return loss, {'lm loss': reduced_losses[0], 'sop loss': reduced_losses[1]}
def get_train_val_test_data(args):
"""Load the data on rank zero and boradcast number of tokens to all GPUS."""
(train_data, valid_data, test_data) = (None, None, None)
# Data loader only on rank 0 of each model parallel group.
if mpu.get_model_parallel_rank() == 0:
print_rank_0('> building train, validation, and test datasets '
'for ALBERT ...')
if args.data_loader is None:
args.data_loader = 'binary'
if args.data_loader != 'binary':
print('Unsupported {} data loader for ALBERT.'.format(
args.data_loader))
exit(1)
if not args.data_path:
print('ALBERT only supports a unified dataset specified '
'with --data-path')
exit(1)
data_parallel_size = mpu.get_data_parallel_world_size()
data_parallel_rank = mpu.get_data_parallel_rank()
global_batch_size = args.batch_size * data_parallel_size
# Number of train/valid/test samples.
train_iters = args.train_iters
eval_iters = (train_iters // args.eval_interval + 1) * args.eval_iters
test_iters = args.eval_iters
train_val_test_num_samples = [args.train_iters * global_batch_size,
eval_iters * global_batch_size,
test_iters * global_batch_size]
print_rank_0(' > datasets target sizes (minimum size):')
print_rank_0(' train: {}'.format(train_val_test_num_samples[0]))
print_rank_0(' validation: {}'.format(train_val_test_num_samples[1]))
print_rank_0(' test: {}'.format(train_val_test_num_samples[2]))
assert len(args.data_path) == 1
train_ds, valid_ds, test_ds = build_train_valid_test_datasets(
vocab_file=args.vocab,
data_prefix=args.data_path[0],
data_impl=args.data_impl,
splits_string=args.split,
train_valid_test_num_samples=train_val_test_num_samples,
max_seq_length=args.seq_length,
masked_lm_prob=args.mask_prob,
short_seq_prob=args.short_seq_prob,
seed=args.seed,
skip_warmup=args.skip_mmap_warmup)
print_rank_0("> finished creating ALBERT datasets ...")
def make_data_loader_(dataset):
if not dataset:
return None
# Use a simple sampler with distributed batch sampler.
sampler = torch.utils.data.SequentialSampler(dataset)
batch_sampler = DistributedBatchSampler(
sampler=sampler,
batch_size=global_batch_size,
drop_last=True,
rank=data_parallel_rank,
world_size=data_parallel_size)
# Torch dataloader.
return torch.utils.data.DataLoader(dataset,
batch_sampler=batch_sampler,
num_workers=args.num_workers,
pin_memory=True)
train_data = make_data_loader_(train_ds)
valid_data = make_data_loader_(valid_ds)
test_data = make_data_loader_(test_ds)
do_train = train_data is not None and args.train_iters > 0
do_valid = valid_data is not None and args.eval_iters > 0
do_test = test_data is not None and args.eval_iters > 0
# Need to broadcast num_tokens and num_type_tokens.
num_tokens = vocab_size_with_padding(train_ds.num_tokens(), args)
token_counts = torch.cuda.LongTensor([num_tokens,
2, # hard coded num_type_tokens
int(do_train),
int(do_valid),
int(do_test)])
else:
token_counts = torch.cuda.LongTensor([0, 0, 0, 0, 0])
# Broadcast num tokens.
torch.distributed.broadcast(token_counts,
mpu.get_model_parallel_src_rank(),
group=mpu.get_model_parallel_group())
args.vocab_size = token_counts[0].item()
args.tokentype_size = token_counts[1].item()
args.do_train = token_counts[2].item()
args.do_valid = token_counts[3].item()
args.do_test = token_counts[4].item()
return train_data, valid_data, test_data
if __name__ == "__main__":
run('Pretrain BERT model', get_train_val_test_data,
model_provider, forward_step)
# coding=utf-8 # coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved. # Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
...@@ -18,81 +18,69 @@ ...@@ -18,81 +18,69 @@
import torch import torch
import torch.nn.functional as F import torch.nn.functional as F
from configure_data import configure_data from megatron import get_args
from megatron import get_timers
from megatron import mpu from megatron import mpu
from megatron import print_rank_0
from megatron.data.bert_dataset import build_train_valid_test_datasets
from megatron.model import BertModel from megatron.model import BertModel
from megatron.utils import print_rank_0 from megatron.training import pretrain
from megatron.utils import reduce_losses from megatron.utils import reduce_losses
from megatron.utils import vocab_size_with_padding
from megatron.training import run
def model_provider(args): def model_provider():
"""Build the model.""" """Build the model."""
print_rank_0('building BERT model ...') print_rank_0('building BERT model ...')
model = BertModel( model = BertModel(
num_layers=args.num_layers, num_tokentypes=2,
vocab_size=args.vocab_size,
hidden_size=args.hidden_size,
num_attention_heads=args.num_attention_heads,
embedding_dropout_prob=args.hidden_dropout,
attention_dropout_prob=args.attention_dropout,
output_dropout_prob=args.hidden_dropout,
max_sequence_length=args.max_position_embeddings,
checkpoint_activations=args.checkpoint_activations,
checkpoint_num_layers=args.checkpoint_num_layers,
add_binary_head=True, add_binary_head=True,
layernorm_epsilon=args.layernorm_epsilon, parallel_output=True)
num_tokentypes=args.tokentype_size,
parallel_output=True,
apply_query_key_layer_scaling=args.apply_query_key_layer_scaling,
attention_softmax_in_fp32=args.attention_softmax_in_fp32)
return model return model
def get_batch(data_iterator, timers): def get_batch(data_iterator):
"""Build the batch."""
# Items and their type. # Items and their type.
keys = ['text', 'types', 'is_random', 'mask', 'mask_labels', 'pad_mask'] keys = ['text', 'types', 'labels', 'is_random', 'loss_mask', 'padding_mask']
datatype = torch.int64 datatype = torch.int64
# Broadcast data. # Broadcast data.
timers('data loader').start()
if data_iterator is not None: if data_iterator is not None:
data = next(data_iterator) data = next(data_iterator)
else: else:
data = None data = None
timers('data loader').stop()
data_b = mpu.broadcast_data(keys, data, datatype) data_b = mpu.broadcast_data(keys, data, datatype)
# Unpack. # Unpack.
tokens = data_b['text'].long() tokens = data_b['text'].long()
types = data_b['types'].long() types = data_b['types'].long()
next_sentence = data_b['is_random'].long() sentence_order = data_b['is_random'].long()
loss_mask = data_b['mask'].float() loss_mask = data_b['loss_mask'].float()
lm_labels = data_b['mask_labels'].long() lm_labels = data_b['labels'].long()
padding_mask = data_b['pad_mask'].long() padding_mask = data_b['padding_mask'].long()
return tokens, types, next_sentence, loss_mask, lm_labels, padding_mask return tokens, types, sentence_order, loss_mask, lm_labels, padding_mask
def forward_step(data_iterator, model, args, timers): def forward_step(data_iterator, model):
"""Forward step.""" """Forward step."""
timers = get_timers()
# Get the batch. # Get the batch.
timers('batch generator').start() timers('batch generator').start()
tokens, types, next_sentence, loss_mask, lm_labels, padding_mask \ tokens, types, sentence_order, loss_mask, lm_labels, padding_mask \
= get_batch(data_iterator, timers) = get_batch(data_iterator)
timers('batch generator').stop() timers('batch generator').stop()
# Forward model. # Forward model.
lm_logits, nsp_logits = model(tokens, 1-padding_mask, tokentype_ids=types) lm_logits, sop_logits = model(tokens, padding_mask, tokentype_ids=types)
nsp_loss = F.cross_entropy(nsp_logits.view(-1, 2).contiguous().float(), sop_loss = F.cross_entropy(sop_logits.view(-1, 2).contiguous().float(),
next_sentence.view(-1).contiguous(), sentence_order.view(-1).contiguous(),
ignore_index=-1) ignore_index=-1)
lm_loss_ = mpu.vocab_parallel_cross_entropy(lm_logits.contiguous().float(), lm_loss_ = mpu.vocab_parallel_cross_entropy(lm_logits.contiguous().float(),
...@@ -100,57 +88,35 @@ def forward_step(data_iterator, model, args, timers): ...@@ -100,57 +88,35 @@ def forward_step(data_iterator, model, args, timers):
lm_loss = torch.sum( lm_loss = torch.sum(
lm_loss_.view(-1) * loss_mask.reshape(-1)) / loss_mask.sum() lm_loss_.view(-1) * loss_mask.reshape(-1)) / loss_mask.sum()
loss = lm_loss + nsp_loss loss = lm_loss + sop_loss
reduced_losses = reduce_losses([lm_loss, nsp_loss]) reduced_losses = reduce_losses([lm_loss, sop_loss])
return loss, {'lm loss': reduced_losses[0], 'nsp loss': reduced_losses[1]} return loss, {'lm loss': reduced_losses[0], 'sop loss': reduced_losses[1]}
def get_train_val_test_data(args):
"""Load the data on rank zero and boradcast number of tokens to all GPUS."""
(train_data, val_data, test_data) = (None, None, None)
# Data loader only on rank 0 of each model parallel group.
if mpu.get_model_parallel_rank() == 0:
if (args.data_loader == 'raw'
or args.data_loader == 'lazy'
or args.data_loader == 'tfrecords'):
data_config = configure_data()
ds_type = 'BERT'
data_config.set_defaults(data_set_type=ds_type, transpose=False)
(train_data, val_data, test_data), tokenizer = data_config.apply(args)
num_tokens = vocab_size_with_padding(tokenizer.num_tokens, args)
# Need to broadcast num_tokens and num_type_tokens.
token_counts = torch.cuda.LongTensor([num_tokens,
tokenizer.num_type_tokens,
int(args.do_train),
int(args.do_valid),
int(args.do_test)])
else:
print("Unsupported data loader for BERT.")
exit(1)
else:
token_counts = torch.cuda.LongTensor([0, 0, 0, 0, 0])
# Broadcast num tokens. def train_valid_test_datasets_provider(train_val_test_num_samples):
torch.distributed.broadcast(token_counts, """Build train, valid, and test datasets."""
mpu.get_model_parallel_src_rank(), args = get_args()
group=mpu.get_model_parallel_group())
num_tokens = token_counts[0].item()
num_type_tokens = token_counts[1].item()
args.do_train = token_counts[2].item()
args.do_valid = token_counts[3].item()
args.do_test = token_counts[4].item()
args.vocab_size = num_tokens print_rank_0('> building train, validation, and test datasets '
args.tokentype_size = num_type_tokens 'for BERT ...')
train_ds, valid_ds, test_ds = build_train_valid_test_datasets(
data_prefix=args.data_path,
data_impl=args.data_impl,
splits_string=args.split,
train_valid_test_num_samples=train_val_test_num_samples,
max_seq_length=args.seq_length,
masked_lm_prob=args.mask_prob,
short_seq_prob=args.short_seq_prob,
seed=args.seed,
skip_warmup=(not args.mmap_warmup))
print_rank_0("> finished creating BERT datasets ...")
return train_data, val_data, test_data return train_ds, valid_ds, test_ds
if __name__ == "__main__": if __name__ == "__main__":
run('Pretrain BERT model', get_train_val_test_data, pretrain(train_valid_test_datasets_provider, model_provider, forward_step,
model_provider, forward_step) args_defaults={'tokenizer_type': 'BertWordPieceLowerCase'})
# coding=utf-8 # coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved. # Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
...@@ -17,53 +17,41 @@ ...@@ -17,53 +17,41 @@
import torch import torch
from configure_data import configure_data from megatron import get_args
from gpt2_data_loader import make_gpt2_dataloaders from megatron import get_timers
from megatron import get_tokenizer
from megatron import mpu from megatron import mpu
from megatron import print_rank_0
from megatron.data.gpt2_dataset import build_train_valid_test_datasets
from megatron.model import GPT2Model from megatron.model import GPT2Model
from megatron.training import pretrain
from megatron.utils import get_ltor_masks_and_position_ids from megatron.utils import get_ltor_masks_and_position_ids
from megatron.utils import print_rank_0
from megatron.utils import reduce_losses from megatron.utils import reduce_losses
from megatron.utils import vocab_size_with_padding
from megatron.training import run
def model_provider(args): def model_provider():
"""Build the model.""" """Build the model."""
print_rank_0('building GPT2 model ...') print_rank_0('building GPT2 model ...')
model = GPT2Model(num_layers=args.num_layers, model = GPT2Model(num_tokentypes=0, parallel_output=True)
vocab_size=args.vocab_size,
hidden_size=args.hidden_size,
num_attention_heads=args.num_attention_heads,
embedding_dropout_prob=args.hidden_dropout,
attention_dropout_prob=args.attention_dropout,
output_dropout_prob=args.hidden_dropout,
max_sequence_length=args.max_position_embeddings,
checkpoint_activations=args.checkpoint_activations,
checkpoint_num_layers=args.checkpoint_num_layers,
layernorm_epsilon=args.layernorm_epsilon,
parallel_output=True,
apply_query_key_layer_scaling=args.apply_query_key_layer_scaling,
attention_softmax_in_fp32=args.attention_softmax_in_fp32)
return model return model
def get_batch(data_iterator, args, timers): def get_batch(data_iterator):
"""Generate a batch""" """Generate a batch"""
args = get_args()
tokenizer = get_tokenizer()
# Items and their type. # Items and their type.
keys = ['text'] keys = ['text']
datatype = torch.int64 datatype = torch.int64
# Broadcast data. # Broadcast data.
timers('data loader').start()
if data_iterator is not None: if data_iterator is not None:
data = next(data_iterator) data = next(data_iterator)
else: else:
data = None data = None
timers('data loader').stop()
data_b = mpu.broadcast_data(keys, data, datatype) data_b = mpu.broadcast_data(keys, data, datatype)
# Unpack. # Unpack.
...@@ -74,24 +62,22 @@ def get_batch(data_iterator, args, timers): ...@@ -74,24 +62,22 @@ def get_batch(data_iterator, args, timers):
# Get the masks and postition ids. # Get the masks and postition ids.
attention_mask, loss_mask, position_ids = get_ltor_masks_and_position_ids( attention_mask, loss_mask, position_ids = get_ltor_masks_and_position_ids(
tokens, tokens,
args.eod_token, tokenizer.eod,
args.reset_position_ids, args.reset_position_ids,
args.reset_attention_mask, args.reset_attention_mask,
args.eod_mask_loss) args.eod_mask_loss)
# Convert
if args.fp16:
attention_mask = attention_mask.half()
return tokens, labels, loss_mask, attention_mask, position_ids return tokens, labels, loss_mask, attention_mask, position_ids
def forward_step(data_iterator, model, args, timers): def forward_step(data_iterator, model):
"""Forward step.""" """Forward step."""
timers = get_timers()
# Get the batch. # Get the batch.
timers('batch generator').start() timers('batch generator').start()
tokens, labels, loss_mask, attention_mask, position_ids = get_batch( tokens, labels, loss_mask, attention_mask, position_ids = get_batch(
data_iterator, args, timers) data_iterator)
timers('batch generator').stop() timers('batch generator').stop()
# Forward model. # Forward model.
...@@ -107,60 +93,26 @@ def forward_step(data_iterator, model, args, timers): ...@@ -107,60 +93,26 @@ def forward_step(data_iterator, model, args, timers):
return loss, {'lm loss': reduced_loss[0]} return loss, {'lm loss': reduced_loss[0]}
def get_train_val_test_data(args): def train_valid_test_datasets_provider(train_val_test_num_samples):
"""Load the data on rank zero and boradcast number of tokens to all GPUS.""" """Build train, valid, and test datasets."""
args = get_args()
(train_data, val_data, test_data) = (None, None, None)
# Data loader only on rank 0 of each model parallel group.
if mpu.get_model_parallel_rank() == 0:
if args.data_loader == 'numpy':
assert len(args.train_data) == 1
args.train_data = args.train_data[0]
assert len(args.valid_data) == 1
args.valid_data = args.valid_data[0]
assert len(args.test_data) == 1
args.test_data = args.test_data[0]
(train_data, val_data, test_data), num_tokens, \
eod_token = make_gpt2_dataloaders(args)
elif args.data_loader == 'raw' or args.data_loader == 'lazy':
data_config = configure_data()
data_config.set_defaults(data_set_type='GPT2', transpose=False)
(train_data, val_data, test_data), tokenizer = data_config.apply(
args)
num_tokens = tokenizer.num_tokens
eod_token = tokenizer.get_command('eos').Id
assert eod_token == tokenizer.get_command('pad').Id
else:
print("Unsupported data loader for GPT2.")
exit(1)
# pad.
num_tokens = vocab_size_with_padding(num_tokens, args)
print_rank_0('> found end-of-document token: {}'.format(eod_token))
token_counts = torch.cuda.LongTensor([num_tokens, eod_token,
int(args.do_train),
int(args.do_valid),
int(args.do_test)])
else:
token_counts = torch.cuda.LongTensor([0, 0, 0, 0, 0])
# Broadcast num tokens.
torch.distributed.broadcast(token_counts,
mpu.get_model_parallel_src_rank(),
group=mpu.get_model_parallel_group())
num_tokens = token_counts[0].item()
eod_token = token_counts[1].item()
args.do_train = token_counts[2].item()
args.do_valid = token_counts[3].item()
args.do_test = token_counts[4].item()
args.vocab_size = num_tokens print_rank_0('> building train, validation, and test datasets '
args.eod_token = eod_token 'for GPT2 ...')
train_ds, valid_ds, test_ds = build_train_valid_test_datasets(
data_prefix=args.data_path,
data_impl=args.data_impl,
splits_string=args.split,
train_valid_test_num_samples=train_val_test_num_samples,
seq_length=args.seq_length,
seed=args.seed,
skip_warmup=(not args.mmap_warmup))
print_rank_0("> finished creating GPT2 datasets ...")
return train_data, val_data, test_data return train_ds, valid_ds, test_ds
if __name__ == "__main__": if __name__ == "__main__":
run('Pretrain GPT-2 model', get_train_val_test_data, pretrain(train_valid_test_datasets_provider, model_provider, forward_step,
model_provider, forward_step) args_defaults={'tokenizer_type': 'GPT2BPETokenizer'})
#!/bin/bash
CHECKPOINT_PATH=checkpoints/gpt2_345m/
MPSIZE=1
NLAYERS=12
NHIDDEN=768
NATT=12
MAXSEQLEN=1024
#SAMPLING ARGS
TEMP=0.9
#If TOPK/TOPP are 0 it defaults to greedy sampling, top-k will also override top-p
TOPK=0
TOPP=0
python generate_samples.py \
--model-parallel-size $MPSIZE \
--num-layers $NLAYERS \
--hidden-size $NHIDDEN \
--load $CHECKPOINT_PATH \
--num-attention-heads $NATT \
--max-position-embeddings 1024 \
--tokenizer-type GPT2BPETokenizer \
--fp16 \
--cache-dir cache \
--out-seq-length $MAXSEQLEN \
--temperature $TEMP \
--top_k $TOPK \
--genfile dbg_unconditional.json \
--num-samples 10 \
--top_p $TOPP \
--recompute
#!/bin/bash
RANK=0
WORLD_SIZE=1
python pretrain_albert.py \
--num-layers 12 \
--hidden-size 768 \
--num-attention-heads 12 \
--batch-size 4 \
--seq-length 512 \
--max-preds-per-seq 80 \
--max-position-embeddings 512 \
--train-iters 10000 \
--save checkpoints/albert_117m \
--load checkpoints/albert_117m \
--resume-dataloader \
--data-path data/megatron/bc_rn_owt_sto_wiki_dedup_shuf_cleaned_0.7_mmap \
--vocab data/megatron/vocab.txt \
--split 949,50,1 \
--distributed-backend nccl \
--lr 0.0001 \
--lr-decay-style linear \
--lr-decay-iters 990000 \
--weight-decay 1e-2 \
--clip-grad 1.0 \
--warmup .01 \
--fp16 \
--fp32-layernorm \
--fp32-embedding \
--skip-mmap-warmup \
--num-workers 0
#!/bin/bash
GPUS_PER_NODE=2
# Change for multinode config
MASTER_ADDR=localhost
MASTER_PORT=6000
NNODES=1
NODE_RANK=0
WORLD_SIZE=$(($GPUS_PER_NODE*$NNODES))
DISTRIBUTED_ARGS="--nproc_per_node $GPUS_PER_NODE --nnodes $NNODES --node_rank $NODE_RANK --master_addr $MASTER_ADDR --master_port $MASTER_PORT"
python -m torch.distributed.launch $DISTRIBUTED_ARGS \
pretrain_albert.py \
--num-layers 12 \
--hidden-size 768 \
--num-attention-heads 12 \
--batch-size 4 \
--seq-length 512 \
--max-preds-per-seq 80 \
--max-position-embeddings 512 \
--train-iters 10000 \
--save checkpoints/albert_117m \
--load checkpoints/albert_117m \
--resume-dataloader \
--data-path data/megatron/bc_rn_owt_sto_wiki_dedup_shuf_cleaned_0.7_mmap \
--vocab data/megatron/vocab.txt \
--split 949,50,1 \
--distributed-backend nccl \
--lr 0.0001 \
--lr-decay-style linear \
--lr-decay-iters 990000 \
--weight-decay 1e-2 \
--clip-grad 1.0 \
--warmup .01 \
--fp16 \
--fp32-layernorm \
--fp32-embedding \
--skip-mmap-warmup \
--num-workers 0
#!/bin/bash
GPUS_PER_NODE=8
# Change for multinode config
MASTER_ADDR=localhost
MASTER_PORT=6000
NNODES=1
NODE_RANK=0
WORLD_SIZE=$(($GPUS_PER_NODE*$NNODES))
DISTRIBUTED_ARGS="--nproc_per_node $GPUS_PER_NODE --nnodes $NNODES --node_rank $NODE_RANK --master_addr $MASTER_ADDR --master_port $MASTER_PORT"
python -m torch.distributed.launch $DISTRIBUTED_ARGS \
pretrain_bert.py \
--model-parallel-size 2 \
--num-layers 24 \
--hidden-size 1024 \
--num-attention-heads 16 \
--batch-size 4 \
--seq-length 512 \
--max-preds-per-seq 80 \
--max-position-embeddings 512 \
--train-iters 1000000 \
--save checkpoints/bert_345m_mp2 \
--load checkpoints/bert_345m_mp2 \
--resume-dataloader \
--train-data wikipedia \
--lazy-loader \
--tokenizer-type BertWordPieceTokenizer \
--tokenizer-model-type bert-large-uncased \
--presplit-sentences \
--cache-dir cache \
--split 949,50,1 \
--distributed-backend nccl \
--lr 0.0001 \
--lr-decay-style linear \
--lr-decay-iters 990000 \
--weight-decay 1e-2 \
--clip-grad 1.0 \
--warmup .01 \
--fp16 \
--fp32-layernorm \
--fp32-embedding
#!/bin/bash
RANK=0
WORLD_SIZE=1
python pretrain_bert.py \
--num-layers 24 \
--hidden-size 1024 \
--num-attention-heads 16 \
--batch-size 4 \
--seq-length 512 \
--max-preds-per-seq 80 \
--max-position-embeddings 512 \
--train-iters 1000000 \
--save checkpoints/bert_345m \
--load checkpoints/bert_345m \
--resume-dataloader \
--train-data wikipedia \
--lazy-loader \
--tokenizer-type SentencePieceTokenizer \
--tokenizer-model-type bpe \
--tokenizer-path tokenizer.model \
--presplit-sentences \
--cache-dir cache \
--split 949,50,1 \
--distributed-backend nccl \
--lr 0.0001 \
--lr-decay-style linear \
--lr-decay-iters 990000 \
--weight-decay 1e-2 \
--clip-grad 1.0 \
--warmup .01 \
--fp16 \
--fp32-layernorm \
--fp32-embedding
#!/bin/bash
GPUS_PER_NODE=8
# Change for multinode config
MASTER_ADDR=localhost
MASTER_PORT=6000
NNODES=1
NODE_RANK=0
WORLD_SIZE=$(($GPUS_PER_NODE*$NNODES))
DISTRIBUTED_ARGS="--nproc_per_node $GPUS_PER_NODE --nnodes $NNODES --node_rank $NODE_RANK --master_addr $MASTER_ADDR --master_port $MASTER_PORT"
python -m torch.distributed.launch $DISTRIBUTED_ARGS \
pretrain_bert.py \
--num-layers 24 \
--hidden-size 1024 \
--num-attention-heads 16 \
--batch-size 4 \
--seq-length 512 \
--max-preds-per-seq 80 \
--max-position-embeddings 512 \
--train-iters 1000000 \
--save checkpoints/bert_345m \
--load checkpoints/bert_345m \
--resume-dataloader \
--use-tfrecords \
--train-data <TF Record 1> <TFRecord 2> \
--valid-data <TF Record 3> \
--test-data <TF Record 4> \
--tokenizer-type BertWordPieceTokenizer \
--tokenizer-model-type bert-large-uncased \
--presplit-sentences \
--cache-dir cache \
--split 949,50,1 \
--distributed-backend nccl \
--lr 0.0001 \
--lr-decay-style linear \
--lr-decay-iters 990000 \
--weight-decay 1e-2 \
--clip-grad 1.0 \
--warmup .01 \
--fp16 \
--fp32-layernorm \
--fp32-embedding
#! /bin/bash
# Runs the "345M" parameter model
GPUS_PER_NODE=8
# Change for multinode config
MASTER_ADDR=localhost
MASTER_PORT=6000
NNODES=1
NODE_RANK=0
WORLD_SIZE=$(($GPUS_PER_NODE*$NNODES))
DISTRIBUTED_ARGS="--nproc_per_node $GPUS_PER_NODE --nnodes $NNODES --node_rank $NODE_RANK --master_addr $MASTER_ADDR --master_port $MASTER_PORT"
python -m torch.distributed.launch $DISTRIBUTED_ARGS \
pretrain_gpt2.py \
--model-parallel-size 2 \
--num-layers 24 \
--hidden-size 1024 \
--num-attention-heads 16 \
--batch-size 8 \
--seq-length 1024 \
--max-position-embeddings 1024 \
--train-iters 320000 \
--save checkpoints/gpt2_345m_mp2 \
--load checkpoints/gpt2_345m_mp2 \
--resume-dataloader \
--train-data wikipedia \
--lazy-loader \
--tokenizer-type GPT2BPETokenizer \
--cache-dir cache \
--split 949,50,1 \
--distributed-backend nccl \
--lr 0.00015 \
--lr-decay-style cosine \
--weight-decay 1e-2 \
--clip-grad 1.0 \
--warmup .01 \
--checkpoint-activations \
--fp16
set +x
"""
example usage:
python scripts/run_gpt2_eval.py \
--model-parallel-size 1 \
--num-layers 12 \
--hidden-size 768 \
--num-attention-heads 12 \
--model-path <gpt2_117_path> \
--data-path <wikitext_tokens_test_path> \
--batch-size 16 \
--cache-dir <cache dir path>
"""
import argparse
import subprocess
parser = argparse.ArgumentParser('run zero shot GPT2 eval')
parser.add_argument('--model-path', type=str, required=True,
help='Saved model path for evaluation')
parser.add_argument('--batch-size', type=int, default=4,
help='batch size to use for evaluation')
parser.add_argument('--num-attention-heads', type=int, default=12,
help='num of transformer attention heads')
parser.add_argument('--hidden-size', type=int, default=768,
help='tansformer hidden size')
parser.add_argument('--num-layers', type=int, default=12,
help='num decoder layers')
parser.add_argument('--data-path', type=str, required=True,
help='Data path for evaluation data')
parser.add_argument('--cloze-eval', action='store_true',
help='Run lambada cloze eval instead of perplexity eval.')
parser.add_argument('--easy-lambada', action='store_true',
help='use easier formulation of lambada')
parser.add_argument('--webtext-eval', action='store_true',
help='Run webtext PPL eval instead of wikitext PPL eval.')
parser.add_argument('--eval-iters', default=5000, type=int,
help='number of iterations to run webtext evaluation')
parser.add_argument('--model-parallel-size', type=int, default=1,
help='model parallel size to use')
parser.add_argument('--load-openai', action='store_true',
help='Load weights from saved openai/hf checkpoints')
parser.add_argument('--cache-dir', type=str, default='cache',
help='directory to cache gpt2 tokenizers')
parser.add_argument('--make-vocab-size-divisible-by', type=int, default=128,
help='Pad the vocab size to be divisible by this value.'
'This is added for computational efficieny reasons.')
args = parser.parse_args()
multinode_args = ''
if args.model_parallel_size > 1:
multinode_args += ' -m torch.distributed.launch --nproc_per_node {} '.format(args.model_parallel_size)
CMD = ' --model-parallel-size {model_par} \
--num-layers {nlayers} \
--hidden-size {hidden} \
--log-interval 100 \
--load {model} \
--eval-batch-size {batch} \
--num-attention-heads {natt} \
--seq-length 1024 \
--max-position-embeddings 1024 \
--tokenizer-type GPT2BPETokenizer \
--text-key text \
--distributed-backend nccl \
--hidden-dropout 0.1 \
--attention-dropout 0.1 \
--fp16 \
--overlapping-eval 32 \
--make-vocab-size-divisible-by {make_vocab_size_divisible_by} \
--cache-dir {cache} '.format(model_par=args.model_parallel_size,
nlayers=args.num_layers,
hidden=args.hidden_size,
model=args.model_path,
batch=args.batch_size,
natt=args.num_attention_heads,
make_vocab_size_divisible_by=args.make_vocab_size_divisible_by,
cache=args.cache_dir)
if args.load_openai:
CMD += ' --load-openai '
if args.cloze_eval:
CMD += ' --valid-data {} '.format(args.data_path)
CMD += ' --cloze-eval '
if not args.easy_lambada:
CMD += ' --strict-lambada '
CMD = 'evaluate_gpt2.py' + CMD
print('Running Lambada Eval Command:', flush=True)
elif args.webtext_eval:
CMD += '--train-iters 0 --eval-iters {} --test-data {} --loose-json '.format(args.eval_iters, args.data_path)
CMD = 'pretrain_gpt2.py' + CMD
print('Running Webtext Eval Command:', flush=True)
else:
CMD += ' --valid-data {} '.format(args.data_path)
CMD = 'evaluate_gpt2.py' + CMD
print('Running PPL Eval Command:', flush=True)
CMD = 'python3 '+multinode_args+CMD
print(CMD, flush=True)
subprocess.call(CMD.split())
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Tasks data utility."""
import re
import numpy as np
def clean_text(text):
"""Remove new lines and multiple spaces and adjust end of sentence dot."""
text = text.replace("\n", " ")
text = re.sub(r'\s+', ' ', text)
for _ in range(3):
text = text.replace(' . ', '. ')
return text
def build_sample(ids, types, paddings, label, unique_id):
"""Convert to numpy and return a sample consumed by the batch producer."""
ids_np = np.array(ids, dtype=np.int64)
types_np = np.array(types, dtype=np.int64)
paddings_np = np.array(paddings, dtype=np.int64)
sample = ({'text': ids_np,
'types': types_np,
'padding_mask': paddings_np,
'label': int(label),
'uid': int(unique_id)})
return sample
def build_tokens_types_paddings_from_text(text_a, text_b,
tokenizer, max_seq_length):
"""Build token types and paddings, trim if needed, and pad if needed."""
text_a_ids = tokenizer.tokenize(text_a)
text_b_ids = None
if text_b is not None:
text_b_ids = tokenizer.tokenize(text_b)
return build_tokens_types_paddings_from_ids(text_a_ids, text_b_ids,
max_seq_length, tokenizer.cls,
tokenizer.sep, tokenizer.pad)
def build_tokens_types_paddings_from_ids(text_a_ids, text_b_ids, max_seq_length,
cls_id, sep_id, pad_id):
"""Build token types and paddings, trim if needed, and pad if needed."""
ids = []
types = []
paddings = []
# [CLS].
ids.append(cls_id)
types.append(0)
paddings.append(1)
# A.
len_text_a = len(text_a_ids)
ids.extend(text_a_ids)
types.extend([0] * len_text_a)
paddings.extend([1] * len_text_a)
# [SEP].
ids.append(sep_id)
types.append(0)
paddings.append(1)
# B.
if text_b_ids is not None:
len_text_b = len(text_b_ids)
ids.extend(text_b_ids)
types.extend([1] * len_text_b)
paddings.extend([1] * len_text_b)
# Cap the size.
trimmed = False
if len(ids) >= max_seq_length:
max_seq_length_m1 = max_seq_length - 1
ids = ids[0:max_seq_length_m1]
types = types[0:max_seq_length_m1]
paddings = paddings[0:max_seq_length_m1]
trimmed = True
# [SEP].
if (text_b_ids is not None) or trimmed:
ids.append(sep_id)
if text_b_ids is None:
types.append(0)
else:
types.append(1)
paddings.append(1)
# Padding.
padding_length = max_seq_length - len(ids)
if padding_length > 0:
ids.extend([pad_id] * padding_length)
types.extend([pad_id] * padding_length)
paddings.extend([0] * padding_length)
return ids, types, paddings
import os
import argparse
import collections
import numpy as np
import torch
def process_files(args):
all_predictions = collections.OrderedDict()
all_labels = collections.OrderedDict()
all_uid = collections.OrderedDict()
for path in args.paths:
path = os.path.join(path, args.prediction_name)
try:
data = torch.load(path)
for dataset in data:
name, d = dataset
predictions, labels, uid = d
if name not in all_predictions:
all_predictions[name] = np.array(predictions)
if args.labels is None:
args.labels = [i for i in range(all_predictions[name].shape[1])]
if args.eval:
all_labels[name] = np.array(labels)
all_uid[name] = np.array(uid)
else:
all_predictions[name] += np.array(predictions)
assert np.allclose(all_uid[name], np.array(uid))
except Exception as e:
print(e)
continue
return all_predictions, all_labels, all_uid
def get_threshold(all_predictions, all_labels, one_threshold=False):
if one_threshold:
all_predictons = {'combined': np.concatenate(list(all_predictions.values()))}
all_labels = {'combined': np.concatenate(list(all_predictions.labels()))}
out_thresh = []
for dataset in all_predictions:
preds = all_predictions[dataset]
labels = all_labels[dataset]
out_thresh.append(calc_threshold(preds, labels))
return out_thresh
def calc_threshold(p, l):
trials = [(i) * (1. / 100.) for i in range(100)]
best_acc = float('-inf')
best_thresh = 0
for t in trials:
acc = ((apply_threshold(p, t).argmax(-1) == l).astype(float)).mean()
if acc > best_acc:
best_acc = acc
best_thresh = t
return best_thresh
def apply_threshold(preds, t):
assert (np.allclose(preds.sum(-1), np.ones(preds.shape[0])))
prob = preds[:, -1]
thresholded = (prob >= t).astype(int)
preds = np.zeros_like(preds)
preds[np.arange(len(thresholded)), thresholded.reshape(-1)] = 1
return preds
def threshold_predictions(all_predictions, threshold):
if len(threshold) != len(all_predictions):
threshold = [threshold[-1]] * (len(all_predictions) - len(threshold))
for i, dataset in enumerate(all_predictions):
thresh = threshold[i]
preds = all_predictions[dataset]
all_predictions[dataset] = apply_threshold(preds, thresh)
return all_predictions
def postprocess_predictions(all_predictions, all_labels, args):
for d in all_predictions:
all_predictions[d] = all_predictions[d] / len(args.paths)
if args.calc_threshold:
args.threshold = get_threshold(all_predictions, all_labels, args.one_threshold)
print('threshold', args.threshold)
if args.threshold is not None:
all_predictions = threshold_predictions(all_predictions, args.threshold)
return all_predictions, all_labels
def write_predictions(all_predictions, all_labels, all_uid, args):
all_correct = 0
count = 0
for dataset in all_predictions:
preds = all_predictions[dataset]
preds = np.argmax(preds, -1)
if args.eval:
correct = (preds == all_labels[dataset]).sum()
num = len(all_labels[dataset])
accuracy = correct / num
count += num
all_correct += correct
accuracy = (preds == all_labels[dataset]).mean()
print(accuracy)
if not os.path.exists(os.path.join(args.outdir, dataset)):
os.makedirs(os.path.join(args.outdir, dataset))
outpath = os.path.join(
args.outdir, dataset, os.path.splitext(
args.prediction_name)[0] + '.tsv')
with open(outpath, 'w') as f:
f.write('id\tlabel\n')
f.write('\n'.join(str(uid) + '\t' + str(args.labels[p])
for uid, p in zip(all_uid[dataset], preds.tolist())))
if args.eval:
print(all_correct / count)
def ensemble_predictions(args):
all_predictions, all_labels, all_uid = process_files(args)
all_predictions, all_labels = postprocess_predictions(all_predictions, all_labels, args)
write_predictions(all_predictions, all_labels, all_uid, args)
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--paths', required=True, nargs='+',
help='paths to checkpoint directories used in ensemble')
parser.add_argument('--eval', action='store_true',
help='compute accuracy metrics against labels (dev set)')
parser.add_argument('--outdir',
help='directory to place ensembled predictions in')
parser.add_argument('--prediction-name', default='test_predictions.pt',
help='name of predictions in checkpoint directories')
parser.add_argument('--calc-threshold', action='store_true',
help='calculate threshold classification')
parser.add_argument('--one-threshold', action='store_true',
help='use on threshold for all subdatasets')
parser.add_argument('--threshold', nargs='+', default=None, type=float,
help='user supplied threshold for classification')
parser.add_argument('--labels', nargs='+', default=None,
help='whitespace separated list of label names')
args = parser.parse_args()
ensemble_predictions(args)
if __name__ == '__main__':
main()
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Evaluation utilities."""
import os
import time
import torch
from megatron import get_args
from megatron import mpu
from megatron import print_rank_0
from tasks.finetune_utils import build_data_loader
from tasks.finetune_utils import process_batch
def accuracy_func_provider(single_dataset_provider):
"""Provide function that calculates accuracies."""
args = get_args()
# Build dataloaders.
datapaths = args.valid_data
dataloaders = []
for datapath in datapaths:
dataset = single_dataset_provider(datapath)
dataloader = build_data_loader(
dataset, args.batch_size, num_workers=args.num_workers,
drop_last=(mpu.get_data_parallel_world_size() > 1))
dataloaders.append((dataset.dataset_name, dataloader))
def metrics_func(model, epoch, output_predictions=False):
print_rank_0('calculating metrics ...')
correct = 0
total = 0
if output_predictions:
assert mpu.get_data_parallel_world_size() == 1
named_predictions = []
names = 'predictions'
for name, dataloader in dataloaders:
output = calculate_correct_answers(name, model, dataloader,
epoch, output_predictions)
if not output_predictions:
correct_ans, total_count = output
else:
correct_ans, total_count, predictions = output
named_predictions.append((name, predictions))
names += '_' + name
correct += correct_ans
total += total_count
percent = float(correct) * 100.0 / float(total)
print_rank_0(' >> |epoch: {}| overall: correct / total = {} / {} = '
'{:.4f} %'.format(epoch, correct, total, percent))
if output_predictions and torch.distributed.get_rank() == 0:
assert args.load is not None
filename = os.path.join(args.load, names + '.pt')
torch.save(named_predictions, filename)
return metrics_func
def calculate_correct_answers(name, model, dataloader,
epoch, output_predictions):
"""Calculate correct over total answers and return prediction if the
`output_predictions` is true."""
start_time = time.time()
model.eval()
with torch.no_grad():
# For all the batches in the dataset.
total = 0
correct = 0
if output_predictions:
# This option is only possible when data parallel size is 1.
assert mpu.get_data_parallel_world_size() == 1
softmaxes = []
labels = []
ids = []
for _, batch in enumerate(dataloader):
# Run the model forward.
tokens, types, labels_, attention_mask = process_batch(batch)
logits = model(tokens, attention_mask, types)
# Add output predictions.
if output_predictions:
softmaxes.extend(torch.nn.Softmax(dim=-1)(
logits.float()).data.cpu().numpy().tolist())
labels.extend(labels_.data.cpu().numpy().tolist())
ids.extend(batch['uid'].cpu().numpy().tolist())
# Compute the correct answers.
predicted = torch.argmax(logits, dim=-1)
corrects = (predicted == labels_)
# Add to the counters.
total += labels_.size(0)
correct += corrects.sum().item()
model.train()
# Reduce.
unreduced = torch.cuda.LongTensor([correct, total])
torch.distributed.all_reduce(unreduced,
group=mpu.get_data_parallel_group())
# Print on screen.
correct_ans = unreduced[0].item()
total_count = unreduced[1].item()
percent = float(correct_ans) * 100.0 / float(total_count)
elapsed_time = time.time() - start_time
print_rank_0(' > |epoch: {}| metrics for {}: correct / total '
'= {} / {} = {:.4f} %, elapsed time (sec): {:.3f}'.format(
epoch, name, correct_ans, total_count,
percent, elapsed_time))
if output_predictions:
return correct_ans, total_count, (softmaxes, labels, ids)
return correct_ans, total_count
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment