Commit 6c521eb1 authored by Jared Casper's avatar Jared Casper
Browse files

Merge branch 'staging_args' into 'staging'

Major refactor: args, global variables, tokenizer

See merge request ADLR/megatron-lm!36
parents 37ff534f b07f1d0a
......@@ -36,7 +36,7 @@ from megatron.utils import Timers
from megatron.utils import load_checkpoint
from megatron.utils import report_memory
from megatron.utils import print_params_min_max_norm
from megatron.utils import print_rank_0
from megatron import print_rank_0
from megatron.data_utils import make_tokenizer
......
......@@ -37,7 +37,7 @@ from megatron import mpu
from megatron.fp16 import FP16_Module
from megatron.model import GPT2Model
from megatron.model import DistributedDataParallel as DDP
from megatron.utils import print_rank_0
from megatron import print_rank_0
def get_model(args):
"""Build the model."""
......
# coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import torch
from .global_vars import get_args
from .global_vars import get_tokenizer
from .global_vars import get_tensorboard_writer
from .global_vars import get_adlr_autoresume
from .global_vars import get_timers
def print_rank_0(message):
"""If distributed is initialized print only on rank 0."""
if torch.distributed.is_initialized():
if torch.distributed.get_rank() == 0:
print(message, flush=True)
else:
print(message, flush=True)
......@@ -13,141 +13,182 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""argparser configuration"""
"""Megatron arguments."""
import argparse
import os
import torch
def add_model_config_args(parser):
"""Model arguments"""
def parse_args(extra_args_provider=None, defaults={}):
"""Parse all arguments."""
parser = argparse.ArgumentParser(description='Megatron-LM Arguments')
# Standard arguments.
parser = _add_network_size_args(parser)
parser = _add_regularization_args(parser)
parser = _add_training_args(parser)
parser = _add_initialization_args(parser)
parser = _add_learning_rate_args(parser)
parser = _add_checkpointing_args(parser)
parser = _add_mixed_precision_args(parser)
parser = _add_distributed_args(parser)
parser = _add_validation_args(parser)
parser = _add_data_args(parser)
parser = _add_autoresume_args(parser)
# TODO: Refactor
parser = _add_gpt2_args(parser)
# Custom arguments.
if extra_args_provider is not None:
parser = extra_args_provider(parser)
group = parser.add_argument_group('model', 'model configuration')
# Parse.
args = parser.parse_args()
group.add_argument('--pretrained-bert', action='store_true',
help='use a pretrained bert-large-uncased model instead'
'of initializing from scratch. See '
'--tokenizer-model-type to specify which pretrained '
'BERT model to use')
group.add_argument('--attention-dropout', type=float, default=0.1,
help='dropout probability for attention weights')
group.add_argument('--num-attention-heads', type=int, default=16,
help='num of transformer attention heads')
group.add_argument('--hidden-size', type=int, default=1024,
help='tansformer hidden size')
group.add_argument('--intermediate-size', type=int, default=None,
help='transformer embedding dimension for FFN'
'set to 4*`--hidden-size` if it is None')
group.add_argument('--num-layers', type=int, default=24,
help='num decoder layers')
group.add_argument('--layernorm-epsilon', type=float, default=1e-5,
help='layer norm epsilon')
group.add_argument('--hidden-dropout', type=float, default=0.1,
help='dropout probability for hidden state transformer')
group.add_argument('--max-position-embeddings', type=int, default=512,
help='maximum number of position embeddings to use')
group.add_argument('--vocab-size', type=int, default=None,
help='vocab size to use for non-character-level '
'tokenization. This value will only be used when '
'creating a tokenizer')
group.add_argument('--deep-init', action='store_true',
help='initialize bert model similar to gpt2 model.'
'scales initialization of projection layers by a '
'factor of 1/sqrt(2N). Necessary to train bert '
'models larger than BERT-Large.')
# Set input defaults.
for key in defaults:
# For default to be valid, it should not be provided in the
# arguments that are passed to the program. We check this by
# ensuring the arg is set to None.
assert getattr(args, key) is None, \
'defaults can only be overwritten for args with None values.'
setattr(args, key, defaults[key])
# Distributed args.
args.rank = int(os.getenv('RANK', '0'))
args.world_size = int(os.getenv("WORLD_SIZE", '1'))
args.model_parallel_size = min(args.model_parallel_size, args.world_size)
if args.rank == 0:
print('using world size: {} and model-parallel size: {} '.format(
args.world_size, args.model_parallel_size))
# Fp16 loss scaling.
args.dynamic_loss_scale = False
if args.loss_scale is None:
args.dynamic_loss_scale = True
# Checks.
assert args.hidden_size % args.num_attention_heads == 0
assert args.max_position_embeddings >= args.seq_length
assert args.min_lr <= args.lr
if args.save is not None:
assert args.save_interval is not None
_print_args(args)
return args
def _print_args(args):
"""Print arguments."""
if args.rank == 0:
print('-------------------- arguments --------------------', flush=True)
str_list = []
for arg in vars(args):
dots = '.' * (32 - len(arg))
str_list.append(' {} {} {}'.format(arg, dots, getattr(args, arg)))
for arg in sorted(str_list, key=lambda x: x.lower()):
print(arg, flush=True)
print('---------------- end of arguments ----------------', flush=True)
def _add_network_size_args(parser):
group = parser.add_argument_group(title='network size')
group.add_argument('--num-layers', type=int, required=True,
help='Number of transformer layers.')
group.add_argument('--hidden-size', type=int, required=True,
help='Tansformer hidden size.')
group.add_argument('--num-attention-heads', type=int, required=True,
help='Number of transformer attention heads.')
group.add_argument('--max-position-embeddings', type=int, required=True,
help='Maximum number of position embeddings to use. '
'This is the size of position embedding.')
group.add_argument('--make-vocab-size-divisible-by', type=int, default=128,
help='Pad the vocab size to be divisible by this value.'
'This is added for computational efficieny reasons.')
group.add_argument('--layernorm-epsilon', type=float, default=1e-5,
help='Layer norm epsilon.')
group.add_argument('--apply-residual-connection-post-layernorm',
action='store_true',
help='If set, use original BERT residula connection '
'ordering.')
return parser
def add_fp16_config_args(parser):
"""Mixed precision arguments."""
def _add_regularization_args(parser):
group = parser.add_argument_group(title='regularization')
group = parser.add_argument_group('fp16', 'fp16 configurations')
group.add_argument('--fp16', action='store_true',
help='Run model in fp16 mode')
group.add_argument('--apply-query-key-layer-scaling', action='store_true',
help='Scale Q * K^T by 1 / layer-number. If this flag '
'is set, then it will automatically set '
'attention-softmax-in-fp32 to true')
group.add_argument('--attention-softmax-in-fp32', action='store_true',
help='Run attention masking and softmax in fp32.')
group.add_argument('--fp32-embedding', action='store_true',
help='embedding in fp32')
group.add_argument('--fp32-layernorm', action='store_true',
help='layer norm in fp32')
group.add_argument('--fp32-tokentypes', action='store_true',
help='embedding token types in fp32')
group.add_argument('--fp32-allreduce', action='store_true',
help='all-reduce in fp32')
group.add_argument('--hysteresis', type=int, default=2,
help='hysteresis for dynamic loss scaling')
group.add_argument('--loss-scale', type=float, default=None,
help='Static loss scaling, positive power of 2 '
'values can improve fp16 convergence. If None, dynamic'
'loss scaling is used.')
group.add_argument('--loss-scale-window', type=float, default=1000,
help='Window over which to raise/lower dynamic scale')
group.add_argument('--min-scale', type=float, default=1,
help='Minimum loss scale for dynamic loss scale')
group.add_argument('--attention-dropout', type=float, default=0.1,
help='Post attention dropout ptobability.')
group.add_argument('--hidden-dropout', type=float, default=0.1,
help='Dropout probability for hidden state transformer.')
group.add_argument('--weight-decay', type=float, default=0.01,
help='Weight decay coefficient for L2 regularization.')
group.add_argument('--clip-grad', type=float, default=1.0,
help='Gradient clipping based on global L2 norm.')
return parser
def add_training_args(parser):
"""Training arguments."""
group = parser.add_argument_group('train', 'training configurations')
def _add_training_args(parser):
group = parser.add_argument_group(title='training')
group.add_argument('--batch-size', type=int, default=4,
help='Data Loader batch size')
group.add_argument('--weight-decay', type=float, default=0.01,
help='weight decay coefficient for L2 regularization')
group.add_argument('--batch-size', type=int, required=True,
help='Batch size per model instance (local batch size). '
'Global batch size is local batch size times data '
'parallel size.')
group.add_argument('--checkpoint-activations', action='store_true',
help='checkpoint activation to allow for training '
'with larger models and sequences')
help='Checkpoint activation to allow for training '
'with larger models, sequences, and batch sizes.')
group.add_argument('--checkpoint-num-layers', type=int, default=1,
help='chunk size (number of layers) for checkpointing')
group.add_argument('--clip-grad', type=float, default=1.0,
help='gradient clipping')
group.add_argument('--train-iters', type=int, default=1000000,
help='total number of iterations to train over all training runs')
help='chunk size (number of layers) for checkpointing.')
group.add_argument('--train-iters', type=int, default=None,
help='Total number of iterations to train over all '
'training runs.')
group.add_argument('--log-interval', type=int, default=100,
help='report interval')
help='Report loss and timing interval.')
group.add_argument('--exit-interval', type=int, default=None,
help='Exit the program after this many new iterations.')
help='Exit the program after the iteration is divisible '
'by this value.')
group.add_argument('--tensorboard-dir', type=str, default=None,
help='Write TensorBoard logs to this directory')
help='Write TensorBoard logs to this directory.')
return parser
def _add_initialization_args(parser):
group = parser.add_argument_group(title='initialization')
group.add_argument('--seed', type=int, default=1234,
help='random seed')
# Batch prodecuer arguments
group.add_argument('--reset-position-ids', action='store_true',
help='Reset posistion ids after end-of-document token.')
group.add_argument('--reset-attention-mask', action='store_true',
help='Reset self attention maske after '
'end-of-document token.')
group.add_argument('--eod-mask-loss', action='store_true',
help='Mask loss for the end of document tokens')
help='Random seed used for python, numpy, '
'pytorch, and cuda.')
group.add_argument('--init-method-std', type=float, default=0.02,
help='Standard deviation of the zero mean normal '
'distribution used for weight initialization.')
# Learning rate.
group.add_argument('--lr-decay-iters', type=int, default=None,
help='number of iterations to decay LR over,'
' If None defaults to `--train-iters`*`--epochs`')
return parser
def _add_learning_rate_args(parser):
group = parser.add_argument_group(title='learning rate')
group.add_argument('--lr', type=float, required=True,
help='Initial learning rate. Depending on decay style '
'and initial warmup, the learing rate at each '
'iteration would be different.')
group.add_argument('--lr-decay-style', type=str, default='linear',
choices=['constant', 'linear', 'cosine', 'exponential'],
help='learning rate decay function')
group.add_argument('--lr', type=float, default=1.0e-4,
help='initial learning rate')
help='Learning rate decay function.')
group.add_argument('--lr-decay-iters', type=int, default=None,
help='number of iterations to decay learning rate over,'
' If None defaults to `--train-iters`')
group.add_argument('--min-lr', type=float, default=0.0,
help='Minumum value for learning rate. The scheduler'
'clip values below this threshold.')
group.add_argument('--warmup', type=float, default=0.01,
help='percentage of data to warmup on (.01 = 1% of all '
'training iters). Default 0.01')
help='Percentage of total iterations to warmup on '
'(.01 = 1 percent of all training iters).')
group.add_argument('--override-lr-scheduler', action='store_true',
help='Reset the values of the scheduler (learning rate,'
'warmup iterations, minimum learning rate, maximum '
......@@ -158,20 +199,24 @@ def add_training_args(parser):
help='Use checkpoint to set the values of the scheduler '
'(learning rate, warmup iterations, minimum learning '
'rate, maximum number of iterations, and decay style '
'from input arguments and ignore values from '
'checkpoints. Notethat all the above values will be '
'reset.')
# model checkpointing
'from checkpoint and ignore input arguments.')
return parser
def _add_checkpointing_args(parser):
group = parser.add_argument_group(title='checkpointing')
group.add_argument('--save', type=str, default=None,
help='Output directory to save checkpoints to.')
group.add_argument('--save-interval', type=int, default=5000,
help='number of iterations between saves')
group.add_argument('--save-interval', type=int, default=None,
help='Number of iterations between checkpoint saves.')
group.add_argument('--no-save-optim', action='store_true',
help='Do not save current optimizer.')
group.add_argument('--no-save-rng', action='store_true',
help='Do not save current rng state.')
group.add_argument('--load', type=str, default=None,
help='Path to a directory containing a model checkpoint.')
help='Directory containing a model checkpoint.')
group.add_argument('--no-load-optim', action='store_true',
help='Do not load optimizer when loading checkpoint.')
group.add_argument('--no-load-rng', action='store_true',
......@@ -180,29 +225,136 @@ def add_training_args(parser):
help='Load model for finetuning. Do not load optimizer '
'or rng state from checkpoint and set iteration to 0. '
'Assumed when loading a release checkpoint.')
group.add_argument('--resume-dataloader', action='store_true',
help='Resume the dataloader when resuming training. '
'Does not apply to tfrecords dataloader, try resuming'
'with a different seed in this case.')
# distributed training args
return parser
def _add_mixed_precision_args(parser):
group = parser.add_argument_group(title='mixed precision')
group.add_argument('--fp16', action='store_true',
help='Run model in fp16 mode.')
group.add_argument('--apply-query-key-layer-scaling', action='store_true',
help='Scale Q * K^T by 1 / layer-number. If this flag '
'is set, then it will automatically set '
'attention-softmax-in-fp32 to true')
group.add_argument('--attention-softmax-in-fp32', action='store_true',
help='Run attention masking and softmax in fp32.')
group.add_argument('--fp32-allreduce', action='store_true',
help='All-reduce in fp32')
group.add_argument('--hysteresis', type=int, default=2,
help='hysteresis for dynamic loss scaling')
group.add_argument('--loss-scale', type=float, default=None,
help='Static loss scaling, positive power of 2 '
'values can improve fp16 convergence. If None, dynamic'
'loss scaling is used.')
group.add_argument('--loss-scale-window', type=float, default=1000,
help='Window over which to raise/lower dynamic scale.')
group.add_argument('--min-scale', type=float, default=1,
help='Minimum loss scale for dynamic loss scale.')
return parser
def _add_distributed_args(parser):
group = parser.add_argument_group(title='mixed precision')
group.add_argument('--model-parallel-size', type=int, default=1,
help='Size of the model parallel.')
group.add_argument('--distributed-backend', default='nccl',
help='which backend to use for distributed '
'training. One of [gloo, nccl]')
choices=['nccl', 'gloo'],
help='Which backend to use for distributed training.')
group.add_argument('--DDP-impl', default='local',
choices=['local', 'torch'],
help='which DistributedDataParallel implementation '
'to use. One of [local, torch]')
'to use.')
group.add_argument('--local_rank', type=int, default=None,
help='local rank passed from distributed launcher')
# autoresume
help='local rank passed from distributed launcher.')
return parser
def _add_validation_args(parser):
group = parser.add_argument_group(title='validation')
group.add_argument('--eval-iters', type=int, default=100,
help='Number of iterations to run for evaluation'
'validation/test for.')
group.add_argument('--eval-interval', type=int, default=1000,
help='Interval between running evaluation on '
'validation set.')
return parser
def _add_data_args(parser):
group = parser.add_argument_group(title='data and dataloader')
group.add_argument('--data-path', type=str, default=None,
help='Path to combined dataset to split.')
group.add_argument('--split', type=str, default='969, 30, 1',
help='Comma-separated list of proportions for training,'
' validation, and test split. For example the split '
'`90,5,5` will use 90% of data for training, 5% for '
'validation and 5% for test.')
group.add_argument('--vocab-file', type=str, required=True,
help='Path to the vocab file.')
group.add_argument('--merge-file', type=str, default=None,
help='Path to the BPE merge file.')
group.add_argument('--seq-length', type=int, required=True,
help="Maximum sequence length to process.")
group.add_argument('--mask-prob', type=float, default=0.15,
help='Probability of replacing a token with mask.')
group.add_argument('--short-seq-prob', type=float, default=0.1,
help='Probability of producing a short sequence.')
group.add_argument('--mmap-warmup', action='store_true',
help='Warm up mmap files.')
group.add_argument('--num-workers', type=int, default=2,
help="Dataloader number of workers.")
group.add_argument('--tokenizer-type', type=str,
default=None,
choices=['BertWordPieceLowerCase',
'GPT2BPETokenizer'],
help='What type of tokenizer to use.')
group.add_argument('--data-impl', type=str, default='infer',
choices=['lazy', 'cached', 'mmap', 'infer'],
help='Implementation of indexed datasets.')
group.add_argument('--reset-position-ids', action='store_true',
help='Reset posistion ids after end-of-document token.')
group.add_argument('--reset-attention-mask', action='store_true',
help='Reset self attention maske after '
'end-of-document token.')
group.add_argument('--eod-mask-loss', action='store_true',
help='Mask loss for the end of document tokens.')
return parser
def _add_autoresume_args(parser):
group = parser.add_argument_group(title='autoresume')
group.add_argument('--adlr-autoresume', action='store_true',
help='enable autoresume on adlr cluster.')
help='Enable autoresume on adlr cluster.')
group.add_argument('--adlr-autoresume-interval', type=int, default=1000,
help='intervals over which check for autoresume'
help='Intervals over which check for autoresume'
'termination signal')
return parser
########################################################################
def _add_gpt2_args(parser):
group = parser.add_argument_group(title='gpt2')
group.add_argument('--input-data-sizes-file', type=str, default='sizes.txt',
help='The filename containing all the shards '
'sizes for numpy data loader')
return parser
def add_evaluation_args(parser):
"""Evaluation arguments."""
......@@ -211,11 +363,6 @@ def add_evaluation_args(parser):
group.add_argument('--eval-batch-size', type=int, default=None,
help='Data Loader batch size for evaluation datasets.'
'Defaults to `--batch-size`')
group.add_argument('--eval-iters', type=int, default=100,
help='number of iterations to run for evaluation'
'validation/test for')
group.add_argument('--eval-interval', type=int, default=1000,
help='interval between running evaluation on validation set')
group.add_argument('--eval-seq-length', type=int, default=None,
help='Maximum sequence length to process for '
'evaluation. Defaults to `--seq-length`')
......@@ -263,154 +410,15 @@ def add_text_generate_args(parser):
return parser
def add_data_args(parser):
def add_data_args_(parser):
"""Train/valid/test data arguments."""
group = parser.add_argument_group('data', 'data configurations')
group.add_argument('--model-parallel-size', type=int, default=1,
help='size of the model parallel.')
group.add_argument('--shuffle', action='store_true',
help='Shuffle data. Shuffling is deterministic '
'based on seed and current epoch.')
group.add_argument('--data-loader', type=str, default=None,
choices=['raw', 'lazy', 'tfrecords', 'numpy', 'binary'],
help='Which data loader to use. Default varies by model.')
group.add_argument('--train-data', nargs='+', default=None,
help='Whitespace separated paths or corpora names '
'for training.')
group.add_argument('--valid-data', nargs='*', default=None,
help='path(s) to the validation data.')
group.add_argument('--test-data', nargs='*', default=None,
help='path(s) to the testing data.')
group.add_argument('--data-path', nargs='+', default=None,
help='path to combined dataset to split')
group.add_argument('--split', default='1000,1,1',
help='comma-separated list of proportions for training,'
' validation, and test split')
group.add_argument('--seq-length', type=int, default=512,
help="Maximum sequence length to process")
group.add_argument('--max-preds-per-seq', type=int, default=None,
help='Maximum number of predictions to use per sequence.'
'Defaults to math.ceil(`--seq-length`*.15/10)*10.'
'MUST BE SPECIFIED IF `--data-loader tfrecords`.')
# arguments for binary data loader
parser.add_argument('--vocab', type=str, default='vocab.txt',
help='path to vocab file')
parser.add_argument('--data-impl', type=str, default='infer',
help='implementation of indexed datasets',
choices=['lazy', 'cached', 'mmap', 'infer'])
parser.add_argument('--max-num-samples', type=int, default=None,
help='Maximum number of samples to plan for, defaults to total iters * batch-size.')
parser.add_argument('--data-epochs', type=int, default=None,
help='Number of epochs to plan for, defaults to using --max-num-samples')
parser.add_argument('--mask-prob', default=0.15, type=float,
help='probability of replacing a token with mask')
parser.add_argument('--short-seq-prob', default=0.1, type=float,
help='probability of producing a short sequence')
parser.add_argument('--skip-mmap-warmup', action='store_true',
help='skip warming up mmap files')
# arguments for numpy data loader
group.add_argument('--input-data-sizes-file', type=str, default='sizes.txt',
help='the filename containing all the shards sizes for numpy data loader')
# arguments for raw/tfrecords data loader
group.add_argument('--delim', default=',',
help='delimiter used to parse csv data files')
group.add_argument('--text-key', default='sentence',
help='key to use to extract text from json/csv')
group.add_argument('--eval-text-key', default=None,
help='key to use to extract text from '
'json/csv evaluation datasets')
group.add_argument('--loose-json', action='store_true',
help='Use loose json (one json-formatted string per '
'newline), instead of tight json (data file is one '
'json string)')
group.add_argument('--presplit-sentences', action='store_true',
help='Dataset content consists of documents where '
'each document consists of newline separated sentences')
group.add_argument('--num-workers', type=int, default=2,
help="""Number of workers to use for dataloading""")
group.add_argument('--tokenizer-model-type', type=str,
default='bert-large-uncased',
help="Model type to use for sentencepiece tokenization \
(one of ['bpe', 'char', 'unigram', 'word']) or \
bert vocab to use for BertWordPieceTokenizer (one of \
['bert-large-uncased', 'bert-large-cased', etc.])")
group.add_argument('--tokenizer-path', type=str, default='tokenizer.model',
help='path used to save/load sentencepiece tokenization '
'models')
group.add_argument('--tokenizer-type', type=str,
default='BertWordPieceLowerCase',
choices=['CharacterLevelTokenizer',
'SentencePieceTokenizer',
'BertWordPieceLowerCase',
'GPT2BPETokenizer'],
help='what type of tokenizer to use')
group.add_argument("--cache-dir", default=None, type=str,
help="Where to store pre-trained BERT downloads")
return parser
def get_args(extra_args_provider=None):
"""Parse all the args."""
parser = argparse.ArgumentParser(description='PyTorch BERT Model')
parser = add_model_config_args(parser)
parser = add_fp16_config_args(parser)
parser = add_training_args(parser)
parser = add_evaluation_args(parser)
parser = add_text_generate_args(parser)
parser = add_data_args(parser)
if extra_args_provider is not None:
parser = extra_args_provider(parser)
args = parser.parse_args()
if not args.train_data and not args.data_path:
print('WARNING: No training data specified')
args.cuda = torch.cuda.is_available()
args.rank = int(os.getenv('RANK', '0'))
args.world_size = int(os.getenv("WORLD_SIZE", '1'))
if os.getenv('OMPI_COMM_WORLD_LOCAL_RANK'):
# We are using (OpenMPI) mpirun for launching distributed data parallel processes
local_rank = int(os.getenv('OMPI_COMM_WORLD_LOCAL_RANK'))
local_size = int(os.getenv('OMPI_COMM_WORLD_LOCAL_SIZE'))
# Possibly running with Slurm
num_nodes = int(os.getenv('SLURM_JOB_NUM_NODES', '1'))
nodeid = int(os.getenv('SLURM_NODEID', '0'))
args.local_rank = local_rank
args.rank = nodeid*local_size + local_rank
args.world_size = num_nodes*local_size
args.model_parallel_size = min(args.model_parallel_size, args.world_size)
if args.rank == 0:
print('using world size: {} and model-parallel size: {} '.format(
args.world_size, args.model_parallel_size))
args.dynamic_loss_scale = False
if args.loss_scale is None:
args.dynamic_loss_scale = True
if args.rank == 0:
print(' > using dynamic loss scaling')
# The args fp32_* or fp16_* meant to be active when the
# args fp16 is set. So the default behaviour should all
# be false.
if not args.fp16:
args.fp32_embedding = False
args.fp32_tokentypes = False
args.fp32_layernorm = False
return args
# coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Input/output checkpointing."""
import os
import random
import sys
import numpy as np
import torch
from torch.nn.parallel.distributed import DistributedDataParallel as torchDDP
from megatron import mpu
from megatron import get_args
from megatron import print_rank_0
def check_checkpoint_args(checkpoint_args):
"""Ensure fixed arguments for a model are the same for the input
arguments and the one retreived frm checkpoint."""
args = get_args()
def _compare(arg_name):
checkpoint_value = getattr(checkpoint_args, arg_name)
args_value = getattr(args, arg_name)
error_message = '{} value from checkpoint ({}) is not equal to the ' \
'input argument value ({}).'.format(
arg_name, checkpoint_value, args_value)
assert checkpoint_value == args_value, error_message
_compare('num_layers')
_compare('hidden_size')
_compare('num_attention_heads')
_compare('max_position_embeddings')
_compare('make_vocab_size_divisible_by')
_compare('padded_vocab_size')
_compare('tokenizer_type')
_compare('model_parallel_size')
def ensure_directory_exists(filename):
"""Build filename's path if it does not already exists."""
dirname = os.path.dirname(filename)
if not os.path.exists(dirname):
os.makedirs(dirname)
def get_checkpoint_name(checkpoints_path, iteration,
release=False, mp_rank=None):
"""A unified checkpoint name."""
if release:
directory = 'release'
else:
directory = 'iter_{:07d}'.format(iteration)
return os.path.join(checkpoints_path, directory,
'mp_rank_{:02d}'.format(
mpu.get_model_parallel_rank() if mp_rank is None \
else mp_rank),
'model_optim_rng.pt')
def get_checkpoint_tracker_filename(checkpoints_path):
"""Tracker file rescords the latest chckpoint during
training to restart from."""
return os.path.join(checkpoints_path, 'latest_checkpointed_iteration.txt')
def save_checkpoint(iteration, model, optimizer, lr_scheduler):
"""Save a model checkpoint."""
args = get_args()
# Only rank zero of the data parallel writes to the disk.
if isinstance(model, torchDDP):
model = model.module
if mpu.get_data_parallel_rank() == 0:
# Arguments, iteration, and model.
state_dict = {}
state_dict['args'] = args
state_dict['iteration'] = iteration
state_dict['model'] = model.state_dict_for_save_checkpoint()
# Optimizer stuff.
if not args.no_save_optim:
if optimizer is not None:
state_dict['optimizer'] = optimizer.state_dict()
if lr_scheduler is not None:
state_dict['lr_scheduler'] = lr_scheduler.state_dict()
# RNG states.
if not args.no_save_rng:
state_dict['random_rng_state'] = random.getstate()
state_dict['np_rng_state'] = np.random.get_state()
state_dict['torch_rng_state'] = torch.get_rng_state()
state_dict['cuda_rng_state'] = torch.cuda.get_rng_state()
state_dict['rng_tracker_states'] \
= mpu.get_cuda_rng_tracker().get_states()
# Save.
checkpoint_name = get_checkpoint_name(args.save, iteration)
print('global rank {} is saving checkpoint at iteration {:7d} to {}'.
format(torch.distributed.get_rank(), iteration, checkpoint_name))
ensure_directory_exists(checkpoint_name)
torch.save(state_dict, checkpoint_name)
print(' successfully saved {}'.format(checkpoint_name))
# Wait so everyone is done (necessary)
torch.distributed.barrier()
# And update the latest iteration
if torch.distributed.get_rank() == 0:
tracker_filename = get_checkpoint_tracker_filename(args.save)
with open(tracker_filename, 'w') as f:
f.write(str(iteration))
# Wait so everyone is done (not necessary)
torch.distributed.barrier()
def load_checkpoint(model, optimizer, lr_scheduler):
"""Load a model checkpoint and return the iteration."""
args = get_args()
if isinstance(model, torchDDP):
model = model.module
# Read the tracker file and set the iteration.
tracker_filename = get_checkpoint_tracker_filename(args.load)
# If no tracker file, return iretation zero.
if not os.path.isfile(tracker_filename):
print_rank_0('WARNING: could not find the metadata file {} '.format(
tracker_filename))
print_rank_0(' will not load any checkpoints and will start from '
'random')
return 0
# Otherwise, read the tracker file and either set the iteration or
# mark it as a release checkpoint.
iteration = 0
release = False
with open(tracker_filename, 'r') as f:
metastring = f.read().strip()
try:
iteration = int(metastring)
except ValueError:
release = metastring == 'release'
if not release:
print_rank_0('ERROR: Invalid metadata file {}. Exiting'.format(
tracker_filename))
sys.exit()
assert iteration > 0 or release, 'error parsing metadata file {}'.format(
tracker_filename)
# Checkpoint.
checkpoint_name = get_checkpoint_name(args.load, iteration, release)
if mpu.get_data_parallel_rank() == 0:
print('global rank {} is loading checkpoint {}'.format(
torch.distributed.get_rank(), checkpoint_name))
# Load the checkpoint.
try:
state_dict = torch.load(checkpoint_name, map_location='cpu')
except ModuleNotFoundError:
# For backward compatibility.
print_rank_0(' > deserializing using the old code structure ...')
sys.modules['fp16.loss_scaler'] = sys.modules[
'megatron.fp16.loss_scaler']
state_dict = torch.load(checkpoint_name, map_location='cpu')
sys.modules.pop('fp16.loss_scaler', None)
except:
print_rank_0('could not load the checkpoint')
sys.exit()
# Set iteration.
if args.finetune or release:
iteration = 0
else:
try:
iteration = state_dict['iteration']
except KeyError:
try: # Backward compatible with older checkpoints
iteration = state_dict['total_iters']
except KeyError:
print_rank_0('A metadata file exists but unable to load '
'iteration from checkpoint {}, exiting'.format(
checkpoint_name))
sys.exit()
# Check arguments.
if 'args' in state_dict:
checkpoint_args = state_dict['args']
check_checkpoint_args(checkpoint_args)
else:
print_rank_0('could not find arguments in the checkpoint ...')
# Model.
model.load_state_dict(state_dict['model'])
# Optimizer.
if not release and not args.finetune and not args.no_load_optim:
try:
if optimizer is not None:
optimizer.load_state_dict(state_dict['optimizer'])
if lr_scheduler is not None:
lr_scheduler.load_state_dict(state_dict['lr_scheduler'])
except KeyError:
print_rank_0('Unable to load optimizer from checkpoint {}. '
'Specify --no-load-optim or --finetune to prevent '
'attempting to load the optimizer state, '
'exiting ...'.format(checkpoint_name))
sys.exit()
# rng states.
if not release and not args.finetune and not args.no_load_rng:
try:
random.setstate(state_dict['random_rng_state'])
np.random.set_state(state_dict['np_rng_state'])
torch.set_rng_state(state_dict['torch_rng_state'])
torch.cuda.set_rng_state(state_dict['cuda_rng_state'])
mpu.get_cuda_rng_tracker().set_states(
state_dict['rng_tracker_states'])
except KeyError:
print_rank_0('Unable to load optimizer from checkpoint {}. '
'Specify --no-load-rng or --finetune to prevent '
'attempting to load the optimizer state, '
'exiting ...'.format(checkpoint_name))
sys.exit()
torch.distributed.barrier()
if mpu.get_data_parallel_rank() == 0:
print(' successfully loaded {}'.format(checkpoint_name))
return iteration
from . import indexed_dataset
from .bert_tokenization import FullTokenizer as FullBertTokenizer
......@@ -22,24 +22,19 @@ import numpy as np
import torch
from torch.utils.data import Dataset
from megatron import get_tokenizer
from megatron import mpu
from megatron.data import helpers
from megatron.data import FullBertTokenizer
from megatron.data.dataset_utils import build_training_sample
from megatron.data.indexed_dataset import make_dataset as make_indexed_dataset
from megatron.utils import print_rank_0
from megatron import print_rank_0
def build_train_valid_test_datasets(vocab_file, data_prefix, data_impl,
splits_string, train_valid_test_num_samples,
def build_train_valid_test_datasets(data_prefix, data_impl, splits_string,
train_valid_test_num_samples,
max_seq_length, masked_lm_prob,
short_seq_prob, seed, skip_warmup):
# Tokenizer is the same
tokenizer = FullBertTokenizer(vocab_file, do_lower_case=True)
print_rank_0(' > using full BERT tokenizer with vocabulary size: {}'.format(
tokenizer.vocab_size()))
# Indexed dataset.
indexed_dataset = get_indexed_dataset_(data_prefix,
data_impl,
......@@ -82,7 +77,6 @@ def build_train_valid_test_datasets(vocab_file, data_prefix, data_impl,
dataset = BertDataset(
name=name,
indexed_dataset=indexed_dataset,
tokenizer=tokenizer,
data_prefix=data_prefix,
num_epochs=None,
max_num_samples=train_valid_test_num_samples[index],
......@@ -107,7 +101,7 @@ def build_train_valid_test_datasets(vocab_file, data_prefix, data_impl,
class BertDataset(Dataset):
def __init__(self, name, indexed_dataset, tokenizer, data_prefix,
def __init__(self, name, indexed_dataset, data_prefix,
num_epochs, max_num_samples, masked_lm_prob,
max_seq_length, short_seq_prob, seed):
......@@ -117,8 +111,7 @@ class BertDataset(Dataset):
self.masked_lm_prob = masked_lm_prob
self.max_seq_length = max_seq_length
# Tokenizer and dataset.
self.tokenizer = tokenizer
# Dataset.
self.indexed_dataset = indexed_dataset
......@@ -133,16 +126,13 @@ class BertDataset(Dataset):
self.name)
# Vocab stuff.
self.vocab_id_list = list(self.tokenizer.inv_vocab.keys())
self.vocab_id_to_token_dict = self.tokenizer.inv_vocab
self.cls_id = self.tokenizer.vocab['[CLS]']
self.sep_id = self.tokenizer.vocab['[SEP]']
self.mask_id = self.tokenizer.vocab['[MASK]']
self.pad_id = self.tokenizer.vocab['[PAD]']
def num_tokens(self):
return self.tokenizer.vocab_size()
tokenizer = get_tokenizer()
self.vocab_id_list = list(tokenizer.inv_vocab.keys())
self.vocab_id_to_token_dict = tokenizer.inv_vocab
self.cls_id = tokenizer.cls
self.sep_id = tokenizer.sep
self.mask_id = tokenizer.mask
self.pad_id = tokenizer.pad
def __len__(self):
......
......@@ -13,71 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""GPT2 dataset."""
import json
import os
import numpy as np
import torch
from torch.multiprocessing import Lock
from torch.utils.data import Dataset
from megatron import mpu
from megatron.data_utils.samplers import DistributedBatchSampler
from megatron.data_utils.tokenization_gpt2 import GPT2Tokenizer
def make_gpt2_dataloaders(args):
# Input parameters.
input_data_sizes_file = args.input_data_sizes_file
seq_length = args.seq_length
initial_seed = args.seed
# Data parallel arguments.
world_size = mpu.get_data_parallel_world_size()
rank = mpu.get_data_parallel_rank()
global_batch_size = args.batch_size * world_size
num_workers = args.num_workers
def make_data_loader_(data_path):
# Build the dataset.
dataset = GPT2Dataset(data_path, input_data_sizes_file,
seq_length, initial_seed)
# Use a simple sampler with distributed batch sampler.
sampler = torch.utils.data.SequentialSampler(dataset)
batch_sampler = DistributedBatchSampler(sampler=sampler,
batch_size=global_batch_size,
drop_last=True,
rank=rank,
world_size=world_size)
# Torch dataloader.
return torch.utils.data.DataLoader(dataset,
batch_sampler=batch_sampler,
num_workers=num_workers,
pin_memory=True)
train = make_data_loader_(args.train_data)
valid = make_data_loader_(args.valid_data)
test = make_data_loader_(args.test_data)
args.do_train = False
args.do_valid = False
args.do_test = False
if train is not None:
args.do_train = True
if valid is not None:
args.do_valid = True
if test is not None:
args.do_test = True
# Tokenizer.
tokenizer = GPT2Tokenizer.from_pretrained('gpt2', cache_dir=args.cache_dir)
eod_token = tokenizer.encoder['<|endoftext|>']
num_tokens = eod_token + 1
return (train, valid, test), num_tokens, eod_token
class GPT2Dataset(Dataset):
......@@ -89,8 +33,6 @@ class GPT2Dataset(Dataset):
self.seq_length = seq_length
self.initial_seed = initial_seed
self.max_epochs = max_epochs
# Lock for building the dataset.
self.lock = Lock()
# Shard stuff.
# Dictionary from shard nameto its size (number of element).
......@@ -120,13 +62,11 @@ class GPT2Dataset(Dataset):
# data index in the shard.
data_idx = idx - self.shards_start_index[shard_index]
# Load the shard if it is not in memory.
#self.lock.acquire()
if self.shards_data[shard_index] is None:
print('global rank {} is building data for shard index {} ...'.
format(torch.distributed.get_rank(), shard_index))
self.build_dataset_(shard_index)
#assert self.shards_data[shard_index] is not None
#self.lock.release()
# Start index.
start_index = self.shards_sample_index[shard_index][data_idx]
# Add one for label shift.
......@@ -194,18 +134,3 @@ class GPT2Dataset(Dataset):
size = self.shard_size_dict[shard]
self.shards_start_index[i] = self.shards_start_index[i-1] + \
size // self.seq_length
'''
if __name__ == '__main__':
print('gpt2 data loader ...')
path = '/raid/mshoeybi/data/gpt2/adlr/reddit_all_ftfy_lg200/npys'
dataset = GPT2Dataset(path, 'sizes.txt', 1024, 1234, 100)
print('dataset contains {} samples'.format(dataset.data_length))
for i in range(len(dataset)):
if i % 512000 == 0:
print(i)
data = dataset[i]
'''
......@@ -18,7 +18,7 @@ from itertools import accumulate
import numpy as np
import torch
from megatron.utils import print_rank_0
from megatron import print_rank_0
def __best_fitting_dtype(vocab_size=None):
if vocab_size is not None and vocab_size < 65500:
......
# coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Batch samplers that work with either random or sequential data samplers."""
import torch
from torch.utils import data
class RandomSampler(data.sampler.Sampler):
"""Based off of pytorch RandomSampler and DistributedSampler. Essentially
a RandomSampler, but this class lets the user set an epoch like
DistributedSampler Samples elements randomly. If without replacement, then
sample from a shuffled dataset. If with replacement, then user can
specify ``num_samples`` to draw.
Arguments:
data_source (Dataset): dataset to sample from
num_samples (int): number of samples to draw, default=len(dataset)
replacement (bool): samples are drawn with replacement if ``True``,
default=False
"""
def __init__(self, data_source, replacement=False, num_samples=None):
self.data_source = data_source
self.replacement = replacement
self._num_samples = num_samples
self.epoch = -1
if self._num_samples is not None and replacement is False:
raise ValueError("With replacement=False, num_samples should not "
"be specified, since a random permute will be "
"performed.")
if not isinstance(self.num_samples, int) or self.num_samples <= 0:
raise ValueError("num_samples should be a positive integer "
"value, but got num_samples={}".format(
self.num_samples))
if not isinstance(self.replacement, bool):
raise ValueError("replacement should be a boolean value, but got "
"replacement={}".format(self.replacement))
@property
def num_samples(self):
# dataset size might change at runtime
if self._num_samples is None:
return len(self.data_source)
return self._num_samples
def __iter__(self):
n = len(self.data_source)
g = torch.Generator()
if self.epoch >= 0:
g.manual_seed(self.epoch)
if self.replacement:
return iter(torch.randint(high=n, size=(self.num_samples,),
dtype=torch.int64, generator=g).tolist())
return iter(torch.randperm(n, generator=g).tolist())
def __len__(self):
return self.num_samples
def set_epoch(self, epoch):
self.epoch = epoch
class DistributedBatchSampler(data.sampler.BatchSampler):
"""Similar to normal implementation of distributed sampler, except
implementation is at the batch sampler level, instead of just the
sampler level. This allows wrapping of arbitrary data samplers
(sequential, random, WeightedRandomSampler, etc.) with this batch
sampler."""
def __init__(self, sampler, batch_size, drop_last, rank=-1,
world_size=2, wrap_last=False):
super(DistributedBatchSampler, self).__init__(sampler, batch_size,
drop_last)
if rank == -1:
assert False, 'should not be here'
rank = torch.distributed.get_rank()
self.rank = rank
self.world_size = world_size
self.sampler.wrap_around = 0
self.wrap_around = 0
self.wrap_last = wrap_last
self.start_iter = 0
def __iter__(self):
batch = []
i = 0
for idx in self.data_iterator(self.sampler, wrap_around=False):
batch.append(idx)
if len(batch) == self.batch_size:
tbatch = self._batch(batch)
if i >= self.start_iter:
yield tbatch
self.start_iter = 0
i += 1
batch = []
batch_len = len(batch)
if batch_len > 0 and not self.drop_last:
if self.wrap_last:
self.sampler.wrap_around -= (self.batch_size)
self.wrap_around += (len(batch))
self.wrap_around %= self.batch_size
yield self._batch(batch)
if self.wrap_last:
self.sampler.wrap_around += self.batch_size
def data_iterator(self, _iter, wrap_around=False):
"""iterates through data and handles wrap around"""
for i, idx in enumerate(_iter):
if i < self.wrap_around%self.batch_size:
continue
if wrap_around:
self.wrap_around += 1
self.wrap_around %= self.batch_size
yield idx
def _batch(self, batch):
"""extracts samples only pertaining to this worker's batch"""
start = self.rank*self.batch_size//self.world_size
end = (self.rank+1)*self.batch_size//self.world_size
return batch[start:end]
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment