"git@developer.sourcefind.cn:OpenDAS/megatron-lm.git" did not exist on "b07f1d0a60bc702873ba6f75263fc73099f3cb73"
Unverified Commit 56254247 authored by Ying Zhang's avatar Ying Zhang Committed by GitHub
Browse files

[benchmarks]Add an MOE benchmark (#866)

* Add MOE to lm benchmarks

* linter

* Fix source / target

* address comments

* address comments

* address comments

* add circleci

* fix circleci

* precommit
parent 73187df0
......@@ -3,6 +3,7 @@
# This source code is licensed under the BSD license found in the
# LICENSE file in the root directory of this source tree.
from collections import namedtuple
from distutils.version import LooseVersion
import io
import operator
......@@ -10,6 +11,7 @@ import tempfile
import torch
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler
import torchtext
from torchtext.data.utils import get_tokenizer
from torchtext.utils import download_from_url, extract_archive
......@@ -35,9 +37,10 @@ def _get_total_batch_size(benchmark_config, model_specs):
return model_specs["seq_len"] * benchmark_config["batch_size"]
def get_real_dataloaders(args, benchmark_config, model_specs):
"""Return real dataloaders for training, testing and validation."""
DatasetsInfo = namedtuple("DataSetsInfo", ["ntokens", "train_dataset", "valid_dataset", "test_dataset"])
def get_real_datasets():
url = "https://s3.amazonaws.com/research.metamind.io/wikitext/wikitext-2-v1.zip"
tmpdir = tempfile.TemporaryDirectory()
test_filepath, valid_filepath, train_filepath = extract_archive(download_from_url(url, root=tmpdir.name))
......@@ -52,30 +55,53 @@ def get_real_dataloaders(args, benchmark_config, model_specs):
train_dataset = data_process(iter(io.open(train_filepath, encoding="utf8")))
valid_dataset = data_process(iter(io.open(valid_filepath, encoding="utf8")))
test_dataset = data_process(iter(io.open(test_filepath, encoding="utf8")))
return DatasetsInfo(len(vocab.stoi), train_dataset, valid_dataset, test_dataset)
def get_dataloaders(datasets_info, benchmark_config, model_specs, num_replicas=1, rank=0):
ntokens, train_dataset, valid_dataset, test_dataset = datasets_info
def batchify(data):
batch_size = benchmark_config["batch_size"]
return _batchify(data, batch_size)
total_batch_size = _get_total_batch_size(benchmark_config, model_specs)
train_dataloader = DataLoader(train_dataset, batch_size=total_batch_size, collate_fn=batchify)
valid_dataloader = DataLoader(valid_dataset, batch_size=total_batch_size, collate_fn=batchify)
test_dataloader = DataLoader(test_dataset, batch_size=total_batch_size, collate_fn=batchify)
return len(vocab.stoi), train_dataloader, valid_dataloader, test_dataloader
train_dataloader = DataLoader(
train_dataset,
sampler=DistributedSampler(train_dataset, num_replicas=num_replicas, rank=rank),
batch_size=total_batch_size,
collate_fn=batchify,
)
valid_dataloader = DataLoader(
valid_dataset,
sampler=DistributedSampler(valid_dataset, num_replicas=num_replicas, rank=rank),
batch_size=total_batch_size,
collate_fn=batchify,
)
test_dataloader = DataLoader(
test_dataset,
sampler=DistributedSampler(test_dataset, num_replicas=num_replicas, rank=rank),
batch_size=total_batch_size,
collate_fn=batchify,
)
return train_dataloader, valid_dataloader, test_dataloader
def get_synthetic_dataloaders(args, benchmark_config, model_specs):
"""Return synthetic dataloaders for training, testing and validation."""
def get_real_dataloaders(args, benchmark_config, model_specs, num_replicas=1, rank=0):
"""Return real dataloaders for training, testing and validation."""
dataset_info = get_real_datasets()
train_dataloader, valid_dataloader, test_dataloader = get_dataloaders(
dataset_info, benchmark_config, model_specs, num_replicas, rank
)
return dataset_info.ntokens, train_dataloader, valid_dataloder, test_dataloader
def batchify(data):
batch_size = benchmark_config["batch_size"]
return _batchify(data, batch_size)
total_batch_size = total_batch_size = _get_total_batch_size(benchmark_config, model_specs)
def get_synthetic_datasets():
# vocab_size is 10000 and length of the real data is 2049990.
lm_dataset = torch.randint(1, 10000, (2049990,))
return DatasetsInfo(10000, lm_dataset, lm_dataset, lm_dataset)
lm_dataloader = DataLoader(
lm_dataset, batch_size=total_batch_size, shuffle=True, num_workers=0, collate_fn=batchify
)
return lm_dataloader, lm_dataloader, lm_dataloader
def get_synthetic_dataloaders(args, benchmark_config, model_specs, num_replicas=1, rank=0):
"""Return synthetic dataloaders for training, testing and validation."""
return get_dataloaders(get_synthetic_datasets(), benchmark_config, model_specs, num_replicas, rank)
......@@ -100,3 +100,29 @@ class Pipe:
def get_golden_synthetic_stats():
# TODO(anj-s): Add support for synthetic regression benchmarks
raise NotImplementedError("Synthetic data benchmarks are not supported.")
class MOE:
def get_model_config():
return {
"vocab_size": 10000,
"ninp": 1024, # embedding dimension
"nhid": 4096, # the dimension of the feedforward network model in nn.TransformerEncoder
"nhead": 32, # the number of heads in the multiheadattention models
"dropout": 0,
"initrange": 0.1,
"scaler": GradScaler(),
"clip_value": 0.05,
"num_decoder_layers": 20,
"seq_len": 33, # (seq_len - 1) needs to be divisible by num_local_experts
"is_moe": True,
"num_local_experts": 2,
}
def get_benchmark_config():
return {
"epochs": 1,
"lr": 0.001, # learning rate
"batch_size": 32,
"criterion": nn.CrossEntropyLoss(),
}
......@@ -8,6 +8,9 @@ import math
import torch
import torch.nn as nn
from fairscale.nn.moe.moe_layer import MOELayer
from fairscale.nn.moe.top2gate import Top2Gate
# TODO(anj-s): Identify if we need this initialization logic for the below wrapped layers.
class EmbeddingLayer(nn.Embedding):
......@@ -42,11 +45,125 @@ class PositionalEncodingLayer(nn.Module):
return self.dropout(x)
class TransformerDecoderLayer(nn.TransformerEncoderLayer):
"""TransformerDecoder layer which inherits from nn.TransformerEncoderLayer."""
class FeedForwardLayer(nn.Module):
"""FeedForward layer for a given Transformer model."""
def __init__(self, d_model, dim_feedforward, activation, dropout) -> None:
super(FeedForwardLayer, self).__init__()
self.linear1 = nn.Linear(d_model, dim_feedforward)
self.activation = activation
self.dropout1 = nn.Dropout(dropout)
self.linear2 = nn.Linear(dim_feedforward, d_model)
self.dropout2 = nn.Dropout(dropout)
def forward(self, x):
return self.dropout2(self.linear2(self.dropout1(self.activation(self.linear1(x)))))
# Forked from https://pytorch.org/docs/stable/_modules/torch/nn/modules/transformer.html#TransformerEncoderLayer.
# Parameters is_moe and num_local_experts are added.
class TransformerEncoderLayer(nn.Module):
r"""TransformerEncoderLayer is made up of self-attn and feedforward network.
This standard encoder layer is based on the paper "Attention Is All You Need".
Ashish Vaswani, Noam Shazeer, Niki Parmar, Jakob Uszkoreit, Llion Jones, Aidan N Gomez,
Lukasz Kaiser, and Illia Polosukhin. 2017. Attention is all you need. In Advances in
Neural Information Processing Systems, pages 6000-6010. Users may modify or implement
in a different way during application.
Args:
d_model: the number of expected features in the input (required).
nhead: the number of heads in the multiheadattention models (required).
dim_feedforward: the dimension of the feedforward network model (default=2048).
dropout: the dropout value (default=0.1).
activation: the activation function of the intermediate layer, can be a string
("relu" or "gelu") or a unary callable. Default: relu
layer_norm_eps: the eps value in layer normalization components (default=1e-5).
norm_first: if ``True``, layer norm is done prior to attention and feedforward
operations, respectivaly. Otherwise it's done after. Default: ``False`` (after).
is_moe: if ``True``, the feedforward layer will have MOE enabled.
num_local_experts: number of local experts for MOE.
Examples::
>>> encoder_layer = nn.TransformerEncoderLayer(d_model=512, nhead=8)
>>> src = torch.rand(10, 32, 512)
>>> out = encoder_layer(src)
"""
__constants__ = ["norm_first"]
def __init__(
self,
d_model,
nhead,
dim_feedforward=2048,
dropout=0.1,
activation=nn.ReLU(),
layer_norm_eps=1e-5,
norm_first=False,
is_moe=False,
num_local_experts=1,
):
super(TransformerEncoderLayer, self).__init__()
self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
self.norm_first = norm_first
self.norm1 = nn.LayerNorm(d_model, eps=layer_norm_eps)
self.norm2 = nn.LayerNorm(d_model, eps=layer_norm_eps)
self.dropout = nn.Dropout(dropout)
self.is_moe = is_moe
if is_moe:
world_size = 1 if not torch.distributed.is_initialized() else torch.distributed.get_world_size()
num_global_experts = num_local_experts * world_size
self.gate = Top2Gate(d_model, num_global_experts)
experts = nn.ModuleList(
[FeedForwardLayer(d_model, dim_feedforward, activation, dropout) for _ in range(num_local_experts)]
)
self.moe_layer = MOELayer(self.gate, experts)
else:
self.ff_block = FeedForwardLayer(d_model, dim_feedforward, activation, dropout)
def forward(self, src, src_mask=None, src_key_padding_mask=None):
r"""Pass the input through the encoder layer.
Args:
src: the sequence to the encoder layer (required).
src_mask: the mask for the src sequence (optional).
src_key_padding_mask: the mask for the src keys per batch (optional).
Shape:
see the docs in Transformer class.
"""
# see Fig. 1 of https://arxiv.org/pdf/2002.04745v1.pdf
x = src
if self.norm_first:
x = x + self._sa_block(self.norm1(x), src_mask, src_key_padding_mask)
x = x + self._ff_block(self.norm2(x))
else:
x = self.norm1(x + self._sa_block(x, src_mask, src_key_padding_mask))
x = self.norm2(x + self._ff_block(x))
return x
# self-attention block
def _sa_block(self, x, attn_mask, key_padding_mask):
x = self.self_attn(x, x, x, attn_mask=attn_mask, key_padding_mask=key_padding_mask, need_weights=False)[0]
return self.dropout(x)
# feed forward block
def _ff_block(self, x):
if self.is_moe:
return self.moe_layer(x)
else:
return self.ff_block(x)
class TransformerDecoderLayer(TransformerEncoderLayer):
"""TransformerDecoder layer which inherits from TransformerEncoderLayer."""
def __init__(self, ninp, nhead, nhid, dropout):
super().__init__(ninp, nhead, nhid, dropout)
def __init__(self, ninp, nhead, nhid, dropout, is_moe=False, num_local_experts=1):
super().__init__(ninp, nhead, nhid, dropout, is_moe=is_moe, num_local_experts=num_local_experts)
self.src_mask = None
def _generate_square_subsequent_mask(self, sz):
......@@ -78,13 +195,13 @@ class LinearLayer(nn.Linear):
class TransformerLM(nn.Sequential):
"""A GPT-2 based nn.Sequential language model."""
def __init__(self, ntokens, ninp, nhead, nhid, dropout, initrange, ndecoder):
def __init__(self, ntokens, ninp, nhead, nhid, dropout, initrange, ndecoder, is_moe=False, num_local_experts=1):
layers = [
EmbeddingLayer(ntokens, ninp, initrange),
PositionalEncodingLayer(ninp, dropout),
]
for _ in range(ndecoder):
layers.append(TransformerDecoderLayer(ninp, nhead, nhid, dropout))
layers.append(TransformerDecoderLayer(ninp, nhead, nhid, dropout, is_moe, num_local_experts))
layers.append(LinearLayer(ninp, ntokens, initrange))
super(TransformerLM, self).__init__(*layers)
import logging
import math
import time
from golden_configs.lm_wikitext2 import MOE as MOEConfig
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
import utils
MPI_PORT = 29500
def benchmark_single_process(config_class, args):
"""Benchmark a given model using a single process and multiple devices."""
world_size = torch.cuda.device_count() if torch.cuda.is_available() else 1
assert world_size > 0
benchmark_config = utils.create_benchmark_config(args.model_name, config_class)
model_specs = utils.get_model_specs(args.model_name, config_class)
mp.spawn(train, args=(world_size, benchmark_config, model_specs, args), nprocs=world_size, join=True)
def train(rank, world_size, benchmark_config, model_specs, args):
logger = mp.log_to_stderr()
logger.setLevel(logging.DEBUG if args.debug else logging.INFO)
utils.init_random_seed(rank)
init_method_pgroup = "tcp://localhost:{}".format(MPI_PORT)
torch.distributed.init_process_group(
backend="nccl", rank=rank, world_size=world_size, init_method=init_method_pgroup
)
logger.info("train, rank={}".format(rank))
device = torch.device("cuda", rank) if torch.cuda.is_available() else torch.device("cpu")
criterion = benchmark_config["criterion"]
model_config = utils.create_model_config(
args, benchmark_config=benchmark_config, model_specs=model_specs, device=device
)
# vocab_size may change in create_model_config() due to input data
vocab_size = model_specs["vocab_size"]
model = model_config["model"]
model.train()
optimizer = model_config["optimizer"]
optimizer = optimizer(model.parameters())
group = model.group if hasattr(model, "group") else None
utils.log_number_of_parameters(model, logger)
total_loss = 0.0
word_counter = 0
total_tokens = 0
total_tokens_per_log_interval = 0
bptt = 2
total_elapsed = 0.0
model = DDP(model, device_ids=[rank], output_device=rank, broadcast_buffers=False)
lm_dataloader, _, _ = utils.get_data_loader(
model_config["dataset_info"], args, benchmark_config, model_specs, num_replicas=world_size, rank=rank
)
def get_batch(source):
seq_len = len(source) - 1
data = source[0:seq_len]
target = source[1 : 1 + seq_len]
return data, target
for i, batch in enumerate(lm_dataloader):
if i == 1:
epoch_start_time = time.time()
if args.max_batch and i > args.max_batch:
break
if i > 0:
total_tokens += batch.numel()
start_time = time.time()
optimizer.zero_grad()
source, target = get_batch(batch)
source = source.to(device)
target = target.to(device)
try:
output = model(source.to(device))
loss = criterion(output.view(-1, vocab_size), target.view(-1))
total_loss += loss.item()
loss.backward()
torch.nn.utils.clip_grad_value_(model.parameters(), model_specs["clip_value"])
optimizer.step()
except Exception as e:
raise RuntimeError(f"training failed on {torch.distributed.get_rank()}") from e
elapsed = time.time() - start_time
total_elapsed += elapsed
log_interval = 1
total_tokens_per_log_interval += batch.numel()
if i % log_interval == 0 and i > 0:
cur_loss = total_loss / log_interval
logger.debug(
"| batch {:5d} | wps {:5.2f} | loss {:5.2f} | ppl {:8.2f}".format(
i, total_tokens_per_log_interval / elapsed, cur_loss, math.exp(cur_loss)
)
)
total_tokens_per_log_interval = 0
total_loss = 0
wps = total_tokens / total_elapsed
logger.debug("rank {}, wps: {}".format(rank, wps))
logger.debug(
"Peak allocated bytes on cuda:{}: {:1d}".format(
dist.get_rank(), torch.cuda.memory_stats(dist.get_rank())["allocated_bytes.all.peak"]
)
)
if __name__ == "__main__":
args = utils.init_args()
logging.basicConfig(level=logging.INFO if not args.debug else logging.DEBUG)
logging.info(f"Running single process benchmark with args: {args}")
benchmark_single_process(MOEConfig, args)
......@@ -3,24 +3,17 @@
# This source code is licensed under the BSD license found in the
# LICENSE file in the root directory of this source tree.
import argparse
from collections import defaultdict
from functools import reduce
import gc
import logging
import math
import operator
import time
from datasets.wikitext2_data import get_real_dataloaders as get_real_wikitext2_dataloaders
from datasets.wikitext2_data import get_synthetic_dataloaders as get_synthetic_wikitext2_dataloaders
from models import transformer_lm
import numpy as np
import torch
import torch.distributed as dist
from torch.distributed import rpc
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.optim import Adam
import utils
from benchmarks.golden_configs.lm_wikitext2 import Pipe as lm_wikitext2
from fairscale.nn import Pipe
......@@ -31,55 +24,6 @@ MPI_PORT = 29500
RPC_PORT = 29501
def init_random_seed(seed: int):
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
np.random.seed(seed)
def get_model_and_optimizer(args, device, benchmark_config, model_config):
"""Return instantiated model and optimizer function."""
if args.model_name == "lm":
model = get_lm_model(args, device, model_config)
lr = benchmark_config["lr"]
def make_adam(params):
return Adam(params, lr=lr)
optimizer = make_adam
return model, optimizer
def get_lm_model(args, device, config):
"""Get language model(based on GPT-2) used for sequence prediction."""
ninp = config["ninp"]
nhead = config["nhead"]
initrange = config["initrange"]
dropout = config["dropout"]
vocab_size = config["vocab_size"]
nhid = config["nhid"]
ndecoder = config["num_decoder_layers"]
if args.lazy_construction:
layers = [
LazyModule(lambda: transformer_lm.EmbeddingLayer(vocab_size, ninp, initrange)),
LazyModule(lambda: transformer_lm.PositionalEncodingLayer(ninp, dropout)),
]
for _ in range(ndecoder):
layers.append(LazyModule(lambda: transformer_lm.TransformerDecoderLayer(ninp, nhead, nhid, dropout)))
layers.append(LazyModule(lambda: transformer_lm.LinearLayer(ninp, vocab_size, initrange)))
model = layers
else:
model = transformer_lm.TransformerLM(vocab_size, ninp, nhead, nhid, dropout, initrange, ndecoder).to(device)
return model
def get_tensors_by_size_bucket():
size_buckets = defaultdict(int)
......@@ -92,25 +36,6 @@ def get_tensors_by_size_bucket():
return size_buckets
def log_number_of_parameters(model):
num_params = reduce(operator.add, (reduce(operator.mul, x.size()) for x in model.parameters()))
if hasattr(model, "group"):
total = torch.Tensor([num_params])
if torch.cuda.is_available():
total = total.cuda()
torch.distributed.all_reduce(total, group=model.group)
logging.debug(
f"training model, #params = {num_params}, group: {model.group.rank()}, grank:"
f" {torch.distributed.get_rank()}, sizes {model.group.size()}"
)
torch.distributed.barrier()
if model.group.rank() == 0:
logging.debug(f"total #prams = {total.item()}")
else:
logging.debug(f"training model, #params = {num_params}")
def get_device(model, index):
if isinstance(model, DDP):
model = model.module
......@@ -137,13 +62,13 @@ def get_fake_dataloader(lm_dataloader_len, args):
def train(model_config, model, benchmark_config, model_specs, args):
lm_dataloader, _, _ = model_config["data"]
lm_dataloader, _, _ = utils.get_data_loader(model_config["dataset_info"], args, benchmark_config, model_specs)
criterion = benchmark_config["criterion"]
vocab_size = model_specs["vocab_size"]
optimizer = model_config["optimizer"]
model.train()
log_number_of_parameters(model)
utils.log_number_of_parameters(model)
total_loss = 0.0
word_counter = 0
......@@ -155,7 +80,7 @@ def train(model_config, model, benchmark_config, model_specs, args):
# TODO(anj-s): Avoid sending fake data to all replicas except the first and last one.
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
if pipe_group and pipe_group.rank() != 0 and pipe_group.rank() != (pipe_group.size() - 1):
lm_dataloader, _, _ = get_synthetic_dataloaders(args, device, benchmark_config, model_specs)
lm_dataloader, _, _ = get_synthetic_dataloaders(args, benchmark_config, model_specs)
total_tokens = 0
total_tokens_per_log_interval = 0
......@@ -292,8 +217,8 @@ def verify_lm_run(wps, golden_config, args):
verify_peak_memory(i, golden_config, 1.1)
def benchmark_language_model(model_config, model, benchmark_config, model_specs, args):
golden_config = get_golden_config(args.model_name, args)
def benchmark_language_model(model_config, model, benchmark_config, model_specs, config_class, args):
golden_config = get_golden_config(args.model_name, config_class, args)
epoch = benchmark_config["epochs"]
start_time = time.time()
if dist.get_rank() == dist.get_world_size() - 1:
......@@ -334,74 +259,16 @@ def generate_balance(num_devices, num_layers):
return balance
def get_synthetic_dataloaders(args, device, benchmark_config, model_specs):
"""Returns dataloader for synthetic data."""
if args.model_name == "lm":
return get_synthetic_wikitext2_dataloaders(args, benchmark_config, model_specs)
else:
raise RuntimeError("Unrecognized args.model_mame " % args.model_name)
def get_real_dataloaders(args, device, benchmark_config, model_specs):
"""Returns dataloaders for real data."""
if args.model_name == "lm":
data = get_real_wikitext2_dataloaders(args, benchmark_config, model_specs)
ntokens, train_dataloader, valid_dataloader, test_dataloader = data
model_specs["vocab_size"] = ntokens
return train_dataloader, valid_dataloader, test_dataloader
else:
raise RuntimeError("Unrecognized args.model_mame " % args.model_name)
def create_model_config(args, benchmark_config=None, model_specs=None):
"""Return a dict with the given model, dataset and optimizer."""
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
if args.use_synthetic_data:
dataloader_fn = get_synthetic_dataloaders
else:
dataloader_fn = get_real_dataloaders
data = dataloader_fn(args, device, benchmark_config, model_specs)
model, optimizer = get_model_and_optimizer(args, device, benchmark_config, model_specs)
return {
"model": model,
"optimizer": optimizer,
"data": data,
}
def create_benchmark_config(model_name):
"""Return a dict with configurations required for benchmarking `model_name` model."""
if model_name == "lm":
return lm_wikitext2.get_benchmark_config()
else:
raise RuntimeError("Unrecognized args.model_mame " % args.model_name)
def get_model_specs(model_name):
"""Return a dict with configurations required for configuring `model_name` model."""
if model_name == "lm":
return lm_wikitext2.get_model_config()
else:
raise RuntimeError("Unrecognized args.model_mame " % args.model_name)
def get_golden_config(model_name, args):
def get_golden_config(model_name, config_class, args):
"""Return a dict with the golden data for throughput and memory usage."""
if model_name == "lm":
return lm_wikitext2.get_golden_real_stats()
return config_class.get_golden_real_stats()
else:
raise RuntimeError("Unrecognized args.model_mame " % args.model_name)
def benchmark_single_process(args):
def benchmark_single_process(config_class, args):
"""Benchmark a given model using a single process and multiple devices."""
init_method_pgroup = "tcp://localhost:{}".format(MPI_PORT)
......@@ -409,11 +276,11 @@ def benchmark_single_process(args):
num_devices = torch.cuda.device_count() if torch.cuda.is_available() else 1
assert num_devices > 0
init_random_seed(0)
utils.init_random_seed(0)
benchmark_config = create_benchmark_config(args.model_name)
model_specs = get_model_specs(args.model_name)
model_config = create_model_config(args, benchmark_config=benchmark_config, model_specs=model_specs)
benchmark_config = utils.create_benchmark_config(args.model_name, config_class)
model_specs = utils.get_model_specs(args.model_name, config_class)
model_config = utils.create_model_config(args, benchmark_config=benchmark_config, model_specs=model_specs)
model = model_config["model"]
balance = generate_balance(min(num_devices, 4), len(model))
......@@ -424,7 +291,7 @@ def benchmark_single_process(args):
if args.dry_run:
train(model_config, pipe_model, benchmark_config, model_specs, args)
else:
benchmark_language_model(model_config, pipe_model, benchmark_config, model_specs, args)
benchmark_language_model(model_config, pipe_model, benchmark_config, model_specs, config_class, args)
def run_worker(rank, world_size, args):
......@@ -432,37 +299,16 @@ def run_worker(rank, world_size, args):
world_size = args.world_size
dist_init(rank + args.rank_base, world_size, hostname=args.host)
initialize_model_parallel(1, world_size)
init_random_seed(0)
utils.init_random_seed(0)
run_mp_worker(args, world_size)
rpc.shutdown()
torch.distributed.destroy_process_group()
parser = argparse.ArgumentParser(description="benchmark")
parser.add_argument("--host", "-o", type=str, default="localhost", help="hostname")
parser.add_argument("--chunks", type=int, default=1, help="number of microbatches per batch")
parser.add_argument("--batch-size", type=int, default=8, help="size of a batch")
parser.add_argument(
"--checkpoint", default="never", choices=["always", "except_last", "never"], help="Checkpointing strategy for pipe"
)
parser.add_argument(
"--lazy-construction", action="store_true", default=False, help="Number of decoder layers in the model"
)
parser.add_argument("--max-batch", type=int, default=4, help="Max number of batches")
parser.add_argument("--use_synthetic_data", action="store_true", help="Uses synthetic data for running benchmarks.")
parser.add_argument("--dry_run", action="store_true", help="Run a sample training run without regression testing.")
parser.add_argument(
# TODO(anj-s): In the process of adding more models and hence the requirement for a flag.
"--model_name",
default="lm",
help="Language Model(LM) used to benchmark nn.pipe.",
)
parser.add_argument("--debug", action="store_true", default=False, help="Display additional debug information")
if __name__ == "__main__":
args = parser.parse_args()
args = utils.init_args()
logging.basicConfig(level=logging.INFO if not args.debug else logging.DEBUG)
logging.info(f"Running single process benchmark with args: {args}")
benchmark_single_process(args)
benchmark_single_process(lm_wikitext2, args)
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the BSD license found in the
# LICENSE file in the root directory of this source tree.
import argparse
from functools import reduce
import logging
import operator
import datasets.wikitext2_data as wikitext2_data
from models import transformer_lm
import numpy as np
import torch
from torch.optim import Adam
def init_random_seed(seed: int):
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
np.random.seed(seed)
def init_args():
parser = argparse.ArgumentParser(description="benchmark")
parser.add_argument("--host", "-o", type=str, default="localhost", help="hostname")
parser.add_argument("--chunks", type=int, default=1, help="number of microbatches per batch")
parser.add_argument("--batch-size", type=int, default=8, help="size of a batch")
parser.add_argument(
"--checkpoint",
default="never",
choices=["always", "except_last", "never"],
help="Checkpointing strategy for pipe",
)
parser.add_argument(
"--lazy-construction", action="store_true", default=False, help="Number of decoder layers in the model"
)
parser.add_argument("--max-batch", type=int, default=4, help="Max number of batches")
parser.add_argument("--use_synthetic_data", action="store_true", help="Uses synthetic data for running benchmarks.")
parser.add_argument("--dry_run", action="store_true", help="Run a sample training run without regression testing.")
parser.add_argument(
# TODO(anj-s): In the process of adding more models and hence the requirement for a flag.
"--model_name",
default="lm",
help="Language Model(LM) used to benchmark nn.pipe.",
)
parser.add_argument("--debug", action="store_true", default=False, help="Display additional debug information")
args = parser.parse_args()
return args
def create_benchmark_config(model_name, config_class):
"""Return a dict with configurations required for benchmarking `model_name` model."""
if model_name == "lm":
return config_class.get_benchmark_config()
else:
raise RuntimeError("Unrecognized args.model_mame " % args.model_name)
def get_model_specs(model_name, config_class):
"""Return a dict with configurations required for configuring `model_name` model."""
if model_name == "lm":
return config_class.get_model_config()
else:
raise RuntimeError("Unrecognized args.model_mame " % model_name)
def create_model_config(args, benchmark_config=None, model_specs=None, device=None):
"""Return a dict with the given model, dataset and optimizer."""
if not device:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
dataset_info = get_dataset_info(args)
assert model_specs is not None
model_specs["vocab_size"] = dataset_info.ntokens
model, optimizer = get_model_and_optimizer(args, device, benchmark_config, model_specs)
return {
"model": model,
"optimizer": optimizer,
"dataset_info": dataset_info,
}
def get_model_and_optimizer(args, device, benchmark_config, model_config):
"""Return instantiated model and optimizer function."""
if args.model_name == "lm":
model = get_lm_model(args, device, model_config)
lr = benchmark_config["lr"]
def make_adam(params):
return Adam(params, lr=lr)
optimizer = make_adam
return model, optimizer
def get_lm_model(args, device, config):
"""Get language model(based on GPT-2) used for sequence prediction."""
ninp = config["ninp"]
nhead = config["nhead"]
initrange = config["initrange"]
dropout = config["dropout"]
vocab_size = config["vocab_size"]
nhid = config["nhid"]
ndecoder = config["num_decoder_layers"]
is_moe = config.get("is_moe", False)
num_local_experts = config.get("num_local_experts", 1)
if args.lazy_construction:
layers = [
LazyModule(lambda: transformer_lm.EmbeddingLayer(vocab_size, ninp, initrange)),
LazyModule(lambda: transformer_lm.PositionalEncodingLayer(ninp, dropout)),
]
for _ in range(ndecoder):
layers.append(
LazyModule(
lambda: transformer_lm.TransformerDecoderLayer(
ninp, nhead, nhid, dropout, is_moe, num_local_experts
)
)
)
layers.append(LazyModule(lambda: transformer_lm.LinearLayer(ninp, vocab_size, initrange)))
model = layers
else:
model = transformer_lm.TransformerLM(
vocab_size, ninp, nhead, nhid, dropout, initrange, ndecoder, is_moe, num_local_experts
).to(device)
return model
def log_number_of_parameters(model, logger=None):
if not logger:
logger = logging
num_params = reduce(operator.add, (reduce(operator.mul, x.size()) for x in model.parameters()))
if hasattr(model, "group"):
total = torch.Tensor([num_params])
if torch.cuda.is_available():
total = total.cuda()
torch.distributed.all_reduce(total, group=model.group)
logger.debug(
f"training model, #params = {num_params}, group: {model.group.rank()}, grank:"
f" {torch.distributed.get_rank()}, sizes {model.group.size()}"
)
torch.distributed.barrier()
if model.group.rank() == 0:
logger.debug(f"total #prams = {total.item()}")
else:
logger.debug(f"training model, #params = {num_params}")
def get_dataset_info(args):
assert args.model_name == "lm"
if args.use_synthetic_data:
return wikitext2_data.get_synthetic_datasets()
else:
return wikitext2_data.get_real_datasets()
def get_data_loader(dataset_info, args, benchmark_config, model_specs, num_replicas=1, rank=0):
return wikitext2_data.get_dataloaders(dataset_info, benchmark_config, model_specs, num_replicas, rank)
......@@ -27,4 +27,4 @@ use_parentheses = true
skip_glob = ["build/*", "stubs/*"]
# Don't split "import" and "from".
force_sort_within_sections = true
known_third_party = ["benchmark_dataset", "datasets", "golden_configs", "models", "numpy", "parameterized", "pytest", "recommonmark", "setuptools", "torch", "torchtext", "torchvision"]
known_third_party = ["benchmark_dataset", "datasets", "golden_configs", "models", "numpy", "parameterized", "pytest", "recommonmark", "setuptools", "torch", "torchtext", "torchvision", "utils"]
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment