"...bert-pytorch.git" did not exist on "230156c425914d660104afccd6ba06c459affee9"
Commit 7143f128 authored by sunxx1's avatar sunxx1
Browse files

Merge branch 'hepj-test' into 'main'

更新transformer代码

See merge request dcutoolkit/deeplearing/dlexamples_new!47
parents a30b77fe c0f05c10
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import itertools
import logging
import math
import operator
import os
import queue
import time
from threading import Thread
import numpy as np
import torch
from fairseq.data import data_utils
logger = logging.getLogger(__name__)
# Object used by _background_consumer to signal the source is exhausted
# to the main thread.
_sentinel = object()
class CountingIterator(object):
"""Wrapper around an iterable that maintains the iteration count.
Args:
iterable (iterable): iterable to wrap
start (int): starting iteration count. Note that this doesn't
actually advance the iterator.
total (int): override the iterator length returned by ``__len``.
This can be used to truncate *iterator*.
Attributes:
n (int): number of elements consumed from this iterator
"""
def __init__(self, iterable, start=None, total=None):
self._itr = iter(iterable)
self.n = start or getattr(iterable, "n", 0)
self.total = total if total is not None else self.n + len(iterable)
def __len__(self):
return self.total
def __iter__(self):
return self
def __next__(self):
if not self.has_next():
raise StopIteration
try:
x = next(self._itr)
except StopIteration:
raise IndexError(
f"Iterator expected to have length {self.total}, "
f"but exhausted at position {self.n}."
)
self.n += 1
return x
def has_next(self):
"""Whether the iterator has been exhausted."""
return self.n < self.total
def skip(self, n):
"""Fast-forward the iterator by skipping n elements."""
for _ in range(n):
next(self)
return self
def take(self, n):
"""Truncate the iterator to n elements at most."""
self.total = min(self.total, n)
# Propagate this change to the underlying iterator
if hasattr(self._itr, "take"):
self._itr.take(max(n - self.n, 0))
return self
class EpochBatchIterating(object):
def __len__(self) -> int:
raise NotImplementedError
@property
def next_epoch_idx(self):
raise NotImplementedError
def next_epoch_itr(
self, shuffle=True, fix_batches_to_gpus=False, set_dataset_epoch=True
):
"""Return a new iterator over the dataset.
Args:
shuffle (bool, optional): shuffle batches before returning the
iterator (default: True).
fix_batches_to_gpus (bool, optional): ensure that batches are always
allocated to the same shards across epochs. Requires
that :attr:`dataset` supports prefetching (default: False).
set_dataset_epoch (bool, optional): update the wrapped Dataset with
the new epoch number (default: True).
"""
raise NotImplementedError
def end_of_epoch(self) -> bool:
"""Returns whether the most recent epoch iterator has been exhausted"""
raise NotImplementedError
@property
def iterations_in_epoch(self) -> int:
"""The number of consumed batches in the current epoch."""
raise NotImplementedError
def state_dict(self):
"""Returns a dictionary containing a whole state of the iterator."""
raise NotImplementedError
def load_state_dict(self, state_dict):
"""Copies the state of the iterator from the given *state_dict*."""
raise NotImplementedError
@property
def first_batch(self):
return "DUMMY"
class StreamingEpochBatchIterator(EpochBatchIterating):
"""A steaming-style iterator over a :class:`torch.utils.data.IterableDataset`.
Args:
dataset (~torch.utils.data.Dataset): dataset from which to load the data
max_sentences: batch size
collate_fn (callable): merges a list of samples to form a mini-batch
num_workers (int, optional): how many subprocesses to use for data
loading. 0 means the data will be loaded in the main process
(default: 0).
epoch (int, optional): the epoch to start the iterator from
(default: 1).
buffer_size (int, optional): the number of batches to keep ready in the
queue. Helps speeding up dataloading. When buffer_size is zero, the
default torch.utils.data.DataLoader preloading is used.
timeout (int, optional): if positive, the timeout value for collecting a batch
from workers. Should always be non-negative (default: ``0``).
"""
def __init__(
self,
dataset,
max_sentences=1,
collate_fn=None,
epoch=1,
num_workers=0,
buffer_size=0,
timeout=0,
):
assert isinstance(dataset, torch.utils.data.IterableDataset)
self.dataset = dataset
self.max_sentences = max_sentences
self.collate_fn = collate_fn
self.epoch = max(epoch, 1) # we use 1-based indexing for epochs
self.num_workers = num_workers
# This upper limit here is to prevent people from abusing this feature
# in a shared computing environment.
self.buffer_size = min(buffer_size, 20)
self.timeout = timeout
self._current_epoch_iterator = None
@property
def next_epoch_idx(self):
"""Return the epoch index after *next_epoch_itr* is called."""
if self._current_epoch_iterator is not None and self.end_of_epoch():
return self.epoch + 1
else:
return self.epoch
def next_epoch_itr(
self, shuffle=True, fix_batches_to_gpus=False, set_dataset_epoch=True
):
self.epoch = self.next_epoch_idx
if set_dataset_epoch and hasattr(self.dataset, "set_epoch"):
self.dataset.set_epoch(self.epoch)
self._current_epoch_iterator = self._get_iterator_for_epoch(self.epoch, shuffle)
return self._current_epoch_iterator
def end_of_epoch(self) -> bool:
return not self._current_epoch_iterator.has_next()
@property
def iterations_in_epoch(self) -> int:
if self._current_epoch_iterator is not None:
return self._current_epoch_iterator.n
return 0
def state_dict(self):
return {
"epoch": self.epoch,
}
def load_state_dict(self, state_dict):
self.epoch = state_dict["epoch"]
def _get_iterator_for_epoch(self, epoch, shuffle, offset=0):
if self.num_workers > 0:
os.environ["PYTHONWARNINGS"] = "ignore:semaphore_tracker:UserWarning"
# Create data loader
worker_init_fn = getattr(self.dataset, "worker_init_fn", None)
itr = torch.utils.data.DataLoader(
self.dataset,
batch_size=self.max_sentences,
collate_fn=self.collate_fn,
num_workers=self.num_workers,
timeout=self.timeout,
worker_init_fn=worker_init_fn,
pin_memory=True,
)
# Wrap with a BufferedIterator if needed
if self.buffer_size > 0:
itr = BufferedIterator(self.buffer_size, itr)
# Wrap with CountingIterator
itr = CountingIterator(itr, start=offset)
return itr
class EpochBatchIterator(EpochBatchIterating):
"""A multi-epoch iterator over a :class:`torch.utils.data.Dataset`.
Compared to :class:`torch.utils.data.DataLoader`, this iterator:
- can be reused across multiple epochs with the :func:`next_epoch_itr`
method (optionally shuffled between epochs)
- can be serialized/deserialized with the :func:`state_dict` and
:func:`load_state_dict` methods
- supports sharding with the *num_shards* and *shard_id* arguments
Args:
dataset (~torch.utils.data.Dataset): dataset from which to load the data
collate_fn (callable): merges a list of samples to form a mini-batch
batch_sampler (~torch.utils.data.Sampler or a callable): an iterator over batches of
indices, or a callable to create such an iterator (~torch.utils.data.Sampler).
A callable batch_sampler will be called for each epoch to enable per epoch dynamic
batch iterators defined by this callable batch_sampler.
seed (int, optional): seed for random number generator for
reproducibility (default: 1).
num_shards (int, optional): shard the data iterator into N
shards (default: 1).
shard_id (int, optional): which shard of the data iterator to
return (default: 0).
num_workers (int, optional): how many subprocesses to use for data
loading. 0 means the data will be loaded in the main process
(default: 0).
epoch (int, optional): the epoch to start the iterator from
(default: 1).
buffer_size (int, optional): the number of batches to keep ready in the
queue. Helps speeding up dataloading. When buffer_size is zero, the
default torch.utils.data.DataLoader preloading is used.
timeout (int, optional): if positive, the timeout value for collecting a batch
from workers. Should always be non-negative (default: ``0``).
disable_shuffling (bool, optional): force disable shuffling
(default: ``False``).
skip_remainder_batch (bool, optional): if set, discard the last batch in an epoch
for the sake of training stability, as the last batch is usually smaller than
local_batch_size * distributed_word_size (default: ``False``).
grouped_shuffling (bool, optional): enable shuffling batches in groups
of num_shards. Ensures that each GPU receives similar length sequences when
batches are sorted by length.
"""
def __init__(
self,
dataset,
collate_fn,
batch_sampler,
seed=1,
num_shards=1,
shard_id=0,
num_workers=0,
epoch=1,
buffer_size=0,
timeout=0,
disable_shuffling=False,
skip_remainder_batch=False,
grouped_shuffling=False,
):
assert isinstance(dataset, torch.utils.data.Dataset)
self.dataset = dataset
self.collate_fn = collate_fn
self.batch_sampler = batch_sampler
self._frozen_batches = (
tuple(batch_sampler) if not callable(batch_sampler) else None
)
self.seed = seed
self.num_shards = num_shards
self.shard_id = shard_id
self.num_workers = num_workers
# This upper limit here is to prevent people from abusing this feature
# in a shared computing environment.
self.buffer_size = min(buffer_size, 20)
self.timeout = timeout
self.disable_shuffling = disable_shuffling
self.skip_remainder_batch = skip_remainder_batch
self.grouped_shuffling = grouped_shuffling
self.epoch = max(epoch, 1) # we use 1-based indexing for epochs
self.shuffle = not disable_shuffling
self._cur_epoch_itr = None
self._next_epoch_itr = None
self._supports_prefetch = getattr(dataset, "supports_prefetch", False)
@property
def frozen_batches(self):
if self._frozen_batches is None:
self._frozen_batches = tuple(self.batch_sampler(self.dataset, self.epoch))
return self._frozen_batches
@property
def first_batch(self):
if len(self.frozen_batches) == 0:
raise Exception(
"The dataset is empty. This could indicate "
"that all elements in the dataset have been skipped. "
"Try increasing the max number of allowed tokens or using "
"a larger dataset."
)
if getattr(self.dataset, "supports_fetch_outside_dataloader", True):
return self.collate_fn([self.dataset[i] for i in self.frozen_batches[0]])
else:
return "DUMMY"
def __len__(self):
return int(math.ceil(len(self.frozen_batches) / float(self.num_shards)))
@property
def n(self):
return self.iterations_in_epoch
@property
def next_epoch_idx(self):
"""Return the epoch index after *next_epoch_itr* is called."""
if self._next_epoch_itr is not None:
return self.epoch
elif self._cur_epoch_itr is not None and self.end_of_epoch():
return self.epoch + 1
else:
return self.epoch
def next_epoch_itr(
self, shuffle=True, fix_batches_to_gpus=False, set_dataset_epoch=True
):
"""Return a new iterator over the dataset.
Args:
shuffle (bool, optional): shuffle batches before returning the
iterator (default: True).
fix_batches_to_gpus (bool, optional): ensure that batches are always
allocated to the same shards across epochs. Requires
that :attr:`dataset` supports prefetching (default: False).
set_dataset_epoch (bool, optional): update the wrapped Dataset with
the new epoch number (default: True).
"""
if self.disable_shuffling:
shuffle = False
prev_epoch = self.epoch
self.epoch = self.next_epoch_idx
if set_dataset_epoch and hasattr(self.dataset, "set_epoch"):
self.dataset.set_epoch(self.epoch)
if self._next_epoch_itr is not None:
self._cur_epoch_itr = self._next_epoch_itr
self._next_epoch_itr = None
else:
if callable(self.batch_sampler) and prev_epoch != self.epoch:
# reset _frozen_batches to refresh the next epoch
self._frozen_batches = None
self._cur_epoch_itr = self._get_iterator_for_epoch(
self.epoch,
shuffle,
fix_batches_to_gpus=fix_batches_to_gpus,
)
self.shuffle = shuffle
return self._cur_epoch_itr
def end_of_epoch(self) -> bool:
"""Returns whether the most recent epoch iterator has been exhausted"""
return not self._cur_epoch_itr.has_next()
@property
def iterations_in_epoch(self):
"""The number of consumed batches in the current epoch."""
if self._cur_epoch_itr is not None:
return self._cur_epoch_itr.n
elif self._next_epoch_itr is not None:
return self._next_epoch_itr.n
return 0
def state_dict(self):
"""Returns a dictionary containing a whole state of the iterator."""
if self.end_of_epoch():
epoch = self.epoch + 1
iter_in_epoch = 0
else:
epoch = self.epoch
iter_in_epoch = self.iterations_in_epoch
return {
"version": 2,
"epoch": epoch,
"iterations_in_epoch": iter_in_epoch,
"shuffle": self.shuffle,
}
def load_state_dict(self, state_dict):
"""Copies the state of the iterator from the given *state_dict*."""
self.epoch = state_dict["epoch"]
itr_pos = state_dict.get("iterations_in_epoch", 0)
version = state_dict.get("version", 1)
if itr_pos > 0:
# fast-forward epoch iterator
self._next_epoch_itr = self._get_iterator_for_epoch(
self.epoch,
shuffle=state_dict.get("shuffle", True),
offset=itr_pos,
)
if self._next_epoch_itr is None:
if version == 1:
# legacy behavior: we finished the epoch, increment epoch counter
self.epoch += 1
else:
raise RuntimeError(
"Cannot resume training due to dataloader mismatch, please "
"report this to the fairseq developers. You can relaunch "
"training with `--reset-dataloader` and it should work."
)
else:
self._next_epoch_itr = None
def _get_iterator_for_epoch(
self, epoch, shuffle, fix_batches_to_gpus=False, offset=0
):
def shuffle_batches(batches, seed):
with data_utils.numpy_seed(seed):
if self.grouped_shuffling:
grouped_batches = [
batches[(i * self.num_shards) : ((i + 1) * self.num_shards)]
for i in range((len(batches) // self.num_shards))
]
np.random.shuffle(grouped_batches)
batches = list(itertools.chain(*grouped_batches))
else:
np.random.shuffle(batches)
return batches
if self._supports_prefetch:
batches = self.frozen_batches
if shuffle and not fix_batches_to_gpus:
batches = shuffle_batches(list(batches), self.seed + epoch)
batches = list(
ShardedIterator(batches, self.num_shards, self.shard_id, fill_value=[])
)
self.dataset.prefetch([i for s in batches for i in s])
if shuffle and fix_batches_to_gpus:
batches = shuffle_batches(batches, self.seed + epoch + self.shard_id)
else:
if shuffle:
batches = shuffle_batches(list(self.frozen_batches), self.seed + epoch)
else:
batches = self.frozen_batches
batches = list(
ShardedIterator(batches, self.num_shards, self.shard_id, fill_value=[])
)
if offset > 0 and offset >= len(batches):
return None
if self.num_workers > 0:
os.environ["PYTHONWARNINGS"] = "ignore:semaphore_tracker:UserWarning"
# Create data loader
itr = torch.utils.data.DataLoader(
self.dataset,
collate_fn=self.collate_fn,
batch_sampler=batches[offset:],
num_workers=self.num_workers,
timeout=self.timeout,
pin_memory=True,
)
# Wrap with a BufferedIterator if needed
if self.buffer_size > 0:
itr = BufferedIterator(self.buffer_size, itr)
# Wrap with CountingIterator
itr = CountingIterator(itr, start=offset)
if self.skip_remainder_batch:
# TODO: Below is a lazy implementation which discard the final batch regardless
# of whether it is a full batch or not.
total_num_itrs = len(batches) - 1
itr.take(total_num_itrs)
logger.info(f"skip final residual batch, total_num_itrs = {total_num_itrs}")
return itr
class GroupedIterator(CountingIterator):
"""Wrapper around an iterable that returns groups (chunks) of items.
Args:
iterable (iterable): iterable to wrap
chunk_size (int): size of each chunk
skip_remainder_batch (bool, optional): if set, discard the last grouped batch in
each training epoch, as the last grouped batch is usually smaller than
local_batch_size * distributed_word_size * chunk_size (default: ``False``).
Attributes:
n (int): number of elements consumed from this iterator
"""
def __init__(self, iterable, chunk_size, skip_remainder_batch=False):
if skip_remainder_batch:
total_num_itrs = int(math.floor(len(iterable) / float(chunk_size)))
logger.info(
f"skip final residual batch, grouped total_num_itrs = {total_num_itrs}"
)
else:
total_num_itrs = int(math.ceil(len(iterable) / float(chunk_size)))
logger.info(f"grouped total_num_itrs = {total_num_itrs}")
itr = _chunk_iterator(iterable, chunk_size, skip_remainder_batch)
super().__init__(
itr,
start=int(math.ceil(getattr(iterable, "n", 0) / float(chunk_size))),
total=total_num_itrs,
)
self.chunk_size = chunk_size
if skip_remainder_batch:
self.take(total_num_itrs)
# TODO: [Hack] Here the grouped iterator modifies the base iterator size so that
# training can move into the next epoch once the grouped iterator is exhausted.
# Double-check this implementation in case unexpected behavior occurs.
iterable.take(total_num_itrs * chunk_size)
def _chunk_iterator(itr, chunk_size, skip_remainder_batch=False):
chunk = []
for x in itr:
chunk.append(x)
if len(chunk) == chunk_size:
yield chunk
chunk = []
if not skip_remainder_batch and len(chunk) > 0:
yield chunk
class ShardedIterator(CountingIterator):
"""A sharded wrapper around an iterable, padded to length.
Args:
iterable (iterable): iterable to wrap
num_shards (int): number of shards to split the iterable into
shard_id (int): which shard to iterator over
fill_value (Any, optional): padding value when the iterable doesn't
evenly divide *num_shards* (default: None).
Attributes:
n (int): number of elements consumed from this iterator
"""
def __init__(
self, iterable, num_shards, shard_id, fill_value=None, skip_remainder_batch=None
):
"""
Args:
skip_remainder_batch: ignored"""
if shard_id < 0 or shard_id >= num_shards:
raise ValueError("shard_id must be between 0 and num_shards")
sharded_len = int(math.ceil(len(iterable) / float(num_shards)))
itr = map(
operator.itemgetter(1),
itertools.zip_longest(
range(sharded_len),
itertools.islice(iterable, shard_id, len(iterable), num_shards),
fillvalue=fill_value,
),
)
super().__init__(
itr,
start=int(math.ceil(getattr(iterable, "n", 0) / float(num_shards))),
total=sharded_len,
)
class BackgroundConsumer(Thread):
def __init__(self, queue, source, max_len, cuda_device):
Thread.__init__(self)
self._queue = queue
self._source = source
self._max_len = max_len
self.count = 0
self.cuda_device = cuda_device
def run(self):
# set_device to avoid creation of GPU0 context when using pin_memory
if self.cuda_device is not None:
torch.cuda.set_device(self.cuda_device)
try:
for item in self._source:
self._queue.put(item)
# Stop if we reached the maximum length
self.count += 1
if self._max_len is not None and self.count >= self._max_len:
break
# Signal the consumer we are done.
self._queue.put(_sentinel)
except Exception as e:
self._queue.put(e)
class BufferedIterator(object):
def __init__(self, size, iterable):
self._queue = queue.Queue(size)
self._iterable = iterable
self._consumer = None
self.start_time = time.time()
self.warning_time = None
self.total = len(iterable)
def _create_consumer(self):
self._consumer = BackgroundConsumer(
self._queue,
self._iterable,
self.total,
torch.cuda.current_device() if torch.cuda.is_available() else None,
)
self._consumer.daemon = True
self._consumer.start()
def __iter__(self):
return self
def __len__(self):
return self.total
def take(self, n):
self.total = min(self.total, n)
# Propagate this change to the underlying iterator
if hasattr(self._iterable, "take"):
self._iterable.take(n)
return self
def __next__(self):
# Create consumer if not created yet
if self._consumer is None:
self._create_consumer()
# Notify the user if there is a data loading bottleneck
if self._queue.qsize() < min(2, max(1, self._queue.maxsize // 2)):
if time.time() - self.start_time > 5 * 60:
if (
self.warning_time is None
or time.time() - self.warning_time > 15 * 60
):
logger.debug(
"Data loading buffer is empty or nearly empty. This may "
"indicate a data loading bottleneck, and increasing the "
"number of workers (--num-workers) may help."
)
self.warning_time = time.time()
# Get next example
item = self._queue.get(True)
if isinstance(item, Exception):
raise item
if item is _sentinel:
raise StopIteration()
return item
class GroupedEpochBatchIterator(EpochBatchIterator):
"""Grouped version of EpochBatchIterator
It takes several samplers from different datasets.
Each epoch shuffle the dataset wise sampler individually with different
random seed. The those sub samplers are combined with into
one big samplers with deterministic permutation to mix batches from
different datasets. It will act like EpochBatchIterator but make sure
1) data from one data set each time
2) for different workers, they use the same order to fetch the data
so they will use data from the same dataset everytime
mult_rate is used for update_freq > 1 case where we want to make sure update_freq
mini-batches come from same source
"""
def __init__(
self,
dataset,
collate_fn,
batch_samplers,
seed=1,
num_shards=1,
shard_id=0,
num_workers=0,
epoch=0,
mult_rate=1,
buffer_size=0,
skip_remainder_batch=False,
):
super().__init__(
dataset,
collate_fn,
batch_samplers,
seed,
num_shards,
shard_id,
num_workers,
epoch,
buffer_size,
skip_remainder_batch=skip_remainder_batch,
)
# level 0: sub-samplers 1: batch_idx 2: batches
self._frozen_batches = tuple([tuple(sub_batch) for sub_batch in batch_samplers])
self.step_size = mult_rate * num_shards
self.lengths = [
(len(x) // self.step_size) * self.step_size for x in self.frozen_batches
]
def __len__(self):
return sum(self.lengths)
@property
def first_batch(self):
if len(self.frozen_batches) == 0:
raise Exception(
"The dataset is empty. This could indicate "
"that all elements in the dataset have been skipped. "
"Try increasing the max number of allowed tokens or using "
"a larger dataset."
)
if self.dataset.supports_fetch_outside_dataloader:
return self.collate_fn([self.dataset[i] for i in self.frozen_batches[0][0]])
else:
return "DUMMY"
def _get_iterator_for_epoch(
self, epoch, shuffle, fix_batches_to_gpus=False, offset=0
):
def shuffle_batches(batches, seed):
with data_utils.numpy_seed(seed):
np.random.shuffle(batches)
return batches
def return_full_batches(batch_sets, seed, shuffle):
if shuffle:
batch_sets = [shuffle_batches(list(x), seed) for x in batch_sets]
batch_sets = [
batch_sets[i][: self.lengths[i]] for i in range(len(batch_sets))
]
batches = list(itertools.chain.from_iterable(batch_sets))
if shuffle:
with data_utils.numpy_seed(seed):
idx = np.random.permutation(len(batches) // self.step_size)
if len(idx) * self.step_size != len(batches):
raise ValueError(
"ERROR: %d %d %d %d"
% (len(idx), self.step_size, len(batches), self.shard_id),
":".join(["%d" % x for x in self.lengths]),
)
mini_shards = [
batches[i * self.step_size : (i + 1) * self.step_size]
for i in idx
]
batches = list(itertools.chain.from_iterable(mini_shards))
return batches
if self._supports_prefetch:
raise NotImplementedError("To be implemented")
else:
batches = return_full_batches(
self.frozen_batches, self.seed + epoch, shuffle
)
batches = list(
ShardedIterator(batches, self.num_shards, self.shard_id, fill_value=[])
)
if offset > 0 and offset >= len(batches):
return None
if self.num_workers > 0:
os.environ["PYTHONWARNINGS"] = "ignore:semaphore_tracker:UserWarning"
itr = torch.utils.data.DataLoader(
self.dataset,
collate_fn=self.collate_fn,
batch_sampler=batches[offset:],
num_workers=self.num_workers,
)
if self.buffer_size > 0:
itr = BufferedIterator(self.buffer_size, itr)
return CountingIterator(itr, start=offset)
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import logging
import numpy as np
import torch
from fairseq.data import FairseqDataset, data_utils
logger = logging.getLogger(__name__)
def collate(
samples,
pad_idx,
eos_idx,
left_pad_source=True,
left_pad_target=False,
input_feeding=True,
pad_to_length=None,
pad_to_multiple=1,
):
if len(samples) == 0:
return {}
def merge(key, left_pad, move_eos_to_beginning=False, pad_to_length=None):
return data_utils.collate_tokens(
[s[key] for s in samples],
pad_idx,
eos_idx,
left_pad,
move_eos_to_beginning,
pad_to_length=pad_to_length,
pad_to_multiple=pad_to_multiple,
)
def check_alignment(alignment, src_len, tgt_len):
if alignment is None or len(alignment) == 0:
return False
if (
alignment[:, 0].max().item() >= src_len - 1
or alignment[:, 1].max().item() >= tgt_len - 1
):
logger.warning("alignment size mismatch found, skipping alignment!")
return False
return True
def compute_alignment_weights(alignments):
"""
Given a tensor of shape [:, 2] containing the source-target indices
corresponding to the alignments, a weight vector containing the
inverse frequency of each target index is computed.
For e.g. if alignments = [[5, 7], [2, 3], [1, 3], [4, 2]], then
a tensor containing [1., 0.5, 0.5, 1] should be returned (since target
index 3 is repeated twice)
"""
align_tgt = alignments[:, 1]
_, align_tgt_i, align_tgt_c = torch.unique(
align_tgt, return_inverse=True, return_counts=True
)
align_weights = align_tgt_c[align_tgt_i[np.arange(len(align_tgt))]]
return 1.0 / align_weights.float()
id = torch.LongTensor([s["id"] for s in samples])
src_tokens = merge(
"source",
left_pad=left_pad_source,
pad_to_length=pad_to_length["source"] if pad_to_length is not None else None,
)
# sort by descending source length
src_lengths = torch.LongTensor(
[s["source"].ne(pad_idx).long().sum() for s in samples]
)
src_lengths, sort_order = src_lengths.sort(descending=True)
id = id.index_select(0, sort_order)
src_tokens = src_tokens.index_select(0, sort_order)
prev_output_tokens = None
target = None
if samples[0].get("target", None) is not None:
target = merge(
"target",
left_pad=left_pad_target,
pad_to_length=pad_to_length["target"]
if pad_to_length is not None
else None,
)
target = target.index_select(0, sort_order)
tgt_lengths = torch.LongTensor(
[s["target"].ne(pad_idx).long().sum() for s in samples]
).index_select(0, sort_order)
ntokens = tgt_lengths.sum().item()
if samples[0].get("prev_output_tokens", None) is not None:
prev_output_tokens = merge("prev_output_tokens", left_pad=left_pad_target)
elif input_feeding:
# we create a shifted version of targets for feeding the
# previous output token(s) into the next decoder step
prev_output_tokens = merge(
"target",
left_pad=left_pad_target,
move_eos_to_beginning=True,
pad_to_length=pad_to_length["target"]
if pad_to_length is not None
else None,
)
else:
ntokens = src_lengths.sum().item()
batch = {
"id": id,
"nsentences": len(samples),
"ntokens": ntokens,
"net_input": {
"src_tokens": src_tokens,
"src_lengths": src_lengths,
},
"target": target,
}
if prev_output_tokens is not None:
batch["net_input"]["prev_output_tokens"] = prev_output_tokens.index_select(
0, sort_order
)
if samples[0].get("alignment", None) is not None:
bsz, tgt_sz = batch["target"].shape
src_sz = batch["net_input"]["src_tokens"].shape[1]
offsets = torch.zeros((len(sort_order), 2), dtype=torch.long)
offsets[:, 1] += torch.arange(len(sort_order), dtype=torch.long) * tgt_sz
if left_pad_source:
offsets[:, 0] += src_sz - src_lengths
if left_pad_target:
offsets[:, 1] += tgt_sz - tgt_lengths
alignments = [
alignment + offset
for align_idx, offset, src_len, tgt_len in zip(
sort_order, offsets, src_lengths, tgt_lengths
)
for alignment in [samples[align_idx]["alignment"].view(-1, 2)]
if check_alignment(alignment, src_len, tgt_len)
]
if len(alignments) > 0:
alignments = torch.cat(alignments, dim=0)
align_weights = compute_alignment_weights(alignments)
batch["alignments"] = alignments
batch["align_weights"] = align_weights
if samples[0].get("constraints", None) is not None:
# Collate the packed constraints across the samples, padding to
# the length of the longest sample.
lens = [sample.get("constraints").size(0) for sample in samples]
max_len = max(lens)
constraints = torch.zeros((len(samples), max(lens))).long()
for i, sample in enumerate(samples):
constraints[i, 0 : lens[i]] = samples[i].get("constraints")
batch["constraints"] = constraints.index_select(0, sort_order)
return batch
class LanguagePairDataset(FairseqDataset):
"""
A pair of torch.utils.data.Datasets.
Args:
src (torch.utils.data.Dataset): source dataset to wrap
src_sizes (List[int]): source sentence lengths
src_dict (~fairseq.data.Dictionary): source vocabulary
tgt (torch.utils.data.Dataset, optional): target dataset to wrap
tgt_sizes (List[int], optional): target sentence lengths
tgt_dict (~fairseq.data.Dictionary, optional): target vocabulary
left_pad_source (bool, optional): pad source tensors on the left side
(default: True).
left_pad_target (bool, optional): pad target tensors on the left side
(default: False).
shuffle (bool, optional): shuffle dataset elements before batching
(default: True).
input_feeding (bool, optional): create a shifted version of the targets
to be passed into the model for teacher forcing (default: True).
remove_eos_from_source (bool, optional): if set, removes eos from end
of source if it's present (default: False).
append_eos_to_target (bool, optional): if set, appends eos to end of
target if it's absent (default: False).
align_dataset (torch.utils.data.Dataset, optional): dataset
containing alignments.
constraints (Tensor, optional): 2d tensor with a concatenated, zero-
delimited list of constraints for each sentence.
append_bos (bool, optional): if set, appends bos to the beginning of
source/target sentence.
num_buckets (int, optional): if set to a value greater than 0, then
batches will be bucketed into the given number of batch shapes.
src_lang_id (int, optional): source language ID, if set, the collated batch
will contain a field 'src_lang_id' in 'net_input' which indicates the
source language of the samples.
tgt_lang_id (int, optional): target language ID, if set, the collated batch
will contain a field 'tgt_lang_id' which indicates the target language
of the samples.
"""
def __init__(
self,
src,
src_sizes,
src_dict,
tgt=None,
tgt_sizes=None,
tgt_dict=None,
left_pad_source=True,
left_pad_target=False,
shuffle=True,
input_feeding=True,
remove_eos_from_source=False,
append_eos_to_target=False,
align_dataset=None,
constraints=None,
append_bos=False,
eos=None,
num_buckets=0,
src_lang_id=None,
tgt_lang_id=None,
pad_to_multiple=1,
):
if tgt_dict is not None:
assert src_dict.pad() == tgt_dict.pad()
assert src_dict.eos() == tgt_dict.eos()
assert src_dict.unk() == tgt_dict.unk()
if tgt is not None:
assert len(src) == len(
tgt
), "Source and target must contain the same number of examples"
self.src = src
self.tgt = tgt
self.src_sizes = np.array(src_sizes)
self.tgt_sizes = np.array(tgt_sizes) if tgt_sizes is not None else None
self.sizes = (
np.vstack((self.src_sizes, self.tgt_sizes)).T
if self.tgt_sizes is not None
else self.src_sizes
)
self.src_dict = src_dict
self.tgt_dict = tgt_dict
self.left_pad_source = left_pad_source
self.left_pad_target = left_pad_target
self.shuffle = shuffle
self.input_feeding = input_feeding
self.remove_eos_from_source = remove_eos_from_source
self.append_eos_to_target = append_eos_to_target
self.align_dataset = align_dataset
if self.align_dataset is not None:
assert (
self.tgt_sizes is not None
), "Both source and target needed when alignments are provided"
self.constraints = constraints
self.append_bos = append_bos
self.eos = eos if eos is not None else src_dict.eos()
self.src_lang_id = src_lang_id
self.tgt_lang_id = tgt_lang_id
if num_buckets > 0:
from fairseq.data import BucketPadLengthDataset
self.src = BucketPadLengthDataset(
self.src,
sizes=self.src_sizes,
num_buckets=num_buckets,
pad_idx=self.src_dict.pad(),
left_pad=self.left_pad_source,
)
self.src_sizes = self.src.sizes
logger.info("bucketing source lengths: {}".format(list(self.src.buckets)))
if self.tgt is not None:
self.tgt = BucketPadLengthDataset(
self.tgt,
sizes=self.tgt_sizes,
num_buckets=num_buckets,
pad_idx=self.tgt_dict.pad(),
left_pad=self.left_pad_target,
)
self.tgt_sizes = self.tgt.sizes
logger.info(
"bucketing target lengths: {}".format(list(self.tgt.buckets))
)
# determine bucket sizes using self.num_tokens, which will return
# the padded lengths (thanks to BucketPadLengthDataset)
num_tokens = np.vectorize(self.num_tokens, otypes=[np.compat.long])
self.bucketed_num_tokens = num_tokens(np.arange(len(self.src)))
self.buckets = [
(None, num_tokens) for num_tokens in np.unique(self.bucketed_num_tokens)
]
else:
self.buckets = None
self.pad_to_multiple = pad_to_multiple
def get_batch_shapes(self):
return self.buckets
def __getitem__(self, index):
tgt_item = self.tgt[index] if self.tgt is not None else None
src_item = self.src[index]
# Append EOS to end of tgt sentence if it does not have an EOS and remove
# EOS from end of src sentence if it exists. This is useful when we use
# use existing datasets for opposite directions i.e., when we want to
# use tgt_dataset as src_dataset and vice versa
if self.append_eos_to_target:
eos = self.tgt_dict.eos() if self.tgt_dict else self.src_dict.eos()
if self.tgt and self.tgt[index][-1] != eos:
tgt_item = torch.cat([self.tgt[index], torch.LongTensor([eos])])
if self.append_bos:
bos = self.tgt_dict.bos() if self.tgt_dict else self.src_dict.bos()
if self.tgt and self.tgt[index][0] != bos:
tgt_item = torch.cat([torch.LongTensor([bos]), self.tgt[index]])
bos = self.src_dict.bos()
if self.src[index][0] != bos:
src_item = torch.cat([torch.LongTensor([bos]), self.src[index]])
if self.remove_eos_from_source:
eos = self.src_dict.eos()
if self.src[index][-1] == eos:
src_item = self.src[index][:-1]
example = {
"id": index,
"source": src_item,
"target": tgt_item,
}
if self.align_dataset is not None:
example["alignment"] = self.align_dataset[index]
if self.constraints is not None:
example["constraints"] = self.constraints[index]
return example
def __len__(self):
return len(self.src)
def collater(self, samples, pad_to_length=None):
"""Merge a list of samples to form a mini-batch.
Args:
samples (List[dict]): samples to collate
pad_to_length (dict, optional): a dictionary of
{'source': source_pad_to_length, 'target': target_pad_to_length}
to indicate the max length to pad to in source and target respectively.
Returns:
dict: a mini-batch with the following keys:
- `id` (LongTensor): example IDs in the original input order
- `ntokens` (int): total number of tokens in the batch
- `net_input` (dict): the input to the Model, containing keys:
- `src_tokens` (LongTensor): a padded 2D Tensor of tokens in
the source sentence of shape `(bsz, src_len)`. Padding will
appear on the left if *left_pad_source* is ``True``.
- `src_lengths` (LongTensor): 1D Tensor of the unpadded
lengths of each source sentence of shape `(bsz)`
- `prev_output_tokens` (LongTensor): a padded 2D Tensor of
tokens in the target sentence, shifted right by one
position for teacher forcing, of shape `(bsz, tgt_len)`.
This key will not be present if *input_feeding* is
``False``. Padding will appear on the left if
*left_pad_target* is ``True``.
- `src_lang_id` (LongTensor): a long Tensor which contains source
language IDs of each sample in the batch
- `target` (LongTensor): a padded 2D Tensor of tokens in the
target sentence of shape `(bsz, tgt_len)`. Padding will appear
on the left if *left_pad_target* is ``True``.
- `tgt_lang_id` (LongTensor): a long Tensor which contains target language
IDs of each sample in the batch
"""
res = collate(
samples,
pad_idx=self.src_dict.pad(),
eos_idx=self.eos,
left_pad_source=self.left_pad_source,
left_pad_target=self.left_pad_target,
input_feeding=self.input_feeding,
pad_to_length=pad_to_length,
pad_to_multiple=self.pad_to_multiple,
)
if self.src_lang_id is not None or self.tgt_lang_id is not None:
src_tokens = res["net_input"]["src_tokens"]
bsz = src_tokens.size(0)
if self.src_lang_id is not None:
res["net_input"]["src_lang_id"] = (
torch.LongTensor([[self.src_lang_id]]).expand(bsz, 1).to(src_tokens)
)
if self.tgt_lang_id is not None:
res["tgt_lang_id"] = (
torch.LongTensor([[self.tgt_lang_id]]).expand(bsz, 1).to(src_tokens)
)
return res
def num_tokens(self, index):
"""Return the number of tokens in a sample. This value is used to
enforce ``--max-tokens`` during batching."""
return max(
self.src_sizes[index],
self.tgt_sizes[index] if self.tgt_sizes is not None else 0,
)
def num_tokens_vec(self, indices):
"""Return the number of tokens for a set of positions defined by indices.
This value is used to enforce ``--max-tokens`` during batching."""
sizes = self.src_sizes[indices]
if self.tgt_sizes is not None:
sizes = np.maximum(sizes, self.tgt_sizes[indices])
return sizes
def size(self, index):
"""Return an example's size as a float or tuple. This value is used when
filtering a dataset with ``--max-positions``."""
return (
self.src_sizes[index],
self.tgt_sizes[index] if self.tgt_sizes is not None else 0,
)
def ordered_indices(self):
"""Return an ordered list of indices. Batches will be constructed based
on this order."""
if self.shuffle:
indices = np.random.permutation(len(self)).astype(np.int64)
else:
indices = np.arange(len(self), dtype=np.int64)
if self.buckets is None:
# sort by target length, then source length
if self.tgt_sizes is not None:
indices = indices[np.argsort(self.tgt_sizes[indices], kind="mergesort")]
return indices[np.argsort(self.src_sizes[indices], kind="mergesort")]
else:
# sort by bucketed_num_tokens, which is:
# max(padded_src_len, padded_tgt_len)
return indices[
np.argsort(self.bucketed_num_tokens[indices], kind="mergesort")
]
@property
def supports_prefetch(self):
return getattr(self.src, "supports_prefetch", False) and (
getattr(self.tgt, "supports_prefetch", False) or self.tgt is None
)
def prefetch(self, indices):
self.src.prefetch(indices)
if self.tgt is not None:
self.tgt.prefetch(indices)
if self.align_dataset is not None:
self.align_dataset.prefetch(indices)
def filter_indices_by_size(self, indices, max_sizes):
"""Filter a list of sample indices. Remove those that are longer
than specified in max_sizes.
Args:
indices (np.array): original array of sample indices
max_sizes (int or list[int] or tuple[int]): max sample size,
can be defined separately for src and tgt (then list or tuple)
Returns:
np.array: filtered sample array
list: list of removed indices
"""
return data_utils.filter_paired_dataset_indices_by_size(
self.src_sizes,
self.tgt_sizes,
indices,
max_sizes,
)
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from .block_pair_dataset import BlockPairDataset
from .masked_lm_dataset import MaskedLMDataset
from .masked_lm_dictionary import BertDictionary, MaskedLMDictionary
__all__ = [
"BertDictionary",
"BlockPairDataset",
"MaskedLMDataset",
"MaskedLMDictionary",
]
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import math
import numpy as np
import torch
from fairseq.data import FairseqDataset
class BlockPairDataset(FairseqDataset):
"""Break a Dataset of tokens into sentence pair blocks for next sentence
prediction as well as masked language model.
High-level logics are:
1. break input tensor to tensor blocks
2. pair the blocks with 50% next sentence and 50% random sentence
3. return paired blocks as well as related segment labels
Args:
dataset (~torch.utils.data.Dataset): dataset to break into blocks
sizes: array of sentence lengths
dictionary: dictionary for the task
block_size: maximum block size
break_mode: mode for breaking copurs into block pairs. currently we support
2 modes
doc: respect document boundaries and each part of the pair should belong to on document
none: don't respect any boundary and cut tokens evenly
short_seq_prob: probability for generating shorter block pairs
doc_break_size: Size for empty line separating documents. Typically 1 if
the sentences have eos, 0 otherwise.
"""
def __init__(
self,
dataset,
dictionary,
sizes,
block_size,
break_mode="doc",
short_seq_prob=0.1,
doc_break_size=1,
):
super().__init__()
self.dataset = dataset
self.pad = dictionary.pad()
self.eos = dictionary.eos()
self.cls = dictionary.cls()
self.mask = dictionary.mask()
self.sep = dictionary.sep()
self.break_mode = break_mode
self.dictionary = dictionary
self.short_seq_prob = short_seq_prob
self.block_indices = []
assert len(dataset) == len(sizes)
if break_mode == "doc":
cur_doc = []
for sent_id, sz in enumerate(sizes):
assert doc_break_size == 0 or sz != 0, (
"when doc_break_size is non-zero, we expect documents to be"
"separated by a blank line with a single eos."
)
# empty line as document separator
if sz == doc_break_size:
if len(cur_doc) == 0:
continue
self.block_indices.append(cur_doc)
cur_doc = []
else:
cur_doc.append(sent_id)
max_num_tokens = block_size - 3 # Account for [CLS], [SEP], [SEP]
self.sent_pairs = []
self.sizes = []
for doc_id, doc in enumerate(self.block_indices):
self._generate_sentence_pair(doc, doc_id, max_num_tokens, sizes)
elif break_mode is None or break_mode == "none":
# each block should have half of the block size since we are constructing block pair
sent_length = (block_size - 3) // 2
total_len = sum(dataset.sizes)
length = math.ceil(total_len / sent_length)
def block_at(i):
start = i * sent_length
end = min(start + sent_length, total_len)
return (start, end)
sent_indices = np.array([block_at(i) for i in range(length)])
sent_sizes = np.array([e - s for s, e in sent_indices])
dataset_index = self._sent_to_dataset_index(sent_sizes)
# pair sentences
self._pair_sentences(dataset_index)
else:
raise ValueError("Invalid break_mode: " + break_mode)
def _pair_sentences(self, dataset_index):
"""
Give a list of evenly cut blocks/sentences, pair these sentences with 50%
consecutive sentences and 50% random sentences.
This is used for none break mode
"""
# pair sentences
for sent_id, sent in enumerate(dataset_index):
next_sent_label = (
1 if np.random.rand() > 0.5 and sent_id != len(dataset_index) - 1 else 0
)
if next_sent_label:
next_sent = dataset_index[sent_id + 1]
else:
next_sent = dataset_index[
self._skip_sampling(len(dataset_index), [sent_id, sent_id + 1])
]
self.sent_pairs.append((sent, next_sent, next_sent_label))
# The current blocks don't include the special tokens but the
# sizes already account for this
self.sizes.append(3 + sent[3] + next_sent[3])
def _sent_to_dataset_index(self, sent_sizes):
"""
Build index mapping block indices to the underlying dataset indices
"""
dataset_index = []
ds_idx, ds_remaining = -1, 0
for to_consume in sent_sizes:
sent_size = to_consume
if ds_remaining == 0:
ds_idx += 1
ds_remaining = sent_sizes[ds_idx]
start_ds_idx = ds_idx
start_offset = sent_sizes[ds_idx] - ds_remaining
while to_consume > ds_remaining:
to_consume -= ds_remaining
ds_idx += 1
ds_remaining = sent_sizes[ds_idx]
ds_remaining -= to_consume
dataset_index.append(
(
start_ds_idx, # starting index in dataset
start_offset, # starting offset within starting index
ds_idx, # ending index in dataset
sent_size, # sentence length
)
)
assert ds_remaining == 0
assert ds_idx == len(self.dataset) - 1
return dataset_index
def _generate_sentence_pair(self, doc, doc_id, max_num_tokens, sizes):
"""
Go through a single document and genrate sentence paris from it
"""
current_chunk = []
current_length = 0
curr = 0
# To provide more randomness, we decrease target seq length for parts of
# samples (10% by default). Note that max_num_tokens is the hard threshold
# for batching and will never be changed.
target_seq_length = max_num_tokens
if np.random.random() < self.short_seq_prob:
target_seq_length = np.random.randint(2, max_num_tokens)
# loop through all sentences in document
while curr < len(doc):
sent_id = doc[curr]
current_chunk.append(sent_id)
current_length = sum(sizes[current_chunk])
# split chunk and generate pair when exceed target_seq_length or
# finish the loop
if curr == len(doc) - 1 or current_length >= target_seq_length:
# split the chunk into 2 parts
a_end = 1
if len(current_chunk) > 2:
a_end = np.random.randint(1, len(current_chunk) - 1)
sent_a = current_chunk[:a_end]
len_a = sum(sizes[sent_a])
# generate next sentence label, note that if there is only 1 sentence
# in current chunk, label is always 0
next_sent_label = (
1 if np.random.rand() > 0.5 and len(current_chunk) != 1 else 0
)
if not next_sent_label:
# if next sentence label is 0, sample sent_b from a random doc
target_b_length = target_seq_length - len_a
rand_doc_id = self._skip_sampling(len(self.block_indices), [doc_id])
random_doc = self.block_indices[rand_doc_id]
random_start = np.random.randint(0, len(random_doc))
sent_b = []
len_b = 0
for j in range(random_start, len(random_doc)):
sent_b.append(random_doc[j])
len_b = sum(sizes[sent_b])
if len_b >= target_b_length:
break
# return the second part of the chunk since it's not used
num_unused_segments = len(current_chunk) - a_end
curr -= num_unused_segments
else:
# if next sentence label is 1, use the second part of chunk as sent_B
sent_b = current_chunk[a_end:]
len_b = sum(sizes[sent_b])
# currently sent_a and sent_B may be longer than max_num_tokens,
# truncate them and return block idx and offsets for them
sent_a, sent_b = self._truncate_sentences(
sent_a, sent_b, max_num_tokens
)
self.sent_pairs.append((sent_a, sent_b, next_sent_label))
self.sizes.append(3 + sent_a[3] + sent_b[3])
current_chunk = []
curr += 1
def _skip_sampling(self, total, skip_ids):
"""
Generate a random integer which is not in skip_ids. Sample range is [0, total)
TODO: ids in skip_ids should be consecutive, we can extend it to more generic version later
"""
rand_id = np.random.randint(total - len(skip_ids))
return rand_id if rand_id < min(skip_ids) else rand_id + len(skip_ids)
def _truncate_sentences(self, sent_a, sent_b, max_num_tokens):
"""
Trancate a pair of sentence to limit total length under max_num_tokens
Logics:
1. Truncate longer sentence
2. Tokens to be truncated could be at the beginning or the end of the sentnce
Returns:
Truncated sentences represented by dataset idx
"""
len_a, len_b = sum(self.dataset.sizes[sent_a]), sum(self.dataset.sizes[sent_b])
front_cut_a = front_cut_b = end_cut_a = end_cut_b = 0
while True:
total_length = (
len_a + len_b - front_cut_a - front_cut_b - end_cut_a - end_cut_b
)
if total_length <= max_num_tokens:
break
if len_a - front_cut_a - end_cut_a > len_b - front_cut_b - end_cut_b:
if np.random.rand() < 0.5:
front_cut_a += 1
else:
end_cut_a += 1
else:
if np.random.rand() < 0.5:
front_cut_b += 1
else:
end_cut_b += 1
# calculate ds indices as well as offsets and return
truncated_sent_a = self._cut_sentence(sent_a, front_cut_a, end_cut_a)
truncated_sent_b = self._cut_sentence(sent_b, front_cut_b, end_cut_b)
return truncated_sent_a, truncated_sent_b
def _cut_sentence(self, sent, front_cut, end_cut):
"""
Cut a sentence based on the numbers of tokens to be cut from beginning and end
Represent the sentence as dataset idx and return
"""
start_ds_idx, end_ds_idx, offset = sent[0], sent[-1], 0
target_len = sum(self.dataset.sizes[sent]) - front_cut - end_cut
while front_cut > 0:
if self.dataset.sizes[start_ds_idx] > front_cut:
offset += front_cut
break
else:
front_cut -= self.dataset.sizes[start_ds_idx]
start_ds_idx += 1
while end_cut > 0:
if self.dataset.sizes[end_ds_idx] > end_cut:
break
else:
end_cut -= self.dataset.sizes[end_ds_idx]
end_ds_idx -= 1
return start_ds_idx, offset, end_ds_idx, target_len
def _fetch_block(self, start_ds_idx, offset, end_ds_idx, length):
"""
Fetch a block of tokens based on its dataset idx
"""
buffer = torch.cat(
[self.dataset[idx] for idx in range(start_ds_idx, end_ds_idx + 1)]
)
s, e = offset, offset + length
return buffer[s:e]
def __getitem__(self, index):
block1, block2, next_sent_label = self.sent_pairs[index]
block1 = self._fetch_block(*block1)
block2 = self._fetch_block(*block2)
return block1, block2, next_sent_label
def __len__(self):
return len(self.sizes)
@property
def supports_prefetch(self):
return getattr(self.dataset, "supports_prefetch", False)
def prefetch(self, indices):
prefetch_idx = set()
for index in indices:
for block1, block2, _ in [self.sent_pairs[index]]:
for ds_idx in range(block1[0], block1[2] + 1):
prefetch_idx.add(ds_idx)
for ds_idx in range(block2[0], block2[2] + 1):
prefetch_idx.add(ds_idx)
self.dataset.prefetch(prefetch_idx)
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import math
from typing import Dict, List, Tuple
import numpy as np
import torch
from fairseq.data import Dictionary, FairseqDataset, data_utils
from fairseq.data.concat_dataset import ConcatDataset
from fairseq.data.legacy.block_pair_dataset import BlockPairDataset
from fairseq.data.token_block_dataset import TokenBlockDataset
class MaskedLMDataset(FairseqDataset):
"""
A wrapper Dataset for masked language modelling. The dataset
wraps around TokenBlockDataset or BlockedPairDataset and creates a batch
where the input blocks are masked according to the specified masking
probability. Additionally the batch can also contain sentence level targets
if this is specified.
Args:
dataset: Dataset which generates blocks of data. Only BlockPairDataset
and TokenBlockDataset are supported.
sizes: Sentence lengths
vocab: Dictionary with the vocabulary and special tokens.
pad_idx: Id of padding token in dictionary
mask_idx: Id of mask token in dictionary
classif_token_idx: Id of classification token in dictionary. This is the
token associated with the sentence embedding (Eg: CLS for BERT)
sep_token_idx: Id of separator token in dictionary
(Eg: SEP in BERT)
seed: Seed for random number generator for reproducibility.
shuffle: Shuffle the elements before batching.
has_pairs: Specifies whether the underlying dataset
generates a pair of blocks along with a sentence_target or not.
Setting it to True assumes that the underlying dataset generates a
label for the pair of sentences which is surfaced as
sentence_target. The default value assumes a single block with no
sentence target.
segment_id: An optional segment id for filling in the segment labels
when we are in the single block setting (Eg: XLM). Default is 0.
masking_ratio: specifies what percentage of the blocks should be masked.
masking_prob: specifies the probability of a given token being
replaced with the "MASK" token.
random_token_prob: specifies the probability of a given token being
replaced by a random token from the vocabulary.
"""
def __init__(
self,
dataset: FairseqDataset,
sizes: np.ndarray,
vocab: Dictionary,
pad_idx: int,
mask_idx: int,
classif_token_idx: int,
sep_token_idx: int,
seed: int = 1,
shuffle: bool = True,
has_pairs: bool = True,
segment_id: int = 0,
masking_ratio: float = 0.15,
masking_prob: float = 0.8,
random_token_prob: float = 0.1,
):
# Make sure the input datasets are the ones supported
assert (
isinstance(dataset, TokenBlockDataset)
or isinstance(dataset, BlockPairDataset)
or isinstance(dataset, ConcatDataset)
), (
"MaskedLMDataset only wraps TokenBlockDataset or BlockPairDataset or "
"ConcatDataset"
)
self.dataset = dataset
self.sizes = np.array(sizes)
self.vocab = vocab
self.pad_idx = pad_idx
self.mask_idx = mask_idx
self.classif_token_idx = classif_token_idx
self.sep_token_idx = sep_token_idx
self.shuffle = shuffle
self.seed = seed
self.has_pairs = has_pairs
self.segment_id = segment_id
self.masking_ratio = masking_ratio
self.masking_prob = masking_prob
self.random_token_prob = random_token_prob
# If we have only one block then sizes needs to be updated to include
# the classification token
if not has_pairs:
self.sizes = self.sizes + 1
def __getitem__(self, index: int):
# if has_pairs, then expect 2 blocks and a sentence target
if self.has_pairs:
(block_one, block_two, sentence_target) = self.dataset[index]
else:
block_one = self.dataset[index]
return {
"id": index,
"block_one": block_one,
"block_two": block_two if self.has_pairs else None,
"sentence_target": sentence_target if self.has_pairs else None,
}
def __len__(self):
return len(self.dataset)
def _mask_block(
self,
sentence: np.ndarray,
mask_idx: int,
pad_idx: int,
dictionary_token_range: Tuple,
):
"""
Mask tokens for Masked Language Model training
Samples mask_ratio tokens that will be predicted by LM.
Note:This function may not be efficient enough since we had multiple
conversions between np and torch, we can replace them with torch
operators later.
Args:
sentence: 1d tensor to be masked
mask_idx: index to use for masking the sentence
pad_idx: index to use for masking the target for tokens we aren't
predicting
dictionary_token_range: range of indices in dictionary which can
be used for random word replacement
(e.g. without special characters)
Return:
masked_sent: masked sentence
target: target with words which we are not predicting replaced
by pad_idx
"""
masked_sent = np.copy(sentence)
sent_length = len(sentence)
mask_num = math.ceil(sent_length * self.masking_ratio)
mask = np.random.choice(sent_length, mask_num, replace=False)
target = np.copy(sentence)
for i in range(sent_length):
if i in mask:
rand = np.random.random()
# replace with mask if probability is less than masking_prob
# (Eg: 0.8)
if rand < self.masking_prob:
masked_sent[i] = mask_idx
# replace with random token if probability is less than
# masking_prob + random_token_prob (Eg: 0.9)
elif rand < (self.masking_prob + self.random_token_prob):
# sample random token from dictionary
masked_sent[i] = np.random.randint(
dictionary_token_range[0], dictionary_token_range[1]
)
else:
target[i] = pad_idx
return masked_sent, target
def _collate(self, samples: List[Dict], pad_idx: int, eos_idx: int):
"""
Does the heavy lifting for creating a batch from the input list of
examples. The logic is as follows:
1. Mask the input blocks. In case has_pair is True then we have 2
blocks to mask.
2. Prepend the first masked block tensor with the special token
used as sentence embedding. Eg: CLS in BERT. This happens
irrespective of the value of has_pair.
3. If has_pair is True, then append the first masked block with the
special separator token (eg: SEP for BERT) and compute segment
label accordingly. In this case, also append the second masked
block with this special separator token and compute its segment
label.
4. For the targets tensor, prepend and append with padding index
accordingly.
5. Concatenate all tensors.
"""
if len(samples) == 0:
return {}
# To ensure determinism, we reset the state of the PRNG after every
# batch based on the seed and the first id of the batch. This ensures
# that across epochs we get the same mask for the same example. This
# is needed for reproducibility and is how BERT does masking
# TODO: Can we add deteminism without this constraint?
with data_utils.numpy_seed(self.seed + samples[0]["id"]):
for s in samples:
# token range is needed for replacing with random token during
# masking
token_range = (self.vocab.nspecial, len(self.vocab))
# mask according to specified probabilities.
masked_blk_one, masked_tgt_one = self._mask_block(
s["block_one"],
self.mask_idx,
self.pad_idx,
token_range,
)
tokens = np.concatenate([[self.classif_token_idx], masked_blk_one])
targets = np.concatenate([[self.pad_idx], masked_tgt_one])
segments = np.ones(len(tokens)) * self.segment_id
# if has_pairs is True then we need to add the SEP token to both
# the blocks after masking and re-compute segments based on the new
# lengths.
if self.has_pairs:
tokens_one = np.concatenate([tokens, [self.sep_token_idx]])
targets_one = np.concatenate([targets, [self.pad_idx]])
masked_blk_two, masked_tgt_two = self._mask_block(
s["block_two"], self.mask_idx, self.pad_idx, token_range
)
tokens_two = np.concatenate([masked_blk_two, [self.sep_token_idx]])
targets_two = np.concatenate([masked_tgt_two, [self.pad_idx]])
# block + 1 sep + 1 special (CLS)
segments_one = np.zeros(len(tokens_one))
# block + 1 sep
segments_two = np.ones(len(tokens_two))
tokens = np.concatenate([tokens_one, tokens_two])
targets = np.concatenate([targets_one, targets_two])
segments = np.concatenate([segments_one, segments_two])
s["source"] = torch.LongTensor(tokens)
s["segment_labels"] = torch.LongTensor(segments)
s["lm_target"] = torch.LongTensor(targets)
def merge(key):
return data_utils.collate_tokens(
[s[key] for s in samples], pad_idx, eos_idx, left_pad=False
)
return {
"id": torch.LongTensor([s["id"] for s in samples]),
"ntokens": sum(len(s["source"]) for s in samples),
"net_input": {
"src_tokens": merge("source"),
"segment_labels": merge("segment_labels"),
},
"lm_target": merge("lm_target"),
"sentence_target": torch.LongTensor([s["sentence_target"] for s in samples])
if self.has_pairs
else None,
"nsentences": len(samples),
}
def collater(self, samples: List[Dict]):
"""Merge a list of samples to form a mini-batch.
Args:
samples (List[dict]): samples to collate
Returns:
dict: a mini-batch of data
"""
return self._collate(samples, self.vocab.pad(), self.vocab.eos())
def num_tokens(self, index: int):
"""
Return the number of tokens in a sample. This value is used to
enforce max-tokens during batching.
"""
return self.sizes[index]
def size(self, index: int):
"""
Return an example's size as a float or tuple. This value is used when
filtering a dataset with max-positions.
"""
return self.sizes[index]
def ordered_indices(self):
"""
Return an ordered list of indices. Batches will be constructed based
on this order.
"""
if self.shuffle:
return np.random.permutation(len(self))
else:
order = [np.arange(len(self))]
order.append(self.sizes)
return np.lexsort(order)
@property
def supports_prefetch(self):
return getattr(self.dataset, "supports_prefetch", False)
def prefetch(self, indices):
self.dataset.prefetch(indices)
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from fairseq.data import Dictionary
class MaskedLMDictionary(Dictionary):
"""
Dictionary for Masked Language Modelling tasks. This extends Dictionary by
adding the mask symbol.
"""
def __init__(
self,
pad="<pad>",
eos="</s>",
unk="<unk>",
mask="<mask>",
):
super().__init__(pad=pad, eos=eos, unk=unk)
self.mask_word = mask
self.mask_index = self.add_symbol(mask)
self.nspecial = len(self.symbols)
def mask(self):
"""Helper to get index of mask symbol"""
return self.mask_index
class BertDictionary(MaskedLMDictionary):
"""
Dictionary for BERT task. This extends MaskedLMDictionary by adding support
for cls and sep symbols.
"""
def __init__(
self,
pad="<pad>",
eos="</s>",
unk="<unk>",
mask="<mask>",
cls="<cls>",
sep="<sep>",
):
super().__init__(pad=pad, eos=eos, unk=unk, mask=mask)
self.cls_word = cls
self.sep_word = sep
self.cls_index = self.add_symbol(cls)
self.sep_index = self.add_symbol(sep)
self.nspecial = len(self.symbols)
def cls(self):
"""Helper to get index of cls symbol"""
return self.cls_index
def sep(self):
"""Helper to get index of sep symbol"""
return self.sep_index
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from . import BaseWrapperDataset
class ListDataset(BaseWrapperDataset):
def __init__(self, dataset, sizes=None):
super().__init__(dataset)
self._sizes = sizes
def __iter__(self):
for x in self.dataset:
yield x
def collater(self, samples):
return samples
@property
def sizes(self):
return self._sizes
def num_tokens(self, index):
return self.sizes[index]
def size(self, index):
return self.sizes[index]
def set_epoch(self, epoch):
pass
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import numpy as np
import torch
from typing import Dict
from fairseq.data.monolingual_dataset import MonolingualDataset
from . import FairseqDataset
class LMContextWindowDataset(FairseqDataset):
"""
Wraps a MonolingualDataset and provides more context for evaluation.
Each item in the new dataset will have a maximum size of
``tokens_per_sample + context_window``.
Args:
dataset: dataset to wrap
tokens_per_sample (int): the max number of tokens in each dataset item
context_window (int): the number of accumulated tokens to add to each
dataset item
pad_idx (int): padding symbol
"""
def __init__(
self,
dataset: MonolingualDataset,
tokens_per_sample: int,
context_window: int,
pad_idx: int,
):
assert context_window > 0
self.dataset = dataset
self.tokens_per_sample = tokens_per_sample
self.context_window = context_window
self.pad_idx = pad_idx
self.prev_tokens = np.empty([0])
def __getitem__(self, index):
return self.dataset[index]
def __len__(self):
return len(self.dataset)
def collater(self, samples) -> Dict:
sample = self.dataset.collater(samples)
pad = self.pad_idx
max_sample_len = self.tokens_per_sample + self.context_window
bsz, tsz = sample["net_input"]["src_tokens"].shape
start_idxs = [0] * bsz
toks = sample["net_input"]["src_tokens"]
lengths = sample["net_input"]["src_lengths"]
tgt = sample["target"]
new_toks = np.empty([bsz, tsz + self.context_window], dtype=np.int64)
new_tgt = np.full([bsz, tsz + self.context_window], pad, dtype=np.int64)
sample_lens = toks.ne(pad).long().sum(dim=1).cpu()
for i in range(bsz):
sample_len = sample_lens[i]
extra = len(self.prev_tokens) + sample_len - max_sample_len
if extra > 0:
self.prev_tokens = self.prev_tokens[extra:]
pads = np.full(self.context_window - len(self.prev_tokens), pad)
new_toks[i] = np.concatenate([self.prev_tokens, toks[i].numpy(), pads])
new_tgt[
i, len(self.prev_tokens) : len(self.prev_tokens) + len(tgt[i])
] = tgt[i]
start_idxs[i] = len(self.prev_tokens)
lengths[i] += len(self.prev_tokens)
self.prev_tokens = new_toks[i][new_toks[i] != pad][-self.context_window :]
sample["net_input"]["src_tokens"] = torch.from_numpy(new_toks)
sample["target"] = torch.from_numpy(new_tgt)
sample["start_indices"] = start_idxs
return sample
def num_tokens(self, index):
return self.dataset.num_tokens(index)
def size(self, index):
return self.dataset.size(index)
def ordered_indices(self):
# NOTE we don't shuffle the data to retain access to the previous dataset elements
return np.arange(len(self.dataset))
@property
def supports_prefetch(self):
return getattr(self.dataset, "supports_prefetch", False)
def prefetch(self, indices):
return self.dataset.prefetch(indices)
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from functools import lru_cache
from . import BaseWrapperDataset
class LRUCacheDataset(BaseWrapperDataset):
def __init__(self, dataset, token=None):
super().__init__(dataset)
@lru_cache(maxsize=8)
def __getitem__(self, index):
return self.dataset[index]
@lru_cache(maxsize=8)
def collater(self, samples):
return self.dataset.collater(samples)
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from functools import lru_cache
import numpy as np
import torch
from fairseq.data import Dictionary, data_utils
from . import BaseWrapperDataset, LRUCacheDataset
class MaskTokensDataset(BaseWrapperDataset):
"""
A wrapper Dataset for masked language modeling.
Input items are masked according to the specified masking probability.
Args:
dataset: Dataset to wrap.
sizes: Sentence lengths
vocab: Dictionary with the vocabulary and special tokens.
pad_idx: Id of pad token in vocab
mask_idx: Id of mask token in vocab
return_masked_tokens: controls whether to return the non-masked tokens
(the default) or to return a tensor with the original masked token
IDs (and *pad_idx* elsewhere). The latter is useful as targets for
masked LM training.
seed: Seed for random number generator for reproducibility.
mask_prob: probability of replacing a token with *mask_idx*.
leave_unmasked_prob: probability that a masked token is unmasked.
random_token_prob: probability of replacing a masked token with a
random token from the vocabulary.
freq_weighted_replacement: sample random replacement words based on
word frequencies in the vocab.
mask_whole_words: only mask whole words. This should be a byte mask
over vocab indices, indicating whether it is the beginning of a
word. We will extend any mask to encompass the whole word.
bpe: BPE to use for whole-word masking.
mask_multiple_length : repeat each mask index multiple times. Default
value is 1.
mask_stdev : standard deviation of masks distribution in case of
multiple masking. Default value is 0.
"""
@classmethod
def apply_mask(cls, dataset: torch.utils.data.Dataset, *args, **kwargs):
"""Return the source and target datasets for masked LM training."""
dataset = LRUCacheDataset(dataset)
return (
LRUCacheDataset(cls(dataset, *args, **kwargs, return_masked_tokens=False)),
LRUCacheDataset(cls(dataset, *args, **kwargs, return_masked_tokens=True)),
)
def __init__(
self,
dataset: torch.utils.data.Dataset,
vocab: Dictionary,
pad_idx: int,
mask_idx: int,
return_masked_tokens: bool = False,
seed: int = 1,
mask_prob: float = 0.15,
leave_unmasked_prob: float = 0.1,
random_token_prob: float = 0.1,
freq_weighted_replacement: bool = False,
mask_whole_words: torch.Tensor = None,
mask_multiple_length: int = 1,
mask_stdev: float = 0.0,
):
assert 0.0 < mask_prob < 1.0
assert 0.0 <= random_token_prob <= 1.0
assert 0.0 <= leave_unmasked_prob <= 1.0
assert random_token_prob + leave_unmasked_prob <= 1.0
assert mask_multiple_length >= 1
assert mask_stdev >= 0.0
self.dataset = dataset
self.vocab = vocab
self.pad_idx = pad_idx
self.mask_idx = mask_idx
self.return_masked_tokens = return_masked_tokens
self.seed = seed
self.mask_prob = mask_prob
self.leave_unmasked_prob = leave_unmasked_prob
self.random_token_prob = random_token_prob
self.mask_whole_words = mask_whole_words
self.mask_multiple_length = mask_multiple_length
self.mask_stdev = mask_stdev
if random_token_prob > 0.0:
if freq_weighted_replacement:
weights = np.array(self.vocab.count)
else:
weights = np.ones(len(self.vocab))
weights[: self.vocab.nspecial] = 0
self.weights = weights / weights.sum()
self.epoch = 0
@property
def can_reuse_epoch_itr_across_epochs(self):
return True # only the noise changes, not item sizes
def set_epoch(self, epoch, **unused):
super().set_epoch(epoch)
self.epoch = epoch
def __getitem__(self, index: int):
return self.__getitem_cached__(self.seed, self.epoch, index)
@lru_cache(maxsize=8)
def __getitem_cached__(self, seed: int, epoch: int, index: int):
with data_utils.numpy_seed(self.seed, self.epoch, index):
item = self.dataset[index]
sz = len(item)
assert (
self.mask_idx not in item
), "Dataset contains mask_idx (={}), this is not expected!".format(
self.mask_idx,
)
if self.mask_whole_words is not None:
word_begins_mask = self.mask_whole_words.gather(0, item)
word_begins_idx = word_begins_mask.nonzero().view(-1)
sz = len(word_begins_idx)
words = np.split(word_begins_mask, word_begins_idx)[1:]
assert len(words) == sz
word_lens = list(map(len, words))
# decide elements to mask
mask = np.full(sz, False)
num_mask = int(
# add a random number for probabilistic rounding
self.mask_prob * sz / float(self.mask_multiple_length)
+ np.random.rand()
)
# multiple masking as described in the vq-wav2vec paper (https://arxiv.org/abs/1910.05453)
mask_idc = np.random.choice(sz, num_mask, replace=False)
if self.mask_stdev > 0.0:
lengths = np.random.normal(
self.mask_multiple_length, self.mask_stdev, size=num_mask
)
lengths = [max(0, int(round(x))) for x in lengths]
mask_idc = np.asarray(
[
mask_idc[j] + offset
for j in range(len(mask_idc))
for offset in range(lengths[j])
],
dtype=np.int64,
)
else:
mask_idc = np.concatenate(
[mask_idc + i for i in range(self.mask_multiple_length)]
)
mask_idc = mask_idc[mask_idc < len(mask)]
try:
mask[mask_idc] = True
except: # something wrong
print(
"Assigning mask indexes {} to mask {} failed!".format(
mask_idc, mask
)
)
raise
if self.return_masked_tokens:
# exit early if we're just returning the masked tokens
# (i.e., the targets for masked LM training)
if self.mask_whole_words is not None:
mask = np.repeat(mask, word_lens)
new_item = np.full(len(mask), self.pad_idx)
new_item[mask] = item[torch.from_numpy(mask.astype(np.uint8)) == 1]
return torch.from_numpy(new_item)
# decide unmasking and random replacement
rand_or_unmask_prob = self.random_token_prob + self.leave_unmasked_prob
if rand_or_unmask_prob > 0.0:
rand_or_unmask = mask & (np.random.rand(sz) < rand_or_unmask_prob)
if self.random_token_prob == 0.0:
unmask = rand_or_unmask
rand_mask = None
elif self.leave_unmasked_prob == 0.0:
unmask = None
rand_mask = rand_or_unmask
else:
unmask_prob = self.leave_unmasked_prob / rand_or_unmask_prob
decision = np.random.rand(sz) < unmask_prob
unmask = rand_or_unmask & decision
rand_mask = rand_or_unmask & (~decision)
else:
unmask = rand_mask = None
if unmask is not None:
mask = mask ^ unmask
if self.mask_whole_words is not None:
mask = np.repeat(mask, word_lens)
new_item = np.copy(item)
new_item[mask] = self.mask_idx
if rand_mask is not None:
num_rand = rand_mask.sum()
if num_rand > 0:
if self.mask_whole_words is not None:
rand_mask = np.repeat(rand_mask, word_lens)
num_rand = rand_mask.sum()
new_item[rand_mask] = np.random.choice(
len(self.vocab),
num_rand,
p=self.weights,
)
return torch.from_numpy(new_item)
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import numpy as np
import torch
from . import FairseqDataset, data_utils
def collate(samples, pad_idx, eos_idx, fixed_pad_length=None, pad_to_bsz=None):
if len(samples) == 0:
return {}
def merge(key, is_list=False):
if is_list:
res = []
for i in range(len(samples[0][key])):
res.append(
data_utils.collate_tokens(
[s[key][i] for s in samples],
pad_idx,
eos_idx,
left_pad=False,
pad_to_length=fixed_pad_length,
pad_to_bsz=pad_to_bsz,
)
)
return res
else:
return data_utils.collate_tokens(
[s[key] for s in samples],
pad_idx,
eos_idx,
left_pad=False,
pad_to_length=fixed_pad_length,
pad_to_bsz=pad_to_bsz,
)
src_tokens = merge("source")
if samples[0]["target"] is not None:
is_target_list = isinstance(samples[0]["target"], list)
target = merge("target", is_target_list)
else:
target = src_tokens
return {
"id": torch.LongTensor([s["id"] for s in samples]),
"nsentences": len(samples),
"ntokens": sum(len(s["source"]) for s in samples),
"net_input": {
"src_tokens": src_tokens,
"src_lengths": torch.LongTensor([s["source"].numel() for s in samples]),
},
"target": target,
}
class MonolingualDataset(FairseqDataset):
"""
A wrapper around torch.utils.data.Dataset for monolingual data.
Args:
dataset (torch.utils.data.Dataset): dataset to wrap
sizes (List[int]): sentence lengths
vocab (~fairseq.data.Dictionary): vocabulary
shuffle (bool, optional): shuffle the elements before batching
(default: True).
"""
def __init__(
self,
dataset,
sizes,
src_vocab,
tgt_vocab=None,
add_eos_for_other_targets=False,
shuffle=False,
targets=None,
add_bos_token=False,
fixed_pad_length=None,
pad_to_bsz=None,
src_lang_idx=None,
tgt_lang_idx=None,
):
self.dataset = dataset
self.sizes = np.array(sizes)
self.vocab = src_vocab
self.tgt_vocab = tgt_vocab or src_vocab
self.add_eos_for_other_targets = add_eos_for_other_targets
self.shuffle = shuffle
self.add_bos_token = add_bos_token
self.fixed_pad_length = fixed_pad_length
self.pad_to_bsz = pad_to_bsz
self.src_lang_idx = src_lang_idx
self.tgt_lang_idx = tgt_lang_idx
assert targets is None or all(
t in {"self", "future", "past"} for t in targets
), "targets must be none or one of 'self', 'future', 'past'"
if targets is not None and len(targets) == 0:
targets = None
self.targets = targets
def __getitem__(self, index):
if self.targets is not None:
# *future_target* is the original sentence
# *source* is shifted right by 1 (maybe left-padded with eos)
# *past_target* is shifted right by 2 (left-padded as needed)
#
# Left-to-right language models should condition on *source* and
# predict *future_target*.
# Right-to-left language models should condition on *source* and
# predict *past_target*.
source, future_target, past_target = self.dataset[index]
source, target = self._make_source_target(
source, future_target, past_target
)
else:
source = self.dataset[index]
target = None
source, target = self._maybe_add_bos(source, target)
return {"id": index, "source": source, "target": target}
def __len__(self):
return len(self.dataset)
def _make_source_target(self, source, future_target, past_target):
if self.targets is not None:
target = []
if (
self.add_eos_for_other_targets
and (("self" in self.targets) or ("past" in self.targets))
and source[-1] != self.vocab.eos()
):
# append eos at the end of source
source = torch.cat([source, source.new([self.vocab.eos()])])
if "future" in self.targets:
future_target = torch.cat(
[future_target, future_target.new([self.vocab.pad()])]
)
if "past" in self.targets:
# first token is before the start of sentence which is only used in "none" break mode when
# add_eos_for_other_targets is False
past_target = torch.cat(
[
past_target.new([self.vocab.pad()]),
past_target[1:],
source[-2, None],
]
)
for t in self.targets:
if t == "self":
target.append(source)
elif t == "future":
target.append(future_target)
elif t == "past":
target.append(past_target)
else:
raise Exception("invalid target " + t)
if len(target) == 1:
target = target[0]
else:
target = future_target
return source, self._filter_vocab(target)
def _maybe_add_bos(self, source, target):
if self.add_bos_token:
source = torch.cat([source.new([self.vocab.bos()]), source])
if target is not None:
target = torch.cat([target.new([self.tgt_vocab.bos()]), target])
return source, target
def num_tokens_vec(self, indices):
"""Return the number of tokens for a set of positions defined by indices.
This value is used to enforce ``--max-tokens`` during batching."""
return self.sizes[indices]
def _filter_vocab(self, target):
if len(self.tgt_vocab) != len(self.vocab):
def _filter(target):
mask = target.ge(len(self.tgt_vocab))
if mask.any():
target[mask] = self.tgt_vocab.unk()
return target
if isinstance(target, list):
return [_filter(t) for t in target]
return _filter(target)
return target
def collater(self, samples):
"""Merge a list of samples to form a mini-batch.
Args:
samples (List[dict]): samples to collate
Returns:
dict: a mini-batch with the following keys:
- `id` (LongTensor): example IDs in the original input order
- `ntokens` (int): total number of tokens in the batch
- `net_input` (dict): the input to the Model, containing keys:
- `src_tokens` (LongTensor): a padded 2D Tensor of tokens in
the source sentence of shape `(bsz, src_len)`. Padding will
appear on the right.
- `target` (LongTensor): a padded 2D Tensor of tokens in the
target sentence of shape `(bsz, tgt_len)`. Padding will appear
on the right.
"""
return collate(
samples,
self.vocab.pad(),
self.vocab.eos(),
self.fixed_pad_length,
self.pad_to_bsz,
)
def num_tokens(self, index):
"""Return the number of tokens in a sample. This value is used to
enforce ``--max-tokens`` during batching."""
return self.sizes[index]
def size(self, index):
"""Return an example's size as a float or tuple. This value is used when
filtering a dataset with ``--max-positions``."""
return self.sizes[index]
def ordered_indices(self):
"""Return an ordered list of indices. Batches will be constructed based
on this order."""
if self.shuffle:
order = [np.random.permutation(len(self))]
else:
order = [np.arange(len(self))]
order.append(self.sizes)
return np.lexsort(order)
@property
def supports_prefetch(self):
return getattr(self.dataset, "supports_prefetch", False)
def prefetch(self, indices):
self.dataset.prefetch(indices)
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import logging
import time
from collections import OrderedDict
from typing import Dict, List, Optional
import numpy as np
from fairseq.data import data_utils
from . import FairseqDataset
logger = logging.getLogger(__name__)
class MultiCorpusDataset(FairseqDataset):
"""
Stores multiple instances of FairseqDataset together.
Unless batch_sample=True, requires each instance
to be the same dataset, as the collate method needs to work on batches with
samples from each dataset.
Allows specifying a distribution over the datasets to use. Note that unlike
MultiCorpusSampledDataset, this distribution allows sampling for each item,
rather than on a batch level. Note that datasets with sampling probabilty
of 0 will be skipped.
Each time ordered_indices() is called, a new sample is generated with
the specified distribution.
Args:
datasets: a OrderedDict of FairseqDataset instances.
distribution: a List containing the probability of getting an utterance from
corresponding dataset
seed: random seed for sampling the datsets
sort_indices: if true, will sort the ordered indices by size
batch_sample: if true, will ensure each batch is from a single dataset
"""
def __init__(
self,
datasets: Dict[str, FairseqDataset],
distribution: List[float],
seed: int,
sort_indices: bool = False,
batch_sample: bool = False,
distributed_rank: Optional[int] = None,
):
super().__init__()
assert isinstance(datasets, OrderedDict)
assert len(datasets) == len(distribution)
assert sum(distribution) == 1
self.datasets = datasets
self.distribution = distribution
self.seed = seed
self.sort_indices = sort_indices
self.batch_sample = batch_sample
self.distributed_rank = distributed_rank
# Avoid repeated conversions to list later
self.dataset_list = list(datasets.values())
self.total_num_instances = 0
first_dataset = self.dataset_list[0]
self.num_instances_per_dataset = []
self.dataset_offsets = []
for i, dataset in enumerate(self.dataset_list):
assert isinstance(dataset, FairseqDataset)
assert type(dataset) is type(first_dataset)
self.num_instances_per_dataset.append(
0 if self.distribution[i] == 0 else len(dataset)
)
self.dataset_offsets.append(self.total_num_instances)
self.total_num_instances += self.num_instances_per_dataset[i]
def ordered_indices(self):
start = time.time()
with data_utils.numpy_seed(self.seed, self.epoch):
logger.info(
f"sampling new dataset with seed {self.seed} epoch {self.epoch}"
)
sampled_indices = []
num_selected_instances = 0
# For each dataset i, sample self.distribution[i] * self.total_num_instances
for i, key in enumerate(self.datasets):
if self.distribution[i] == 0:
# skip dataset if sampling probability is 0
continue
if i < len(self.datasets) - 1:
num_instances = int(self.distribution[i] * self.total_num_instances)
high = self.dataset_offsets[i + 1]
else:
num_instances = self.total_num_instances - num_selected_instances
high = self.total_num_instances
logger.info(f"sampling {num_instances} from {key} dataset")
num_selected_instances += num_instances
# First, add k copies of the dataset where k = num_instances // len(dataset).
# This ensures an equal distribution of the data points as much as possible.
# For the remaining entries randomly sample them
dataset_size = len(self.datasets[key])
num_copies = num_instances // dataset_size
dataset_indices = (
np.random.permutation(high - self.dataset_offsets[i])
+ self.dataset_offsets[i]
)[: num_instances - num_copies * dataset_size]
if num_copies > 0:
sampled_indices += list(
np.concatenate(
(
np.repeat(
np.arange(self.dataset_offsets[i], high), num_copies
),
dataset_indices,
)
)
)
else:
sampled_indices += list(dataset_indices)
assert (
len(sampled_indices) == self.total_num_instances
), f"{len(sampled_indices)} vs {self.total_num_instances}"
np.random.shuffle(sampled_indices)
if self.sort_indices:
sampled_indices.sort(key=lambda i: self.num_tokens(i))
logger.info(
"multi_corpus_dataset ordered_indices took {}s".format(
time.time() - start
)
)
return np.array(sampled_indices, dtype=np.int64)
def _map_index(self, index: int):
"""
If dataset A has length N and dataset B has length M
then index 1 maps to index 1 of dataset A, and index N + 1
maps to index 1 of B.
"""
counter = 0
for num_instances, key in zip(self.num_instances_per_dataset, self.datasets):
if index < counter + num_instances:
return index - counter, key
counter += num_instances
raise ValueError(
"Invalid index: {}, max: {}".format(index, self.total_num_instances)
)
def __len__(self):
"""
Length of this dataset is the sum of individual datasets
"""
return self.total_num_instances
def __getitem__(self, index):
new_index, key = self._map_index(index)
try:
item = self.datasets[key][new_index]
item["full_id"] = index
return item
except Exception as e:
e.args = (f"Error from {key} dataset", *e.args)
raise
def collater(self, samples):
"""
If we are doing batch sampling, then pick the right collater to use.
Otherwise we assume all collaters are the same.
"""
if len(samples) == 0:
return None
if "full_id" in samples[0]:
_, key = self._map_index(samples[0]["full_id"])
try:
batch = self.datasets[key].collater(samples)
except Exception:
print(f"Collating failed for key {key}", flush=True)
raise
return batch
else:
# Subclasses may override __getitem__ to not specify full_id
return list(self.datasets.values())[0].collater(samples)
def num_tokens(self, index: int):
index, key = self._map_index(index)
return self.datasets[key].num_tokens(index)
def size(self, index: int):
index, key = self._map_index(index)
return self.datasets[key].size(index)
@property
def can_reuse_epoch_itr_across_epochs(self):
return False
def set_epoch(self, epoch, **unused):
super().set_epoch(epoch)
logger.info(f"setting epoch of multi_corpus_dataset to {epoch}")
self.epoch = epoch
@property
def supports_prefetch(self):
return False
@property
def supports_fetch_outside_dataloader(self):
return all(
self.datasets[key].supports_fetch_outside_dataloader
for key in self.datasets
)
def batch_by_size(
self,
indices,
max_tokens=None,
max_sentences=None,
required_batch_size_multiple=1,
):
if not self.batch_sample:
return super().batch_by_size(
indices, max_tokens, max_sentences, required_batch_size_multiple
)
dataset_indices = {key: [] for key in self.datasets}
for i in indices:
_, key = self._map_index(i)
dataset_indices[key].append(i)
batches = []
for key in dataset_indices:
cur_batches = super().batch_by_size(
np.array(dataset_indices[key], dtype=np.int64),
max_tokens,
max_sentences,
required_batch_size_multiple,
)
logger.info(f"Created {len(cur_batches)} batches for dataset {key}")
batches += cur_batches
# If this dataset is used in a distributed training setup,
# then shuffle such that the order is seeded by the distributed rank
# as well
if self.distributed_rank is not None:
with data_utils.numpy_seed(self.seed, self.epoch, self.distributed_rank):
np.random.shuffle(batches)
return batches
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from collections import OrderedDict
from typing import Callable, Dict, List
import numpy as np
from . import FairseqDataset
def uniform_sampler(x):
# Sample from uniform distribution
return np.random.choice(x, 1).item()
class MultiCorpusSampledDataset(FairseqDataset):
"""
Stores multiple instances of FairseqDataset together and in every iteration
creates a batch by first sampling a dataset according to a specified
probability distribution and then getting instances from that dataset.
Args:
datasets: an OrderedDict of FairseqDataset instances.
sampling_func: A function for sampling over list of dataset keys.
The default strategy is to sample uniformly.
"""
def __init__(
self,
datasets: Dict[str, FairseqDataset],
sampling_func: Callable[[List], int] = None,
):
super().__init__()
assert isinstance(datasets, OrderedDict)
self.datasets = datasets
if sampling_func is None:
sampling_func = uniform_sampler
self.sampling_func = sampling_func
self.total_num_instances = 0
for _, dataset in datasets.items():
assert isinstance(dataset, FairseqDataset)
self.total_num_instances += len(dataset)
self._ordered_indices = None
def __len__(self):
"""
Length of this dataset is the sum of individual datasets
"""
return self.total_num_instances
def ordered_indices(self):
"""
Ordered indices for batching. Here we call the underlying
dataset's ordered_indices() so that we get the same random ordering
as we would have from using the underlying dataset directly.
"""
if self._ordered_indices is None:
self._ordered_indices = OrderedDict(
[
(key, dataset.ordered_indices())
for key, dataset in self.datasets.items()
]
)
return np.arange(len(self))
def _map_index_to_dataset(self, key: int, index: int):
"""
Different underlying datasets have different lengths. In order to ensure
we are not accessing an index outside the range of the current dataset
size, we wrap around. This function should be called after we have
created an ordering for this and all underlying datasets.
"""
assert (
self._ordered_indices is not None
), "Must call MultiCorpusSampledDataset.ordered_indices() first"
mapped_index = index % len(self.datasets[key])
return self._ordered_indices[key][mapped_index]
def __getitem__(self, index: int):
"""
Get the item associated with index from each underlying dataset.
Since index is in the range of [0, TotalNumInstances], we need to
map the index to the dataset before retrieving the item.
"""
return OrderedDict(
[
(key, dataset[self._map_index_to_dataset(key, index)])
for key, dataset in self.datasets.items()
]
)
def collater(self, samples: List[Dict]):
"""
Generate a mini-batch for this dataset.
To convert this into a regular mini-batch we use the following
logic:
1. Select a dataset using the specified probability distribution.
2. Call the collater function of the selected dataset.
"""
if len(samples) == 0:
return None
selected_key = self.sampling_func(list(self.datasets.keys()))
selected_samples = [sample[selected_key] for sample in samples]
return self.datasets[selected_key].collater(selected_samples)
def num_tokens(self, index: int):
"""
Return an example's length (number of tokens), used for batching. Here
we return the max across all examples at index across all underlying
datasets.
"""
return max(
dataset.num_tokens(self._map_index_to_dataset(key, index))
for key, dataset in self.datasets.items()
)
def size(self, index: int):
"""
Return an example's size as a float or tuple. Here we return the max
across all underlying datasets. This value is used when filtering a
dataset with max-positions.
"""
return max(
dataset.size(self._map_index_to_dataset(key, index))
for key, dataset in self.datasets.items()
)
@property
def supports_prefetch(self):
return all(
getattr(dataset, "supports_prefetch", False)
for dataset in self.datasets.values()
)
def prefetch(self, indices):
for key, dataset in self.datasets.items():
dataset.prefetch(
[self._map_index_to_dataset(key, index) for index in indices]
)
@property
def supports_fetch_outside_dataloader(self):
return all(
self.datasets[key].supports_fetch_outside_dataloader
for key in self.datasets
)
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import itertools
import json
import logging
import math
import os
from collections import OrderedDict, defaultdict
from argparse import ArgumentError
from fairseq import utils
from fairseq.data import (
AppendTokenDataset,
ConcatDataset,
Dictionary,
LanguagePairDataset,
PrependTokenDataset,
SampledMultiDataset,
SampledMultiEpochDataset,
StripTokenDataset,
TransformEosLangPairDataset,
TruncateDataset,
data_utils,
indexed_dataset,
)
from fairseq.data.multilingual.multilingual_utils import (
EncoderLangtok,
LangTokSpec,
LangTokStyle,
augment_dictionary,
get_lang_tok,
)
from fairseq.data.multilingual.sampled_multi_dataset import CollateFormat
from fairseq.file_io import PathManager
from fairseq.utils import FileContentsAction, csv_str_list, eval_str_dict
logger = logging.getLogger(__name__)
SRC_DICT_NAME = "src"
TGT_DICT_NAME = "tgt"
def _lang_id(dic: Dictionary, lang: str):
"""Return language ID index."""
idx = dic.index(lang)
assert idx != dic.unk_index, "cannot find language ID for lang {}".format(lang)
return idx
def load_sampling_weights(from_file):
with open(from_file) as f:
weights = json.load(f)
return weights
class MultilingualDatasetManager(object):
def __init__(self, args, lang_pairs, langs, dicts, sampling_method):
super().__init__()
self.args = args
self.seed = args.seed
self.lang_pairs = lang_pairs
self.extra_lang_pairs = (
list({p for _, v in args.extra_lang_pairs.items() for p in v.split(",")})
if args.extra_lang_pairs
else []
)
self.src_langs = {
p.split("-")[0] for p in args.lang_pairs + self.extra_lang_pairs
}
self.tgt_langs = {
p.split("-")[1] for p in args.lang_pairs + self.extra_lang_pairs
}
self.langs = langs
self.dicts = dicts
self.lang_dict = self.create_lang_dictionary(self.langs)
self.sampling_method = sampling_method
self.sampling_scheduler = None
self._has_sharded_data = False
self._num_shards_dict = {}
self._training_data_sizes = defaultdict(lambda: {})
@classmethod
def setup_data_manager(cls, args, lang_pairs, langs, dicts, sampling_method):
return MultilingualDatasetManager(
args, lang_pairs, langs, dicts, sampling_method
)
@staticmethod
def add_args(parser):
parser.add_argument(
"data",
help="colon separated path to data directories list, \
will be iterated upon during epochs in round-robin manner",
action=FileContentsAction,
)
parser.add_argument(
"--langs",
default=None,
type=csv_str_list,
help="a list of languages comma sperated languages which can appear in lang-pairs; "
"note that the ordering determines language token IDs",
)
parser.add_argument(
"--lang-dict",
default=None,
type=str,
help="an external file which contains a list of "
"languages which can appear in lang-pairs; "
"note that the ordering determines language token IDs; "
"--langs and --lang-dict are two exclusive options",
)
parser.add_argument(
"--source-dict",
default=None,
type=str,
help="path to source dictionary; if specified it will override per language dictionary loading",
)
parser.add_argument(
"--target-dict",
default=None,
type=str,
help="path to target dictionary; if specified it will override per language dictionary loading",
)
parser.add_argument(
"--lang-tok-style",
default=LangTokStyle.multilingual.value,
type=str,
choices=[LangTokStyle.multilingual.value, LangTokStyle.mbart.value],
help="language token styles",
)
parser.add_argument(
"--load-alignments",
action="store_true",
help="load the binarized alignments",
)
parser.add_argument(
"--left-pad-source",
default="True",
type=str,
metavar="BOOL",
help="pad the source on the left",
)
parser.add_argument(
"--left-pad-target",
default="False",
type=str,
metavar="BOOL",
help="pad the target on the left",
)
try:
parser.add_argument(
"--max-source-positions",
default=1024,
type=int,
metavar="N",
help="max number of tokens in the source sequence",
)
parser.add_argument(
"--max-target-positions",
default=1024,
type=int,
metavar="N",
help="max number of tokens in the target sequence",
)
except ArgumentError:
# this might have already been defined. Once we transition this to hydra it should be fine to add it here.
pass
parser.add_argument(
"--upsample-primary",
default=1,
type=int,
help="amount to upsample primary dataset",
)
parser.add_argument(
"--truncate-source",
action="store_true",
default=False,
help="truncate source to max-source-positions",
)
parser.add_argument(
"--encoder-langtok",
default=None,
type=str,
choices=[EncoderLangtok.src.value, EncoderLangtok.tgt.value],
metavar="SRCTGT",
help="prepend to the beginning of source sentence the source or target "
"language token. (src/tgt)",
)
parser.add_argument(
"--decoder-langtok",
action="store_true",
help="prepend to the beginning of target sentence the target language token",
)
parser.add_argument(
"--lang-tok-replacing-bos-eos", action="store_true", default=False
)
parser.add_argument(
"--enable-lang-ids",
default=False,
action="store_true",
help="whether to include language IDs in samples",
)
parser.add_argument(
"--enable-reservsed-directions-shared-datasets",
default=False,
action="store_true",
help="whether to allow datasets be used in reversed directions",
)
parser.add_argument(
"--extra-data",
help='a dictionary of data name to this path, \
e.g. {"mined", path_to_mined_data, "denoised": path_to_denoised_data}',
type=lambda uf: eval_str_dict(uf, type=str),
default=None,
)
parser.add_argument(
"--extra-lang-pairs",
help='a dictionary of data name to the language pairs they serve, \
e.g. {"mined": comma-separated-lang-pairs, "denoised": comma-separated-lang-pairs}',
type=lambda uf: eval_str_dict(uf, type=str),
default=None,
)
parser.add_argument(
"--fixed-dictionary",
help="Fixed dictionary to use with model path",
default=None,
type=str,
)
parser.add_argument(
"--langtoks-specs",
help='a list of comma separated data types that a set of language tokens to be specialized for, \
e.g. "main,dae,mined". There will be a set of language tokens added to the vocab to \
distinguish languages in different training data types. If not specified, default language \
tokens per languages will be added',
default=LangTokSpec.main.value,
type=csv_str_list,
)
parser.add_argument(
"--langtoks",
help='a dictionary of how to add language tokens, \
e.g. {"mined": (None, "tgt"), "mono_dae": ("src.dae", "tgt"), "main": \
("src", "tgt")}, or {"mined": ("src.mined", "tgt")}',
default=None,
type=lambda uf: eval_str_dict(uf, type=str),
)
parser.add_argument(
"--sampling-weights-from-file",
help='a file contain a python dictionary of how to sample data sets, \
e.g. { "main:en_XX-es_XX": 0.2, "mined:en_XX-pt_XX": 0.5, \
"mono_dae:es_XX-es_XX: 0.3, "main:en_xx-fr_XX": 0.8 }',
default=None,
type=str,
)
parser.add_argument(
"--sampling-weights",
help='a dictionary of how to sample data sets, \
e.g. { "main:en_XX-es_XX": 0.2, "mined:en_XX-pt_XX": 0.5, \
"mono_dae:es_XX-es_XX: 0.3, "main:en_xx-fr_XX": 0.8 }',
default=None,
type=lambda uf: eval_str_dict(uf, type=str),
)
parser.add_argument(
"--virtual-epoch-size",
default=None,
type=int,
help="virtual epoch size to speed up data loading",
)
parser.add_argument(
"--virtual-data-size",
default=None,
type=int,
help="virtual data size of the whole joint dataset to speed"
"up data loading and have specific dynamic sampling strategy interval",
)
@classmethod
def load_langs(cls, args, **kwargs):
if args.lang_dict and args.langs:
raise ValueError("--langs and --lang-dict can not both be specified")
if args.lang_dict is None and args.langs is None:
logger.warning(
"External language dictionary is not provided; "
"use lang-pairs to infer the set of supported languages. "
"The language ordering is not stable which might cause "
"misalignment in pretraining and finetuning."
)
# infer from lang_pairs as it is
langs = list(
{x for lang_pair in args.lang_pairs for x in lang_pair.split("-")}
)
langs = sorted(langs)
logger.info(f"inferred language list: {langs}")
elif args.lang_dict:
with open(
PathManager.get_local_path(args.lang_dict), "r", encoding="utf-8"
) as f:
langs = [lang.strip() for lang in f.readlines() if lang.strip()]
logger.info(
f"loaded language list from {args.lang_dict} as they are ordered in file"
)
elif args.langs:
langs = args.langs
logger.info(
f"parsed the language list as they are ordered in the option: {langs}"
)
return langs
def has_sharded_data(self, split):
return self._has_sharded_data and split == getattr(
self.args, "train_subset", None
)
def _shared_collater(self):
return not (self.args.extra_data and "mono_dae" in self.args.extra_data) and (
not self.args.lang_tok_replacing_bos_eos
)
def estimate_global_pass_epoch(self, epoch):
if self.args.virtual_epoch_size is None or self.args.virtual_data_size is None:
return None
# one epoch more for remaining data in each shard
virtual_epochs_per_shard = math.ceil(
self.args.virtual_data_size / self.args.virtual_epoch_size
)
# note that fairseq epoch / shard_epoch starts from 1
shard_epoch = (epoch - 1) // virtual_epochs_per_shard + 1
return shard_epoch
@classmethod
def prepare(cls, load_dictionary, args, **kargs):
args.left_pad_source = utils.eval_bool(args.left_pad_source)
args.left_pad_target = utils.eval_bool(args.left_pad_target)
if not hasattr(args, "shuffle_instance"):
args.shuffle_instance = False
if args.langtoks is None:
args.langtoks = {}
if "main" not in args.langtoks:
src_langtok_spec = args.encoder_langtok if args.encoder_langtok else None
tgt_langtok_spec = "tgt" if args.decoder_langtok else None
args.langtoks["main"] = (src_langtok_spec, tgt_langtok_spec)
def check_langs(langs, pairs):
messages = []
for src, tgt in pairs:
if src not in langs or tgt not in langs:
messages.append(
f"language pair {src}-{tgt} contains languages "
"that are not in the language dictionary"
)
if len(messages) > 0:
raise ValueError(" ".join(messages) + f"; langs: {langs}")
if args.lang_pairs is None:
raise ValueError(
"--lang-pairs is required. List all the language pairs in the training objective."
)
if isinstance(args.lang_pairs, str):
args.lang_pairs = args.lang_pairs.split(",")
if args.source_lang is not None or args.target_lang is not None:
training = False
else:
training = True
language_list = cls.load_langs(args, **kargs)
check_langs(
language_list,
(
[p.split("-") for p in args.lang_pairs]
if training
else [(args.source_lang, args.target_lang)]
),
)
def load_dictionary_and_postproc(path):
d = load_dictionary(path)
augment_dictionary(
dictionary=d,
language_list=language_list,
lang_tok_style=args.lang_tok_style,
langtoks_specs=args.langtoks_specs,
extra_data=args.extra_data,
)
return d
dicts = cls.load_all_dictionaries(
args, language_list, load_dictionary_and_postproc, training
)
return language_list, dicts, training
@classmethod
def load_all_dictionaries(cls, args, language_list, load_dictionary, training):
dicts = OrderedDict()
if args.source_dict is not None:
dicts[SRC_DICT_NAME] = load_dictionary(args.source_dict)
if args.target_dict is not None:
dicts[TGT_DICT_NAME] = load_dictionary(args.target_dict)
if training:
extra_lang_pairs = (
list(
{p for _, v in args.extra_lang_pairs.items() for p in v.split(",")}
)
if args.extra_lang_pairs
else []
)
src_langs_to_load_dicts = sorted(
{p.split("-")[0] for p in (args.lang_pairs + extra_lang_pairs)}
)
tgt_langs_to_load_dicts = sorted(
{p.split("-")[1] for p in (args.lang_pairs + extra_lang_pairs)}
)
else:
src_langs_to_load_dicts = [args.source_lang]
tgt_langs_to_load_dicts = [args.target_lang]
paths = utils.split_paths(args.data)
assert len(paths) > 0
def load_dicts(langs_to_load_dicts):
for lang in langs_to_load_dicts:
dicts[lang] = load_dictionary(
os.path.join(paths[0], "dict.{}.txt".format(lang))
)
if len(dicts) > 0:
dict0 = next(iter(dicts.values()))
assert dicts[lang].pad() == dict0.pad()
assert dicts[lang].eos() == dict0.eos()
assert dicts[lang].unk() == dict0.unk()
logger.info("[{}] dictionary: {} types".format(lang, len(dicts[lang])))
if args.fixed_dictionary is not None:
fixed_dict = load_dictionary(args.fixed_dictionary)
dicts = {
lang: fixed_dict
for lang in src_langs_to_load_dicts + tgt_langs_to_load_dicts
}
else:
if args.source_dict is None:
load_dicts(src_langs_to_load_dicts)
if args.target_dict is None:
load_dicts(tgt_langs_to_load_dicts)
return dicts
def get_source_dictionary(self, lang):
if self.args.source_dict is not None:
return self.dicts[SRC_DICT_NAME]
else:
return self.dicts[lang]
def get_target_dictionary(self, lang):
if self.args.target_dict is not None:
return self.dicts[TGT_DICT_NAME]
else:
return self.dicts[lang]
@classmethod
def create_lang_dictionary(cls, langs):
unk = "<unk>"
# hack to remove symbols other than unk as they are not needed by lang dict
lang_dict = Dictionary(pad=unk, eos=unk, unk=unk, bos=unk)
for lang in langs:
lang_dict.add_symbol(lang)
return lang_dict
@classmethod
def get_langtok_index(cls, lang_tok, dic):
idx = dic.index(lang_tok)
assert (
idx != dic.unk_index
), "cannot find language token {} in the dictionary".format(lang_tok)
return idx
def get_encoder_langtok(self, src_lang, tgt_lang, spec=None):
if spec is None:
return None
if spec and spec.startswith("src"):
if src_lang is None:
return None
langtok = get_lang_tok(
lang=src_lang, lang_tok_style=self.args.lang_tok_style, spec=spec
)
else:
if tgt_lang is None:
return None
langtok = get_lang_tok(
lang=tgt_lang, lang_tok_style=self.args.lang_tok_style, spec=spec
)
return self.get_langtok_index(
langtok,
self.get_source_dictionary(src_lang)
if src_lang
else self.get_target_dictionary(tgt_lang),
)
def get_decoder_langtok(self, tgt_lang, spec=None):
if spec is None:
return None
langtok = get_lang_tok(
lang=tgt_lang, lang_tok_style=self.args.lang_tok_style, spec=spec
)
return self.get_langtok_index(langtok, self.get_target_dictionary(tgt_lang))
@classmethod
def load_data(cls, path, vdict, impl):
dataset = data_utils.load_indexed_dataset(path, vdict, impl)
return dataset
@classmethod
def split_exists(cls, split, src, tgt, lang, data_path, dataset_impl):
filename = os.path.join(data_path, "{}.{}-{}.{}".format(split, src, tgt, lang))
return indexed_dataset.dataset_exists(filename, impl=dataset_impl)
def load_lang_dataset(
self,
data_path,
split,
src,
src_dict,
tgt,
tgt_dict,
combine,
dataset_impl,
upsample_primary,
max_source_positions,
prepend_bos=False,
load_alignments=False,
truncate_source=False,
):
src_datasets = []
tgt_datasets = []
for k in itertools.count():
split_k = split + (str(k) if k > 0 else "")
# infer langcode
if self.split_exists(split_k, src, tgt, src, data_path, dataset_impl):
prefix = os.path.join(data_path, "{}.{}-{}.".format(split_k, src, tgt))
elif self.split_exists(split_k, tgt, src, src, data_path, dataset_impl):
prefix = os.path.join(data_path, "{}.{}-{}.".format(split_k, tgt, src))
else:
if k > 0:
break
else:
logger.error(
f"Dataset not found: {data_path}, {split_k}, {src}, {tgt}"
)
raise FileNotFoundError(
"Dataset not found: {} ({})".format(split, data_path)
)
src_dataset = self.load_data(prefix + src, src_dict, dataset_impl)
if truncate_source:
src_dataset = AppendTokenDataset(
TruncateDataset(
StripTokenDataset(src_dataset, src_dict.eos()),
max_source_positions - 1,
),
src_dict.eos(),
)
src_datasets.append(src_dataset)
tgt_datasets.append(self.load_data(prefix + tgt, tgt_dict, dataset_impl))
logger.info(
"{} {} {}-{} {} examples".format(
data_path, split_k, src, tgt, len(src_datasets[-1])
)
)
if not combine:
break
assert len(src_datasets) == len(tgt_datasets)
if len(src_datasets) == 1:
src_dataset, tgt_dataset = src_datasets[0], tgt_datasets[0]
else:
sample_ratios = [1] * len(src_datasets)
sample_ratios[0] = upsample_primary
src_dataset = ConcatDataset(src_datasets, sample_ratios)
tgt_dataset = ConcatDataset(tgt_datasets, sample_ratios)
if prepend_bos:
assert hasattr(src_dict, "bos_index") and hasattr(tgt_dict, "bos_index")
src_dataset = PrependTokenDataset(src_dataset, src_dict.bos())
tgt_dataset = PrependTokenDataset(tgt_dataset, tgt_dict.bos())
align_dataset = None
if load_alignments:
align_path = os.path.join(
data_path, "{}.align.{}-{}".format(split, src, tgt)
)
if indexed_dataset.dataset_exists(align_path, impl=dataset_impl):
align_dataset = data_utils.load_indexed_dataset(
align_path, None, dataset_impl
)
return src_dataset, tgt_dataset, align_dataset
def load_langpair_dataset(
self,
data_path,
split,
src,
src_dict,
tgt,
tgt_dict,
combine,
dataset_impl,
upsample_primary,
left_pad_source,
left_pad_target,
max_source_positions,
max_target_positions,
prepend_bos=False,
load_alignments=False,
truncate_source=False,
src_dataset_transform_func=lambda dataset: dataset,
tgt_dataset_transform_func=lambda dataset: dataset,
src_lang_id=None,
tgt_lang_id=None,
langpairs_sharing_datasets=None,
):
norm_direction = "-".join(sorted([src, tgt]))
if langpairs_sharing_datasets is not None:
src_dataset = langpairs_sharing_datasets.get(
(data_path, split, norm_direction, src), "NotInCache"
)
tgt_dataset = langpairs_sharing_datasets.get(
(data_path, split, norm_direction, tgt), "NotInCache"
)
align_dataset = langpairs_sharing_datasets.get(
(data_path, split, norm_direction, src, tgt), "NotInCache"
)
# a hack: any one is not in cache, we need to reload them
if (
langpairs_sharing_datasets is None
or src_dataset == "NotInCache"
or tgt_dataset == "NotInCache"
or align_dataset == "NotInCache"
or split != getattr(self.args, "train_subset", None)
):
# source and target datasets can be reused in reversed directions to save memory
# reversed directions of valid and test data will not share source and target datasets
src_dataset, tgt_dataset, align_dataset = self.load_lang_dataset(
data_path,
split,
src,
src_dict,
tgt,
tgt_dict,
combine,
dataset_impl,
upsample_primary,
max_source_positions=max_source_positions,
prepend_bos=prepend_bos,
load_alignments=load_alignments,
truncate_source=truncate_source,
)
src_dataset = src_dataset_transform_func(src_dataset)
tgt_dataset = tgt_dataset_transform_func(tgt_dataset)
if langpairs_sharing_datasets is not None:
langpairs_sharing_datasets[
(data_path, split, norm_direction, src)
] = src_dataset
langpairs_sharing_datasets[
(data_path, split, norm_direction, tgt)
] = tgt_dataset
langpairs_sharing_datasets[
(data_path, split, norm_direction, src, tgt)
] = align_dataset
if align_dataset is None:
# no align data so flag the reverse direction as well in sharing
langpairs_sharing_datasets[
(data_path, split, norm_direction, tgt, src)
] = align_dataset
else:
logger.info(
f"Reusing source and target datasets of [{split}] {tgt}-{src} for reversed direction: "
f"[{split}] {src}-{tgt}: src length={len(src_dataset)}; tgt length={len(tgt_dataset)}"
)
return LanguagePairDataset(
src_dataset,
src_dataset.sizes,
src_dict,
tgt_dataset,
tgt_dataset.sizes if tgt_dataset is not None else None,
tgt_dict,
left_pad_source=left_pad_source,
left_pad_target=left_pad_target,
align_dataset=align_dataset,
src_lang_id=src_lang_id,
tgt_lang_id=tgt_lang_id,
)
def src_dataset_tranform_func(self, src_lang, tgt_lang, dataset, spec=None):
if self.args.lang_tok_replacing_bos_eos:
# it is handled by self.alter_dataset_langtok
# TODO: Unifiy with alter_dataset_langtok
return dataset
if spec is None:
return dataset
tok = self.get_encoder_langtok(src_lang, tgt_lang, spec)
if tok:
return PrependTokenDataset(dataset, tok)
return dataset
def tgt_dataset_tranform_func(self, source_lang, target_lang, dataset, spec=None):
if dataset is None:
# note that target dataset can be None during inference time
return None
if self.args.lang_tok_replacing_bos_eos:
# TODO: Unifiy with alter_dataset_langtok
# It is handled by self.alter_dataset_langtok.
# The complication in self.alter_dataset_langtok
# makes a unified framework difficult.
return dataset
# if not self.args.decoder_langtok:
if not spec:
return dataset
tok = self.get_decoder_langtok(target_lang, spec)
if tok:
return PrependTokenDataset(dataset, tok)
return dataset
def alter_dataset_langtok(
self,
lang_pair_dataset,
src_eos=None,
src_lang=None,
tgt_eos=None,
tgt_lang=None,
src_langtok_spec=None,
tgt_langtok_spec=None,
):
if src_langtok_spec is None and tgt_langtok_spec is None:
return lang_pair_dataset
new_src_eos = None
if (
src_langtok_spec is not None
and src_eos is not None
and (src_lang is not None or tgt_lang is not None)
):
new_src_eos = self.get_encoder_langtok(src_lang, tgt_lang, src_langtok_spec)
else:
src_eos = None
new_tgt_bos = None
if tgt_langtok_spec and tgt_eos is not None and tgt_lang is not None:
new_tgt_bos = self.get_decoder_langtok(tgt_lang, tgt_langtok_spec)
else:
tgt_eos = None
return TransformEosLangPairDataset(
lang_pair_dataset,
src_eos=src_eos,
new_src_eos=new_src_eos,
tgt_bos=tgt_eos,
new_tgt_bos=new_tgt_bos,
)
def load_a_dataset(
self,
split,
data_path,
src,
src_dict,
tgt,
tgt_dict,
combine,
prepend_bos=False,
langpairs_sharing_datasets=None,
data_category=None,
**extra_kwargs,
):
dataset_impl = self.args.dataset_impl
upsample_primary = self.args.upsample_primary
left_pad_source = self.args.left_pad_source
left_pad_target = self.args.left_pad_target
max_source_positions = self.args.max_source_positions
max_target_positions = self.args.max_target_positions
load_alignments = self.args.load_alignments
truncate_source = self.args.truncate_source
src_dataset_transform_func = self.src_dataset_tranform_func
tgt_dataset_transform_func = self.tgt_dataset_tranform_func
enable_lang_ids = self.args.enable_lang_ids
lang_dictionary = self.lang_dict
src_langtok_spec, tgt_langtok_spec = extra_kwargs["langtok_spec"]
src_langtok = self.get_encoder_langtok(src, tgt, src_langtok_spec)
tgt_langtok = self.get_decoder_langtok(tgt, tgt_langtok_spec)
logger.info(
f"{data_category}:{src}-{tgt} src_langtok: {src_langtok}; tgt_langtok: {tgt_langtok}"
)
langpair_ds = self.load_langpair_dataset(
data_path,
split,
src,
src_dict,
tgt,
tgt_dict,
combine,
dataset_impl,
upsample_primary,
left_pad_source,
left_pad_target,
max_source_positions,
max_target_positions,
prepend_bos,
load_alignments,
truncate_source,
src_dataset_transform_func=lambda dataset: src_dataset_transform_func(
src, tgt, dataset, src_langtok_spec
),
tgt_dataset_transform_func=lambda dataset: tgt_dataset_transform_func(
src, tgt, dataset, tgt_langtok_spec
),
src_lang_id=_lang_id(lang_dictionary, src)
if enable_lang_ids and lang_dictionary is not None
else None,
tgt_lang_id=_lang_id(lang_dictionary, tgt)
if enable_lang_ids and lang_dictionary is not None
else None,
langpairs_sharing_datasets=langpairs_sharing_datasets,
)
# TODO: handle modified lang toks for mined data and dae data
if self.args.lang_tok_replacing_bos_eos:
ds = self.alter_dataset_langtok(
langpair_ds,
src_eos=self.get_source_dictionary(src).eos()
if src
else self.get_target_dictionary(tgt).eos(),
src_lang=src,
tgt_eos=self.get_target_dictionary(tgt).eos(),
tgt_lang=tgt,
src_langtok_spec=src_langtok_spec,
tgt_langtok_spec=tgt_langtok_spec,
)
else:
ds = langpair_ds
return ds
def load_split_langpair_datasets(self, split, data_param_list):
datasets = []
langpairs_sharing_datasets = (
{} if self.args.enable_reservsed_directions_shared_datasets else None
)
for param in data_param_list:
ds = self.load_a_dataset(
split=split,
langpairs_sharing_datasets=langpairs_sharing_datasets,
**param,
)
datasets.append(ds)
return datasets
def get_data_paths_and_lang_pairs(self, split):
datapaths = {"main": self.args.data}
lang_pairs = {"main": self.lang_pairs}
if split == getattr(self.args, "train_subset", None):
# only training data can have extra data and extra language pairs
if self.args.extra_data:
extra_datapaths = self.args.extra_data
datapaths.update(extra_datapaths)
if self.args.extra_lang_pairs:
extra_lang_pairs = {
k: v.split(",") for k, v in self.args.extra_lang_pairs.items()
}
lang_pairs.update(extra_lang_pairs)
return datapaths, lang_pairs
@classmethod
def get_dataset_key(cls, data_category, src, tgt):
return f"{data_category}:{src}-{tgt}"
@classmethod
def _get_shard_num_dict(cls, split, paths):
shards = defaultdict(int)
for path in paths:
files = PathManager.ls(path)
directions = set()
for f in files:
if f.startswith(split) and f.endswith(".idx"):
# idx files of the form "{split}.{src}-{tgt}.{lang}.idx"
direction = f.split(".")[-3]
directions.add(direction)
for direction in directions:
shards[direction] += 1
return shards
def get_split_num_data_shards(self, split):
if split in self._num_shards_dict:
return self._num_shards_dict[split]
num_shards_dict = {}
data_paths, lang_pairs = self.get_data_paths_and_lang_pairs(split)
for data_category, paths in data_paths.items():
if data_category not in lang_pairs:
continue
paths = utils.split_paths(paths)
shards_dict = self._get_shard_num_dict(split, paths)
lang_dirs = [
lang_pair.split("-") for lang_pair in lang_pairs[data_category]
]
lang_dirs = [x if len(x) > 1 else (x[0], x[0]) for x in lang_dirs]
for src, tgt in lang_dirs:
key = self.get_dataset_key(data_category, src, tgt)
if "mono_" in data_category:
# monolingual data requires tgt only
assert src is None or src == tgt, (
f"error: src={src}, "
f"tgt={tgt} for data_category={data_category}"
)
num_shards_dict[key] = shards_dict[tgt]
else:
if f"{src}-{tgt}" in shards_dict:
num_shards_dict[key] = shards_dict[f"{src}-{tgt}"]
elif f"{tgt}-{src}" in shards_dict:
# follow the fairseq tradition to use reversed direction data if it is not available
num_shards_dict[key] = shards_dict[f"{tgt}-{src}"]
self._num_shards_dict[split] = num_shards_dict
logger.info(f"[{split}] num of shards: {num_shards_dict}")
return num_shards_dict
@classmethod
def get_shard_id(cls, num_shards, epoch, shard_epoch=None):
shard = epoch if shard_epoch is None else shard_epoch
shard = (shard - 1) % num_shards
return shard
def get_split_data_path(self, paths, epoch, shard_epoch, num_shards):
path = paths[self.get_shard_id(num_shards, epoch, shard_epoch)]
return path
def get_split_data_param_list(self, split, epoch, shard_epoch=None):
# TODO: to extend with extra datasets and keys and loop over different shard data paths
param_list = []
data_paths, lang_pairs = self.get_data_paths_and_lang_pairs(split)
logger.info(f"langtoks settings: {self.args.langtoks}")
split_num_shards_dict = self.get_split_num_data_shards(split)
for data_category, paths in data_paths.items():
if data_category not in lang_pairs:
continue
paths = utils.split_paths(paths)
assert len(paths) > 0
if len(paths) > 1:
self._has_sharded_data = True
if split != getattr(self.args, "train_subset", None):
# if not training data set, use the first shard for valid and test
paths = paths[:1]
if data_category in self.args.langtoks:
lang_tok_spec = self.args.langtoks[data_category]
else:
# default to None
lang_tok_spec = (None, None)
# infer langcode
lang_dirs = [
lang_pair.split("-") for lang_pair in lang_pairs[data_category]
]
lang_dirs = [x if len(x) > 1 else (x[0], x[0]) for x in lang_dirs]
for src, tgt in lang_dirs:
assert src is not None or data_category == "mono_dae", (
f"error: src={src}, " f"tgt={tgt} for data_category={data_category}"
)
# logger.info(f"preparing param for {data_category}: {src} - {tgt}")
key = self.get_dataset_key(data_category, src, tgt)
data_path = self.get_split_data_path(
paths, epoch, shard_epoch, split_num_shards_dict[key]
)
param_list.append(
{
"key": key,
"data_path": data_path,
"split": split,
"src": src,
"src_dict": self.get_source_dictionary(src)
if src and data_category != "mono_dae"
else None,
"tgt": tgt,
"tgt_dict": self.get_target_dictionary(tgt),
"data_category": data_category,
"langtok_spec": lang_tok_spec,
}
)
return param_list
def get_train_dataset_sizes(
self, data_param_list, datasets, epoch, shard_epoch=None
):
num_shards = [
self.get_split_num_data_shards(param["split"])[param["key"]]
for param in data_param_list
]
data_sizes = []
for (key, d), num_shard in zip(datasets, num_shards):
my_data_sizes = self._training_data_sizes[key]
shard_ind = self.get_shard_id(num_shard, epoch, shard_epoch)
if shard_ind not in my_data_sizes:
my_data_sizes[shard_ind] = len(d)
known_size = max(my_data_sizes.values())
data_sizes.append(
# If we don't know the data size of the shard yet,
# use the the max known data size to approximate.
# Note that we preprocess shards by a designated shard size
# and put any remaining data at the end into the last shard so
# the max shard size approximation is almost correct before loading
# the last shard; after loading the last shard, it will have the
# exact data sizes of the whole data size.
(key, sum(my_data_sizes.get(i, known_size) for i in range(num_shard)))
)
logger.info(
f"estimated total data sizes of all shards used in sampling ratios: {data_sizes}. "
"Note that if the data a shard has not been loaded yet, use the max known data size to approximate"
)
return [s for _, s in data_sizes]
def get_train_sampling_ratios(
self, data_param_list, datasets, epoch=1, shard_epoch=None
):
data_sizes = self.get_train_dataset_sizes(
data_param_list, datasets, epoch, shard_epoch
)
sampling_func = self.sampling_method.sampling_method_selector()
sample_ratios = sampling_func(data_sizes) if sampling_func is not None else None
return sample_ratios
def get_sampling_ratios(self, data_param_list, datasets, epoch, shard_epoch=None):
if self.args.sampling_weights_from_file:
weights = load_sampling_weights(self.args.sampling_weights_from_file)
sample_ratios = [weights[k] for k, _ in datasets]
logger.info(
"| ignoring --sampling-weights when loadding sampling weights "
f"from file {self.args.sampling_weights_from_file}"
)
elif self.args.sampling_weights:
sample_ratios = [self.args.sampling_weights[k] for k, _ in datasets]
else:
sample_ratios = self.get_train_sampling_ratios(
data_param_list, datasets, epoch, shard_epoch
)
if sample_ratios is not None:
logger.info(
"| Upsample ratios: {}".format(
list(zip(map(lambda x: x["key"], data_param_list), sample_ratios))
)
)
assert len(sample_ratios) == len(datasets)
return sample_ratios
def load_split_datasets(
self, split, training, epoch=1, combine=False, shard_epoch=None, **kwargs
):
data_param_list = self.get_split_data_param_list(
split, epoch, shard_epoch=shard_epoch
)
langpairs_sharing_datasets = (
{} if self.args.enable_reservsed_directions_shared_datasets else None
)
datasets = [
(
param["key"],
self.load_a_dataset(
combine=combine,
langpairs_sharing_datasets=langpairs_sharing_datasets,
**param,
),
)
for param in data_param_list
]
return datasets, data_param_list
def load_into_concat_dataset(self, split, datasets, data_param_list):
if self.args.lang_tok_replacing_bos_eos:
# TODO: to investigate why TransformEosLangPairDataset doesn't work with ConcatDataset
return SampledMultiDataset(
OrderedDict(datasets),
sampling_ratios=None,
eval_key=None,
collate_format=CollateFormat.single,
virtual_size=None,
split=split,
)
return ConcatDataset([d for _, d in datasets])
def load_sampled_multi_epoch_dataset(
self, split, training, epoch=0, combine=False, shard_epoch=None, **kwargs
):
datasets, data_param_list = self.load_split_datasets(
split, training, epoch, combine, shard_epoch=shard_epoch, **kwargs
)
if training and split == getattr(self.args, "train_subset", None):
sample_ratios = self.get_sampling_ratios(data_param_list, datasets, epoch)
return SampledMultiEpochDataset(
OrderedDict(datasets),
epoch=epoch,
shard_epoch=shard_epoch,
# valid and test datasets will be degenerate to concating datasets:
sampling_ratios=sample_ratios,
eval_key=None,
collate_format=CollateFormat.single,
virtual_size=self.args.virtual_data_size,
split=split,
virtual_epoch_size=self.args.virtual_epoch_size,
# if not using lang_tok altering, simplified to use the same collater
shared_collater=self._shared_collater(),
)
else:
return self.load_into_concat_dataset(split, datasets, data_param_list)
def load_sampled_multi_dataset(
self, split, training, epoch=0, combine=False, shard_epoch=None, **kwargs
):
datasets, data_param_list = self.load_split_datasets(
split, training, epoch, combine, shard_epoch=shard_epoch, **kwargs
)
if training and split == getattr(self.args, "train_subset", None):
sample_ratios = self.get_sampling_ratios(data_param_list, datasets, epoch)
return SampledMultiDataset(
OrderedDict(datasets),
epoch=epoch,
# valid and test datasets will be degerate to concating datasets:
sampling_ratios=sample_ratios,
eval_key=None,
collate_format=CollateFormat.single,
virtual_size=self.args.virtual_data_size,
split=split,
# if not using lang_tok altering, simplified to use the same collater
shared_collater=self._shared_collater(),
)
else:
return self.load_into_concat_dataset(split, datasets, data_param_list)
def load_dataset(
self, split, training, epoch=0, combine=False, shard_epoch=None, **kwargs
):
if self.args.virtual_epoch_size is None:
return self.load_sampled_multi_dataset(
split, training, epoch, combine, shard_epoch, **kwargs
)
else:
return self.load_sampled_multi_epoch_dataset(
split, training, epoch, combine, shard_epoch, **kwargs
)
from enum import Enum
from typing import Dict, List, Optional, Sequence
import torch
from fairseq.data import Dictionary
class EncoderLangtok(Enum):
"""
Prepend to the beginning of source sentence either the
source or target language token. (src/tgt).
"""
src = "src"
tgt = "tgt"
class LangTokSpec(Enum):
main = "main"
mono_dae = "mono_dae"
class LangTokStyle(Enum):
multilingual = "multilingual"
mbart = "mbart"
@torch.jit.export
def get_lang_tok(
lang: str, lang_tok_style: str, spec: str = LangTokSpec.main.value
) -> str:
# TOKEN_STYLES can't be defined outside this fn since it needs to be
# TorchScriptable.
TOKEN_STYLES: Dict[str, str] = {
LangTokStyle.mbart.value: "[{}]",
LangTokStyle.multilingual.value: "__{}__",
}
if spec.endswith("dae"):
lang = f"{lang}_dae"
elif spec.endswith("mined"):
lang = f"{lang}_mined"
style = TOKEN_STYLES[lang_tok_style]
return style.format(lang)
def augment_dictionary(
dictionary: Dictionary,
language_list: List[str],
lang_tok_style: str,
langtoks_specs: Sequence[str] = (LangTokSpec.main.value,),
extra_data: Optional[Dict[str, str]] = None,
) -> None:
for spec in langtoks_specs:
for language in language_list:
dictionary.add_symbol(
get_lang_tok(lang=language, lang_tok_style=lang_tok_style, spec=spec)
)
if lang_tok_style == LangTokStyle.mbart.value or (
extra_data is not None and LangTokSpec.mono_dae.value in extra_data
):
dictionary.add_symbol("<mask>")
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import datetime
import hashlib
import logging
import time
from bisect import bisect_right
from collections import OrderedDict, defaultdict
from enum import Enum
from typing import List
import numpy as np
import torch
from fairseq.data import FairseqDataset, data_utils
from fairseq.distributed import utils as distributed_utils
def get_time_gap(s, e):
return (
datetime.datetime.fromtimestamp(e) - datetime.datetime.fromtimestamp(s)
).__str__()
logger = logging.getLogger(__name__)
def default_virtual_size_func(datasets, ratios, max_scale_up=1.5):
sizes = [len(d) for d in datasets]
if ratios is None:
return sum(sizes)
largest_idx = np.argmax(sizes)
largest_r = ratios[largest_idx]
largest_s = sizes[largest_idx]
# set virtual sizes relative to the largest dataset
virtual_sizes = [(r / largest_r) * largest_s for r in ratios]
vsize = sum(virtual_sizes)
max_size = sum(sizes) * max_scale_up
return int(vsize if vsize < max_size else max_size)
class CollateFormat(Enum):
single = 1
ordered_dict = 2
class SampledMultiDataset(FairseqDataset):
"""Samples from multiple sub-datasets according to given sampling ratios.
Args:
datasets (
List[~torch.utils.data.Dataset]
or OrderedDict[str, ~torch.utils.data.Dataset]
): datasets
sampling_ratios (List[float]): list of probability of each dataset to be sampled
(default: None, which corresponds to concatenating all dataset together).
seed (int): RNG seed to use (default: 2).
epoch (int): starting epoch number (default: 1).
eval_key (str, optional): a key used at evaluation time that causes
this instance to pass-through batches from *datasets[eval_key]*.
collate_format (CollateFormat): collater output format, either CollateFormat.ordered_dict or
CollateFormat.single (default: CollateFormat.single) where CollateFormat.single configures
the collater to output batches of data mixed from all sub-datasets,
and CollateFormat.ordered_dict configures the collater to output a dictionary of batches indexed by keys
of sub-datasets.
Note that not all sub-datasets will present in a single batch in both formats.
virtual_size (int, or callable): the expected virtual size of the dataset (default: default_virtual_size_func).
split (str): the split of the data, e.g. 'train', 'valid' or 'test'.
shared_collater (bool): whether or not to all sub-datasets have the same collater.
shuffle (bool): whether or not to shuffle data (default: True).
"""
def __init__(
self,
datasets,
sampling_ratios=None,
seed=2,
epoch=1,
eval_key=None,
collate_format=CollateFormat.single,
virtual_size=default_virtual_size_func,
split="",
shared_collater=False,
shuffle=True,
):
super().__init__()
self.shared_collater = shared_collater
self.shuffle = shuffle
if isinstance(datasets, OrderedDict):
self.keys = list(datasets.keys())
datasets = list(datasets.values())
elif isinstance(datasets, List):
self.keys = list(range(len(datasets)))
else:
raise AssertionError()
self.datasets = datasets
self.split = split
self.eval_key = eval_key
if self.eval_key is not None:
self.collate_format = CollateFormat.single
else:
self.collate_format = collate_format
self.seed = seed
self._cur_epoch = None
self.cumulated_sizes = None
# self.datasets[k][self._cur_indices[i]] is the data item i in this sampled dataset
# namely, data item i is sampled from the kth sub-dataset self.datasets[k]
# where self.cumulated_sizes[k-1] <= i < self.cumulated_sizes[k]
self._cur_indices = None
self._sizes = None
self.virtual_size_per_dataset = None
# caching properties
self._reset_cached_properties()
self.setup_sampling(sampling_ratios, virtual_size)
self.set_epoch(epoch)
def _clean_if_not_none(self, var_list):
for v in var_list:
if v is not None:
del v
def _reset_cached_properties(self):
self._clean_if_not_none([self._sizes, self._cur_indices])
self._sizes = None
self._cur_indices = None
def setup_sampling(self, sample_ratios, virtual_size):
sizes = [len(d) for d in self.datasets]
if sample_ratios is None:
# default back to concating datasets
self.sample_ratios = None
self.virtual_size = sum(sizes)
else:
if not isinstance(sample_ratios, np.ndarray):
sample_ratios = np.array(sample_ratios)
self.sample_ratios = sample_ratios
virtual_size = (
default_virtual_size_func if virtual_size is None else virtual_size
)
self.virtual_size = (
virtual_size(self.datasets, self.sample_ratios)
if callable(virtual_size)
else virtual_size
)
def adjust_sampling(self, epoch, sampling_ratios, virtual_size):
if sampling_ratios is not None:
sampling_ratios = self._sync_sample_ratios(sampling_ratios)
self.setup_sampling(sampling_ratios, virtual_size)
def _sync_sample_ratios(self, ratios):
# in case the ratios are not precisely the same across processes
# also to ensure every procresses update the ratios in the same pace
ratios = torch.DoubleTensor(ratios)
if torch.distributed.is_initialized():
if torch.cuda.is_available():
distributed_utils.all_reduce(
ratios.cuda(), group=distributed_utils.get_data_parallel_group()
)
else:
distributed_utils.all_reduce(
ratios, group=distributed_utils.get_data_parallel_group()
)
ret = ratios.cpu()
ret = ret.numpy()
return ret
def random_choice_in_dataset(self, rng, dataset, choice_size):
if hasattr(dataset, "random_choice_in_dataset"):
return dataset.random_choice_in_dataset(rng, choice_size)
dataset_size = len(dataset)
return rng.choice(
dataset_size, choice_size, replace=(choice_size > dataset_size)
)
def get_virtual_indices(self, rng, datasets, sample_ratios, virtual_size):
def get_counts(sample_ratios):
counts = np.array([virtual_size * r for r in sample_ratios], dtype=np.int64)
diff = virtual_size - counts.sum()
assert diff >= 0
# due to round-offs, the size might not match the desired sizes
if diff > 0:
dataset_indices = rng.choice(
len(sample_ratios), size=diff, p=sample_ratios
)
for i in dataset_indices:
counts[i] += 1
return counts
def get_in_dataset_indices(datasets, sizes, sample_ratios):
counts = get_counts(sample_ratios)
# uniformally sample desired counts for each dataset
# if the desired counts are large, sample with replacement:
indices = [
self.random_choice_in_dataset(rng, d, c)
for c, d in zip(counts, datasets)
]
return indices
sizes = [len(d) for d in datasets]
if sample_ratios is None:
# default back to concating datasets
in_dataset_indices = [list(range(s)) for s in sizes]
virtual_sizes_per_dataset = sizes
else:
ratios = sample_ratios / sample_ratios.sum()
in_dataset_indices = get_in_dataset_indices(datasets, sizes, ratios)
virtual_sizes_per_dataset = [len(d) for d in in_dataset_indices]
virtual_sizes_per_dataset = np.array(virtual_sizes_per_dataset, np.int64)
cumulative_sizes = np.cumsum(virtual_sizes_per_dataset)
assert sum(virtual_sizes_per_dataset) == virtual_size
assert cumulative_sizes[-1] == virtual_size
if virtual_size < sum(sizes):
logger.warning(
f"virtual data size ({virtual_size}) is less than real data size ({sum(sizes)})."
" If virtual size << real data size, there could be data coverage issue."
)
in_dataset_indices = np.hstack(in_dataset_indices)
return in_dataset_indices, cumulative_sizes, virtual_sizes_per_dataset
def _get_dataset_and_index(self, index):
i = bisect_right(self.cumulated_sizes, index)
return i, self._cur_indices[index]
def __getitem__(self, index):
# self.__getitem__(index) returns self.datasets[k][self._cur_indices[index]]
# where k satisfies self.cumulated_sizes[k - 1] <= k < self.cumulated_sizes[k]
ds_idx, ds_sample_idx = self._get_dataset_and_index(index)
ret = (ds_idx, self.datasets[ds_idx][ds_sample_idx])
return ret
def num_tokens(self, index):
return self.sizes[index].max()
def num_tokens_vec(self, indices):
sizes_vec = self.sizes[np.array(indices)]
# max across all dimensions but first one
return np.amax(sizes_vec, axis=tuple(range(1, len(sizes_vec.shape))))
def size(self, index):
return self.sizes[index]
def __len__(self):
return self.virtual_size
def collater(self, samples, **extra_args):
"""Merge a list of samples to form a mini-batch."""
if len(samples) == 0:
return None
if self.collate_format == "ordered_dict":
collect_samples = [[] for _ in range(len(self.datasets))]
for (i, sample) in samples:
collect_samples[i].append(sample)
batch = OrderedDict(
[
(self.keys[i], dataset.collater(collect_samples[i]))
for i, (key, dataset) in enumerate(zip(self.keys, self.datasets))
if len(collect_samples[i]) > 0
]
)
elif self.shared_collater:
batch = self.datasets[0].collater([s for _, s in samples])
else:
samples_dict = defaultdict(list)
pad_to_length = (
defaultdict(int)
if "pad_to_length" not in extra_args
else extra_args["pad_to_length"]
)
for ds_idx, s in samples:
pad_to_length["source"] = max(
pad_to_length["source"], s["source"].size(0)
)
if s["target"] is not None:
pad_to_length["target"] = max(
pad_to_length["target"], s["target"].size(0)
)
samples_dict[ds_idx].append(s)
batches = [
self.datasets[i].collater(samples_dict[i], pad_to_length=pad_to_length)
for i in range(len(self.datasets))
if len(samples_dict[i]) > 0
]
def straight_data(tensors):
batch = torch.cat(tensors, dim=0)
return batch
src_lengths = straight_data(
[b["net_input"]["src_lengths"] for b in batches]
)
src_lengths, sort_order = src_lengths.sort(descending=True)
def straight_order(tensors):
batch = straight_data(tensors)
return batch.index_select(0, sort_order)
batch = {
"id": straight_order([b["id"] for b in batches]),
"nsentences": sum(b["nsentences"] for b in batches),
"ntokens": sum(b["ntokens"] for b in batches),
"net_input": {
"src_tokens": straight_order(
[b["net_input"]["src_tokens"] for b in batches]
),
"src_lengths": src_lengths,
},
"target": straight_order([b["target"] for b in batches])
if batches[0]["target"] is not None
else None,
}
if "prev_output_tokens" in batches[0]["net_input"]:
batch["net_input"]["prev_output_tokens"] = straight_order(
[b["net_input"]["prev_output_tokens"] for b in batches]
)
if "src_lang_id" in batches[0]["net_input"]:
batch["net_input"]["src_lang_id"] = straight_order(
[b["net_input"]["src_lang_id"] for b in batches]
)
if "tgt_lang_id" in batches[0]:
batch["tgt_lang_id"] = straight_order(
[b["tgt_lang_id"] for b in batches]
)
return batch
@property
def sizes(self):
if self._sizes is not None:
return self._sizes
start_time = time.time()
in_sub_dataset_indices = [
self._cur_indices[
0 if i == 0 else self.cumulated_sizes[i - 1] : self.cumulated_sizes[i]
]
for i in range(len(self.datasets))
]
sub_dataset_sizes = [
d.sizes[indices]
for d, indices in zip(self.datasets, in_sub_dataset_indices)
]
self._sizes = np.vstack(sub_dataset_sizes)
logger.info(f"sizes() calling time: {get_time_gap(start_time, time.time())}")
return self._sizes
def ordered_indices(self):
if self.shuffle:
indices = np.random.permutation(len(self))
else:
indices = np.arange(len(self))
sizes = self.sizes
tgt_sizes = sizes[:, 1] if len(sizes.shape) > 0 and sizes.shape[1] > 1 else None
src_sizes = (
sizes[:, 0] if len(sizes.shape) > 0 and sizes.shape[1] > 1 else sizes
)
# sort by target length, then source length
if tgt_sizes is not None:
indices = indices[np.argsort(tgt_sizes[indices], kind="mergesort")]
sort_indices = indices[np.argsort(src_sizes[indices], kind="mergesort")]
return sort_indices
def prefetch(self, indices):
prefetch_indices = [[] for _ in range(len(self.datasets))]
for i in indices:
ds_idx, ds_sample_idx = self._get_dataset_and_index(i)
prefetch_indices[ds_idx].append(ds_sample_idx)
for i in range(len(prefetch_indices)):
self.datasets[i].prefetch(prefetch_indices[i])
@property
def can_reuse_epoch_itr_across_epochs(self):
return False
def set_epoch(self, epoch):
super().set_epoch(epoch)
if epoch == self._cur_epoch:
# re-enter so return
return
for d in self.datasets:
if hasattr(d, "set_epoch"):
d.set_epoch(epoch)
self._cur_epoch = epoch
self._establish_virtual_datasets()
def _establish_virtual_datasets(self):
if self.sample_ratios is None and self._cur_indices is not None:
# not a samping dataset, no need to resample if indices are already established
return
self._reset_cached_properties()
start_time = time.time()
# Generate a weighted sample of indices as a function of the
# random seed and the current epoch.
rng = np.random.RandomState(
[
int(
hashlib.sha1(
str(self.__class__.__name__).encode("utf-8")
).hexdigest(),
16,
)
% (2**32),
self.seed % (2**32), # global seed
self._cur_epoch, # epoch index,
]
)
self._clean_if_not_none(
[self.cumulated_sizes, self.virtual_size_per_dataset, self._sizes]
)
self._sizes = None
indices, cumulated_sizes, virtual_size_per_dataset = self.get_virtual_indices(
rng, self.datasets, self.sample_ratios, self.virtual_size
)
self._cur_indices = indices
self.cumulated_sizes = cumulated_sizes
self.virtual_size_per_dataset = virtual_size_per_dataset
raw_sizes = [len(d) for d in self.datasets]
sampled_sizes = self.virtual_size_per_dataset
logger.info(
f"[{self.split}] Raw sizes: {str(dict(zip(self.keys, raw_sizes)))}; "
f"raw total size: {sum(raw_sizes)}"
)
logger.info(
f"[{self.split}] Resampled sizes: {str(dict(zip(self.keys, sampled_sizes)))}; "
f"resampled total size: {sum(sampled_sizes)}"
)
if self.sample_ratios is not None:
logger.info(
f"[{self.split}] Upsampling ratios: {str(dict(zip(self.keys, self.sample_ratios)))}"
)
else:
logger.info(f"[{self.split}] A concat dataset")
logger.info(
f"[{self.split}] virtual dataset established time: {get_time_gap(start_time, time.time())}"
)
def filter_indices_by_size(self, indices, max_sizes):
"""Filter a list of sample indices. Remove those that are longer
than specified in max_sizes.
Args:
indices (np.array): original array of sample indices
max_sizes (int or list[int] or tuple[int]): max sample size,
can be defined separately for src and tgt (then list or tuple)
Returns:
np.array: filtered sample array
list: list of removed indices
"""
sizes = self.sizes
tgt_sizes = sizes[:, 1] if len(sizes.shape) > 0 and sizes.shape[1] > 1 else None
src_sizes = (
sizes[:, 0] if len(sizes.shape) > 0 and sizes.shape[1] > 1 else sizes
)
return data_utils.filter_paired_dataset_indices_by_size(
src_sizes, tgt_sizes, indices, max_sizes
)
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import hashlib
import logging
import math
import numpy as np
from fairseq.data import SampledMultiDataset
from .sampled_multi_dataset import CollateFormat, default_virtual_size_func
logger = logging.getLogger(__name__)
class SampledMultiEpochDataset(SampledMultiDataset):
"""Samples from multiple sub-datasets according to sampling ratios
using virtual epoch sizes to speed up dataloading.
Args:
datasets (
List[~torch.utils.data.Dataset]
or OrderedDict[str, ~torch.utils.data.Dataset]
): datasets
sampling_ratios (List[float]): list of probability of each dataset to be sampled
(default: None, which corresponds to concating all dataset together).
seed (int): RNG seed to use (default: 2).
epoch (int): starting epoch number (default: 1).
eval_key (str, optional): a key used at evaluation time that causes
this instance to pass-through batches from *datasets[eval_key]*.
collate_format (CollateFormat): collater output format, either CollateFormat.ordered_dict or
CollateFormat.single (default: CollateFormat.single) where CollateFormat.single configures
the collater to output batches of data mixed from all sub-datasets,
and CollateFormat.ordered_dict configures the collater to output a dictionary of batches indexed by keys
of sub-datasets.
Note that not all sub-datasets will present in a single batch in both formats.
virtual_size (int, or callable): the expected virtual size of the dataset (default: default_virtual_size_func).
split (str): the split of the data, e.g. 'train', 'valid' or 'test'.
virtual_epoch_size (int): virtual epoch size, the dataset will go through the data by
this virtual epoch size one by one to speed up data loading, e.g. indicing and filtering
can be performed whenever a virtual epoch is loaded without waiting for the whole dataset to be loaded.
shared_collater (bool): whether or not to all sub-datasets have the same collater.
shard_epoch (int): the real epoch number for shard selection.
shuffle (bool): whether or not to shuffle data (default: True).
"""
def __init__(
self,
datasets,
sampling_ratios=None,
seed=2,
epoch=1,
eval_key=None,
collate_format=CollateFormat.single,
virtual_size=default_virtual_size_func,
split="",
virtual_epoch_size=None,
shared_collater=False,
shard_epoch=1,
shuffle=True,
):
self.virtual_epoch_size = virtual_epoch_size
self._current_epoch_start_index = None
self._random_global_indices = None
self.shard_epoch = shard_epoch if shard_epoch is not None else 1
self.load_next_shard = None
self._epoch_sizes = None
super().__init__(
datasets=datasets,
sampling_ratios=sampling_ratios,
seed=seed,
epoch=epoch,
eval_key=eval_key,
collate_format=collate_format,
virtual_size=virtual_size,
split=split,
shared_collater=shared_collater,
shuffle=shuffle,
)
def _setup(self, epoch):
self.virtual_epoch_size = (
self.virtual_epoch_size
if self.virtual_epoch_size is not None
else self.virtual_size
)
if self.virtual_epoch_size > self.virtual_size:
logger.warning(
f"virtual epoch size {self.virtual_epoch_size} "
f"is greater than virtual dataset size {self.virtual_size}"
)
self.virtual_epoch_size = self.virtual_size
self.num_virtual_epochs = math.ceil(self.virtual_size / self.virtual_epoch_size)
self._current_epoch_start_index = self._get_epoch_start_index(epoch)
logger.info(
f"virtual epoch size {self.virtual_epoch_size}; virtual dataset size {self.virtual_size}"
)
def _map_epoch_index_to_global(self, index):
index = self._current_epoch_start_index + index
# add randomness
return self._random_global_indices[index]
@property
def sizes(self):
if self._epoch_sizes is not None:
return self._epoch_sizes
_sizes = super().sizes
indices = self._random_global_indices[
self._current_epoch_start_index : self._current_epoch_start_index
+ len(self)
]
self._epoch_sizes = _sizes[indices]
# del super()._sizes to save memory
del self._sizes
self._sizes = None
return self._epoch_sizes
def _get_dataset_and_index(self, index):
i = self._map_epoch_index_to_global(index)
return super()._get_dataset_and_index(i)
def __len__(self):
return (
self.virtual_epoch_size
if self._current_epoch_start_index + self.virtual_epoch_size
< self.virtual_size
else self.virtual_size - self._current_epoch_start_index
)
def set_epoch(self, epoch):
if self._current_epoch_start_index is None:
# initializing epoch idnices of a virtual dataset
self._setup(epoch)
self._next_virtual_epoch(epoch)
else:
# working on already intialized epoch indices
if epoch == self._cur_epoch:
# re-enter so return
return
self._next_virtual_epoch(epoch)
def _get_epoch_start_index(self, epoch):
assert epoch >= 1 # fairseq is using 1-based epoch everywhere
return ((epoch - 1) % self.num_virtual_epochs) * self.virtual_epoch_size
def _next_global_indices(self, epoch):
rng = np.random.RandomState(
[
int(
hashlib.sha1(
str(self.__class__.__name__).encode("utf-8")
).hexdigest(),
16,
)
% (2**32),
self.seed % (2**32), # global seed
epoch, # epoch index,
]
)
del self._random_global_indices
self._random_global_indices = rng.choice(
self.virtual_size, self.virtual_size, replace=False
)
if self.load_next_shard is None:
self.load_next_shard = False
else:
# increase shard epoch for next loading
self.shard_epoch += 1
self.load_next_shard = True
logger.info(
"to load next epoch/shard in next load_dataset: "
f"epoch={epoch}/shard_epoch={self.shard_epoch}"
)
def _next_virtual_epoch(self, epoch):
index = self._get_epoch_start_index(epoch)
if index == 0 or self._random_global_indices is None:
# need to start from the beginning,
# so call super().set_epoch(epoch) to establish the global virtual indices
logger.info(
"establishing a new set of global virtual indices for "
f"epoch={epoch}/shard_epoch={self.shard_epoch}"
)
super().set_epoch(epoch)
self._next_global_indices(epoch)
else:
self._cur_epoch = epoch
# reset cache sizes and ordered_indices for the epoch after moving to a new epoch
self._clean_if_not_none(
[
self._epoch_sizes,
]
)
self._epoch_sizes = None
self._current_epoch_start_index = index
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import logging
from typing import List
logger = logging.getLogger(__name__)
def uniform(dataset_sizes: List[int]):
return [1.0] * len(dataset_sizes)
def temperature_sampling(dataset_sizes, temp):
total_size = sum(dataset_sizes)
return [(size / total_size) ** (1.0 / temp) for size in dataset_sizes]
def make_temperature_sampling(temp=1.0):
def sampling_func(dataset_sizes):
return temperature_sampling(dataset_sizes, temp)
return sampling_func
def make_ratio_sampling(ratios):
def sampling_func(dataset_sizes):
return ratios
return sampling_func
class SamplingMethod:
@staticmethod
def add_arguments(parser):
parser.add_argument(
"--sampling-method",
choices=[
"uniform",
"temperature",
"concat",
"RoundRobin",
],
type=str,
default="concat",
help="The method to sample data per language pairs",
)
parser.add_argument(
"--sampling-temperature",
default=1.5,
type=float,
help="only work with --sampling-method temperature",
)
@staticmethod
def build_sampler(args, task):
return SamplingMethod(args, task)
def __init__(self, args, task):
self.args = args
self.task = task
def is_adaptive(self):
return False
def sampling_method_selector(self):
args = self.args
logger.info(f"selected sampler: {args.sampling_method}")
if args.sampling_method == "uniform":
return uniform
elif args.sampling_method == "temperature" or self.is_adaptive():
return make_temperature_sampling(float(args.sampling_temperature))
else:
# default to concating all data set together
return None
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from collections import OrderedDict
import torch
from torch.utils.data.dataloader import default_collate
from . import FairseqDataset
def _flatten(dico, prefix=None):
"""Flatten a nested dictionary."""
new_dico = OrderedDict()
if isinstance(dico, dict):
prefix = prefix + "." if prefix is not None else ""
for k, v in dico.items():
if v is None:
continue
new_dico.update(_flatten(v, prefix + k))
elif isinstance(dico, list):
for i, v in enumerate(dico):
new_dico.update(_flatten(v, prefix + ".[" + str(i) + "]"))
else:
new_dico = OrderedDict({prefix: dico})
return new_dico
def _unflatten(dico):
"""Unflatten a flattened dictionary into a nested dictionary."""
new_dico = OrderedDict()
for full_k, v in dico.items():
full_k = full_k.split(".")
node = new_dico
for k in full_k[:-1]:
if k.startswith("[") and k.endswith("]"):
k = int(k[1:-1])
if k not in node:
node[k] = OrderedDict()
node = node[k]
node[full_k[-1]] = v
return new_dico
class NestedDictionaryDataset(FairseqDataset):
def __init__(self, defn, sizes=None):
super().__init__()
self.defn = _flatten(defn)
self.sizes = [sizes] if not isinstance(sizes, (list, tuple)) else sizes
first = None
for v in self.defn.values():
if not isinstance(
v,
(
FairseqDataset,
torch.utils.data.Dataset,
),
):
raise ValueError("Expected Dataset but found: {}".format(v.__class__))
first = first or v
if len(v) > 0:
assert len(v) == len(first), "dataset lengths must match"
self._len = len(first)
def __getitem__(self, index):
return OrderedDict((k, ds[index]) for k, ds in self.defn.items())
def __len__(self):
return self._len
def collater(self, samples):
"""Merge a list of samples to form a mini-batch.
Args:
samples (List[dict]): samples to collate
Returns:
dict: a mini-batch suitable for forwarding with a Model
"""
if len(samples) == 0:
return {}
sample = OrderedDict()
for k, ds in self.defn.items():
try:
sample[k] = ds.collater([s[k] for s in samples])
except NotImplementedError:
sample[k] = default_collate([s[k] for s in samples])
return _unflatten(sample)
def num_tokens(self, index):
"""Return the number of tokens in a sample. This value is used to
enforce ``--max-tokens`` during batching."""
return max(s[index] for s in self.sizes)
def size(self, index):
"""Return an example's size as a float or tuple. This value is used when
filtering a dataset with ``--max-positions``."""
if len(self.sizes) == 1:
return self.sizes[0][index]
else:
return (s[index] for s in self.sizes)
@property
def supports_prefetch(self):
"""Whether this dataset supports prefetching."""
return any(ds.supports_prefetch for ds in self.defn.values())
def prefetch(self, indices):
"""Prefetch the data required for this epoch."""
for ds in self.defn.values():
if getattr(ds, "supports_prefetch", False):
ds.prefetch(indices)
@property
def can_reuse_epoch_itr_across_epochs(self):
return all(ds.can_reuse_epoch_itr_across_epochs for ds in self.defn.values())
def set_epoch(self, epoch):
super().set_epoch(epoch)
for ds in self.defn.values():
ds.set_epoch(epoch)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment