Commit 27dab946 authored by huchen's avatar huchen
Browse files

Merge branch 'GNMT-v2' into 'main'

更新了GNMT v2

See merge request dcutoolkit/deeplearing/dlexamples_new!11
parents 20291e9d 07c30a15
# Copyright (c) 2018-2020, NVIDIA CORPORATION. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import logging import logging
import math import math
import torch import torch
from torch.nn.utils import clip_grad_norm_ from torch.nn.utils import clip_grad_norm_
import seq2seq.utils as utils
from apex.contrib.optimizers import FusedAdam
from apex.multi_tensor_apply import multi_tensor_applier
from amp_C import multi_tensor_l2norm
import apex.amp._amp_state import apex.amp._amp_state
from apex import amp from apex import amp
class Fp16Optimizer: class FP16Optimizer:
""" """
Mixed precision optimizer with dynamic loss scaling and backoff. Mixed precision optimizer with dynamic loss scaling and backoff.
https://docs.nvidia.com/deeplearning/sdk/mixed-precision-training/index.html#scalefactor https://docs.nvidia.com/deeplearning/sdk/mixed-precision-training/index.html#scalefactor
""" """
# Flattening master weight
def initialize_flat_fp32_weight(self, model):
logging.info('Initializing fp32 clone weights')
self.fp16_model = model
for p in self.fp16_model.parameters():
p.grad = None
nelem = 0
for p in model.parameters():
nelem += p.numel()
self.fp32_params = torch.cuda.FloatTensor(nelem)
self.fp16_params = torch.cuda.HalfTensor(nelem)
pointer = 0
for p in model.parameters():
nelem = p.numel()
self.fp32_params[pointer:pointer+nelem].copy_(p.data.view(-1))
self.fp16_params[pointer:pointer+nelem].copy_(p.data.view(-1))
pointer += nelem
self.fp32_params = torch.nn.Parameter(self.fp32_params)
self.fp32_params.grad = torch.autograd.Variable(
self.fp32_params.data.new(*self.fp32_params.size()))
self.fp16_params = torch.nn.Parameter(self.fp16_params)
self.fp16_params.grad = torch.autograd.Variable(
self.fp16_params.data.new(*self.fp16_params.size()))
@staticmethod @staticmethod
def fp16_to_fp32_flat_grad(fp32_params, fp16_model): def set_grads(params, params_with_grad):
pointer = 0 """
for p in fp16_model.parameters(): Copies gradients from param_with_grad to params
nelem = p.numel()
fp32_params.grad.data[pointer:pointer+nelem].copy_(p.grad.data.view(-1))
pointer += nelem
@staticmethod :param params: dst parameters
def fp16_to_fp16_flat_grad(fp16_params, fp16_model): :param params_with_grad: src parameters
fp16_params.grad.data = torch.cat( """
[p.grad.data.view(-1) for p in fp16_model.parameters()]) for param, param_w_grad in zip(params, params_with_grad):
if param.grad is None:
param.grad = torch.nn.Parameter(torch.empty_like(param))
param.grad.data.copy_(param_w_grad.grad.data)
@staticmethod @staticmethod
def fp32_to_fp16_params(fp16_model, fp32_params): def set_weights(params, new_params):
#Copy master weights onto model weights """
pointer = 0 Copies parameters from new_params to params
for p in fp16_model.parameters():
nelem = p.numel() :param params: dst parameters
p.data.view(-1).copy_(fp32_params.data[pointer:pointer+nelem]) :param new_params: src parameters
pointer += nelem """
for param, new_param in zip(params, new_params):
param.data.copy_(new_param.data)
def __init__(self, fp16_model, grad_clip=float('inf'), loss_scale=1024,
dls_downscale=2, dls_upscale=2, dls_upscale_interval=128, def __init__(self, model, grad_clip=float('inf'), loss_scale=8192,
use_mt=False): dls_downscale=2, dls_upscale=2, dls_upscale_interval=128):
""" """
Constructor for the Fp16Optimizer. Constructor for the Fp16Optimizer.
:param fp16_model: model (previously casted to half) :param model: model
:param grad_clip: coefficient for gradient clipping, max L2 norm of the :param grad_clip: coefficient for gradient clipping, max L2 norm of the
gradients gradients
:param loss_scale: initial loss scale :param loss_scale: initial loss scale
...@@ -84,25 +71,16 @@ class Fp16Optimizer: ...@@ -84,25 +71,16 @@ class Fp16Optimizer:
this factor if previous dls_upscale_interval batches finished this factor if previous dls_upscale_interval batches finished
successfully successfully
:param dls_upscale_interval: interval for loss scale upscaling :param dls_upscale_interval: interval for loss scale upscaling
:param use_mt: with multi-tensor apply we don't need to flatten parameters
""" """
logging.info('Initializing fp16 optimizer with {}'.format( logging.info('Initializing fp16 optimizer')
'multi-tenosr apply' if use_mt else 'flattening')) self.initialize_model(model)
if use_mt:
self.initialize_model(fp16_model)
else:
self.initialize_flat_fp32_weight(fp16_model)
self.use_mt = use_mt
self.since_last_invalid = 0 self.since_last_invalid = 0
self.loss_scale = loss_scale self.loss_scale = loss_scale
self.dls_downscale = dls_downscale self.dls_downscale = dls_downscale
self.dls_upscale = dls_upscale self.dls_upscale = dls_upscale
self.dls_upscale_interval = dls_upscale_interval self.dls_upscale_interval = dls_upscale_interval
self.grad_clip = grad_clip self.grad_clip = grad_clip
self.world_size = utils.get_world_size()
self.dummy_overflow_buf = torch.cuda.IntTensor([0])
def initialize_model(self, model): def initialize_model(self, model):
""" """
...@@ -110,13 +88,13 @@ class Fp16Optimizer: ...@@ -110,13 +88,13 @@ class Fp16Optimizer:
:param model: fp16 model :param model: fp16 model
""" """
logging.info('Converting model to half precision')
model.half()
logging.info('Initializing fp32 clone weights') logging.info('Initializing fp32 clone weights')
self.fp16_model = model self.model = model
for p in self.fp16_model.parameters(): self.model.zero_grad()
p.grad = None
self.fp32_params = [param.to(torch.float32).detach() self.fp32_params = [param.to(torch.float32).detach()
for param in model.parameters()] for param in model.parameters()]
self.fp16_params = [p for p in model.parameters()]
for param in self.fp32_params: for param in self.fp32_params:
param.requires_grad = True param.requires_grad = True
...@@ -138,170 +116,100 @@ class Fp16Optimizer: ...@@ -138,170 +116,100 @@ class Fp16Optimizer:
loss *= self.loss_scale loss *= self.loss_scale
loss.backward() loss.backward()
if not update: return if update:
self.set_grads(self.fp32_params, self.model.parameters())
# Average the all-reduced gradients by world size if APEX if self.loss_scale != 1.0:
# doesn't do that for param in self.fp32_params:
scaling_factor = self.loss_scale param.grad.data /= self.loss_scale
if hasattr(self.fp16_model, 'gradient_average') and \
not self.fp16_model.gradient_average:
scaling_factor *= self.world_size
# APEX DDP reset the gradients to be views into allreduce_buffers
# So downstream code should simply be able to use the .grad
# attributes as usual
if isinstance(optimizer, FusedAdam):
if self.world_size != 1 and self.fp16_model.retain_allreduce_buffers:
grads = [p.grad for p in self.fp16_params]
norm, _ = multi_tensor_applier(
multi_tensor_l2norm,
self.dummy_overflow_buf,
[grads],
False)
norm = norm.item() / scaling_factor
else:
self.fp16_to_fp16_flat_grad(self.fp16_params, self.fp16_model)
grads = [self.fp16_params.grad]
norm = self.fp16_params.grad.data.norm(p=2,
dtype=torch.float).item() / scaling_factor
else:
self.fp16_to_fp32_flat_grad(self.fp32_params, self.fp16_model)
if scaling_factor != 1.0:
self.fp32_params.grad.data /= scaling_factor
norm = clip_grad_norm_([self.fp32_params], self.grad_clip)
if math.isfinite(norm):
if scheduler is not None:
scheduler.step()
if isinstance(optimizer, FusedAdam): norm = clip_grad_norm_(self.fp32_params, self.grad_clip)
clip_coef = self.grad_clip / (norm + 1e-6)
clip_coef = scaling_factor / min(1, clip_coef)
if self.use_mt:
optimizer.step(grads=grads, output_params=self.fp16_params, scale=clip_coef)
else:
optimizer.step(grads=grads, scale=clip_coef)
else:
optimizer.step()
# Unflatten params if not multi-tensor apply if math.isfinite(norm):
if not self.use_mt: scheduler.step()
self.fp32_to_fp16_params(self.fp16_model, self.fp32_params) optimizer.step()
self.since_last_invalid += 1 self.set_weights(self.model.parameters(),
else: self.fp32_params)
self.loss_scale /= self.dls_downscale self.since_last_invalid += 1
self.since_last_invalid = 0 else:
logging.info(f'Gradient norm: {norm}') self.loss_scale /= self.dls_downscale
logging.info(f'Skipped batch, new scale: {self.loss_scale}') self.since_last_invalid = 0
logging.info(f'Gradient norm: {norm}')
logging.info(f'Skipped batch, new scale: {self.loss_scale}')
if self.since_last_invalid >= self.dls_upscale_interval: if self.since_last_invalid >= self.dls_upscale_interval:
self.loss_scale *= self.dls_upscale self.loss_scale *= self.dls_upscale
self.loss_scale = min(self.loss_scale, 8192.0) self.loss_scale = min(self.loss_scale, 8192.0)
logging.info(f'Upscaling, new scale: {self.loss_scale}') logging.info(f'Upscaling, new scale: {self.loss_scale}')
self.since_last_invalid = 0 self.since_last_invalid = 0
for p in self.fp16_model.parameters(): self.model.zero_grad()
p.grad = None
class DwuFp16Optimizer: class FP32Optimizer:
""" """
Distributed weight update mixed precision optimizer with dynamic Standard optimizer, computes backward and applies weight update.
loss scaling and backoff.
https://docs.nvidia.com/deeplearning/sdk/mixed-precision-training/index.html#scalefactor
""" """
def __init__(self, fp16_model, loss_scale=1024, def __init__(self, model, grad_clip=None):
dls_downscale=2, dls_upscale=2, dls_upscale_interval=128):
""" """
Constructor for the DwuFp16Optimizer. Constructor for the Fp32Optimizer
:param fp16_model: model (previously casted to half) :param model: model
:param loss_scale: initial loss scale :param grad_clip: coefficient for gradient clipping, max L2 norm of the
:param dls_downscale: loss downscale factor, loss scale is divided by gradients
this factor when NaN/INF occurs in the gradients
:param dls_upscale: loss upscale factor, loss scale is multiplied by
this factor if previous dls_upscale_interval batches finished
successfully
:param dls_upscale_interval: interval for loss scale upscaling
""" """
logging.info('Initializing dwu fp16 optimizer') logging.info('Initializing fp32 optimizer')
self.initialize_model(model)
self.grad_clip = grad_clip
self.since_last_invalid = 0 def initialize_model(self, model):
self.loss_scale = loss_scale """
self.dls_downscale = dls_downscale Initializes state of the model.
self.dls_upscale = dls_upscale
self.dls_upscale_interval = dls_upscale_interval :param model: model
self.world_size = utils.get_world_size() """
self.fp16_model = fp16_model self.model = model
self.model.zero_grad()
def step(self, loss, optimizer, scheduler, update=True): def step(self, loss, optimizer, scheduler, update=True):
""" """
Performs one step of the optimizer. Performs one step of the optimizer.
Applies loss scaling, computes gradients in fp16, converts gradients to
fp32, inverts scaling and applies optional gradient norm clipping.
If gradients are finite, it applies update to fp32 master weights and
copies updated parameters to fp16 model for the next iteration. If
gradients are not finite, it skips the batch and adjusts scaling factor
for the next iteration.
:param loss: value of loss function :param loss: value of loss function
:param optimizer: optimizer :param optimizer: optimizer
:param update: if True executes weight update :param update: if True executes weight update
""" """
scaling_factor = self.loss_scale * self.world_size
optimizer.set_global_scale(scaling_factor)
loss *= self.loss_scale
loss.backward() loss.backward()
optimizer.complete_reductions() if update:
if self.grad_clip != float('inf'):
if not update: clip_grad_norm_(self.model.parameters(), self.grad_clip)
torch.cuda.synchronize() scheduler.step()
return optimizer.step()
self.model.zero_grad()
# Gradient division by world_size is fused with FusedAdam
norm = optimizer.L2_grad_norm / scaling_factor
should_update = math.isfinite(norm)
if should_update:
if scheduler is not None:
scheduler.step()
optimizer.step(skip_overflow_check=True)
if should_update:
self.since_last_invalid += 1
else:
self.loss_scale /= self.dls_downscale
self.since_last_invalid = 0
logging.info(f'Gradient norm: {norm}')
logging.info(f'Skipped batch, new scale: {self.loss_scale}')
if self.since_last_invalid >= self.dls_upscale_interval:
self.loss_scale *= self.dls_upscale
self.loss_scale = min(self.loss_scale, 8192.0)
logging.info(f'Upscaling, new scale: {self.loss_scale}')
self.since_last_invalid = 0
for p in self.fp16_model.parameters():
p.grad = None
class Fp32Optimizer: class AMPOptimizer:
""" """
Standard optimizer, computes backward and applies weight update. Optimizer compatible with AMP.
Uses AMP to apply loss scaling, computes backward and applies weight
update.
""" """
def __init__(self, model, grad_clip=None): def __init__(self, model, grad_clip=None, loss_scale=8192,
dls_upscale_interval=128):
""" """
Constructor for the Fp32Optimizer Constructor for the AMPOptimizer
:param model: model :param model: model
:param grad_clip: coefficient for gradient clipping, max L2 norm of the :param grad_clip: coefficient for gradient clipping, max L2 norm of the
gradients gradients
""" """
logging.info('Initializing fp32 optimizer') logging.info('Initializing amp optimizer')
self.initialize_model(model) self.initialize_model(model)
self.grad_clip = grad_clip self.grad_clip = grad_clip
loss_scaler = apex.amp._amp_state.loss_scalers[0]
loss_scaler._loss_scale = loss_scale
loss_scaler._scale_seq_len = dls_upscale_interval
def initialize_model(self, model): def initialize_model(self, model):
""" """
Initializes state of the model. Initializes state of the model.
...@@ -319,13 +227,12 @@ class Fp32Optimizer: ...@@ -319,13 +227,12 @@ class Fp32Optimizer:
:param optimizer: optimizer :param optimizer: optimizer
:param update: if True executes weight update :param update: if True executes weight update
""" """
loss.backward() with amp.scale_loss(loss, optimizer) as scaled_loss:
scaled_loss.backward()
if update: if update:
if self.grad_clip != float('inf'): if self.grad_clip != float('inf'):
clip_grad_norm_(self.model.parameters(), self.grad_clip) clip_grad_norm_(amp.master_params(optimizer), self.grad_clip)
if scheduler is not None: scheduler.step()
scheduler.step()
optimizer.step() optimizer.step()
self.model.zero_grad() self.model.zero_grad()
# Copyright (c) 2018-2020, NVIDIA CORPORATION. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import logging import logging
import math import math
import torch import torch
from mlperf_logging.mllog import constants
from seq2seq.utils import log_event
def perhaps_convert_float(param, total): def perhaps_convert_float(param, total):
...@@ -74,22 +91,6 @@ class WarmupMultiStepLR(torch.optim.lr_scheduler._LRScheduler): ...@@ -74,22 +91,6 @@ class WarmupMultiStepLR(torch.optim.lr_scheduler._LRScheduler):
f'remain_steps, setting warmup_steps=remain_steps') f'remain_steps, setting warmup_steps=remain_steps')
self.warmup_steps = self.remain_steps self.warmup_steps = self.remain_steps
log_event(key=constants.OPT_LR_ALT_DECAY_FUNC,
value=True)
log_event(key=constants.OPT_LR_ALT_WARMUP_FUNC,
value=True)
log_event(key=constants.OPT_LR_DECAY_INTERVAL,
value=self.decay_interval)
log_event(key=constants.OPT_LR_DECAY_FACTOR,
value=self.decay_factor)
log_event(key=constants.OPT_LR_DECAY_STEPS,
value=self.decay_steps)
log_event(key=constants.OPT_LR_REMAIN_STEPS,
value=self.remain_steps)
log_event(key=constants.OPT_LR_WARMUP_STEPS,
value=self.warmup_steps)
super(WarmupMultiStepLR, self).__init__(optimizer, last_epoch) super(WarmupMultiStepLR, self).__init__(optimizer, last_epoch)
def get_lr(self): def get_lr(self):
......
import logging # Copyright (c) 2018-2020, NVIDIA CORPORATION. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import torch import torch
import torch.nn as nn import torch.nn as nn
from apex.contrib import xentropy
loss_func = xentropy.SoftmaxCrossEntropyLoss.apply
class LabelSmoothing(nn.Module): class LabelSmoothing(nn.Module):
""" """
NLL loss with label smoothing. NLL loss with label smoothing.
""" """
def __init__(self, padding_idx, smoothing=0.0, fusion=True): def __init__(self, padding_idx, smoothing=0.0):
""" """
Constructor for the LabelSmoothing module. Constructor for the LabelSmoothing module.
...@@ -22,20 +37,14 @@ class LabelSmoothing(nn.Module): ...@@ -22,20 +37,14 @@ class LabelSmoothing(nn.Module):
self.padding_idx = padding_idx self.padding_idx = padding_idx
self.confidence = 1.0 - smoothing self.confidence = 1.0 - smoothing
self.smoothing = smoothing self.smoothing = smoothing
self.fusion = fusion
logging.info(f'Fused xentropy flag set to {fusion}')
def forward(self, x, target): def forward(self, x, target):
if self.fusion: logprobs = torch.nn.functional.log_softmax(x, dim=-1,
loss = loss_func(x, target, self.smoothing, self.padding_idx, True) dtype=torch.float32)
else:
logprobs = torch.nn.functional.log_softmax(x, dim=-1, non_pad_mask = (target != self.padding_idx)
dtype=torch.float32) nll_loss = -logprobs.gather(dim=-1, index=target.unsqueeze(1))
nll_loss = nll_loss.squeeze(1)[non_pad_mask]
non_pad_mask = (target != self.padding_idx) smooth_loss = -logprobs.mean(dim=-1)[non_pad_mask]
nll_loss = -logprobs.gather(dim=-1, index=target.unsqueeze(1)) loss = self.confidence * nll_loss + self.smoothing * smooth_loss
nll_loss = nll_loss.squeeze(1)[non_pad_mask]
smooth_loss = -logprobs.mean(dim=-1)[non_pad_mask]
loss = self.confidence * nll_loss + self.smoothing * smooth_loss
return loss.sum() return loss.sum()
# Copyright (c) 2018-2020, NVIDIA CORPORATION. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from pytablewriter import MarkdownTableWriter
class TrainingTable:
def __init__(self, acc_unit='BLEU', time_unit='min', perf_unit='tok/s'):
self.data = []
self.acc_unit = acc_unit
self.time_unit = time_unit
self.perf_unit = perf_unit
self.time_unit_convert = {'s': 1, 'min': 1/60, 'h': 1/3600}
def add(self, gpus, batch_size, accuracy, perf, time_to_train):
time_to_train *= self.time_unit_convert[self.time_unit]
if not accuracy:
accuracy = 0.0
accuracy = round(accuracy, 2)
self.data.append([gpus, batch_size, accuracy, perf, time_to_train])
def write(self, title, math):
writer = MarkdownTableWriter()
writer.table_name = f'{title}'
header = [f'**GPUs**',
f'**Batch Size / GPU**',
f'**Accuracy - {math.upper()} ({self.acc_unit})**',
f'**Throughput - {math.upper()} ({self.perf_unit})**',
f'**Time to Train - {math.upper()} ({self.time_unit})**',
]
writer.headers = header
writer.value_matrix = self.data
writer.write_table()
# Copyright (c) 2017 Elad Hoffer
# Copyright (c) 2018-2020, NVIDIA CORPORATION. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import logging import logging
import os import os
import time import time
...@@ -7,19 +28,15 @@ import numpy as np ...@@ -7,19 +28,15 @@ import numpy as np
import torch import torch
import torch.optim import torch.optim
import torch.utils.data import torch.utils.data
from apex.parallel import DistributedDataParallel as DDP from apex.parallel import DistributedDataParallel
from apex.contrib.optimizers import FusedAdam
from apex.contrib.optimizers.distributed_fused_adam import DistributedFusedAdam
from apex import amp from apex import amp
from mlperf_logging.mllog import constants
from seq2seq.train.fp_optimizers import Fp16Optimizer from seq2seq.train.fp_optimizers import FP16Optimizer
from seq2seq.train.fp_optimizers import DwuFp16Optimizer from seq2seq.train.fp_optimizers import FP32Optimizer
from seq2seq.train.fp_optimizers import Fp32Optimizer from seq2seq.train.fp_optimizers import AMPOptimizer
from seq2seq.train.lr_scheduler import WarmupMultiStepLR
from seq2seq.utils import AverageMeter from seq2seq.utils import AverageMeter
from seq2seq.utils import log_event
from seq2seq.utils import sync_workers from seq2seq.utils import sync_workers
from seq2seq.utils import get_world_size
class Seq2SeqTrainer: class Seq2SeqTrainer:
...@@ -30,53 +47,48 @@ class Seq2SeqTrainer: ...@@ -30,53 +47,48 @@ class Seq2SeqTrainer:
model, model,
criterion, criterion,
opt_config, opt_config,
scheduler_config,
print_freq=10, print_freq=10,
save_freq=1000, save_freq=1000,
grad_clip=float('inf'), grad_clip=float('inf'),
batch_first=False,
save_info={}, save_info={},
save_path='.', save_dir='.',
train_iterations=0, train_iterations=0,
checkpoint_filename='checkpoint%s.pth', checkpoint_filename='checkpoint%s.pth',
keep_checkpoints=5, keep_checkpoints=5,
math='fp32', math='fp32',
loss_scaling={}, loss_scaling={},
cuda=True,
distributed=False,
distributed_overlap_allreduce=False,
distributed_overlap_num_allreduce_streams=1,
distributed_overlap_allreduce_messagesize=1e7,
distributed_overlap_allreduce_communicators=None,
intra_epoch_eval=0, intra_epoch_eval=0,
prealloc_mode='always', prealloc_mode='always',
warmup=0,
iter_size=1, iter_size=1,
verbose=False, translator=None,
args=None): verbose=False):
""" """
Constructor for the Seq2SeqTrainer. Constructor for the Seq2SeqTrainer.
:param model: model to train :param model: model to train
:param criterion: criterion (loss function) :param criterion: criterion (loss function)
:param opt_config: dictionary with options for the optimizer :param opt_config: dictionary with options for the optimizer
:param scheduler_config: dictionary with options for the learning rate
scheduler
:param print_freq: prints short summary every 'print_freq' iterations :param print_freq: prints short summary every 'print_freq' iterations
:param save_freq: saves checkpoint every 'save_freq' iterations :param save_freq: saves checkpoint every 'save_freq' iterations
:param grad_clip: coefficient for gradient clipping :param grad_clip: coefficient for gradient clipping
:param batch_first: if True the model uses (batch,seq,feature) tensors,
if false the model uses (seq, batch, feature)
:param save_info: dict with additional state stored in each checkpoint :param save_info: dict with additional state stored in each checkpoint
:param save_path: path to the directiory for checkpoints :param save_dir: path to the directiory for checkpoints
:param train_iterations: total number of training iterations to execute :param train_iterations: total number of training iterations to execute
:param checkpoint_filename: name of files with checkpoints :param checkpoint_filename: name of files with checkpoints
:param keep_checkpoints: max number of checkpoints to keep :param keep_checkpoints: max number of checkpoints to keep
:param math: arithmetic type :param math: arithmetic type
:param loss_scaling: options for dynamic loss scaling :param loss_scaling: options for dynamic loss scaling
:param cuda: if True use cuda, if False train on cpu
:param distributed: if True run distributed training
:param intra_epoch_eval: number of additional eval runs within each :param intra_epoch_eval: number of additional eval runs within each
training epoch training epoch
:param prealloc_mode: controls preallocation, :param prealloc_mode: controls preallocation,
choices=['off', 'once', 'always'] choices=['off', 'once', 'always']
:param warmup: number of warmup iterations for performance counters
:param iter_size: number of iterations between weight updates :param iter_size: number of iterations between weight updates
:param translator: instance of Translator, runs inference on test set
:param verbose: enables verbose logging :param verbose: enables verbose logging
""" """
super(Seq2SeqTrainer, self).__init__() super(Seq2SeqTrainer, self).__init__()
...@@ -84,125 +96,62 @@ class Seq2SeqTrainer: ...@@ -84,125 +96,62 @@ class Seq2SeqTrainer:
self.criterion = criterion self.criterion = criterion
self.epoch = 0 self.epoch = 0
self.save_info = save_info self.save_info = save_info
self.save_path = save_path self.save_dir = save_dir
self.save_freq = save_freq self.save_freq = save_freq
self.save_counter = 0 self.save_counter = 0
self.checkpoint_filename = checkpoint_filename self.checkpoint_filename = checkpoint_filename
self.checkpoint_counter = cycle(range(keep_checkpoints)) self.checkpoint_counter = cycle(range(keep_checkpoints))
self.opt_config = opt_config self.opt_config = opt_config
self.cuda = cuda self.device = next(model.parameters()).device
self.distributed = distributed
self.print_freq = print_freq self.print_freq = print_freq
self.batch_first = batch_first
self.verbose = verbose self.verbose = verbose
self.loss = None self.loss = None
self.translator = None self.translator = translator
self.scheduler = None
self.intra_epoch_eval = intra_epoch_eval self.intra_epoch_eval = intra_epoch_eval
self.warmup = warmup
self.iter_size = iter_size self.iter_size = iter_size
self.prealloc_mode = prealloc_mode self.prealloc_mode = prealloc_mode
self.preallocated = False self.preallocated = False
# Assume multi-tensor apply if with APEX DDP
self.args = args
self.use_mt = (distributed and iter_size == 1 and \
opt_config['optimizer'] == 'FusedAdam')
# Use APEX gradient average if gradient accumulation option enabled
self.retain_allreduce_buffers = True if iter_size == 1 else False
self.gradient_average = False if iter_size == 1 else True
if cuda: self.distributed = torch.distributed.is_initialized()
self.model = self.model.cuda() self.batch_first = model.batch_first
self.criterion = self.criterion.cuda()
params = self.model.parameters() params = self.model.parameters()
if math == 'fp16':
self.model = self.model.half() if math == 'manual_fp16':
if distributed and self.args.distributed_weight_update != 2: self.fp_optimizer = FP16Optimizer(
self.model = DDP(self.model, self.model, grad_clip,
message_size=distributed_overlap_allreduce_messagesize, loss_scale=loss_scaling['init_scale'],
delay_allreduce=(not distributed_overlap_allreduce), dls_upscale_interval=loss_scaling['upscale_interval']
num_allreduce_streams=distributed_overlap_num_allreduce_streams, )
allreduce_communicators=distributed_overlap_allreduce_communicators, params = self.fp_optimizer.fp32_params
retain_allreduce_buffers=self.retain_allreduce_buffers, elif math == 'fp32' or math == 'tf32':
gradient_average=self.gradient_average) self.fp_optimizer = FP32Optimizer(self.model, grad_clip)
if self.args.distributed_weight_update == 2:
# gradient clipping maintained by DistributedFusedAdam
self.fp_optimizer = DwuFp16Optimizer(
self.model,
loss_scale=loss_scaling['init_scale'],
dls_upscale_interval=loss_scaling['upscale_interval']
)
params = list(self.model.parameters())
else:
self.fp_optimizer = Fp16Optimizer(
self.model, grad_clip,
use_mt=self.use_mt,
loss_scale=loss_scaling['init_scale'],
dls_upscale_interval=loss_scaling['upscale_interval']
)
params = self.fp_optimizer.fp32_params if isinstance(self.fp_optimizer.fp32_params, list) \
else [self.fp_optimizer.fp32_params]
elif math == 'fp32':
if distributed:
self.model = DDP(self.model,
message_size=distributed_overlap_allreduce_messagesize,
delay_allreduce=(not distributed_overlap_allreduce))
self.fp_optimizer = Fp32Optimizer(self.model, grad_clip)
# params = self.model.parameters()
opt_name = opt_config.pop('optimizer') opt_name = opt_config.pop('optimizer')
if opt_name == 'FusedAdam': self.optimizer = torch.optim.__dict__[opt_name](params, **opt_config)
if math == 'fp16' or math == 'fp32':
if self.args.distributed_weight_update == 2:
dwu_args = self.distributed_weight_update_config
self.optimizer = DistributedFusedAdam(params, max_grad_norm=grad_clip,
**dwu_args, **opt_config)
self.optimizer.set_global_scale(1.0) # used for grad norm clipping in step function
else:
# Maintain grad norm and scaling by ourselves
self.optimizer = FusedAdam(params, use_mt=self.use_mt, **opt_config)
else:
self.optimizer = FusedAdam(params, use_mt=self.use_mt, max_grad_norm=grad_clip,
amp_scale_adjustment=get_world_size(), **opt_config)
else:
self.optimizer = torch.optim.__dict__[opt_name](params,
**opt_config)
logging.info(f'Using optimizer: {self.optimizer}') logging.info(f'Using optimizer: {self.optimizer}')
log_event(key=constants.OPT_NAME, self.scheduler = WarmupMultiStepLR(self.optimizer, train_iterations,
value=constants.ADAM, sync=False) **scheduler_config)
log_event(key=constants.OPT_BASE_LR,
value=opt_config['lr'], sync=False) if math == 'fp16':
log_event(key=constants.OPT_ADAM_BETA_1, self.model, self.optimizer = amp.initialize(
value=self.optimizer.defaults['betas'][0], sync=False) self.model,
log_event(key=constants.OPT_ADAM_BETA_2, self.optimizer,
value=self.optimizer.defaults['betas'][1], sync=False) cast_model_outputs=torch.float16,
log_event(key=constants.OPT_ADAM_EPSILON, keep_batchnorm_fp32=False,
value=self.optimizer.defaults['eps'], sync=False) opt_level='O2')
@property self.fp_optimizer = AMPOptimizer(
def distributed_weight_update_config(self): self.model,
""" grad_clip,
Return a kwarg dictionary that provides arguments for the distributed loss_scale=loss_scaling['init_scale'],
weight update feature. dls_upscale_interval=loss_scaling['upscale_interval']
""" )
return {
'dwu_group_size': self.args.dwu_group_size, if self.distributed:
'dwu_num_blocks': self.args.dwu_num_blocks, self.model = DistributedDataParallel(self.model)
'dwu_num_chunks': self.args.dwu_num_chunks,
'dwu_num_rs_pg': self.args.dwu_num_rs_pg,
'dwu_num_ar_pg': self.args.dwu_num_ar_pg,
'dwu_num_ag_pg': self.args.dwu_num_ag_pg,
'overlap_reductions': self.args.dwu_overlap_reductions,
'full_pipeline': self.args.dwu_full_pipeline,
'compute_L2_grad_norm': self.args.dwu_grad_norm,
'e5m2_allgather': self.args.dwu_e5m2_allgather,
'predivide': False,
'flat_mt': True,
}
def iterate(self, src, tgt, update=True, training=True): def iterate(self, src, tgt, update=True, training=True):
""" """
...@@ -213,20 +162,16 @@ class Seq2SeqTrainer: ...@@ -213,20 +162,16 @@ class Seq2SeqTrainer:
:param update: if True: optimizer does update of the weights :param update: if True: optimizer does update of the weights
:param training: if True: executes optimizer :param training: if True: executes optimizer
""" """
#print("############################iterate##########################################")
src, src_length = src src, src_length = src
tgt, tgt_length = tgt tgt, tgt_length = tgt
src_length = torch.LongTensor(src_length) src = src.to(self.device)
tgt_length = torch.LongTensor(tgt_length) tgt = tgt.to(self.device)
src_length = src_length.to(self.device)
num_toks = {} num_toks = {}
num_toks['tgt'] = int(sum(tgt_length - 1)) num_toks['tgt'] = int(sum(tgt_length - 1))
num_toks['src'] = int(sum(src_length)) num_toks['src'] = int(sum(src_length))
if self.cuda:
src = src.cuda(non_blocking=True)
tgt = tgt.cuda(non_blocking=True)
if self.batch_first: if self.batch_first:
output = self.model(src, src_length, tgt[:, :-1]) output = self.model(src, src_length, tgt[:, :-1])
tgt_labels = tgt[:, 1:] tgt_labels = tgt[:, 1:]
...@@ -239,16 +184,13 @@ class Seq2SeqTrainer: ...@@ -239,16 +184,13 @@ class Seq2SeqTrainer:
loss = self.criterion(output.view(T * B, -1), loss = self.criterion(output.view(T * B, -1),
tgt_labels.contiguous().view(-1)) tgt_labels.contiguous().view(-1))
loss_per_batch = torch.empty((1), dtype=torch.float, device='cpu', loss_per_batch = loss.item()
requires_grad=False, pin_memory=True)
loss_per_batch.copy_(loss, non_blocking=True)
loss /= (B * self.iter_size) loss /= (B * self.iter_size)
if training: if training:
self.fp_optimizer.step(loss, self.optimizer, self.scheduler, self.fp_optimizer.step(loss, self.optimizer, self.scheduler,
update) update)
loss_per_batch = loss_per_batch.item()
loss_per_token = loss_per_batch / num_toks['tgt'] loss_per_token = loss_per_batch / num_toks['tgt']
loss_per_sentence = loss_per_batch / B loss_per_sentence = loss_per_batch / B
...@@ -269,14 +211,14 @@ class Seq2SeqTrainer: ...@@ -269,14 +211,14 @@ class Seq2SeqTrainer:
eval_iters = eval_iters * self.iter_size eval_iters = eval_iters * self.iter_size
eval_iters = set(eval_iters) eval_iters = set(eval_iters)
batch_time = AverageMeter(skip_first=False) batch_time = AverageMeter(self.warmup)
data_time = AverageMeter(skip_first=False) data_time = AverageMeter(self.warmup)
losses_per_token = AverageMeter(skip_first=False) losses_per_token = AverageMeter()
losses_per_sentence = AverageMeter(skip_first=False) losses_per_sentence = AverageMeter()
tot_tok_time = AverageMeter(skip_first=False) tot_tok_time = AverageMeter(self.warmup)
src_tok_time = AverageMeter(skip_first=False) src_tok_time = AverageMeter(self.warmup)
tgt_tok_time = AverageMeter(skip_first=False) tgt_tok_time = AverageMeter(self.warmup)
batch_size = data_loader.batch_size batch_size = data_loader.batch_size
...@@ -289,9 +231,7 @@ class Seq2SeqTrainer: ...@@ -289,9 +231,7 @@ class Seq2SeqTrainer:
update = False update = False
if i % self.iter_size == self.iter_size - 1: if i % self.iter_size == self.iter_size - 1:
update = True update = True
#####aiss prof
#if i == 500:
# break
# do a train/evaluate iteration # do a train/evaluate iteration
stats = self.iterate(src, tgt, update, training=training) stats = self.iterate(src, tgt, update, training=training)
loss_per_token, loss_per_sentence, num_toks = stats loss_per_token, loss_per_sentence, num_toks = stats
...@@ -308,11 +248,17 @@ class Seq2SeqTrainer: ...@@ -308,11 +248,17 @@ class Seq2SeqTrainer:
tot_num_toks = num_toks['tgt'] + num_toks['src'] tot_num_toks = num_toks['tgt'] + num_toks['src']
tot_tok_time.update(tot_num_toks / elapsed) tot_tok_time.update(tot_num_toks / elapsed)
self.loss = losses_per_token.avg self.loss = losses_per_token.avg
if training and i in eval_iters: if training and i in eval_iters:
assert self.translator is not None eval_fname = f'eval_epoch_{self.epoch}_iter_{i}'
test_bleu, _ = self.translator.run(calc_bleu=True, eval_path = os.path.join(self.save_dir, eval_fname)
epoch=self.epoch, _, eval_stats = self.translator.run(
iteration=i) calc_bleu=True,
epoch=self.epoch,
iteration=i,
eval_path=eval_path,
)
test_bleu = eval_stats['bleu']
log = [] log = []
log += [f'TRAIN [{self.epoch}][{i}/{len(data_loader)}]'] log += [f'TRAIN [{self.epoch}][{i}/{len(data_loader)}]']
...@@ -373,8 +319,10 @@ class Seq2SeqTrainer: ...@@ -373,8 +319,10 @@ class Seq2SeqTrainer:
logging.info('Executing preallocation') logging.info('Executing preallocation')
torch.cuda.empty_cache() torch.cuda.empty_cache()
src_length = [max_length] * batch_size src_length = torch.full((batch_size,), max_length,
tgt_length = [max_length] * batch_size dtype=torch.int64)
tgt_length = torch.full((batch_size,), max_length,
dtype=torch.int64)
if self.batch_first: if self.batch_first:
shape = (batch_size, max_length) shape = (batch_size, max_length)
...@@ -437,7 +385,6 @@ class Seq2SeqTrainer: ...@@ -437,7 +385,6 @@ class Seq2SeqTrainer:
self.model.load_state_dict(checkpoint['state_dict']) self.model.load_state_dict(checkpoint['state_dict'])
self.fp_optimizer.initialize_model(self.model) self.fp_optimizer.initialize_model(self.model)
self.optimizer.load_state_dict(checkpoint['optimizer']) self.optimizer.load_state_dict(checkpoint['optimizer'])
assert self.scheduler is not None
self.scheduler.load_state_dict(checkpoint['scheduler']) self.scheduler.load_state_dict(checkpoint['scheduler'])
self.epoch = checkpoint['epoch'] self.epoch = checkpoint['epoch']
self.loss = checkpoint['loss'] self.loss = checkpoint['loss']
...@@ -456,7 +403,7 @@ class Seq2SeqTrainer: ...@@ -456,7 +403,7 @@ class Seq2SeqTrainer:
""" """
def write_checkpoint(state, filename): def write_checkpoint(state, filename):
filename = os.path.join(self.save_path, filename) filename = os.path.join(self.save_dir, filename)
logging.info(f'Saving model to {filename}') logging.info(f'Saving model to {filename}')
torch.save(state, filename) torch.save(state, filename)
...@@ -465,7 +412,6 @@ class Seq2SeqTrainer: ...@@ -465,7 +412,6 @@ class Seq2SeqTrainer:
else: else:
model_state = self.model.state_dict() model_state = self.model.state_dict()
assert self.scheduler is not None
state = { state = {
'epoch': self.epoch, 'epoch': self.epoch,
'state_dict': model_state, 'state_dict': model_state,
......
import logging
import os
import time
from itertools import cycle
import numpy as np
import torch
import torch.optim
import torch.utils.data
from apex.parallel import DistributedDataParallel as DDP
from apex.contrib.optimizers import FusedAdam
from apex.contrib.optimizers.distributed_fused_adam import DistributedFusedAdam
from apex import amp
from mlperf_logging.mllog import constants
from seq2seq.train.fp_optimizers import Fp16Optimizer
from seq2seq.train.fp_optimizers import DwuFp16Optimizer
from seq2seq.train.fp_optimizers import Fp32Optimizer
from seq2seq.utils import AverageMeter
from seq2seq.utils import log_event
from seq2seq.utils import sync_workers
from seq2seq.utils import get_world_size
class Seq2SeqTrainer:
"""
Seq2SeqTrainer
"""
def __init__(self,
model,
criterion,
opt_config,
print_freq=10,
save_freq=1000,
grad_clip=float('inf'),
batch_first=False,
save_info={},
save_path='.',
train_iterations=0,
checkpoint_filename='checkpoint%s.pth',
keep_checkpoints=5,
math='fp32',
loss_scaling={},
cuda=True,
distributed=False,
distributed_overlap_allreduce=False,
distributed_overlap_num_allreduce_streams=1,
distributed_overlap_allreduce_messagesize=1e7,
distributed_overlap_allreduce_communicators=None,
intra_epoch_eval=0,
prealloc_mode='always',
iter_size=1,
verbose=False,
args=None):
"""
Constructor for the Seq2SeqTrainer.
:param model: model to train
:param criterion: criterion (loss function)
:param opt_config: dictionary with options for the optimizer
:param print_freq: prints short summary every 'print_freq' iterations
:param save_freq: saves checkpoint every 'save_freq' iterations
:param grad_clip: coefficient for gradient clipping
:param batch_first: if True the model uses (batch,seq,feature) tensors,
if false the model uses (seq, batch, feature)
:param save_info: dict with additional state stored in each checkpoint
:param save_path: path to the directiory for checkpoints
:param train_iterations: total number of training iterations to execute
:param checkpoint_filename: name of files with checkpoints
:param keep_checkpoints: max number of checkpoints to keep
:param math: arithmetic type
:param loss_scaling: options for dynamic loss scaling
:param cuda: if True use cuda, if False train on cpu
:param distributed: if True run distributed training
:param intra_epoch_eval: number of additional eval runs within each
training epoch
:param prealloc_mode: controls preallocation,
choices=['off', 'once', 'always']
:param iter_size: number of iterations between weight updates
:param verbose: enables verbose logging
"""
super(Seq2SeqTrainer, self).__init__()
self.model = model
self.criterion = criterion
self.epoch = 0
self.save_info = save_info
self.save_path = save_path
self.save_freq = save_freq
self.save_counter = 0
self.checkpoint_filename = checkpoint_filename
self.checkpoint_counter = cycle(range(keep_checkpoints))
self.opt_config = opt_config
self.cuda = cuda
self.distributed = distributed
self.print_freq = print_freq
self.batch_first = batch_first
self.verbose = verbose
self.loss = None
self.translator = None
self.scheduler = None
self.intra_epoch_eval = intra_epoch_eval
self.iter_size = iter_size
self.prealloc_mode = prealloc_mode
self.preallocated = False
# Assume multi-tensor apply if with APEX DDP
self.args = args
self.use_mt = (distributed and iter_size == 1 and \
opt_config['optimizer'] == 'FusedAdam')
# Use APEX gradient average if gradient accumulation option enabled
self.retain_allreduce_buffers = True if iter_size == 1 else False
self.gradient_average = False if iter_size == 1 else True
if cuda:
self.model = self.model.cuda()
self.criterion = self.criterion.cuda()
params = self.model.parameters()
if math == 'fp16':
self.model = self.model.half()
if distributed and self.args.distributed_weight_update != 2:
self.model = DDP(self.model,
message_size=distributed_overlap_allreduce_messagesize,
delay_allreduce=(not distributed_overlap_allreduce),
num_allreduce_streams=distributed_overlap_num_allreduce_streams,
allreduce_communicators=distributed_overlap_allreduce_communicators,
retain_allreduce_buffers=self.retain_allreduce_buffers,
gradient_average=self.gradient_average)
if self.args.distributed_weight_update == 2:
# gradient clipping maintained by DistributedFusedAdam
self.fp_optimizer = DwuFp16Optimizer(
self.model,
loss_scale=loss_scaling['init_scale'],
dls_upscale_interval=loss_scaling['upscale_interval']
)
params = list(self.model.parameters())
else:
self.fp_optimizer = Fp16Optimizer(
self.model, grad_clip,
use_mt=self.use_mt,
loss_scale=loss_scaling['init_scale'],
dls_upscale_interval=loss_scaling['upscale_interval']
)
params = self.fp_optimizer.fp32_params if isinstance(self.fp_optimizer.fp32_params, list) \
else [self.fp_optimizer.fp32_params]
elif math == 'fp32':
if distributed:
self.model = DDP(self.model,
message_size=distributed_overlap_allreduce_messagesize,
delay_allreduce=(not distributed_overlap_allreduce))
self.fp_optimizer = Fp32Optimizer(self.model, grad_clip)
# params = self.model.parameters()
opt_name = opt_config.pop('optimizer')
if opt_name == 'FusedAdam':
if math == 'fp16' or math == 'fp32':
if self.args.distributed_weight_update == 2:
dwu_args = self.distributed_weight_update_config
self.optimizer = DistributedFusedAdam(params, max_grad_norm=grad_clip,
**dwu_args, **opt_config)
self.optimizer.set_global_scale(1.0) # used for grad norm clipping in step function
else:
# Maintain grad norm and scaling by ourselves
self.optimizer = FusedAdam(params, use_mt=self.use_mt, **opt_config)
else:
self.optimizer = FusedAdam(params, use_mt=self.use_mt, max_grad_norm=grad_clip,
amp_scale_adjustment=get_world_size(), **opt_config)
else:
self.optimizer = torch.optim.__dict__[opt_name](params,
**opt_config)
logging.info(f'Using optimizer: {self.optimizer}')
log_event(key=constants.OPT_NAME,
value=constants.ADAM, sync=False)
log_event(key=constants.OPT_BASE_LR,
value=opt_config['lr'], sync=False)
log_event(key=constants.OPT_ADAM_BETA_1,
value=self.optimizer.defaults['betas'][0], sync=False)
log_event(key=constants.OPT_ADAM_BETA_2,
value=self.optimizer.defaults['betas'][1], sync=False)
log_event(key=constants.OPT_ADAM_EPSILON,
value=self.optimizer.defaults['eps'], sync=False)
@property
def distributed_weight_update_config(self):
"""
Return a kwarg dictionary that provides arguments for the distributed
weight update feature.
"""
return {
'dwu_group_size': self.args.dwu_group_size,
'dwu_num_blocks': self.args.dwu_num_blocks,
'dwu_num_chunks': self.args.dwu_num_chunks,
'dwu_num_rs_pg': self.args.dwu_num_rs_pg,
'dwu_num_ar_pg': self.args.dwu_num_ar_pg,
'dwu_num_ag_pg': self.args.dwu_num_ag_pg,
'overlap_reductions': self.args.dwu_overlap_reductions,
'full_pipeline': self.args.dwu_full_pipeline,
'compute_L2_grad_norm': self.args.dwu_grad_norm,
'e5m2_allgather': self.args.dwu_e5m2_allgather,
'predivide': False,
'flat_mt': True,
}
def iterate(self, src, tgt, update=True, training=True):
"""
Performs one iteration of the training/validation.
:param src: batch of examples from the source language
:param tgt: batch of examples from the target language
:param update: if True: optimizer does update of the weights
:param training: if True: executes optimizer
"""
src, src_length = src
tgt, tgt_length = tgt
src_length = torch.LongTensor(src_length)
tgt_length = torch.LongTensor(tgt_length)
num_toks = {}
num_toks['tgt'] = int(sum(tgt_length - 1))
num_toks['src'] = int(sum(src_length))
if self.cuda:
src = src.cuda(non_blocking=True)
tgt = tgt.cuda(non_blocking=True)
#################aiss prof shape###########
print("aiss prof shape######################")
print("src.size():%s, src_length.size():%s, tgt[:-1].size():%s"%(src.size(), src_length.size(), tgt[:-1].size()))
if self.batch_first:
output = self.model(src, src_length, tgt[:, :-1])
tgt_labels = tgt[:, 1:]
T, B = output.size(1), output.size(0)
else:
output = self.model(src, src_length, tgt[:-1])
tgt_labels = tgt[1:]
T, B = output.size(0), output.size(1)
loss = self.criterion(output.view(T * B, -1),
tgt_labels.contiguous().view(-1))
loss_per_batch = torch.empty((1), dtype=torch.float, device='cpu',
requires_grad=False, pin_memory=True)
loss_per_batch.copy_(loss, non_blocking=True)
loss /= (B * self.iter_size)
if training:
self.fp_optimizer.step(loss, self.optimizer, self.scheduler,
update)
loss_per_batch = loss_per_batch.item()
loss_per_token = loss_per_batch / num_toks['tgt']
loss_per_sentence = loss_per_batch / B
return loss_per_token, loss_per_sentence, num_toks
def feed_data(self, data_loader, training=True):
"""
Runs training or validation on batches from data_loader.
:param data_loader: data loader
:param training: if True runs training else runs validation
"""
if training:
assert self.optimizer is not None
eval_fractions = np.linspace(0, 1, self.intra_epoch_eval+2)[1:-1]
iters_with_update = len(data_loader) // self.iter_size
eval_iters = (eval_fractions * iters_with_update).astype(int)
eval_iters = eval_iters * self.iter_size
eval_iters = set(eval_iters)
batch_time = AverageMeter(skip_first=False)
data_time = AverageMeter(skip_first=False)
losses_per_token = AverageMeter(skip_first=False)
losses_per_sentence = AverageMeter(skip_first=False)
tot_tok_time = AverageMeter(skip_first=False)
src_tok_time = AverageMeter(skip_first=False)
tgt_tok_time = AverageMeter(skip_first=False)
batch_size = data_loader.batch_size
end = time.time()
for i, (src, tgt) in enumerate(data_loader):
self.save_counter += 1
# measure data loading time
data_time.update(time.time() - end)
update = False
if i % self.iter_size == self.iter_size - 1:
update = True
############################aiss add for prof############################
if i == 1:
break
#########################################################################
# do a train/evaluate iteration
###########################aiss add for prof#############################
import torch.autograd.profiler as profiler
if i == 10:
with profiler.profile(use_cuda=True) as prof:
with profiler.record_function("dcu_prof"):
stats = self.iterate(src, tgt, update, training=training)
#stats = self.iterate(src, tgt, update, training=training)
print(prof.key_averages(group_by_input_shape=True).table(sort_by="cuda_time_total"))
#print(prof)
prof.export_chrome_trace("trace_dcu.json")
#############################aiss end prof#############################################
stats = self.iterate(src, tgt, update, training=training)
loss_per_token, loss_per_sentence, num_toks = stats
# measure accuracy and record loss
losses_per_token.update(loss_per_token, num_toks['tgt'])
losses_per_sentence.update(loss_per_sentence, batch_size)
# measure elapsed time
elapsed = time.time() - end
batch_time.update(elapsed)
src_tok_time.update(num_toks['src'] / elapsed)
tgt_tok_time.update(num_toks['tgt'] / elapsed)
tot_num_toks = num_toks['tgt'] + num_toks['src']
tot_tok_time.update(tot_num_toks / elapsed)
self.loss = losses_per_token.avg
if training and i in eval_iters:
assert self.translator is not None
test_bleu, _ = self.translator.run(calc_bleu=True,
epoch=self.epoch,
iteration=i)
log = []
log += [f'TRAIN [{self.epoch}][{i}/{len(data_loader)}]']
log += [f'BLEU: {test_bleu:.2f}']
log = '\t'.join(log)
logging.info(log)
self.model.train()
self.preallocate(data_loader.batch_size,
data_loader.dataset.max_len, training=True)
if i % self.print_freq == 0:
phase = 'TRAIN' if training else 'VALIDATION'
log = []
log += [f'{phase} [{self.epoch}][{i}/{len(data_loader)}]']
log += [f'Time {batch_time.val:.3f} ({batch_time.avg:.3f})']
log += [f'Data {data_time.val:.2e} ({data_time.avg:.2e})']
log += [f'Tok/s {tot_tok_time.val:.0f} ({tot_tok_time.avg:.0f})']
if self.verbose:
log += [f'Src tok/s {src_tok_time.val:.0f} ({src_tok_time.avg:.0f})']
log += [f'Tgt tok/s {tgt_tok_time.val:.0f} ({tgt_tok_time.avg:.0f})']
log += [f'Loss/sentence {losses_per_sentence.val:.1f} ({losses_per_sentence.avg:.1f})']
log += [f'Loss/tok {losses_per_token.val:.4f} ({losses_per_token.avg:.4f})']
if training:
lr = self.optimizer.param_groups[0]['lr']
log += [f'LR {lr:.3e}']
log = '\t'.join(log)
logging.info(log)
save_chkpt = (self.save_counter % self.save_freq) == (self.save_freq - 1)
if training and save_chkpt:
self.save_counter = 0
self.save_info['iteration'] = i
identifier = next(self.checkpoint_counter, -1)
if identifier != -1:
with sync_workers() as rank:
if rank == 0:
self.save(identifier=identifier)
end = time.time()
tot_tok_time.reduce('sum')
losses_per_token.reduce('mean')
return losses_per_token.avg, tot_tok_time.avg
def preallocate(self, batch_size, max_length, training):
"""
Generates maximum sequence length batch and runs forward and backward
pass without updating model parameters.
:param batch_size: batch size for preallocation
:param max_length: max sequence length for preallocation
:param training: if True preallocates memory for backward pass
"""
if self.prealloc_mode == 'always' or (self.prealloc_mode == 'once' and
not self.preallocated):
logging.info('Executing preallocation')
torch.cuda.empty_cache()
src_length = [max_length] * batch_size
tgt_length = [max_length] * batch_size
if self.batch_first:
shape = (batch_size, max_length)
else:
shape = (max_length, batch_size)
src = torch.full(shape, 4, dtype=torch.int64)
tgt = torch.full(shape, 4, dtype=torch.int64)
src = src, src_length
tgt = tgt, tgt_length
self.iterate(src, tgt, update=False, training=training)
self.model.zero_grad()
self.preallocated = True
def optimize(self, data_loader):
"""
Sets model in training mode, preallocates memory and runs training on
data provided by data_loader.
:param data_loader: data loader
"""
torch.set_grad_enabled(True)
self.model.train()
self.preallocate(data_loader.batch_size, data_loader.dataset.max_len,
training=True)
output = self.feed_data(data_loader, training=True)
self.model.zero_grad()
return output
def evaluate(self, data_loader):
"""
Sets model in eval mode, disables gradients, preallocates memory and
runs validation on data provided by data_loader.
:param data_loader: data loader
"""
torch.set_grad_enabled(False)
self.model.eval()
self.preallocate(data_loader.batch_size, data_loader.dataset.max_len,
training=False)
output = self.feed_data(data_loader, training=False)
self.model.zero_grad()
return output
def load(self, filename):
"""
Loads checkpoint from filename.
:param filename: path to the checkpoint file
"""
if os.path.isfile(filename):
checkpoint = torch.load(filename, map_location={'cuda:0': 'cpu'})
if self.distributed:
self.model.module.load_state_dict(checkpoint['state_dict'])
else:
self.model.load_state_dict(checkpoint['state_dict'])
self.fp_optimizer.initialize_model(self.model)
self.optimizer.load_state_dict(checkpoint['optimizer'])
assert self.scheduler is not None
self.scheduler.load_state_dict(checkpoint['scheduler'])
self.epoch = checkpoint['epoch']
self.loss = checkpoint['loss']
logging.info(f'Loaded checkpoint {filename} (epoch {self.epoch})')
else:
logging.error(f'Invalid checkpoint: {filename}')
def save(self, identifier=None, is_best=False, save_all=False):
"""
Stores checkpoint to a file.
:param identifier: identifier for periodic checkpoint
:param is_best: if True stores checkpoint to 'model_best.pth'
:param save_all: if True stores checkpoint after completed training
epoch
"""
def write_checkpoint(state, filename):
filename = os.path.join(self.save_path, filename)
logging.info(f'Saving model to {filename}')
torch.save(state, filename)
if self.distributed:
model_state = self.model.module.state_dict()
else:
model_state = self.model.state_dict()
assert self.scheduler is not None
state = {
'epoch': self.epoch,
'state_dict': model_state,
'optimizer': self.optimizer.state_dict(),
'scheduler': self.scheduler.state_dict(),
'loss': getattr(self, 'loss', None),
}
state = dict(list(state.items()) + list(self.save_info.items()))
if identifier is not None:
filename = self.checkpoint_filename % identifier
write_checkpoint(state, filename)
if is_best:
filename = 'model_best.pth'
write_checkpoint(state, filename)
if save_all:
filename = f'checkpoint_epoch_{self.epoch:03d}.pth'
write_checkpoint(state, filename)
import ctypes
import logging.config
import os
import random
import sys
import time
from contextlib import contextmanager
import numpy as np
import torch
import torch.distributed as dist
import torch.nn.init as init
import torch.utils.collect_env
from mlperf_logging import mllog
from mlperf_logging.mllog import constants
# torch.cuda.cudart().hipDeviceSetLimit() doesn't work due to
# https://github.com/pytorch/pytorch/pull/33678
_libcudart = ctypes.CDLL('libcudart.so')
mllogger = mllog.get_mllogger()
def log_start(*args, **kwargs):
_log_print(mllogger.start, *args, **kwargs)
def log_end(*args, **kwargs):
_log_print(mllogger.end, *args, **kwargs)
def log_event(*args, **kwargs):
_log_print(mllogger.event, *args, **kwargs)
def _log_print(logger, *args, **kwargs):
"""
Wrapper for MLPerf compliance logging calls.
All arguments but 'sync' and 'log_all_ranks' are passed to
mlperf_logging.mllog.
If 'sync' is set to True then the wrapper will synchronize all distributed
workers. 'sync' should be set to True for all compliance tags that require
accurate timing (RUN_START, RUN_STOP etc.)
If 'log_all_ranks' is set to True then all distributed workers will print
logging message, if set to False then only worker with rank=0 will print
the message.
"""
if kwargs.pop('sync', False):
barrier()
if 'stack_offset' not in kwargs:
kwargs['stack_offset'] = 3
if 'value' not in kwargs:
kwargs['value'] = None
if kwargs.pop('log_all_ranks', False):
log = True
else:
log = (get_rank() == 0)
if log:
logger(*args, **kwargs)
def configure_logger(benchmark):
mllog.config(filename=os.path.join(os.path.dirname(os.path.abspath(__file__)), f'{benchmark}.log'))
mllogger = mllog.get_mllogger()
mllogger.logger.propagate = False
def init_lstm_(lstm, init_weight=0.1):
"""
Initializes weights of LSTM layer.
Weights and biases are initialized with uniform(-init_weight, init_weight)
distribution.
:param lstm: instance of torch.nn.LSTM
:param init_weight: range for the uniform initializer
"""
# Initialize hidden-hidden weights
init.uniform_(lstm.weight_hh_l0.data, -init_weight, init_weight)
# Initialize input-hidden weights:
init.uniform_(lstm.weight_ih_l0.data, -init_weight, init_weight)
# Initialize bias. PyTorch LSTM has two biases, one for input-hidden GEMM
# and the other for hidden-hidden GEMM. Here input-hidden bias is
# initialized with uniform distribution and hidden-hidden bias is
# initialized with zeros.
init.uniform_(lstm.bias_ih_l0.data, -init_weight, init_weight)
init.zeros_(lstm.bias_hh_l0.data)
if lstm.bidirectional:
init.uniform_(lstm.weight_hh_l0_reverse.data, -init_weight, init_weight)
init.uniform_(lstm.weight_ih_l0_reverse.data, -init_weight, init_weight)
init.uniform_(lstm.bias_ih_l0_reverse.data, -init_weight, init_weight)
init.zeros_(lstm.bias_hh_l0_reverse.data)
def generate_seeds(rng, size):
"""
Generate list of random seeds
:param rng: random number generator
:param size: length of the returned list
"""
seeds = [rng.randint(0, 2**32 - 1) for _ in range(size)]
return seeds
def broadcast_seeds(seeds, device):
"""
Broadcasts random seeds to all distributed workers.
Returns list of random seeds (broadcasted from workers with rank 0).
:param seeds: list of seeds (integers)
:param device: torch.device
"""
if torch.distributed.is_available() and torch.distributed.is_initialized():
seeds_tensor = torch.LongTensor(seeds).to(device)
torch.distributed.broadcast(seeds_tensor, 0)
seeds = seeds_tensor.tolist()
return seeds
def setup_seeds(master_seed, epochs, device):
"""
Generates seeds from one master_seed.
Function returns (worker_seeds, shuffling_seeds), worker_seeds are later
used to initialize per-worker random number generators (mostly for
dropouts), shuffling_seeds are for RNGs resposible for reshuffling the
dataset before each epoch.
Seeds are generated on worker with rank 0 and broadcasted to all other
workers.
:param master_seed: master RNG seed used to initialize other generators
:param epochs: number of epochs
:param device: torch.device (used for distributed.broadcast)
"""
if master_seed is None:
# random master seed, random.SystemRandom() uses /dev/urandom on Unix
master_seed = random.SystemRandom().randint(0, 2**32 - 1)
if get_rank() == 0:
# master seed is reported only from rank=0 worker, it's to avoid
# confusion, seeds from rank=0 are later broadcasted to other
# workers
logging.info(f'Using random master seed: {master_seed}')
else:
# master seed was specified from command line
logging.info(f'Using master seed from command line: {master_seed}')
log_event(key=constants.SEED, value=master_seed)
# initialize seeding RNG
seeding_rng = random.Random(master_seed)
# generate worker seeds, one seed for every distributed worker
worker_seeds = generate_seeds(seeding_rng, get_world_size())
# generate seeds for data shuffling, one seed for every epoch
shuffling_seeds = generate_seeds(seeding_rng, epochs)
# broadcast seeds from rank=0 to other workers
worker_seeds = broadcast_seeds(worker_seeds, device)
shuffling_seeds = broadcast_seeds(shuffling_seeds, device)
return worker_seeds, shuffling_seeds
def barrier():
"""
Works as a temporary distributed barrier, currently pytorch
doesn't implement barrier for NCCL backend.
Calls all_reduce on dummy tensor and synchronizes with GPU.
"""
if torch.distributed.is_available() and torch.distributed.is_initialized():
torch.distributed.all_reduce(torch.cuda.FloatTensor(1))
torch.cuda.synchronize()
def get_rank():
"""
Gets distributed rank or returns zero if distributed is not initialized.
"""
if torch.distributed.is_available() and torch.distributed.is_initialized():
rank = torch.distributed.get_rank()
else:
rank = 0
return rank
def get_world_size():
"""
Gets total number of distributed workers or returns one if distributed is
not initialized.
"""
if torch.distributed.is_available() and torch.distributed.is_initialized():
world_size = torch.distributed.get_world_size()
else:
world_size = 1
return world_size
@contextmanager
def sync_workers():
"""
Yields distributed rank and synchronizes all workers on exit.
"""
rank = get_rank()
yield rank
barrier()
@contextmanager
def timer(name, ndigits=2, sync_gpu=True):
if sync_gpu:
torch.cuda.synchronize()
start = time.time()
yield
if sync_gpu:
torch.cuda.synchronize()
stop = time.time()
elapsed = round(stop - start, ndigits)
logging.info(f'TIMER {name} {elapsed}')
def setup_logging(log_all_ranks=True, log_file=os.devnull):
"""
Configures logging.
By default logs from all workers are printed to the console, entries are
prefixed with "N: " where N is the rank of the worker. Logs printed to the
console don't include timestaps.
Full logs with timestamps are saved to the log_file file.
"""
class RankFilter(logging.Filter):
def __init__(self, rank, log_all_ranks):
self.rank = rank
self.log_all_ranks = log_all_ranks
def filter(self, record):
record.rank = self.rank
if self.log_all_ranks:
return True
else:
return (self.rank == 0)
rank = get_rank()
rank_filter = RankFilter(rank, log_all_ranks)
logging_format = "%(asctime)s - %(levelname)s - %(rank)s - %(message)s"
logging.basicConfig(level=logging.DEBUG,
format=logging_format,
datefmt="%Y-%m-%d %H:%M:%S",
filename=log_file,
filemode='w')
console = logging.StreamHandler(sys.stdout)
console.setLevel(logging.INFO)
formatter = logging.Formatter('%(rank)s: %(message)s')
console.setFormatter(formatter)
logging.getLogger('').addHandler(console)
logging.getLogger('').addFilter(rank_filter)
def set_device(cuda, local_rank):
"""
Sets device based on local_rank and returns instance of torch.device.
:param cuda: if True: use cuda
:param local_rank: local rank of the worker
"""
if cuda:
torch.cuda.set_device(local_rank)
device = torch.device('cuda')
else:
device = torch.device('cpu')
return device
def init_distributed(cuda):
"""
Initializes distributed backend.
:param cuda: (bool) if True initializes nccl backend, if False initializes
gloo backend
"""
world_size = int(os.environ.get('WORLD_SIZE', 1))
distributed = (world_size > 1)
if distributed:
backend = 'nccl' if cuda else 'gloo'
dist.init_process_group(backend=backend,
init_method='env://')
assert dist.is_initialized()
barrier()
return distributed
def log_env_info():
"""
Prints information about execution environment.
"""
logging.info('Collecting environment information...')
env_info = torch.utils.collect_env.get_pretty_env_info()
logging.info(f'{env_info}')
def pad_vocabulary(math):
if math == 'fp16':
pad_vocab = 8
elif math == 'fp32':
pad_vocab = 1
return pad_vocab
class AverageMeter:
"""
Computes and stores the average and current value
"""
def __init__(self, skip_first=True):
self.reset()
self.skip = skip_first
def reset(self):
self.val = 0
self.avg = 0
self.sum = 0
self.count = 0
def update(self, val, n=1):
self.val = val
if self.skip:
self.skip = False
else:
self.sum += val * n
self.count += n
self.avg = self.sum / self.count
def reduce(self, op):
"""
Reduces average value over all workers.
:param op: 'sum' or 'mean', reduction operator
"""
if op not in ('sum', 'mean'):
raise NotImplementedError
distributed = (get_world_size() > 1)
if distributed:
# Backward/forward compatibility around
# https://github.com/pytorch/pytorch/commit/540ef9b1fc5506369a48491af8a285a686689b36 and
# https://github.com/pytorch/pytorch/commit/044d00516ccd6572c0d6ab6d54587155b02a3b86
# To accomodate change in Pytorch's distributed API
if hasattr(dist, "get_backend"):
_backend = dist.get_backend()
if hasattr(dist, "DistBackend"):
backend_enum_holder = dist.DistBackend
else:
backend_enum_holder = dist.Backend
else:
_backend = dist._backend
backend_enum_holder = dist.dist_backend
cuda = _backend == backend_enum_holder.NCCL
if cuda:
avg = torch.cuda.FloatTensor([self.avg])
_sum = torch.cuda.FloatTensor([self.sum])
else:
avg = torch.FloatTensor([self.avg])
_sum = torch.FloatTensor([self.sum])
dist.all_reduce(avg)
dist.all_reduce(_sum)
self.avg = avg.item()
self.sum = _sum.item()
if op == 'mean':
self.avg /= get_world_size()
self.sum /= get_world_size()
def debug_tensor(tensor, name):
"""
Simple utility which helps with debugging.
Takes a tensor and outputs: min, max, avg, std, number of NaNs, number of
INFs.
:param tensor: torch tensor
:param name: name of the tensor (only for logging)
"""
logging.info(name)
tensor = tensor.detach().float().cpu().numpy()
logging.info(f'MIN: {tensor.min()} MAX: {tensor.max()} '
f'AVG: {tensor.mean()} STD: {tensor.std()} '
f'NAN: {np.isnan(tensor).sum()} INF: {np.isinf(tensor).sum()}')
def l2_promote():
# Check what's the device limit for current device, should be 64 by default
pValue = ctypes.cast((ctypes.c_int*1)(), ctypes.POINTER(ctypes.c_int))
result = _libcudart.hipDeviceGetLimit(pValue, ctypes.c_int(0x05))
# Set device limit on the current device
# cudaLimitMaxL2FetchGranularity = 0x05
result = _libcudart.hipDeviceSetLimit(ctypes.c_int(0x05), ctypes.c_int(128))
# Get the device limit again, should be 128
result = _libcudart.hipDeviceGetLimit(pValue, ctypes.c_int(0x05))
logging.info(f'L2 promotion: {pValue[0]}B')
import ctypes # Copyright (c) 2017 Elad Hoffer
# Copyright (c) 2018-2020, NVIDIA CORPORATION. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import logging.config import logging.config
import os import os
import random import random
...@@ -6,59 +26,12 @@ import sys ...@@ -6,59 +26,12 @@ import sys
import time import time
from contextlib import contextmanager from contextlib import contextmanager
import dllogger
import numpy as np import numpy as np
import torch import torch
import torch.distributed as dist import torch.distributed as dist
import torch.nn.init as init import torch.nn.init as init
import torch.utils.collect_env import torch.utils.collect_env
from mlperf_logging import mllog
from mlperf_logging.mllog import constants
#### torch.cuda.cudart().cudaDeviceSetLimit() doesn't work due to
# https://github.com/pytorch/pytorch/pull/33678
#aiss
#_libcudart = ctypes.CDLL('libcudart.so')
mllogger = mllog.get_mllogger()
def log_start(*args, **kwargs):
_log_print(mllogger.start, *args, **kwargs)
def log_end(*args, **kwargs):
_log_print(mllogger.end, *args, **kwargs)
def log_event(*args, **kwargs):
_log_print(mllogger.event, *args, **kwargs)
def _log_print(logger, *args, **kwargs):
"""
Wrapper for MLPerf compliance logging calls.
All arguments but 'sync' and 'log_all_ranks' are passed to
mlperf_logging.mllog.
If 'sync' is set to True then the wrapper will synchronize all distributed
workers. 'sync' should be set to True for all compliance tags that require
accurate timing (RUN_START, RUN_STOP etc.)
If 'log_all_ranks' is set to True then all distributed workers will print
logging message, if set to False then only worker with rank=0 will print
the message.
"""
if kwargs.pop('sync', False):
barrier()
if 'stack_offset' not in kwargs:
kwargs['stack_offset'] = 3
if 'value' not in kwargs:
kwargs['value'] = None
if kwargs.pop('log_all_ranks', False):
log = True
else:
log = (get_rank() == 0)
if log:
logger(*args, **kwargs)
def configure_logger(benchmark):
mllog.config(filename=os.path.join(os.path.dirname(os.path.abspath(__file__)), f'{benchmark}.log'))
mllogger = mllog.get_mllogger()
mllogger.logger.propagate = False
def init_lstm_(lstm, init_weight=0.1): def init_lstm_(lstm, init_weight=0.1):
...@@ -110,7 +83,7 @@ def broadcast_seeds(seeds, device): ...@@ -110,7 +83,7 @@ def broadcast_seeds(seeds, device):
:param device: torch.device :param device: torch.device
""" """
if torch.distributed.is_available() and torch.distributed.is_initialized(): if torch.distributed.is_available() and torch.distributed.is_initialized():
seeds_tensor = torch.LongTensor(seeds).to(device) seeds_tensor = torch.tensor(seeds, dtype=torch.int64, device=device)
torch.distributed.broadcast(seeds_tensor, 0) torch.distributed.broadcast(seeds_tensor, 0)
seeds = seeds_tensor.tolist() seeds = seeds_tensor.tolist()
return seeds return seeds
...@@ -142,8 +115,6 @@ def setup_seeds(master_seed, epochs, device): ...@@ -142,8 +115,6 @@ def setup_seeds(master_seed, epochs, device):
# master seed was specified from command line # master seed was specified from command line
logging.info(f'Using master seed from command line: {master_seed}') logging.info(f'Using master seed from command line: {master_seed}')
log_event(key=constants.SEED, value=master_seed)
# initialize seeding RNG # initialize seeding RNG
seeding_rng = random.Random(master_seed) seeding_rng = random.Random(master_seed)
...@@ -161,13 +132,10 @@ def setup_seeds(master_seed, epochs, device): ...@@ -161,13 +132,10 @@ def setup_seeds(master_seed, epochs, device):
def barrier(): def barrier():
""" """
Works as a temporary distributed barrier, currently pytorch Call torch.distributed.barrier() if distritubed is in use
doesn't implement barrier for NCCL backend.
Calls all_reduce on dummy tensor and synchronizes with GPU.
""" """
if torch.distributed.is_available() and torch.distributed.is_initialized(): if torch.distributed.is_available() and torch.distributed.is_initialized():
torch.distributed.all_reduce(torch.cuda.FloatTensor(1)) torch.distributed.barrier()
torch.cuda.synchronize()
def get_rank(): def get_rank():
...@@ -176,7 +144,6 @@ def get_rank(): ...@@ -176,7 +144,6 @@ def get_rank():
""" """
if torch.distributed.is_available() and torch.distributed.is_initialized(): if torch.distributed.is_available() and torch.distributed.is_initialized():
rank = torch.distributed.get_rank() rank = torch.distributed.get_rank()
print("this rank: ",rank)
else: else:
rank = 0 rank = 0
return rank return rank
...@@ -240,6 +207,10 @@ def setup_logging(log_all_ranks=True, log_file=os.devnull): ...@@ -240,6 +207,10 @@ def setup_logging(log_all_ranks=True, log_file=os.devnull):
rank = get_rank() rank = get_rank()
rank_filter = RankFilter(rank, log_all_ranks) rank_filter = RankFilter(rank, log_all_ranks)
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
handler.close()
logging_format = "%(asctime)s - %(levelname)s - %(rank)s - %(message)s" logging_format = "%(asctime)s - %(levelname)s - %(rank)s - %(message)s"
logging.basicConfig(level=logging.DEBUG, logging.basicConfig(level=logging.DEBUG,
format=logging_format, format=logging_format,
...@@ -254,6 +225,21 @@ def setup_logging(log_all_ranks=True, log_file=os.devnull): ...@@ -254,6 +225,21 @@ def setup_logging(log_all_ranks=True, log_file=os.devnull):
logging.getLogger('').addFilter(rank_filter) logging.getLogger('').addFilter(rank_filter)
def setup_dllogger(enabled=True, filename=os.devnull):
rank = get_rank()
if enabled and rank == 0:
backends = [
dllogger.JSONStreamBackend(
dllogger.Verbosity.VERBOSE,
filename,
),
]
dllogger.init(backends)
else:
dllogger.init([])
def set_device(cuda, local_rank): def set_device(cuda, local_rank):
""" """
Sets device based on local_rank and returns instance of torch.device. Sets device based on local_rank and returns instance of torch.device.
...@@ -283,7 +269,6 @@ def init_distributed(cuda): ...@@ -283,7 +269,6 @@ def init_distributed(cuda):
dist.init_process_group(backend=backend, dist.init_process_group(backend=backend,
init_method='env://') init_method='env://')
assert dist.is_initialized() assert dist.is_initialized()
barrier()
return distributed return distributed
...@@ -297,36 +282,75 @@ def log_env_info(): ...@@ -297,36 +282,75 @@ def log_env_info():
def pad_vocabulary(math): def pad_vocabulary(math):
if math == 'fp16': if math == 'tf32' or math == 'fp16' or math == 'manual_fp16':
pad_vocab = 8 pad_vocab = 8
elif math == 'fp32': elif math == 'fp32':
pad_vocab = 1 pad_vocab = 1
return pad_vocab return pad_vocab
def benchmark(test_acc, target_acc, test_perf, target_perf):
def test(achieved, target, name):
passed = True
if target is not None and achieved is not None:
logging.info(f'{name} achieved: {achieved:.2f} '
f'target: {target:.2f}')
if achieved >= target:
logging.info(f'{name} test passed')
else:
logging.info(f'{name} test failed')
passed = False
return passed
passed = True
passed &= test(test_acc, target_acc, 'Accuracy')
passed &= test(test_perf, target_perf, 'Performance')
return passed
def debug_tensor(tensor, name):
"""
Simple utility which helps with debugging.
Takes a tensor and outputs: min, max, avg, std, number of NaNs, number of
INFs.
:param tensor: torch tensor
:param name: name of the tensor (only for logging)
"""
logging.info(name)
tensor = tensor.detach().float().cpu().numpy()
logging.info(f'MIN: {tensor.min()} MAX: {tensor.max()} '
f'AVG: {tensor.mean()} STD: {tensor.std()} '
f'NAN: {np.isnan(tensor).sum()} INF: {np.isinf(tensor).sum()}')
class AverageMeter: class AverageMeter:
""" """
Computes and stores the average and current value Computes and stores the average and current value
""" """
def __init__(self, skip_first=True): def __init__(self, warmup=0, keep=False):
self.reset() self.reset()
self.skip = skip_first self.warmup = warmup
self.keep = keep
def reset(self): def reset(self):
self.val = 0 self.val = 0
self.avg = 0 self.avg = 0
self.sum = 0 self.sum = 0
self.count = 0 self.count = 0
self.iters = 0
self.vals = []
def update(self, val, n=1): def update(self, val, n=1):
self.iters += 1
self.val = val self.val = val
if self.skip: if self.iters > self.warmup:
self.skip = False
else:
self.sum += val * n self.sum += val * n
self.count += n self.count += n
self.avg = self.sum / self.count self.avg = self.sum / self.count
if self.keep:
self.vals.append(val)
def reduce(self, op): def reduce(self, op):
""" """
...@@ -339,21 +363,8 @@ class AverageMeter: ...@@ -339,21 +363,8 @@ class AverageMeter:
distributed = (get_world_size() > 1) distributed = (get_world_size() > 1)
if distributed: if distributed:
# Backward/forward compatibility around backend = dist.get_backend()
# https://github.com/pytorch/pytorch/commit/540ef9b1fc5506369a48491af8a285a686689b36 and cuda = (backend == dist.Backend.NCCL)
# https://github.com/pytorch/pytorch/commit/044d00516ccd6572c0d6ab6d54587155b02a3b86
# To accomodate change in Pytorch's distributed API
if hasattr(dist, "get_backend"):
_backend = dist.get_backend()
if hasattr(dist, "DistBackend"):
backend_enum_holder = dist.DistBackend
else:
backend_enum_holder = dist.Backend
else:
_backend = dist._backend
backend_enum_holder = dist.dist_backend
cuda = _backend == backend_enum_holder.NCCL
if cuda: if cuda:
avg = torch.cuda.FloatTensor([self.avg]) avg = torch.cuda.FloatTensor([self.avg])
...@@ -370,33 +381,3 @@ class AverageMeter: ...@@ -370,33 +381,3 @@ class AverageMeter:
if op == 'mean': if op == 'mean':
self.avg /= get_world_size() self.avg /= get_world_size()
self.sum /= get_world_size() self.sum /= get_world_size()
def debug_tensor(tensor, name):
"""
Simple utility which helps with debugging.
Takes a tensor and outputs: min, max, avg, std, number of NaNs, number of
INFs.
:param tensor: torch tensor
:param name: name of the tensor (only for logging)
"""
logging.info(name)
tensor = tensor.detach().float().cpu().numpy()
logging.info(f'MIN: {tensor.min()} MAX: {tensor.max()} '
f'AVG: {tensor.mean()} STD: {tensor.std()} '
f'NAN: {np.isnan(tensor).sum()} INF: {np.isinf(tensor).sum()}')
#
#def l2_promote():
# # Check what's the device limit for current device, should be 64 by default
# pValue = ctypes.cast((ctypes.c_int*1)(), ctypes.POINTER(ctypes.c_int))
# result = _libcudart.cudaDeviceGetLimit(pValue, ctypes.c_int(0x05))
#
# # Set device limit on the current device
# # cudaLimitMaxL2FetchGranularity = 0x05
# result = _libcudart.cudaDeviceSetLimit(ctypes.c_int(0x05), ctypes.c_int(128))
#
# # Get the device limit again, should be 128
# result = _libcudart.cudaDeviceGetLimit(pValue, ctypes.c_int(0x05))
# logging.info(f'L2 promotion: {pValue[0]}B')
from setuptools import setup, find_packages
from torch.utils.cpp_extension import BuildExtension, CUDAExtension
import sys
ROCM_HOME= '/opt/rocm-3.3.0/'
nvcc='/opt/rocm-3.3.0/hip/bin/hipcc'
if sys.version_info < (3,):
sys.exit('Sorry, Python3 is required for gnmt.')
with open('requirements.txt') as f:
reqs = f.read()
extra_cuda_compile_args = {
'cxx': ['-O2', ] + ['-DHCC_ENABLE_ACCELERATOR_PRINTF'],
#'nvcc': ['--gpu-architecture=sm_70', ]
'nvcc': ['-O3',] + ['-fno-gpu-rdc',]+['--amdgpu-target=gfx906']
}
cat_utils = CUDAExtension(
name='seq2seq.pack_utils._C',
sources=[
'seq2seq/csrc/pack_utils.cpp',
#'seq2seq/csrc/pack_utils_kernel.cu'
'seq2seq/csrc/pack_utils_kernel.hip'
],
extra_compile_args=extra_cuda_compile_args
)
attn_score = CUDAExtension(
name='seq2seq.attn_score._C',
sources=[
'seq2seq/csrc/attn_score_cuda.cpp',
#'seq2seq/csrc/attn_score_cuda_kernel.cu',
'seq2seq/csrc/attn_score_hip_kernel.hip',
],
extra_compile_args=extra_cuda_compile_args
)
setup(
name='gnmt',
version='0.7.0',
description='GNMT',
install_requires=reqs.strip().split('\n'),
packages=find_packages(),
ext_modules=[cat_utils, attn_score],
#ext_modules=[cat_utils],
cmdclass={
'build_ext': BuildExtension
},
test_suite='tests',
)
name: Upload Python Package
on:
release:
types: [created]
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- name: Set up Python
uses: actions/setup-python@v1
with:
python-version: '3.x'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install setuptools wheel twine
- name: Build and publish
env:
TWINE_USERNAME: ${{ secrets.PYPI_USERNAME }}
TWINE_PASSWORD: ${{ secrets.PYPI_PASSWORD }}
run: |
python setup.py sdist bdist_wheel
twine upload dist/*
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
.static_storage/
.media/
local_settings.py
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
CHANGELOG
---------
v0.3.7:
- BPE dropout (Provilkov et al., 2019)
- more efficient glossaries (https://github.com/rsennrich/subword-nmt/pull/69)
v0.3.6:
- fix to subword-bpe command encoding
v0.3.5:
- fix to subword-bpe command under Python 2
- wider support of --total-symbols argument
v0.3.4:
- segment_tokens method to improve library usability (https://github.com/rsennrich/subword-nmt/pull/52)
- support regex glossaries (https://github.com/rsennrich/subword-nmt/pull/56)
- allow unicode separators (https://github.com/rsennrich/subword-nmt/pull/57)
- new option --total-symbols in learn-bpe (commit 61ad8)
- fix documentation (best practices) (https://github.com/rsennrich/subword-nmt/pull/60)
v0.3:
- library is now installable via pip
- fix occasional problems with UTF-8 whitespace and new lines in learn_bpe and apply_bpe.
- do not silently convert UTF-8 newline characters into "\n"
- do not silently convert UTF-8 whitespace characters into " "
- UTF-8 whitespace and newline characters are now considered part of a word, and segmented by BPE
v0.2:
- different, more consistent handling of end-of-word token (commit a749a7) (https://github.com/rsennrich/subword-nmt/issues/19)
- allow passing of vocabulary and frequency threshold to apply_bpe.py, preventing the production of OOV (or rare) subword units (commit a00db)
- made learn_bpe.py deterministic (commit 4c54e)
- various changes to make handling of UTF more consistent between Python versions
- new command line arguments for apply_bpe.py:
- '--glossaries' to prevent given strings from being affected by BPE
- '--merges' to apply a subset of learned BPE operations
- new command line arguments for learn_bpe.py:
- '--dict-input': rather than raw text file, interpret input as a frequency dictionary (as created by get_vocab.py).
v0.1:
- consistent cross-version unicode handling
- all scripts are now deterministic
The MIT License (MIT)
Copyright (c) 2015 University of Edinburgh
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
\ No newline at end of file
Subword Neural Machine Translation
==================================
This repository contains preprocessing scripts to segment text into subword
units. The primary purpose is to facilitate the reproduction of our experiments
on Neural Machine Translation with subword units (see below for reference).
INSTALLATION
------------
install via pip (from PyPI):
pip install subword-nmt
install via pip (from Github):
pip install https://github.com/rsennrich/subword-nmt/archive/master.zip
alternatively, clone this repository; the scripts are executable stand-alone.
USAGE INSTRUCTIONS
------------------
Check the individual files for usage instructions.
To apply byte pair encoding to word segmentation, invoke these commands:
subword-nmt learn-bpe -s {num_operations} < {train_file} > {codes_file}
subword-nmt apply-bpe -c {codes_file} < {test_file} > {out_file}
To segment rare words into character n-grams, do the following:
subword-nmt get-vocab --train_file {train_file} --vocab_file {vocab_file}
subword-nmt segment-char-ngrams --vocab {vocab_file} -n {order} --shortlist {size} < {test_file} > {out_file}
The original segmentation can be restored with a simple replacement:
sed -r 's/(@@ )|(@@ ?$)//g'
If you cloned the repository and did not install a package, you can also run the individual commands as scripts:
./subword_nmt/learn_bpe.py -s {num_operations} < {train_file} > {codes_file}
BEST PRACTICE ADVICE FOR BYTE PAIR ENCODING IN NMT
--------------------------------------------------
We found that for languages that share an alphabet, learning BPE on the
concatenation of the (two or more) involved languages increases the consistency
of segmentation, and reduces the problem of inserting/deleting characters when
copying/transliterating names.
However, this introduces undesirable edge cases in that a word may be segmented
in a way that has only been observed in the other language, and is thus unknown
at test time. To prevent this, `apply_bpe.py` accepts a `--vocabulary` and a
`--vocabulary-threshold` option so that the script will only produce symbols
which also appear in the vocabulary (with at least some frequency).
To use this functionality, we recommend the following recipe (assuming L1 and L2
are the two languages):
Learn byte pair encoding on the concatenation of the training text, and get resulting vocabulary for each:
cat {train_file}.L1 {train_file}.L2 | subword-nmt learn-bpe -s {num_operations} -o {codes_file}
subword-nmt apply-bpe -c {codes_file} < {train_file}.L1 | subword-nmt get-vocab > {vocab_file}.L1
subword-nmt apply-bpe -c {codes_file} < {train_file}.L2 | subword-nmt get-vocab > {vocab_file}.L2
more conventiently, you can do the same with with this command:
subword-nmt learn-joint-bpe-and-vocab --input {train_file}.L1 {train_file}.L2 -s {num_operations} -o {codes_file} --write-vocabulary {vocab_file}.L1 {vocab_file}.L2
re-apply byte pair encoding with vocabulary filter:
subword-nmt apply-bpe -c {codes_file} --vocabulary {vocab_file}.L1 --vocabulary-threshold 50 < {train_file}.L1 > {train_file}.BPE.L1
subword-nmt apply-bpe -c {codes_file} --vocabulary {vocab_file}.L2 --vocabulary-threshold 50 < {train_file}.L2 > {train_file}.BPE.L2
as a last step, extract the vocabulary to be used by the neural network. Example with Nematus:
nematus/data/build_dictionary.py {train_file}.BPE.L1 {train_file}.BPE.L2
[you may want to take the union of all vocabularies to support multilingual systems]
for test/dev data, re-use the same options for consistency:
subword-nmt apply-bpe -c {codes_file} --vocabulary {vocab_file}.L1 --vocabulary-threshold 50 < {test_file}.L1 > {test_file}.BPE.L1
ADVANCED FEATURES
-----------------
On top of the basic BPE implementation, this repository supports:
- BPE dropout (Provilkov, Emelianenko and Voita, 2019): https://arxiv.org/abs/1910.13267
use the argument `--dropout 0.1` for `subword-nmt apply-bpe` to randomly drop out possible merges.
Doing this on the training corpus can improve quality of the final system; at test time, use BPE without dropout.
In order to obtain reproducible results, argument `--seed` can be used to set the random seed.
**Note:** In the original paper, the authors used BPE-Dropout on each new batch separately. You can copy the training corpus several times to get similar behavior to obtain multiple segmentations for the same sentence.
- support for glossaries:
use the argument `--glossaries` for `subword-nmt apply-bpe` to provide a list of words and/or regular expressions
that should always be passed to the output without subword segmentation
PUBLICATIONS
------------
The segmentation methods are described in:
Rico Sennrich, Barry Haddow and Alexandra Birch (2016):
Neural Machine Translation of Rare Words with Subword Units
Proceedings of the 54th Annual Meeting of the Association for Computational Linguistics (ACL 2016). Berlin, Germany.
HOW IMPLEMENTATION DIFFERS FROM Sennrich et al. (2016)
------------------------------------------------------
This repository implements the subword segmentation as described in Sennrich et al. (2016),
but since version 0.2, there is one core difference related to end-of-word tokens.
In Sennrich et al. (2016), the end-of-word token `</w>` is initially represented as a separate token, which can be merged with other subwords over time:
```
u n d </w>
f u n d </w>
```
Since 0.2, end-of-word tokens are initially concatenated with the word-final character:
```
u n d</w>
f u n d</w>
```
The new representation ensures that when BPE codes are learned from the above examples and then applied to new text, it is clear that a subword unit `und` is unambiguously word-final, and `un` is unambiguously word-internal, preventing the production of up to two different subword units from each BPE merge operation.
`apply_bpe.py` is backward-compatible and continues to accept old-style BPE files. New-style BPE files are identified by having the following first line: `#version: 0.2`
ACKNOWLEDGMENTS
---------------
This project has received funding from Samsung Electronics Polska sp. z o.o. - Samsung R&D Institute Poland, and from the European Union’s Horizon 2020 research and innovation programme under grant agreement 645452 (QT21).
from setuptools import setup, find_packages
import unittest
import codecs
def test_suite():
test_loader = unittest.TestLoader()
test_suite = test_loader.discover('subword_nmt/tests', pattern='test_*.py')
return test_suite
setup(
name='subword_nmt',
version='0.3.7',
description='Unsupervised Word Segmentation for Neural Machine Translation and Text Generation',
long_description=(codecs.open("README.md", encoding='utf-8').read() +
"\n\n" + codecs.open("CHANGELOG.md", encoding='utf-8').read()),
long_description_content_type="text/markdown",
url='https://github.com/rsennrich/subword-nmt',
author='Rico Sennrich',
license='MIT',
test_suite='setup.test_suite',
classifiers=[
'Intended Audience :: Developers',
'Topic :: Text Processing',
'Topic :: Scientific/Engineering :: Artificial Intelligence',
'License :: OSI Approved :: MIT License',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 3',
],
packages=find_packages(),
entry_points={
'console_scripts': ['subword-nmt=subword_nmt.subword_nmt:main'],
},
include_package_data=True
)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Rico Sennrich
"""Use operations learned with learn_bpe.py to encode a new text.
The text will not be smaller, but use only a fixed vocabulary, with rare words
encoded as variable-length sequences of subword units.
Reference:
Rico Sennrich, Barry Haddow and Alexandra Birch (2015). Neural Machine Translation of Rare Words with Subword Units.
Proceedings of the 54th Annual Meeting of the Association for Computational Linguistics (ACL 2016). Berlin, Germany.
"""
from __future__ import unicode_literals, division
import sys
import os
import inspect
import codecs
import io
import argparse
import re
import warnings
import random
import tempfile
from multiprocessing import Pool, cpu_count
# hack for python2/3 compatibility
from io import open
argparse.open = open
class BPE(object):
def __init__(self, codes, merges=-1, separator='@@', vocab=None, glossaries=None):
codes.seek(0)
offset=1
# check version information
firstline = codes.readline()
if firstline.startswith('#version:'):
self.version = tuple([int(x) for x in re.sub(r'(\.0+)*$','', firstline.split()[-1]).split(".")])
offset += 1
else:
self.version = (0, 1)
codes.seek(0)
self.bpe_codes = [tuple(item.strip('\r\n ').split(' ')) for (n, item) in enumerate(codes.read().rstrip('\n').split('\n')) if (n < merges or merges == -1)]
for i, item in enumerate(self.bpe_codes):
if len(item) != 2:
sys.stderr.write('Error: invalid line {0} in BPE codes file: {1}\n'.format(i+offset, ' '.join(item)))
sys.stderr.write('The line should exist of exactly two subword units, separated by whitespace\n')
sys.exit(1)
# some hacking to deal with duplicates (only consider first instance)
self.bpe_codes = dict([(code,i) for (i,code) in reversed(list(enumerate(self.bpe_codes)))])
self.bpe_codes_reverse = dict([(pair[0] + pair[1], pair) for pair,i in self.bpe_codes.items()])
self.separator = separator
self.vocab = vocab
self.glossaries = glossaries if glossaries else []
self.glossaries_regex = re.compile('^({})$'.format('|'.join(glossaries))) if glossaries else None
self.cache = {}
def process_lines(self, filename, outfile, dropout=0, num_workers=1):
if sys.version_info < (3, 0):
print("Parallel mode is only supported in Python3.")
sys.exit(1)
if num_workers == 1:
_process_lines(self, filename, outfile, dropout, 0, 0)
elif num_workers > 1:
with open(filename, encoding="utf-8") as f:
size = os.fstat(f.fileno()).st_size
chunk_size = int(size / num_workers)
offsets = [0 for _ in range(num_workers + 1)]
for i in range(1, num_workers):
f.seek(chunk_size * i)
pos = f.tell()
while True:
try:
line = f.readline()
break
except UnicodeDecodeError:
pos -= 1
f.seek(pos)
offsets[i] = f.tell()
assert 0 <= offsets[i] < 1e20, "Bad new line separator, e.g. '\\r'"
res_files = []
pool = Pool(processes=num_workers)
for i in range(num_workers):
tmp = tempfile.NamedTemporaryFile(delete=False)
tmp.close()
res_files.append(tmp)
pool.apply_async(_process_lines, (self, filename, tmp.name, dropout, offsets[i], offsets[i + 1]))
pool.close()
pool.join()
for i in range(num_workers):
with open(res_files[i].name, encoding="utf-8") as fi:
for line in fi:
outfile.write(line)
os.remove(res_files[i].name)
else:
raise ValueError('`num_workers` is expected to be a positive number, but got {}.'.format(num_workers))
def process_line(self, line, dropout=0):
"""segment line, dealing with leading and trailing whitespace"""
out = ""
leading_whitespace = len(line)-len(line.lstrip('\r\n '))
if leading_whitespace:
out += line[:leading_whitespace]
out += self.segment(line, dropout)
trailing_whitespace = len(line)-len(line.rstrip('\r\n '))
if trailing_whitespace and trailing_whitespace != len(line):
out += line[-trailing_whitespace:]
return out
def segment(self, sentence, dropout=0):
"""segment single sentence (whitespace-tokenized string) with BPE encoding"""
segments = self.segment_tokens(sentence.strip('\r\n ').split(' '), dropout)
return ' '.join(segments)
def segment_tokens(self, tokens, dropout=0):
"""segment a sequence of tokens with BPE encoding"""
output = []
for word in tokens:
# eliminate double spaces
if not word:
continue
new_word = [out for segment in self._isolate_glossaries(word)
for out in encode(segment,
self.bpe_codes,
self.bpe_codes_reverse,
self.vocab,
self.separator,
self.version,
self.cache,
self.glossaries_regex,
dropout)]
for item in new_word[:-1]:
output.append(item + self.separator)
output.append(new_word[-1])
return output
def _isolate_glossaries(self, word):
word_segments = [word]
for gloss in self.glossaries:
word_segments = [out_segments for segment in word_segments
for out_segments in isolate_glossary(segment, gloss)]
return word_segments
def _process_lines(bpe, filename, outfile, dropout, begin, end):
if isinstance(outfile, str):
fo = open(outfile, "w", encoding="utf-8")
else:
fo = outfile
with open(filename, encoding="utf-8") as f:
f.seek(begin)
line = f.readline()
while line:
pos = f.tell()
assert 0 <= pos < 1e20, "Bad new line separator, e.g. '\\r'"
if end > 0 and pos > end:
break
fo.write(bpe.process_line(line, dropout))
line = f.readline()
if isinstance(outfile, str):
fo.close()
def create_parser(subparsers=None):
if subparsers:
parser = subparsers.add_parser('apply-bpe',
formatter_class=argparse.RawDescriptionHelpFormatter,
description="learn BPE-based word segmentation")
else:
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description="learn BPE-based word segmentation")
parser.add_argument(
'--input', '-i', type=argparse.FileType('r'), default=sys.stdin,
metavar='PATH',
help="Input file (default: standard input).")
parser.add_argument(
'--codes', '-c', type=argparse.FileType('r'), metavar='PATH',
required=True,
help="File with BPE codes (created by learn_bpe.py).")
parser.add_argument(
'--merges', '-m', type=int, default=-1,
metavar='INT',
help="Use this many BPE operations (<= number of learned symbols)"+
"default: Apply all the learned merge operations")
parser.add_argument(
'--output', '-o', type=argparse.FileType('w'), default=sys.stdout,
metavar='PATH',
help="Output file (default: standard output)")
parser.add_argument(
'--separator', '-s', type=str, default='@@', metavar='STR',
help="Separator between non-final subword units (default: '%(default)s'))")
parser.add_argument(
'--vocabulary', type=argparse.FileType('r'), default=None,
metavar="PATH",
help="Vocabulary file (built with get_vocab.py). If provided, this script reverts any merge operations that produce an OOV.")
parser.add_argument(
'--vocabulary-threshold', type=int, default=None,
metavar="INT",
help="Vocabulary threshold. If vocabulary is provided, any word with frequency < threshold will be treated as OOV")
parser.add_argument(
'--dropout', type=float, default=0,
metavar="P",
help="Dropout BPE merge operations with probability P (Provilkov et al., 2019). Use this on training data only.")
parser.add_argument(
'--glossaries', type=str, nargs='+', default=None,
metavar="STR",
help="Glossaries. Words matching any of the words/regex provided in glossaries will not be affected "+
"by the BPE (i.e. they will neither be broken into subwords, nor concatenated with other subwords. "+
"Can be provided as a list of words/regex after the --glossaries argument. Enclose each regex in quotes.")
parser.add_argument(
'--seed', type=int, default=None,
metavar="S",
help="Random seed for the random number generators (e.g. for BPE dropout with --dropout).")
parser.add_argument(
'--num-workers', type=int, default=1,
help="Number of processors to process texts, only supported in Python3. If -1, set `multiprocessing.cpu_count()`. (default: %(default)s)")
return parser
def encode(orig, bpe_codes, bpe_codes_reverse, vocab, separator, version, cache, glossaries_regex=None, dropout=0):
"""Encode word based on list of BPE merge operations, which are applied consecutively
"""
if not dropout and orig in cache:
return cache[orig]
if glossaries_regex and glossaries_regex.match(orig):
cache[orig] = (orig,)
return (orig,)
if len(orig) == 1:
return orig
if version == (0, 1):
word = list(orig) + ['</w>']
elif version == (0, 2): # more consistent handling of word-final segments
word = list(orig[:-1]) + [orig[-1] + '</w>']
else:
raise NotImplementedError
while len(word) > 1:
# get list of symbol pairs; optionally apply dropout
pairs = [(bpe_codes[pair],i,pair) for (i,pair) in enumerate(zip(word, word[1:])) if (not dropout or random.random() > dropout) and pair in bpe_codes]
if not pairs:
break
#get first merge operation in list of BPE codes
bigram = min(pairs)[2]
# find start position of all pairs that we want to merge
positions = [i for (rank,i,pair) in pairs if pair == bigram]
i = 0
new_word = []
bigram = ''.join(bigram)
for j in positions:
# merges are invalid if they start before current position. This can happen if there are overlapping pairs: (x x x -> xx x)
if j < i:
continue
new_word.extend(word[i:j]) # all symbols before merged pair
new_word.append(bigram) # merged pair
i = j+2 # continue after merged pair
new_word.extend(word[i:]) # add all symbols until end of word
word = new_word
# don't print end-of-word symbols
if word[-1] == '</w>':
word = word[:-1]
elif word[-1].endswith('</w>'):
word[-1] = word[-1][:-4]
word = tuple(word)
if vocab:
word = check_vocab_and_split(word, bpe_codes_reverse, vocab, separator)
cache[orig] = word
return word
def recursive_split(segment, bpe_codes, vocab, separator, final=False):
"""Recursively split segment into smaller units (by reversing BPE merges)
until all units are either in-vocabulary, or cannot be split futher."""
try:
if final:
left, right = bpe_codes[segment + '</w>']
right = right[:-4]
else:
left, right = bpe_codes[segment]
except:
#sys.stderr.write('cannot split {0} further.\n'.format(segment))
yield segment
return
if left + separator in vocab:
yield left
else:
for item in recursive_split(left, bpe_codes, vocab, separator, False):
yield item
if (final and right in vocab) or (not final and right + separator in vocab):
yield right
else:
for item in recursive_split(right, bpe_codes, vocab, separator, final):
yield item
def check_vocab_and_split(orig, bpe_codes, vocab, separator):
"""Check for each segment in word if it is in-vocabulary,
and segment OOV segments into smaller units by reversing the BPE merge operations"""
out = []
for segment in orig[:-1]:
if segment + separator in vocab:
out.append(segment)
else:
#sys.stderr.write('OOV: {0}\n'.format(segment))
for item in recursive_split(segment, bpe_codes, vocab, separator, False):
out.append(item)
segment = orig[-1]
if segment in vocab:
out.append(segment)
else:
#sys.stderr.write('OOV: {0}\n'.format(segment))
for item in recursive_split(segment, bpe_codes, vocab, separator, True):
out.append(item)
return out
def read_vocabulary(vocab_file, threshold):
"""read vocabulary file produced by get_vocab.py, and filter according to frequency threshold.
"""
vocabulary = set()
for line in vocab_file:
word, freq = line.strip('\r\n ').split(' ')
freq = int(freq)
if threshold == None or freq >= threshold:
vocabulary.add(word)
return vocabulary
def isolate_glossary(word, glossary):
"""
Isolate a glossary present inside a word.
Returns a list of subwords. In which all 'glossary' glossaries are isolated
For example, if 'USA' is the glossary and '1934USABUSA' the word, the return value is:
['1934', 'USA', 'B', 'USA']
"""
# regex equivalent of (if word == glossary or glossary not in word)
if re.match('^'+glossary+'$', word) or not re.search(glossary, word):
return [word]
else:
segments = re.split(r'({})'.format(glossary), word)
segments, ending = segments[:-1], segments[-1]
segments = list(filter(None, segments)) # Remove empty strings in regex group.
return segments + [ending.strip('\r\n ')] if ending != '' else segments
if __name__ == '__main__':
currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
newdir = os.path.join(currentdir, 'subword_nmt')
if os.path.isdir(newdir):
warnings.simplefilter('default')
warnings.warn(
"this script's location has moved to {0}. This symbolic link will be removed in a future version. Please point to the new location, or install the package and use the command 'subword-nmt'".format(newdir),
DeprecationWarning
)
# python 2/3 compatibility
if sys.version_info < (3, 0):
sys.stderr = codecs.getwriter('UTF-8')(sys.stderr)
sys.stdout = codecs.getwriter('UTF-8')(sys.stdout)
sys.stdin = codecs.getreader('UTF-8')(sys.stdin)
else:
sys.stdin = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', write_through=True, line_buffering=True)
parser = create_parser()
args = parser.parse_args()
if args.num_workers <= 0:
args.num_workers = cpu_count()
# read/write files as UTF-8
args.codes = codecs.open(args.codes.name, encoding='utf-8')
if args.input.name != '<stdin>':
args.input = codecs.open(args.input.name, encoding='utf-8')
if args.output.name != '<stdout>':
args.output = codecs.open(args.output.name, 'w', encoding='utf-8')
if args.vocabulary:
args.vocabulary = codecs.open(args.vocabulary.name, encoding='utf-8')
if args.vocabulary:
vocabulary = read_vocabulary(args.vocabulary, args.vocabulary_threshold)
else:
vocabulary = None
if sys.version_info < (3, 0):
args.separator = args.separator.decode('UTF-8')
if args.glossaries:
args.glossaries = [g.decode('UTF-8') for g in args.glossaries]
if args.num_workers > 1:
args.num_workers = 1
warnings.warn("Parallel mode is only supported in Python3. Using 1 processor instead.")
if args.seed is not None:
random.seed(args.seed)
bpe = BPE(args.codes, args.merges, args.separator, vocabulary, args.glossaries)
if args.input.name == '<stdin>' or args.num_workers == 1:
if args.num_workers > 1:
warnings.warn("In parallel mode, the input cannot be STDIN. Using 1 processor instead.")
for line in args.input:
args.output.write(bpe.process_line(line, args.dropout))
else:
bpe.process_lines(args.input.name, args.output, args.dropout, args.num_workers)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Rico Sennrich
"""Use byte pair encoding (BPE) to learn a variable-length encoding of the vocabulary in a text.
Unlike the original BPE, it does not compress the plain text, but can be used to reduce the vocabulary
of a text to a configurable number of symbols, with only a small increase in the number of tokens.
This is an (inefficient) toy implementation that shows the algorithm. For processing large datasets,
indexing and incremental updates can be used to speed up the implementation (see learn_bpe.py).
Reference:
Rico Sennrich, Barry Haddow and Alexandra Birch (2016). Neural Machine Translation of Rare Words with Subword Units.
Proceedings of the 54th Annual Meeting of the Association for Computational Linguistics (ACL 2016). Berlin, Germany.
"""
import re
import sys
import collections
def get_stats(vocab):
pairs = collections.defaultdict(int)
for word, freq in vocab.items():
symbols = word.split()
for i in range(len(symbols)-1):
pairs[symbols[i],symbols[i+1]] += freq
return pairs
def merge_vocab(pair, v_in):
v_out = {}
bigram_pattern = re.escape(' '.join(pair))
p = re.compile(r'(?<!\S)' + bigram_pattern + r'(?!\S)')
for word in v_in:
w_out = p.sub(''.join(pair), word)
v_out[w_out] = v_in[word]
return v_out
vocab = {'l o w</w>' : 5, 'l o w e r</w>' : 2,
'n e w e s t</w>' : 6, 'w i d e s t</w>' : 3}
num_merges = 15
for i in range(num_merges):
pairs = get_stats(vocab)
try:
best = max(pairs, key=pairs.get)
except ValueError:
break
if pairs[best] < 2:
sys.stderr.write('no pair has frequency > 1. Stopping\n')
break
vocab = merge_vocab(best, vocab)
print(best)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Rico Sennrich
"""Compute chrF3 for machine translation evaluation
Reference:
Maja Popović (2015). chrF: character n-gram F-score for automatic MT evaluation. In Proceedings of the Tenth Workshop on Statistical Machine Translationn, pages 392–395, Lisbon, Portugal.
"""
from __future__ import print_function, unicode_literals, division
import sys
import codecs
import io
import argparse
from collections import defaultdict
# hack for python2/3 compatibility
from io import open
argparse.open = open
def create_parser():
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description="learn BPE-based word segmentation")
parser.add_argument(
'--ref', '-r', type=argparse.FileType('r'), required=True,
metavar='PATH',
help="Reference file")
parser.add_argument(
'--hyp', type=argparse.FileType('r'), metavar='PATH',
default=sys.stdin,
help="Hypothesis file (default: stdin).")
parser.add_argument(
'--beta', '-b', type=float, default=3,
metavar='FLOAT',
help="beta parameter (default: '%(default)s')")
parser.add_argument(
'--ngram', '-n', type=int, default=6,
metavar='INT',
help="ngram order (default: '%(default)s')")
parser.add_argument(
'--space', '-s', action='store_true',
help="take spaces into account (default: '%(default)s')")
parser.add_argument(
'--precision', action='store_true',
help="report precision (default: '%(default)s')")
parser.add_argument(
'--recall', action='store_true',
help="report recall (default: '%(default)s')")
return parser
def extract_ngrams(words, max_length=4, spaces=False):
if not spaces:
words = ''.join(words.split())
else:
words = words.strip()
results = defaultdict(lambda: defaultdict(int))
for length in range(max_length):
for start_pos in range(len(words)):
end_pos = start_pos + length + 1
if end_pos <= len(words):
results[length][tuple(words[start_pos: end_pos])] += 1
return results
def get_correct(ngrams_ref, ngrams_test, correct, total):
for rank in ngrams_test:
for chain in ngrams_test[rank]:
total[rank] += ngrams_test[rank][chain]
if chain in ngrams_ref[rank]:
correct[rank] += min(ngrams_test[rank][chain], ngrams_ref[rank][chain])
return correct, total
def f1(correct, total_hyp, total_ref, max_length, beta=3, smooth=0):
precision = 0
recall = 0
for i in range(max_length):
if total_hyp[i] + smooth and total_ref[i] + smooth:
precision += (correct[i] + smooth) / (total_hyp[i] + smooth)
recall += (correct[i] + smooth) / (total_ref[i] + smooth)
precision /= max_length
recall /= max_length
return (1 + beta**2) * (precision*recall) / ((beta**2 * precision) + recall), precision, recall
def main(args):
correct = [0]*args.ngram
total = [0]*args.ngram
total_ref = [0]*args.ngram
for line in args.ref:
line2 = args.hyp.readline()
ngrams_ref = extract_ngrams(line, max_length=args.ngram, spaces=args.space)
ngrams_test = extract_ngrams(line2, max_length=args.ngram, spaces=args.space)
get_correct(ngrams_ref, ngrams_test, correct, total)
for rank in ngrams_ref:
for chain in ngrams_ref[rank]:
total_ref[rank] += ngrams_ref[rank][chain]
chrf, precision, recall = f1(correct, total, total_ref, args.ngram, args.beta)
print('chrF3: {0:.4f}'.format(chrf))
if args.precision:
print('chrPrec: {0:.4f}'.format(precision))
if args.recall:
print('chrRec: {0:.4f}'.format(recall))
if __name__ == '__main__':
# python 2/3 compatibility
if sys.version_info < (3, 0):
sys.stderr = codecs.getwriter('UTF-8')(sys.stderr)
sys.stdout = codecs.getwriter('UTF-8')(sys.stdout)
sys.stdin = codecs.getreader('UTF-8')(sys.stdin)
else:
sys.stdin = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', write_through=True, line_buffering=True)
parser = create_parser()
args = parser.parse_args()
main(args)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment