Commit f1ad8c94 authored by Neel Kant's avatar Neel Kant
Browse files

Merge staging into ict-stable

parents 8ba76558 c99fa80c
......@@ -65,7 +65,7 @@ def set_global_variables(extra_args_provider=None, args_defaults={}):
"""Set args, tokenizer, tensorboard-writer, adlr-autoresume, and timers."""
args = _parse_args(extra_args_provider=extra_args_provider,
defaults=args_defaults)
_build_tokenizer(args)
_ = _build_tokenizer(args)
_set_tensorboard_writer(args)
_set_adlr_autoresume(args)
_set_timers()
......@@ -85,6 +85,13 @@ def _build_tokenizer(args):
global _GLOBAL_TOKENIZER
_ensure_var_is_not_initialized(_GLOBAL_TOKENIZER, 'tokenizer')
_GLOBAL_TOKENIZER = build_tokenizer(args)
return _GLOBAL_TOKENIZER
def rebuild_tokenizer(args):
global _GLOBAL_TOKENIZER
_GLOBAL_TOKENIZER = None
return _build_tokenizer(args)
def _set_tensorboard_writer(args):
......
......@@ -59,6 +59,7 @@ def _initialize_distributed():
"""Initialize torch.distributed and mpu."""
args = get_args()
device_count = torch.cuda.device_count()
if torch.distributed.is_initialized():
if args.rank == 0:
......@@ -66,23 +67,25 @@ def _initialize_distributed():
'skipping initialization ...', flush=True)
args.rank = torch.distributed.get_rank()
args.world_size = torch.distributed.get_world_size()
device = torch.cuda.current_device()
local_rank = args.rank % torch.cuda.device_count()
assert local_rank == device, \
'expected local-rank to be the same as rank % device-count.'
if device_count > 0:
device = torch.cuda.current_device()
local_rank = args.rank % device_count
assert local_rank == device, \
'expected local-rank to be the same as rank % device-count.'
else:
if args.rank == 0:
print('> initializing torch distributed ...', flush=True)
# Manually set the device ids.
device = args.rank % torch.cuda.device_count()
if args.local_rank is not None:
assert args.local_rank == device, \
'expected local-rank to be the same as rank % device-count.'
else:
args.local_rank = device
torch.cuda.set_device(device)
if device_count > 0:
device = args.rank % device_count
if args.local_rank is not None:
assert args.local_rank == device, \
'expected local-rank to be the same as rank % device-count.'
else:
args.local_rank = device
torch.cuda.set_device(device)
# Call the init process
init_method = 'tcp://'
master_ip = os.getenv('MASTER_ADDR', 'localhost')
......@@ -94,7 +97,8 @@ def _initialize_distributed():
init_method=init_method)
# Set the model-parallel / data-parallel communicators.
mpu.initialize_model_parallel(args.model_parallel_size)
if device_count > 0:
mpu.initialize_model_parallel(args.model_parallel_size)
def _init_autoresume():
......@@ -112,7 +116,8 @@ def _set_random_seed(seed):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
mpu.model_parallel_cuda_manual_seed(seed)
if torch.cuda.device_count() > 0:
mpu.model_parallel_cuda_manual_seed(seed)
else:
raise ValueError('Seed ({}) should be a positive integer.'.format(seed))
......
......@@ -102,6 +102,7 @@ class ParallelSelfAttention(MegatronModule):
output_layer_init_method, layer_number):
super(ParallelSelfAttention, self).__init__()
args = get_args()
self.fp16 = args.fp16
self.attention_mask_func = attention_mask_func
self.apply_query_key_layer_scaling = args.apply_query_key_layer_scaling
......@@ -244,7 +245,7 @@ class ParallelSelfAttention(MegatronModule):
query_layer, key_layer)
# fp32 conversion.
if self.attention_softmax_in_fp32:
if self.fp16 and self.attention_softmax_in_fp32:
attention_scores = attention_scores.float()
# Apply attention mask. [b, np, s, s]
......@@ -267,7 +268,7 @@ class ParallelSelfAttention(MegatronModule):
attention_probs = self._get_attention_probs(attention_scores)
# fp16 conversion
if self.attention_softmax_in_fp32:
if self.fp16 and self.attention_softmax_in_fp32:
attention_probs = attention_probs.half()
# Context layer. [b, s, hp]
......
......@@ -37,11 +37,12 @@ from megatron.learning_rates import AnnealingLR
from megatron.model import DistributedDataParallel as LocalDDP
from megatron.model import get_params_for_weight_decay_optimization
from megatron.utils import check_adlr_autoresume_termination
from megatron.utils import make_data_loader
from megatron.utils import report_memory
def pretrain(train_val_test_data_provider, model_provider, forward_step_func,
extra_args_provider=None, args_defaults={}):
def pretrain(train_valid_test_dataset_provider, model_provider,
forward_step_func, extra_args_provider=None, args_defaults={}):
"""Main training program.
This function will run the followings in the order provided:
......@@ -51,9 +52,9 @@ def pretrain(train_val_test_data_provider, model_provider, forward_step_func,
4) train the modle using the forward_step_func.
Arguments:
train_val_test_data_provider: a function that builds datasets
and returns `train, val, test` dataloaders.
model_provider: a function that returns a vanilla version of the
train_valid_test_dataset_provider: a function that takes the size of
train/valid/test dataset and returns `train, valid, test` datasets.
model_provider: a function that returns a vanilla version of the
model. By vanilla we mean a simple model on cpu with no fp16 or ddp.
forward_step_func: a function that takes a `data iterator` and `model`,
and returns a `loss` scalar with a dictionary with key:values being
......@@ -78,35 +79,28 @@ def pretrain(train_val_test_data_provider, model_provider, forward_step_func,
timers('model and optimizer').stop()
# Data stuff.
timers('train/valid/test dataset').start()
train_data, val_data, test_data = train_val_test_data_provider()
timers('train/valid/test dataset').stop()
# Train, validation, and test data.
timers('train/valid/test dataloader').start()
train_data_iterator, val_data_iterator, \
test_data_iterator = get_train_val_test_data_iterators(train_data,
val_data,
test_data)
timers('train/valid/test dataloader').stop()
timers('train/valid/test data iterators').start()
train_data_iterator, valid_data_iterator, test_data_iterator \
= build_train_valid_test_data_iterators(
train_valid_test_dataset_provider)
timers('train/valid/test data iterators').stop()
# Print setup timing.
print_rank_0('done with setups ...')
timers.log(['model and optimizer', 'train/valid/test dataset',
'train/valid/test dataloader'])
timers.log(['model and optimizer', 'train/valid/test data iterators'])
print_rank_0('training ...')
iteration = 0
if args.do_train and args.train_iters > 0:
iteration, _ = train(forward_step_func,
model, optimizer, lr_scheduler,
train_data_iterator, val_data_iterator)
train_data_iterator, valid_data_iterator)
if args.do_valid:
prefix = 'the end of training for val data'
evaluate_and_print_results(prefix, forward_step_func,
val_data_iterator, model,
valid_data_iterator, model,
iteration, False)
if args.save and iteration != 0:
......@@ -151,8 +145,7 @@ def get_model(model_provider_func):
return model
raise NotImplementedError('Unknown DDP implementation specified: {}. '
'Exiting.'.format(args.DDP_impl))
sys.exit()
'Exiting.'.format(args.DDP_impl))
def get_optimizer(model):
......@@ -354,7 +347,7 @@ def training_log(loss_dict, total_loss_dict, learning_rate, iteration,
def train(forward_step_func, model, optimizer, lr_scheduler,
train_data_iterator, val_data_iterator):
train_data_iterator, valid_data_iterator):
"""Train the model function."""
args = get_args()
timers = get_timers()
......@@ -381,9 +374,12 @@ def train(forward_step_func, model, optimizer, lr_scheduler,
iteration += 1
# Logging.
loss_scale = None
if args.fp16:
loss_scale = optimizer.loss_scale
report_memory_flag = training_log(loss_dict, total_loss_dict,
optimizer.param_groups[0]['lr'],
iteration, optimizer.loss_scale,
iteration, loss_scale,
report_memory_flag)
# Autoresume
......@@ -402,7 +398,7 @@ def train(forward_step_func, model, optimizer, lr_scheduler,
args.do_valid:
prefix = 'iteration {}'.format(iteration)
evaluate_and_print_results(prefix, forward_step_func,
val_data_iterator, model,
valid_data_iterator, model,
iteration, False)
if args.exit_interval and iteration % args.exit_interval == 0:
......@@ -471,37 +467,87 @@ def evaluate_and_print_results(prefix, forward_step_func,
print_rank_0('-' * length)
def get_train_val_test_data_iterators(train_data, val_data, test_data):
"""Build train/validation/test iterators"""
def build_train_valid_test_data_iterators(
build_train_valid_test_datasets_provider):
"""XXX"""
args = get_args()
(train_dataloader, valid_dataloader, test_dataloader) = (None, None, None)
print_rank_0('> building train, validation, and test datasets ...')
# Data loader only on rank 0 of each model parallel group.
if mpu.get_model_parallel_rank() == 0:
# Rank, size, and global batch size.
data_parallel_size = mpu.get_data_parallel_world_size()
global_batch_size = args.batch_size * data_parallel_size
# Number of train/valid/test samples.
train_iters = args.train_iters
eval_iters = (train_iters // args.eval_interval + 1) * args.eval_iters
test_iters = args.eval_iters
train_val_test_num_samples = [train_iters * global_batch_size,
eval_iters * global_batch_size,
test_iters * global_batch_size]
print_rank_0(' > datasets target sizes (minimum size):')
print_rank_0(' train: {}'.format(train_val_test_num_samples[0]))
print_rank_0(' validation: {}'.format(train_val_test_num_samples[1]))
print_rank_0(' test: {}'.format(train_val_test_num_samples[2]))
# Build the datasets.
train_ds, valid_ds, test_ds = build_train_valid_test_datasets_provider(
train_val_test_num_samples)
# Build dataloders.
train_dataloader = make_data_loader(train_ds)
valid_dataloader = make_data_loader(valid_ds)
test_dataloader = make_data_loader(test_ds)
# Flags to know if we need to do training/validation/testing.
do_train = train_dataloader is not None and args.train_iters > 0
do_valid = valid_dataloader is not None and args.eval_iters > 0
do_test = test_dataloader is not None and args.eval_iters > 0
# Need to broadcast num_tokens and num_type_tokens.
flags = torch.cuda.LongTensor(
[int(do_train), int(do_valid), int(do_test)])
else:
flags = torch.cuda.LongTensor([0, 0, 0])
# Broadcast num tokens.
torch.distributed.broadcast(flags,
mpu.get_model_parallel_src_rank(),
group=mpu.get_model_parallel_group())
args.do_train = flags[0].item()
args.do_valid = flags[1].item()
args.do_test = flags[2].item()
# Shift the start iterations.
if train_data is not None:
train_data.batch_sampler.start_iter = args.iteration % \
len(train_data)
if train_dataloader is not None:
train_dataloader.batch_sampler.start_iter = args.iteration % \
len(train_dataloader)
print_rank_0('setting training data start iteration to {}'.
format(train_data.batch_sampler.start_iter))
if val_data is not None:
format(train_dataloader.batch_sampler.start_iter))
if valid_dataloader is not None:
start_iter_val = (args.iteration // args.eval_interval) * \
args.eval_iters
val_data.batch_sampler.start_iter = 0
valid_dataloader.batch_sampler.start_iter = start_iter_val % \
len(valid_dataloader)
print_rank_0('setting validation data start iteration to {}'.
format(val_data.batch_sampler.start_iter))
format(valid_dataloader.batch_sampler.start_iter))
if train_data is not None:
train_data_iterator = iter(train_data)
# Build iterators.
if train_dataloader is not None:
train_data_iterator = iter(train_dataloader)
else:
train_data_iterator = None
if val_data is not None:
val_data_iterator = iter(val_data)
if valid_dataloader is not None:
valid_data_iterator = iter(valid_dataloader)
else:
val_data_iterator = None
valid_data_iterator = None
if test_data is not None:
test_data_iterator = iter(test_data)
if test_dataloader is not None:
test_data_iterator = iter(test_dataloader)
else:
test_data_iterator = None
return train_data_iterator, val_data_iterator, test_data_iterator
return train_data_iterator, valid_data_iterator, test_data_iterator
# coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import numpy as np
import time
import os
import sys
from tokenizer import Tokenizer
def tokenize_corpus(filename, np_filename, print_interval=10000):
print(' > tokenizing {}'.format(filename))
tokenizer = Tokenizer(cache_dir='./cache')
tokenized_docs = []
num_docs = 0
num_tokens = 0
start_time = time.time()
with open(filename, 'r') as f:
for line in f:
try:
myjson = json.loads(line)
url = myjson['url']
sample = myjson['text']
tokens = tokenizer.tokenize_document(sample)
tokenized_docs.append(np.array(tokens, dtype=np.uint16))
num_docs += 1
num_tokens += len(tokens)
if num_docs % print_interval == 0:
print(' processed {:9d} documents in {:.2f} (s) so far'.
format(num_docs, time.time() - start_time),
flush=True)
except Exception as e:
print(' skipping ', line, e)
print(' >> processed {} document with total of {} tokens ...'.format(
num_docs, num_tokens))
tokenized_docs = np.array(tokenized_docs, dtype=object)
np.save(np_filename, tokenized_docs, allow_pickle=True)
print(' >> saved the tokenzed document to {} ...'.format(np_filename))
if __name__ == '__main__':
print('building gpt2 dataset ...')
path = sys.argv[1]
shard = sys.argv[2]
input_filename = os.path.join(path,
'shards/shard_{:04d}'.format(int(shard)))
output_filename = os.path.join(path,
'npys/shard_{:04d}.npy'.format(int(shard)))
print('will be reading {}'.format(input_filename))
print('and will write the results to {}'.format(output_filename))
tokenize_corpus(input_filename, output_filename)
import glob
import json
import os
import time
import sys
import numpy as np
if __name__ == '__main__':
print('building the shard sizes ...')
path = sys.argv[1]
print('> reading numpy files from {}'.format(path))
npy_files = glob.glob(path + '/*.npy')
npy_files.sort()
print(' found {} numpy files'.format(len(npy_files)))
size_dict = {}
counter = 0
start_time = time.time()
for filename in npy_files:
data = np.load(filename, allow_pickle=True)
size = np.hstack(data).size
np_filename = os.path.basename(filename)
size_dict[np_filename] = size
counter += 1
if counter % 10 == 0:
print(' processed {} files in {:.2f} seconds'.format(
counter, time.time() - start_time))
output_filename = os.path.join(path, 'sizes.txt')
with open(output_filename, 'w') as f:
json.dump(size_dict, f)
print('> wrote sizes to {}'.format(output_filename))
#!/bin/bash
echo "processing gpt2 data ..."
DIR="/raid/mpatwary/redownload_v0/0-21"
for thread in {0..3}; do
echo " launching thread "$thread && python make_gpt2_dataset.py $DIR $thread > $DIR/logs/shard_$thread.log 2>&1 &
done
# coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
sys.path.append('..')
from megatron.data_utils.tokenization_gpt2 import GPT2Tokenizer
class Tokenizer:
def __init__(self, cache_dir=None):
self.tokenizer = GPT2Tokenizer.from_pretrained('gpt2',
cache_dir=cache_dir)
self.tokenizer.max_len = int(1e12)
self.eod_token = self.tokenizer.encoder['<|endoftext|>']
assert self.eod_token < 65535, 'vocab size will not fit in uint16'
print('> GPT2 tokenizer with {} vocab size and eod token {} ...'.format(
len(self.tokenizer.encoder), self.eod_token))
def tokenize_document(self, document):
tokens = self.tokenizer.encode(document)
tokens.append(self.eod_token)
return tokens
......@@ -25,13 +25,11 @@ from megatron import print_rank_0
from megatron.data.bert_dataset import build_train_valid_test_datasets
from megatron.model import BertModel
from megatron.training import pretrain
from megatron.utils import make_data_loader
from megatron.utils import reduce_losses
def model_provider():
"""Build the model."""
args = get_args()
print_rank_0('building BERT model ...')
......@@ -44,6 +42,7 @@ def model_provider():
def get_batch(data_iterator):
"""Build the batch."""
# Items and their type.
keys = ['text', 'types', 'labels', 'is_random', 'loss_mask', 'padding_mask']
......@@ -96,70 +95,28 @@ def forward_step(data_iterator, model):
return loss, {'lm loss': reduced_losses[0], 'sop loss': reduced_losses[1]}
def get_train_val_test_data():
"""Load the data on rank zero and boradcast number of tokens to all GPUS."""
def train_valid_test_datasets_provider(train_val_test_num_samples):
"""Build train, valid, and test datasets."""
args = get_args()
(train_data, valid_data, test_data) = (None, None, None)
# Data loader only on rank 0 of each model parallel group.
if mpu.get_model_parallel_rank() == 0:
print_rank_0('> building train, validation, and test datasets '
'for BERT ...')
data_parallel_size = mpu.get_data_parallel_world_size()
data_parallel_rank = mpu.get_data_parallel_rank()
global_batch_size = args.batch_size * data_parallel_size
# Number of train/valid/test samples.
train_iters = args.train_iters
eval_iters = (train_iters // args.eval_interval + 1) * args.eval_iters
test_iters = args.eval_iters
train_val_test_num_samples = [train_iters * global_batch_size,
eval_iters * global_batch_size,
test_iters * global_batch_size]
print_rank_0(' > datasets target sizes (minimum size):')
print_rank_0(' train: {}'.format(train_val_test_num_samples[0]))
print_rank_0(' validation: {}'.format(train_val_test_num_samples[1]))
print_rank_0(' test: {}'.format(train_val_test_num_samples[2]))
train_ds, valid_ds, test_ds = build_train_valid_test_datasets(
data_prefix=args.data_path,
data_impl=args.data_impl,
splits_string=args.split,
train_valid_test_num_samples=train_val_test_num_samples,
max_seq_length=args.seq_length,
masked_lm_prob=args.mask_prob,
short_seq_prob=args.short_seq_prob,
seed=args.seed,
skip_warmup=(not args.mmap_warmup))
print_rank_0("> finished creating BERT datasets ...")
train_data = make_data_loader(train_ds)
valid_data = make_data_loader(valid_ds)
test_data = make_data_loader(test_ds)
do_train = train_data is not None and args.train_iters > 0
do_valid = valid_data is not None and args.eval_iters > 0
do_test = test_data is not None and args.eval_iters > 0
# Need to broadcast num_tokens and num_type_tokens.
flags = torch.cuda.LongTensor(
[int(do_train), int(do_valid), int(do_test)])
else:
flags = torch.cuda.LongTensor([0, 0, 0])
# Broadcast num tokens.
torch.distributed.broadcast(flags,
mpu.get_model_parallel_src_rank(),
group=mpu.get_model_parallel_group())
args.do_train = flags[0].item()
args.do_valid = flags[1].item()
args.do_test = flags[2].item()
print_rank_0('> building train, validation, and test datasets '
'for BERT ...')
train_ds, valid_ds, test_ds = build_train_valid_test_datasets(
data_prefix=args.data_path,
data_impl=args.data_impl,
splits_string=args.split,
train_valid_test_num_samples=train_val_test_num_samples,
max_seq_length=args.seq_length,
masked_lm_prob=args.mask_prob,
short_seq_prob=args.short_seq_prob,
seed=args.seed,
skip_warmup=(not args.mmap_warmup))
print_rank_0("> finished creating BERT datasets ...")
return train_data, valid_data, test_data
return train_ds, valid_ds, test_ds
if __name__ == "__main__":
pretrain(get_train_val_test_data, model_provider, forward_step,
pretrain(train_valid_test_datasets_provider, model_provider, forward_step,
args_defaults={'tokenizer_type': 'BertWordPieceLowerCase'})
......@@ -15,8 +15,6 @@
"""Pretrain GPT2"""
import os
import torch
from megatron import get_args
......@@ -24,17 +22,15 @@ from megatron import get_timers
from megatron import get_tokenizer
from megatron import mpu
from megatron import print_rank_0
from megatron.data.gpt2_dataset import GPT2Dataset
from megatron.data.gpt2_dataset import build_train_valid_test_datasets
from megatron.model import GPT2Model
from megatron.training import pretrain
from megatron.utils import get_ltor_masks_and_position_ids
from megatron.utils import make_data_loader
from megatron.utils import reduce_losses
def model_provider():
"""Build the model."""
args = get_args()
print_rank_0('building GPT2 model ...')
model = GPT2Model(num_tokentypes=0, parallel_output=True)
......@@ -98,71 +94,26 @@ def forward_step(data_iterator, model):
return loss, {'lm loss': reduced_loss[0]}
def make_gpt2_dataloaders():
"""Build gpt2 dataloders."""
args = get_args()
# Input parameters.
input_data_sizes_file = args.input_data_sizes_file
seq_length = args.seq_length
initial_seed = args.seed
# Build the datasets.
def _build_dataset(name):
return GPT2Dataset(os.path.join(args.data_path, name),
args.input_data_sizes_file,
args.seq_length, args.seed)
train_ds = _build_dataset('train')
valid_ds = _build_dataset('valid')
test_ds = _build_dataset('test')
# Dataloaders
train = make_data_loader(train_ds)
valid = make_data_loader(valid_ds)
test = make_data_loader(test_ds)
args.do_train = False
args.do_valid = False
args.do_test = False
if train is not None:
args.do_train = True
if valid is not None:
args.do_valid = True
if test is not None:
args.do_test = True
return (train, valid, test)
def get_train_val_test_data():
"""Load the data on rank zero and boradcast number of tokens to all GPUS."""
def train_valid_test_datasets_provider(train_val_test_num_samples):
"""Build train, valid, and test datasets."""
args = get_args()
(train_data, val_data, test_data) = (None, None, None)
# Data loader only on rank 0 of each model parallel group.
if mpu.get_model_parallel_rank() == 0:
(train_data, val_data, test_data) = make_gpt2_dataloaders()
flags = torch.cuda.LongTensor([int(args.do_train),
int(args.do_valid),
int(args.do_test)])
else:
flags = torch.cuda.LongTensor([0, 0, 0])
# Broadcast num tokens.
torch.distributed.broadcast(flags,
mpu.get_model_parallel_src_rank(),
group=mpu.get_model_parallel_group())
args.do_train = flags[0].item()
args.do_valid = flags[1].item()
args.do_test = flags[2].item()
print_rank_0('> building train, validation, and test datasets '
'for GPT2 ...')
train_ds, valid_ds, test_ds = build_train_valid_test_datasets(
data_prefix=args.data_path,
data_impl=args.data_impl,
splits_string=args.split,
train_valid_test_num_samples=train_val_test_num_samples,
seq_length=args.seq_length,
seed=args.seed,
skip_warmup=(not args.mmap_warmup))
print_rank_0("> finished creating GPT2 datasets ...")
return train_data, val_data, test_data
return train_ds, valid_ds, test_ds
if __name__ == "__main__":
pretrain(get_train_val_test_data, model_provider, forward_step,
pretrain(train_valid_test_datasets_provider, model_provider, forward_step,
args_defaults={'tokenizer_type': 'GPT2BPETokenizer'})
import os
import argparse
import collections
import numpy as np
import torch
def process_files(args):
all_predictions = collections.OrderedDict()
all_labels = collections.OrderedDict()
all_uid = collections.OrderedDict()
for path in args.paths:
path = os.path.join(path, args.prediction_name)
try:
data = torch.load(path)
for dataset in data:
name, d = dataset
predictions, labels, uid = d
if name not in all_predictions:
all_predictions[name] = np.array(predictions)
if args.labels is None:
args.labels = [i for i in range(all_predictions[name].shape[1])]
if args.eval:
all_labels[name] = np.array(labels)
all_uid[name] = np.array(uid)
else:
all_predictions[name] += np.array(predictions)
assert np.allclose(all_uid[name], np.array(uid))
except Exception as e:
print(e)
continue
return all_predictions, all_labels, all_uid
def get_threshold(all_predictions, all_labels, one_threshold=False):
if one_threshold:
all_predictons = {'combined': np.concatenate(list(all_predictions.values()))}
all_labels = {'combined': np.concatenate(list(all_predictions.labels()))}
out_thresh = []
for dataset in all_predictions:
preds = all_predictions[dataset]
labels = all_labels[dataset]
out_thresh.append(calc_threshold(preds,labels))
return out_thresh
def calc_threshold(p, l):
trials = [(i)*(1./100.) for i in range(100)]
best_acc = float('-inf')
best_thresh = 0
for t in trials:
acc = ((apply_threshold(p, t).argmax(-1) == l).astype(float)).mean()
if acc > best_acc:
best_acc = acc
best_thresh = t
return best_thresh
def apply_threshold(preds, t):
assert (np.allclose(preds.sum(-1), np.ones(preds.shape[0])))
prob = preds[:,-1]
thresholded = (prob >= t).astype(int)
preds = np.zeros_like(preds)
preds[np.arange(len(thresholded)), thresholded.reshape(-1)] = 1
return preds
def threshold_predictions(all_predictions, threshold):
if len(threshold)!=len(all_predictions):
threshold = [threshold[-1]]*(len(all_predictions)-len(threshold))
for i, dataset in enumerate(all_predictions):
thresh = threshold[i]
preds = all_predictions[dataset]
all_predictions[dataset] = apply_threshold(preds, thresh)
return all_predictions
def postprocess_predictions(all_predictions, all_labels, args):
for d in all_predictions:
all_predictions[d] = all_predictions[d]/len(args.paths)
if args.calc_threshold:
args.threshold = get_threshold(all_predictions, all_labels, args.one_threshold)
print('threshold', args.threshold)
if args.threshold is not None:
all_predictions = threshold_predictions(all_predictions, args.threshold)
return all_predictions, all_labels
def write_predictions(all_predictions, all_labels, all_uid, args):
all_correct = 0
count = 0
for dataset in all_predictions:
preds = all_predictions[dataset]
preds = np.argmax(preds, -1)
if args.eval:
correct = (preds == all_labels[dataset]).sum()
num = len(all_labels[dataset])
accuracy = correct/num
count += num
all_correct += correct
accuracy = (preds == all_labels[dataset]).mean()
print(accuracy)
if not os.path.exists(os.path.join(args.outdir, dataset)):
os.makedirs(os.path.join(args.outdir, dataset))
outpath = os.path.join(args.outdir, dataset, os.path.splitext(args.prediction_name)[0]+'.tsv')
with open(outpath, 'w') as f:
f.write('id\tlabel\n')
f.write('\n'.join(str(uid)+'\t'+str(args.labels[p]) for uid, p in zip(all_uid[dataset], preds.tolist())))
if args.eval:
print(all_correct/count)
def ensemble_predictions(args):
all_predictions, all_labels, all_uid = process_files(args)
all_predictions, all_labels = postprocess_predictions(all_predictions, all_labels, args)
write_predictions(all_predictions, all_labels, all_uid, args)
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--paths', required=True, nargs='+',
help='paths to checkpoint directories used in ensemble')
parser.add_argument('--eval', action='store_true',
help='compute accuracy metrics against labels (dev set)')
parser.add_argument('--outdir',
help='directory to place ensembled predictions in')
parser.add_argument('--prediction-name', default='test_predictions.pt',
help='name of predictions in checkpoint directories')
parser.add_argument('--calc-threshold', action='store_true',
help='calculate threshold classification')
parser.add_argument('--one-threshold', action='store_true',
help='use on threshold for all subdatasets')
parser.add_argument('--threshold', nargs='+', default=None, type=float,
help='user supplied threshold for classification')
parser.add_argument('--labels',nargs='+', default=None,
help='whitespace separated list of label names')
args = parser.parse_args()
ensemble_predictions(args)
if __name__ == '__main__':
main()
\ No newline at end of file
......@@ -15,6 +15,11 @@
"""Sample Generate GPT2"""
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),
os.path.pardir)))
from megatron import get_args
from megatron import get_tokenizer
from megatron import print_rank_0
......
# coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Merge model parallel partitions."""
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__),
os.path.pardir)))
import torch
from arguments import get_args
from megatron import mpu
from megatron.utils import ensure_directory_exists
from megatron.utils import get_checkpoint_name
from megatron.utils import get_checkpoint_tracker_filename
from megatron.utils import vocab_size_with_padding
from megatron.checkpointing import ensure_directory_exists
from megatron.checkpointing import get_checkpoint_name
from megatron.checkpointing import get_checkpoint_tracker_filename
from megatron.global_vars import rebuild_tokenizer
from megatron.global_vars import _parse_args
def split_into_partitions(tensor, num_partitions, partition_dim, stride):
......@@ -84,21 +104,26 @@ def merge_partitions(merged, partitions, partition_dim, stride):
return
def get_model(model_type, args):
def get_model(model_type):
if model_type == 'BERT':
from pretrain_albert import model_provider
args.tokentype_size = 2
elif model_type == 'GPT':
from pretrain_bert import model_provider
elif model_type == 'GPT2':
from pretrain_gpt2 import model_provider
elif model_type == 'RACE':
from tasks.race.finetune import model_provider
elif model_type == ['MNLI', 'QQP']:
num_classes = 2
if model_type == 'MNLI':
num_classes = 3
from megatron.model.classification import Classification
def model_provider():
return Classification(num_classes=num_classes, num_tokentypes=2)
else:
raise Exception('unrecognized model type: {}'.format(model_type))
orig_vocab_size = args.vocab_size
args.vocab_size = vocab_size_with_padding(args.vocab_size, args)
model = model_provider(args)
model = model_provider()
model = model.half()
args.vocab_size = orig_vocab_size
return model
......@@ -147,17 +172,32 @@ def test_split_merge():
print(' > max error (should be zero): {}'.format(max_error))
def main(model_type):
def get_mp_merge_args(parser):
"""Provide extra arguments required for merging."""
group = parser.add_argument_group(title='mp merge')
group.add_argument('--model-type', type=str, required=True,
choices=['BERT', 'GPT2', 'RACE', 'MNLI', 'QQP'],
help='Type of the mdoel.')
return parser
def main():
# Args
args = get_args()
args = _parse_args(extra_args_provider=get_mp_merge_args)
model_type = args.model_type
orig_model_parallel_size = args.model_parallel_size
args.model_parallel_size = 1
tokenizer = rebuild_tokenizer(args)
print('\n merging model parallel partitions ...')
assert args.vocab_size is not None
print(' > number of partitions: {}'.format(args.model_parallel_size))
print(' > number of partitions: {}'.format(orig_model_parallel_size))
print(' > checkpoint path: {}'.format(args.load))
print(' > model parameters:')
print(' number of tokens ................ {} '.format(args.vocab_size))
print(' number of tokens ................ {} '.format(
tokenizer.vocab_size))
print(' number of layers ................ {}'.format(args.num_layers))
print(' hidden sise ..................... {}'.format(args.hidden_size))
print(' number of attention heads ....... {}'.format(
......@@ -169,17 +209,19 @@ def main(model_type):
print('> building the full model ...')
mpu.initialize.set_model_parallel_world_size(1)
mpu.initialize.set_model_parallel_rank(0)
merged_model = get_model(model_type, args)
merged_model = get_model(model_type)
# Build and load partitions.
partitions = []
iteration = 0
args.model_parallel_size = orig_model_parallel_size
tokenizer = rebuild_tokenizer(args)
mpu.initialize.set_model_parallel_world_size(args.model_parallel_size)
for rank in range(args.model_parallel_size):
mpu.initialize.set_model_parallel_rank(rank)
checkpoint_name, iteration = get_parallel_checkpoint_name(args.load)
print('> loading {} ...'.format(checkpoint_name))
model_ = get_model(model_type, args)
model_ = get_model(model_type)
sd = torch.load(checkpoint_name, map_location='cpu')
model_.load_state_dict(sd['model'])
partitions.append(model_)
......@@ -225,6 +267,7 @@ def main(model_type):
# Save the model.
args.model_parallel_size = 1
mpu.initialize.set_model_parallel_rank(0)
sd = {}
sd['model'] = merged_model.state_dict_for_save_checkpoint()
......@@ -240,4 +283,4 @@ def main(model_type):
if __name__ == '__main__':
main('BERT')
main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment