Commit 7df61696 authored by Sugon_ldc's avatar Sugon_ldc
Browse files

add fairseq0.10.2

parents
Pipeline #471 failed with stages
in 0 seconds
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from fairseq.data import Dictionary
class MaskedLMDictionary(Dictionary):
"""
Dictionary for Masked Language Modelling tasks. This extends Dictionary by
adding the mask symbol.
"""
def __init__(
self,
pad="<pad>",
eos="</s>",
unk="<unk>",
mask="<mask>",
):
super().__init__(pad=pad, eos=eos, unk=unk)
self.mask_word = mask
self.mask_index = self.add_symbol(mask)
self.nspecial = len(self.symbols)
def mask(self):
"""Helper to get index of mask symbol"""
return self.mask_index
class BertDictionary(MaskedLMDictionary):
"""
Dictionary for BERT task. This extends MaskedLMDictionary by adding support
for cls and sep symbols.
"""
def __init__(
self,
pad="<pad>",
eos="</s>",
unk="<unk>",
mask="<mask>",
cls="<cls>",
sep="<sep>",
):
super().__init__(pad=pad, eos=eos, unk=unk, mask=mask)
self.cls_word = cls
self.sep_word = sep
self.cls_index = self.add_symbol(cls)
self.sep_index = self.add_symbol(sep)
self.nspecial = len(self.symbols)
def cls(self):
"""Helper to get index of cls symbol"""
return self.cls_index
def sep(self):
"""Helper to get index of sep symbol"""
return self.sep_index
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from . import BaseWrapperDataset
class ListDataset(BaseWrapperDataset):
def __init__(self, dataset, sizes=None):
super().__init__(dataset)
self._sizes = sizes
def __iter__(self):
for x in self.dataset:
yield x
def collater(self, samples):
return samples
@property
def sizes(self):
return self._sizes
def num_tokens(self, index):
return self.sizes[index]
def size(self, index):
return self.sizes[index]
def set_epoch(self, epoch):
pass
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import numpy as np
import torch
from fairseq.data.monolingual_dataset import MonolingualDataset
from . import FairseqDataset
class LMContextWindowDataset(FairseqDataset):
"""Wraps a MonolingualDataset and provides more context for evaluation."""
def __init__(self, dataset, tokens_per_sample, context_window, pad_idx):
assert isinstance(dataset, MonolingualDataset)
assert context_window > 0
self.dataset = dataset
self.tokens_per_sample = tokens_per_sample
self.context_window = context_window
self.pad_idx = pad_idx
self.prev_tokens = np.empty([0])
def __getitem__(self, index):
return self.dataset[index]
def __len__(self):
return len(self.dataset)
def collater(self, samples):
sample = self.dataset.collater(samples)
pad = self.pad_idx
max_sample_len = self.tokens_per_sample + self.context_window
bsz, tsz = sample["net_input"]["src_tokens"].shape
start_idxs = [0] * bsz
toks = sample["net_input"]["src_tokens"]
lengths = sample["net_input"]["src_lengths"]
tgt = sample["target"]
new_toks = np.empty([bsz, tsz + self.context_window], dtype=np.int64)
new_tgt = np.full([bsz, tsz + self.context_window], pad, dtype=np.int64)
sample_lens = toks.ne(pad).long().sum(dim=1).cpu()
for i in range(bsz):
sample_len = sample_lens[i]
extra = len(self.prev_tokens) + sample_len - max_sample_len
if extra > 0:
self.prev_tokens = self.prev_tokens[extra:]
pads = np.full(self.context_window - len(self.prev_tokens), pad)
new_toks[i] = np.concatenate([self.prev_tokens, toks[i].numpy(), pads])
new_tgt[
i, len(self.prev_tokens) : len(self.prev_tokens) + len(tgt[i])
] = tgt[i]
start_idxs[i] = len(self.prev_tokens)
lengths[i] += len(self.prev_tokens)
self.prev_tokens = new_toks[i][new_toks[i] != pad][-self.context_window :]
sample["net_input"]["src_tokens"] = torch.from_numpy(new_toks)
sample["target"] = torch.from_numpy(new_tgt)
sample["start_indices"] = start_idxs
return sample
def num_tokens(self, index):
return self.dataset.num_tokens(index)
def size(self, index):
return self.dataset.size(index)
def ordered_indices(self):
# NOTE we don't shuffle the data to retain access to the previous dataset elements
return np.arange(len(self.dataset))
@property
def supports_prefetch(self):
return getattr(self.dataset, "supports_prefetch", False)
def prefetch(self, indices):
return self.dataset.prefetch(indices)
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from functools import lru_cache
from . import BaseWrapperDataset
class LRUCacheDataset(BaseWrapperDataset):
def __init__(self, dataset, token=None):
super().__init__(dataset)
@lru_cache(maxsize=8)
def __getitem__(self, index):
return self.dataset[index]
@lru_cache(maxsize=8)
def collater(self, samples):
return self.dataset.collater(samples)
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from functools import lru_cache
import numpy as np
import torch
from fairseq.data import Dictionary, data_utils
from . import BaseWrapperDataset, LRUCacheDataset
class MaskTokensDataset(BaseWrapperDataset):
"""
A wrapper Dataset for masked language modeling.
Input items are masked according to the specified masking probability.
Args:
dataset: Dataset to wrap.
sizes: Sentence lengths
vocab: Dictionary with the vocabulary and special tokens.
pad_idx: Id of pad token in vocab
mask_idx: Id of mask token in vocab
return_masked_tokens: controls whether to return the non-masked tokens
(the default) or to return a tensor with the original masked token
IDs (and *pad_idx* elsewhere). The latter is useful as targets for
masked LM training.
seed: Seed for random number generator for reproducibility.
mask_prob: probability of replacing a token with *mask_idx*.
leave_unmasked_prob: probability that a masked token is unmasked.
random_token_prob: probability of replacing a masked token with a
random token from the vocabulary.
freq_weighted_replacement: sample random replacement words based on
word frequencies in the vocab.
mask_whole_words: only mask whole words. This should be a byte mask
over vocab indices, indicating whether it is the beginning of a
word. We will extend any mask to encompass the whole word.
bpe: BPE to use for whole-word masking.
"""
@classmethod
def apply_mask(cls, dataset: torch.utils.data.Dataset, *args, **kwargs):
"""Return the source and target datasets for masked LM training."""
dataset = LRUCacheDataset(dataset)
return (
LRUCacheDataset(cls(dataset, *args, **kwargs, return_masked_tokens=False)),
LRUCacheDataset(cls(dataset, *args, **kwargs, return_masked_tokens=True)),
)
def __init__(
self,
dataset: torch.utils.data.Dataset,
vocab: Dictionary,
pad_idx: int,
mask_idx: int,
return_masked_tokens: bool = False,
seed: int = 1,
mask_prob: float = 0.15,
leave_unmasked_prob: float = 0.1,
random_token_prob: float = 0.1,
freq_weighted_replacement: bool = False,
mask_whole_words: torch.Tensor = None,
):
assert 0.0 < mask_prob < 1.0
assert 0.0 <= random_token_prob <= 1.0
assert 0.0 <= leave_unmasked_prob <= 1.0
assert random_token_prob + leave_unmasked_prob <= 1.0
self.dataset = dataset
self.vocab = vocab
self.pad_idx = pad_idx
self.mask_idx = mask_idx
self.return_masked_tokens = return_masked_tokens
self.seed = seed
self.mask_prob = mask_prob
self.leave_unmasked_prob = leave_unmasked_prob
self.random_token_prob = random_token_prob
self.mask_whole_words = mask_whole_words
if random_token_prob > 0.0:
if freq_weighted_replacement:
weights = np.array(self.vocab.count)
else:
weights = np.ones(len(self.vocab))
weights[: self.vocab.nspecial] = 0
self.weights = weights / weights.sum()
self.epoch = 0
@property
def can_reuse_epoch_itr_across_epochs(self):
return True # only the noise changes, not item sizes
def set_epoch(self, epoch, **unused):
super().set_epoch(epoch)
self.epoch = epoch
@lru_cache(maxsize=8)
def __getitem__(self, index: int):
with data_utils.numpy_seed(self.seed, self.epoch, index):
item = self.dataset[index]
sz = len(item)
assert (
self.mask_idx not in item
), "Dataset contains mask_idx (={}), this is not expected!".format(
self.mask_idx,
)
if self.mask_whole_words is not None:
word_begins_mask = self.mask_whole_words.gather(0, item)
word_begins_idx = word_begins_mask.nonzero().view(-1)
sz = len(word_begins_idx)
words = np.split(word_begins_mask, word_begins_idx)[1:]
assert len(words) == sz
word_lens = list(map(len, words))
# decide elements to mask
mask = np.full(sz, False)
num_mask = int(
# add a random number for probabilistic rounding
self.mask_prob * sz
+ np.random.rand()
)
mask[np.random.choice(sz, num_mask, replace=False)] = True
if self.return_masked_tokens:
# exit early if we're just returning the masked tokens
# (i.e., the targets for masked LM training)
if self.mask_whole_words is not None:
mask = np.repeat(mask, word_lens)
new_item = np.full(len(mask), self.pad_idx)
new_item[mask] = item[torch.from_numpy(mask.astype(np.uint8)) == 1]
return torch.from_numpy(new_item)
# decide unmasking and random replacement
rand_or_unmask_prob = self.random_token_prob + self.leave_unmasked_prob
if rand_or_unmask_prob > 0.0:
rand_or_unmask = mask & (np.random.rand(sz) < rand_or_unmask_prob)
if self.random_token_prob == 0.0:
unmask = rand_or_unmask
rand_mask = None
elif self.leave_unmasked_prob == 0.0:
unmask = None
rand_mask = rand_or_unmask
else:
unmask_prob = self.leave_unmasked_prob / rand_or_unmask_prob
decision = np.random.rand(sz) < unmask_prob
unmask = rand_or_unmask & decision
rand_mask = rand_or_unmask & (~decision)
else:
unmask = rand_mask = None
if unmask is not None:
mask = mask ^ unmask
if self.mask_whole_words is not None:
mask = np.repeat(mask, word_lens)
new_item = np.copy(item)
new_item[mask] = self.mask_idx
if rand_mask is not None:
num_rand = rand_mask.sum()
if num_rand > 0:
if self.mask_whole_words is not None:
rand_mask = np.repeat(rand_mask, word_lens)
num_rand = rand_mask.sum()
new_item[rand_mask] = np.random.choice(
len(self.vocab),
num_rand,
p=self.weights,
)
return torch.from_numpy(new_item)
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import numpy as np
import torch
from . import FairseqDataset, data_utils
def collate(samples, pad_idx, eos_idx):
if len(samples) == 0:
return {}
def merge(key, is_list=False):
if is_list:
res = []
for i in range(len(samples[0][key])):
res.append(
data_utils.collate_tokens(
[s[key][i] for s in samples],
pad_idx,
eos_idx,
left_pad=False,
)
)
return res
else:
return data_utils.collate_tokens(
[s[key] for s in samples],
pad_idx,
eos_idx,
left_pad=False,
)
src_tokens = merge("source")
if samples[0]["target"] is not None:
is_target_list = isinstance(samples[0]["target"], list)
target = merge("target", is_target_list)
else:
target = src_tokens
return {
"id": torch.LongTensor([s["id"] for s in samples]),
"nsentences": len(samples),
"ntokens": sum(len(s["source"]) for s in samples),
"net_input": {
"src_tokens": src_tokens,
"src_lengths": torch.LongTensor([s["source"].numel() for s in samples]),
},
"target": target,
}
class MonolingualDataset(FairseqDataset):
"""
A wrapper around torch.utils.data.Dataset for monolingual data.
Args:
dataset (torch.utils.data.Dataset): dataset to wrap
sizes (List[int]): sentence lengths
vocab (~fairseq.data.Dictionary): vocabulary
shuffle (bool, optional): shuffle the elements before batching
(default: True).
"""
def __init__(
self,
dataset,
sizes,
src_vocab,
tgt_vocab,
add_eos_for_other_targets,
shuffle,
targets=None,
add_bos_token=False,
):
self.dataset = dataset
self.sizes = np.array(sizes)
self.vocab = src_vocab
self.tgt_vocab = tgt_vocab
self.add_eos_for_other_targets = add_eos_for_other_targets
self.shuffle = shuffle
self.add_bos_token = add_bos_token
assert targets is None or all(
t in {"self", "future", "past"} for t in targets
), "targets must be none or one of 'self', 'future', 'past'"
if targets is not None and len(targets) == 0:
targets = None
self.targets = targets
def __getitem__(self, index):
if self.targets is not None:
# *future_target* is the original sentence
# *source* is shifted right by 1 (maybe left-padded with eos)
# *past_target* is shifted right by 2 (left-padded as needed)
#
# Left-to-right language models should condition on *source* and
# predict *future_target*.
# Right-to-left language models should condition on *source* and
# predict *past_target*.
source, future_target, past_target = self.dataset[index]
source, target = self._make_source_target(
source, future_target, past_target
)
else:
source = self.dataset[index]
target = None
source, target = self._maybe_add_bos(source, target)
return {"id": index, "source": source, "target": target}
def __len__(self):
return len(self.dataset)
def _make_source_target(self, source, future_target, past_target):
if self.targets is not None:
target = []
if (
self.add_eos_for_other_targets
and (("self" in self.targets) or ("past" in self.targets))
and source[-1] != self.vocab.eos()
):
# append eos at the end of source
source = torch.cat([source, source.new([self.vocab.eos()])])
if "future" in self.targets:
future_target = torch.cat(
[future_target, future_target.new([self.vocab.pad()])]
)
if "past" in self.targets:
# first token is before the start of sentence which is only used in "none" break mode when
# add_eos_for_other_targets is False
past_target = torch.cat(
[
past_target.new([self.vocab.pad()]),
past_target[1:],
source[-2, None],
]
)
for t in self.targets:
if t == "self":
target.append(source)
elif t == "future":
target.append(future_target)
elif t == "past":
target.append(past_target)
else:
raise Exception("invalid target " + t)
if len(target) == 1:
target = target[0]
else:
target = future_target
return source, self._filter_vocab(target)
def _maybe_add_bos(self, source, target):
if self.add_bos_token:
source = torch.cat([source.new([self.vocab.bos()]), source])
if target is not None:
target = torch.cat([target.new([self.tgt_vocab.bos()]), target])
return source, target
def _filter_vocab(self, target):
if len(self.tgt_vocab) != len(self.vocab):
def _filter(target):
mask = target.ge(len(self.tgt_vocab))
if mask.any():
target[mask] = self.tgt_vocab.unk()
return target
if isinstance(target, list):
return [_filter(t) for t in target]
return _filter(target)
return target
def collater(self, samples):
"""Merge a list of samples to form a mini-batch.
Args:
samples (List[dict]): samples to collate
Returns:
dict: a mini-batch with the following keys:
- `id` (LongTensor): example IDs in the original input order
- `ntokens` (int): total number of tokens in the batch
- `net_input` (dict): the input to the Model, containing keys:
- `src_tokens` (LongTensor): a padded 2D Tensor of tokens in
the source sentence of shape `(bsz, src_len)`. Padding will
appear on the right.
- `target` (LongTensor): a padded 2D Tensor of tokens in the
target sentence of shape `(bsz, tgt_len)`. Padding will appear
on the right.
"""
return collate(samples, self.vocab.pad(), self.vocab.eos())
def num_tokens(self, index):
"""Return the number of tokens in a sample. This value is used to
enforce ``--max-tokens`` during batching."""
return self.sizes[index]
def size(self, index):
"""Return an example's size as a float or tuple. This value is used when
filtering a dataset with ``--max-positions``."""
return self.sizes[index]
def ordered_indices(self):
"""Return an ordered list of indices. Batches will be constructed based
on this order."""
if self.shuffle:
order = [np.random.permutation(len(self))]
else:
order = [np.arange(len(self))]
order.append(self.sizes)
return np.lexsort(order)
@property
def supports_prefetch(self):
return getattr(self.dataset, "supports_prefetch", False)
def prefetch(self, indices):
self.dataset.prefetch(indices)
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import logging
from collections import OrderedDict
from typing import Dict, List
import numpy as np
from fairseq.data import data_utils
from . import FairseqDataset
logger = logging.getLogger(__name__)
class MultiCorpusDataset(FairseqDataset):
"""
Stores multiple instances of FairseqDataset together. Requires each instance
to be the same dataset, as the collate method needs to work on batches with
samples from each dataset.
Allows specifying a distribution over the datasets to use. Note that unlike
MultiCorpusSampledDataset, this distribution allows sampling for each item,
rather than on a batch level.
Each time ordered_indices() is called, a new sample is generated with
the specified distribution.
Args:
datasets: a OrderedDict of FairseqDataset instances.
distribution: a List containing the probability of getting an utterance from
corresponding dataset
seed: random seed for sampling the datsets
sort_indices: if true, will sort the ordered indices by size
"""
def __init__(
self,
datasets: Dict[str, FairseqDataset],
distribution: List[float],
seed: int,
sort_indices: bool = False,
):
super().__init__()
assert isinstance(datasets, OrderedDict)
assert len(datasets) == len(distribution)
self.datasets = datasets
self.distribution = distribution
self.seed = seed
self.sort_indices = sort_indices
# Avoid repeated conversions to list later
self.dataset_list = list(datasets.values())
self.total_num_instances = 0
first_dataset = list(self.datasets.values())[0]
self.dataset_offsets = []
for dataset in datasets.values():
assert isinstance(dataset, FairseqDataset)
assert type(dataset) is type(first_dataset)
self.dataset_offsets.append(self.total_num_instances)
self.total_num_instances += len(dataset)
def ordered_indices(self):
with data_utils.numpy_seed(self.seed, self.epoch):
# Used to store the order of indices of each dataset to use
indices = [
np.random.permutation(len(dataset))
for dataset in self.datasets.values()
]
# Keep track of which samples we've used for each dataset
counters = [0 for _ in self.datasets]
sampled_indices = [
self._sample(indices, counters) for _ in range(self.total_num_instances)
]
if self.sort_indices:
sampled_indices.sort(key=lambda i: self.num_tokens(i))
return np.array(sampled_indices, dtype=np.int64)
def _sample(self, indices, counters):
# First pick dataset
dataset_idx = np.random.choice(len(self.distribution), p=self.distribution)
# Then get dataset internal index
idx = indices[dataset_idx][counters[dataset_idx]]
# Convert to multi-datasets index
idx += self.dataset_offsets[dataset_idx]
counters[dataset_idx] += 1
# Reset if we reach end
if counters[dataset_idx] == len(self.dataset_list[dataset_idx]):
counters[dataset_idx] = 0
indices[dataset_idx] = np.random.permutation(
len(self.dataset_list[dataset_idx])
)
return idx
def _map_index(self, index: int):
"""
If dataset A has length N and dataset B has length M
then index 1 maps to index 1 of dataset A, and index N + 1
maps to index 1 of B.
"""
counter = 0
for key, dataset in self.datasets.items():
if index < counter + len(dataset):
return index - counter, key
counter += len(dataset)
raise ValueError(
"Invalid index: {}, max: {}".format(index, self.total_num_instances)
)
def __len__(self):
"""
Length of this dataset is the sum of individual datasets
"""
return self.total_num_instances
def __getitem__(self, index):
index, key = self._map_index(index)
return self.datasets[key][index]
def collater(self, samples):
"""
Since we enforce all datsets to be the same, collating is just
picking the first one and doing collate.
"""
if len(samples) == 0:
return None
return list(self.datasets.values())[0].collater(samples)
def num_tokens(self, index: int):
index, key = self._map_index(index)
return self.datasets[key].num_tokens(index)
def size(self, index: int):
index, key = self._map_index(index)
return self.datasets[key].size(index)
@property
def can_reuse_epoch_itr_across_epochs(self):
return False
def set_epoch(self, epoch, **unused):
super().set_epoch(epoch)
self.epoch = epoch
@property
def supports_prefetch(self):
return False
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from collections import OrderedDict
from typing import Callable, Dict, List
import numpy as np
from . import FairseqDataset
def uniform_sampler(x):
# Sample from uniform distribution
return np.random.choice(x, 1).item()
class MultiCorpusSampledDataset(FairseqDataset):
"""
Stores multiple instances of FairseqDataset together and in every iteration
creates a batch by first sampling a dataset according to a specified
probability distribution and then getting instances from that dataset.
Args:
datasets: an OrderedDict of FairseqDataset instances.
sampling_func: A function for sampling over list of dataset keys.
The default strategy is to sample uniformly.
"""
def __init__(
self,
datasets: Dict[str, FairseqDataset],
sampling_func: Callable[[List], int] = None,
):
super().__init__()
assert isinstance(datasets, OrderedDict)
self.datasets = datasets
if sampling_func is None:
sampling_func = uniform_sampler
self.sampling_func = sampling_func
self.total_num_instances = 0
for _, dataset in datasets.items():
assert isinstance(dataset, FairseqDataset)
self.total_num_instances += len(dataset)
self._ordered_indices = None
def __len__(self):
"""
Length of this dataset is the sum of individual datasets
"""
return self.total_num_instances
def ordered_indices(self):
"""
Ordered indices for batching. Here we call the underlying
dataset's ordered_indices() so that we get the same random ordering
as we would have from using the underlying dataset directly.
"""
if self._ordered_indices is None:
self._ordered_indices = OrderedDict(
[
(key, dataset.ordered_indices())
for key, dataset in self.datasets.items()
]
)
return np.arange(len(self))
def _map_index_to_dataset(self, key: int, index: int):
"""
Different underlying datasets have different lengths. In order to ensure
we are not accessing an index outside the range of the current dataset
size, we wrap around. This function should be called after we have
created an ordering for this and all underlying datasets.
"""
assert (
self._ordered_indices is not None
), "Must call MultiCorpusSampledDataset.ordered_indices() first"
mapped_index = index % len(self.datasets[key])
return self._ordered_indices[key][mapped_index]
def __getitem__(self, index: int):
"""
Get the item associated with index from each underlying dataset.
Since index is in the range of [0, TotalNumInstances], we need to
map the index to the dataset before retrieving the item.
"""
return OrderedDict(
[
(key, dataset[self._map_index_to_dataset(key, index)])
for key, dataset in self.datasets.items()
]
)
def collater(self, samples: List[Dict]):
"""
Generate a mini-batch for this dataset.
To convert this into a regular mini-batch we use the following
logic:
1. Select a dataset using the specified probability distribution.
2. Call the collater function of the selected dataset.
"""
if len(samples) == 0:
return None
selected_key = self.sampling_func(list(self.datasets.keys()))
selected_samples = [sample[selected_key] for sample in samples]
return self.datasets[selected_key].collater(selected_samples)
def num_tokens(self, index: int):
"""
Return an example's length (number of tokens), used for batching. Here
we return the max across all examples at index across all underlying
datasets.
"""
return max(
dataset.num_tokens(self._map_index_to_dataset(key, index))
for key, dataset in self.datasets.items()
)
def size(self, index: int):
"""
Return an example's size as a float or tuple. Here we return the max
across all underlying datasets. This value is used when filtering a
dataset with max-positions.
"""
return max(
dataset.size(self._map_index_to_dataset(key, index))
for key, dataset in self.datasets.items()
)
@property
def supports_prefetch(self):
return all(
getattr(dataset, "supports_prefetch", False)
for dataset in self.datasets.values()
)
def prefetch(self, indices):
for key, dataset in self.datasets.items():
dataset.prefetch(
[self._map_index_to_dataset(key, index) for index in indices]
)
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import itertools
import json
import logging
import math
import os
from collections import OrderedDict, defaultdict
from fairseq import utils
from fairseq.data import (
AppendTokenDataset,
ConcatDataset,
Dictionary,
LanguagePairDataset,
PrependTokenDataset,
SampledMultiDataset,
SampledMultiEpochDataset,
StripTokenDataset,
TransformEosLangPairDataset,
TruncateDataset,
data_utils,
indexed_dataset,
)
from fairseq.data.multilingual.multilingual_utils import (
EncoderLangtok,
LangTokSpec,
LangTokStyle,
augment_dictionary,
get_lang_tok,
)
from fairseq.data.multilingual.sampled_multi_dataset import CollateFormat
from fairseq.file_io import PathManager
from fairseq.utils import FileContentsAction, csv_str_list, eval_str_dict
logger = logging.getLogger(__name__)
def _lang_id(dic: Dictionary, lang: str):
"""Return language ID index."""
idx = dic.index(lang)
assert idx != dic.unk_index, "cannot find language ID for lang {}".format(lang)
return idx
def load_sampling_weights(from_file):
with open(from_file) as f:
weights = json.load(f)
return weights
class MultilingualDatasetManager(object):
def __init__(self, args, lang_pairs, langs, dicts, sampling_method):
super().__init__()
self.args = args
self.seed = args.seed
self.lang_pairs = lang_pairs
self.langs = langs
self.dicts = dicts
self.lang_dict = self.create_lang_dictionary(self.langs)
self.sampling_method = sampling_method
self.sampling_scheduler = None
self._has_sharded_data = False
self._num_shards_dict = {}
self._training_data_sizes = defaultdict(lambda: {})
@classmethod
def setup_data_manager(cls, args, lang_pairs, langs, dicts, sampling_method):
return MultilingualDatasetManager(
args, lang_pairs, langs, dicts, sampling_method
)
@staticmethod
def add_args(parser):
parser.add_argument(
"data",
help="colon separated path to data directories list, \
will be iterated upon during epochs in round-robin manner",
action=FileContentsAction,
)
parser.add_argument(
"--langs",
default=None,
type=csv_str_list,
help="a list of languages comma sperated languages which can appear in lang-pairs; "
"note that the ordering determines language token IDs",
)
parser.add_argument(
"--lang-dict",
default=None,
type=str,
help="an external file which contains a list of "
"languages which can appear in lang-pairs; "
"note that the ordering determines language token IDs; "
"--langs and --lang-dict are two exclusive options",
)
parser.add_argument(
"--lang-tok-style",
default=LangTokStyle.multilingual.value,
type=str,
choices=[LangTokStyle.multilingual.value, LangTokStyle.mbart.value],
help="language token styles",
)
parser.add_argument(
"--load-alignments",
action="store_true",
help="load the binarized alignments",
)
parser.add_argument(
"--left-pad-source",
default="True",
type=str,
metavar="BOOL",
help="pad the source on the left",
)
parser.add_argument(
"--left-pad-target",
default="False",
type=str,
metavar="BOOL",
help="pad the target on the left",
)
parser.add_argument(
"--max-source-positions",
default=1024,
type=int,
metavar="N",
help="max number of tokens in the source sequence",
)
parser.add_argument(
"--max-target-positions",
default=1024,
type=int,
metavar="N",
help="max number of tokens in the target sequence",
)
parser.add_argument(
"--upsample-primary",
default=1,
type=int,
help="amount to upsample primary dataset",
)
parser.add_argument(
"--truncate-source",
action="store_true",
default=False,
help="truncate source to max-source-positions",
)
parser.add_argument(
"--encoder-langtok",
default=None,
type=str,
choices=[EncoderLangtok.src.value, EncoderLangtok.tgt.value],
metavar="SRCTGT",
help="prepend to the beginning of source sentence the source or target "
"language token. (src/tgt)",
)
parser.add_argument(
"--decoder-langtok",
action="store_true",
help="prepend to the beginning of target sentence the target language token",
)
parser.add_argument(
"--lang-tok-replacing-bos-eos", action="store_true", default=False
)
parser.add_argument(
"--enable-lang-ids",
default=False,
action="store_true",
help="whether to include language IDs in samples",
)
parser.add_argument(
"--enable-reservsed-directions-shared-datasets",
default=False,
action="store_true",
help="whether to allow datasets be used in reversed directions",
)
parser.add_argument(
"--extra-data",
help='a dictionary of data name to this path, \
e.g. {"mined", path_to_mined_data, "denoised": path_to_denoised_data}',
type=lambda uf: eval_str_dict(uf, type=str),
default=None,
)
parser.add_argument(
"--extra-lang-pairs",
help='a dictionary of data name to the language pairs they serve, \
e.g. {"mined": comma-separated-lang-pairs, "denoised": comma-separated-lang-pairs}',
type=lambda uf: eval_str_dict(uf, type=str),
default=None,
)
parser.add_argument(
"--fixed-dictionary",
help="Fixed dictionary to use with model path",
default=None,
type=str,
)
parser.add_argument(
"--langtoks-specs",
help='a list of comma separated data types that a set of language tokens to be specialized for, \
e.g. "main,dae,mined". There will be a set of language tokens added to the vocab to \
distinguish languages in different training data types. If not specified, default language \
tokens per languages will be added',
default=LangTokSpec.main.value,
type=csv_str_list,
)
parser.add_argument(
"--langtoks",
help='a dictionary of how to add language tokens, \
e.g. {"mined": (None, "tgt"), "mono_dae": ("src.dae", "tgt"), "main": \
("src", "tgt")}, or {"mined": ("src.mined", "tgt")}',
default=None,
type=lambda uf: eval_str_dict(uf, type=str),
)
parser.add_argument(
"--sampling-weights-from-file",
help='a file contain a python dictionary of how to sample data sets, \
e.g. { "main:en_XX-es_XX": 0.2, "mined:en_XX-pt_XX": 0.5, \
"mono_dae:es_XX-es_XX: 0.3, "main:en_xx-fr_XX": 0.8 }',
default=None,
type=str,
)
parser.add_argument(
"--sampling-weights",
help='a dictionary of how to sample data sets, \
e.g. { "main:en_XX-es_XX": 0.2, "mined:en_XX-pt_XX": 0.5, \
"mono_dae:es_XX-es_XX: 0.3, "main:en_xx-fr_XX": 0.8 }',
default=None,
type=lambda uf: eval_str_dict(uf, type=str),
)
parser.add_argument(
"--virtual-epoch-size",
default=1000000,
type=int,
help="virtual epoch size to speed up data loading",
)
parser.add_argument(
"--virtual-data-size",
default=None,
type=int,
help="virtual data size of the whole joint dataset to speed"
"up data loading and have specific dynamic sampling strategy interval",
)
@classmethod
def load_langs(cls, args, **kwargs):
if args.lang_dict and args.langs:
raise ValueError("--langs and --lang-dict can not both be specified")
if args.lang_dict is None and args.langs is None:
logger.warning(
"External language dictionary is not provided; "
"use lang-pairs to infer the set of supported languages. "
"The language ordering is not stable which might cause "
"misalignment in pretraining and finetuning."
)
# infer from lang_pairs as it is
langs = list(
{x for lang_pair in args.lang_pairs for x in lang_pair.split("-")}
)
langs = sorted(langs)
logger.info(f"inferred language list: {langs}")
elif args.lang_dict:
with open(
PathManager.get_local_path(args.lang_dict), "r", encoding="utf-8"
) as f:
langs = [lang.strip() for lang in f.readlines() if lang.strip()]
logger.info(
f"loaded language list from {args.lang_dict} as they are ordered in file"
)
elif args.langs:
langs = args.langs
logger.info(
f"parsed the language list as they are ordered in the option: {langs}"
)
return langs
def has_sharded_data(self, split):
return self._has_sharded_data and split == getattr(
self.args, "train_subset", None
)
def _shared_collater(self):
return not (self.args.extra_data and "mono_dae" in self.args.extra_data) and (
not self.args.lang_tok_replacing_bos_eos
)
def estimate_global_pass_epoch(self, epoch):
if self.args.virtual_epoch_size is None or self.args.virtual_data_size is None:
return None
# one epoch more for remaining data in each shard
virtual_epochs_per_shard = math.ceil(
self.args.virtual_data_size / self.args.virtual_epoch_size
)
# note that fairseq epoch / shard_epoch starts from 1
shard_epoch = (epoch - 1) // virtual_epochs_per_shard + 1
return shard_epoch
@classmethod
def prepare(cls, load_dictionary, args, **kargs):
args.left_pad_source = utils.eval_bool(args.left_pad_source)
args.left_pad_target = utils.eval_bool(args.left_pad_target)
if not hasattr(args, "shuffle_instance"):
args.shuffle_instance = False
if args.langtoks is None:
args.langtoks = {}
if "main" not in args.langtoks:
src_langtok_spec = args.encoder_langtok if args.encoder_langtok else None
tgt_langtok_spec = "tgt" if args.decoder_langtok else None
args.langtoks["main"] = (src_langtok_spec, tgt_langtok_spec)
def check_langs(langs, pairs):
messages = []
for src, tgt in pairs:
if src not in langs or tgt not in langs:
messages.append(
f"language pair {src}-{tgt} contains languages "
"that are not in the language dictionary"
)
if len(messages) > 0:
raise ValueError(" ".join(messages) + f"; langs: {langs}")
if args.lang_pairs is None:
raise ValueError(
"--lang-pairs is required. List all the language pairs in the training objective."
)
if isinstance(args.lang_pairs, str):
args.lang_pairs = args.lang_pairs.split(",")
if args.source_lang is not None or args.target_lang is not None:
training = False
else:
training = True
language_list = cls.load_langs(args, **kargs)
check_langs(
language_list,
(
[p.split("-") for p in args.lang_pairs]
if training
else [(args.source_lang, args.target_lang)]
),
)
# load dictionaries
if training:
extra_lang_pairs = (
list(
{p for _, v in args.extra_lang_pairs.items() for p in v.split(",")}
)
if args.extra_lang_pairs
else []
)
langs_to_load_dicts = sorted(
{x for p in args.lang_pairs + extra_lang_pairs for x in p.split("-")}
)
else:
langs_to_load_dicts = sorted([args.source_lang, args.target_lang])
dicts = OrderedDict()
paths = utils.split_paths(args.data)
assert len(paths) > 0
for lang in langs_to_load_dicts:
if args.fixed_dictionary is not None:
dicts[lang] = load_dictionary(args.fixed_dictionary)
else:
dicts[lang] = load_dictionary(
os.path.join(paths[0], "dict.{}.txt".format(lang))
)
augment_dictionary(
dictionary=dicts[lang],
language_list=language_list,
lang_tok_style=args.lang_tok_style,
langtoks_specs=args.langtoks_specs,
extra_data=args.extra_data,
)
if len(dicts) > 0:
assert dicts[lang].pad() == dicts[langs_to_load_dicts[0]].pad()
assert dicts[lang].eos() == dicts[langs_to_load_dicts[0]].eos()
assert dicts[lang].unk() == dicts[langs_to_load_dicts[0]].unk()
logger.info("[{}] dictionary: {} types".format(lang, len(dicts[lang])))
return language_list, dicts, training
@classmethod
def create_lang_dictionary(cls, langs):
unk = "<unk>"
# hack to remove symbols other than unk as they are not needed by lang dict
lang_dict = Dictionary(pad=unk, eos=unk, unk=unk, bos=unk)
for lang in langs:
lang_dict.add_symbol(lang)
return lang_dict
@classmethod
def get_langtok_index(cls, lang_tok, dic):
idx = dic.index(lang_tok)
assert (
idx != dic.unk_index
), "cannot find language token {} in the dictionary".format(lang_tok)
return idx
def get_encoder_langtok(self, src_lang, tgt_lang, spec=None):
if spec is None:
return None
if spec and spec.startswith("src"):
if src_lang is None:
return None
langtok = get_lang_tok(
lang=src_lang, lang_tok_style=self.args.lang_tok_style, spec=spec
)
else:
if tgt_lang is None:
return None
langtok = get_lang_tok(
lang=tgt_lang, lang_tok_style=self.args.lang_tok_style, spec=spec
)
return self.get_langtok_index(
langtok, self.dicts[src_lang if src_lang else tgt_lang]
)
def get_decoder_langtok(self, tgt_lang, spec=None):
if spec is None:
return None
langtok = get_lang_tok(
lang=tgt_lang, lang_tok_style=self.args.lang_tok_style, spec=spec
)
return self.get_langtok_index(langtok, self.dicts[tgt_lang])
@classmethod
def load_data(cls, path, vdict, impl):
dataset = data_utils.load_indexed_dataset(path, vdict, impl)
return dataset
@classmethod
def split_exists(cls, split, src, tgt, lang, data_path, dataset_impl):
filename = os.path.join(data_path, "{}.{}-{}.{}".format(split, src, tgt, lang))
return indexed_dataset.dataset_exists(filename, impl=dataset_impl)
def load_lang_dataset(
self,
data_path,
split,
src,
src_dict,
tgt,
tgt_dict,
combine,
dataset_impl,
upsample_primary,
max_source_positions,
prepend_bos=False,
load_alignments=False,
truncate_source=False,
):
src_datasets = []
tgt_datasets = []
for k in itertools.count():
split_k = split + (str(k) if k > 0 else "")
# infer langcode
if self.split_exists(split_k, src, tgt, src, data_path, dataset_impl):
prefix = os.path.join(data_path, "{}.{}-{}.".format(split_k, src, tgt))
elif self.split_exists(split_k, tgt, src, src, data_path, dataset_impl):
prefix = os.path.join(data_path, "{}.{}-{}.".format(split_k, tgt, src))
else:
if k > 0:
break
else:
logger.error(
f"Dataset not found: {data_path}, {split_k}, {src}, {tgt}"
)
raise FileNotFoundError(
"Dataset not found: {} ({})".format(split, data_path)
)
src_dataset = self.load_data(prefix + src, src_dict, dataset_impl)
if truncate_source:
src_dataset = AppendTokenDataset(
TruncateDataset(
StripTokenDataset(src_dataset, src_dict.eos()),
max_source_positions - 1,
),
src_dict.eos(),
)
src_datasets.append(src_dataset)
tgt_datasets.append(self.load_data(prefix + tgt, tgt_dict, dataset_impl))
logger.info(
"{} {} {}-{} {} examples".format(
data_path, split_k, src, tgt, len(src_datasets[-1])
)
)
if not combine:
break
assert len(src_datasets) == len(tgt_datasets)
if len(src_datasets) == 1:
src_dataset, tgt_dataset = src_datasets[0], tgt_datasets[0]
else:
sample_ratios = [1] * len(src_datasets)
sample_ratios[0] = upsample_primary
src_dataset = ConcatDataset(src_datasets, sample_ratios)
tgt_dataset = ConcatDataset(tgt_datasets, sample_ratios)
if prepend_bos:
assert hasattr(src_dict, "bos_index") and hasattr(tgt_dict, "bos_index")
src_dataset = PrependTokenDataset(src_dataset, src_dict.bos())
tgt_dataset = PrependTokenDataset(tgt_dataset, tgt_dict.bos())
align_dataset = None
if load_alignments:
align_path = os.path.join(
data_path, "{}.align.{}-{}".format(split, src, tgt)
)
if indexed_dataset.dataset_exists(align_path, impl=dataset_impl):
align_dataset = data_utils.load_indexed_dataset(
align_path, None, dataset_impl
)
return src_dataset, tgt_dataset, align_dataset
def load_langpair_dataset(
self,
data_path,
split,
src,
src_dict,
tgt,
tgt_dict,
combine,
dataset_impl,
upsample_primary,
left_pad_source,
left_pad_target,
max_source_positions,
max_target_positions,
prepend_bos=False,
load_alignments=False,
truncate_source=False,
src_dataset_transform_func=lambda dataset: dataset,
tgt_dataset_transform_func=lambda dataset: dataset,
src_lang_id=None,
tgt_lang_id=None,
langpairs_sharing_datasets=None,
):
norm_direction = "-".join(sorted([src, tgt]))
if langpairs_sharing_datasets is not None:
src_dataset = langpairs_sharing_datasets.get(
(data_path, split, norm_direction, src), "NotInCache"
)
tgt_dataset = langpairs_sharing_datasets.get(
(data_path, split, norm_direction, tgt), "NotInCache"
)
align_dataset = langpairs_sharing_datasets.get(
(data_path, split, norm_direction, src, tgt), "NotInCache"
)
# a hack: any one is not in cache, we need to reload them
if (
langpairs_sharing_datasets is None
or src_dataset == "NotInCache"
or tgt_dataset == "NotInCache"
or align_dataset == "NotInCache"
or split != getattr(self.args, "train_subset", None)
):
# source and target datasets can be reused in reversed directions to save memory
# reversed directions of valid and test data will not share source and target datasets
src_dataset, tgt_dataset, align_dataset = self.load_lang_dataset(
data_path,
split,
src,
src_dict,
tgt,
tgt_dict,
combine,
dataset_impl,
upsample_primary,
max_source_positions=max_source_positions,
prepend_bos=prepend_bos,
load_alignments=load_alignments,
truncate_source=truncate_source,
)
src_dataset = src_dataset_transform_func(src_dataset)
tgt_dataset = tgt_dataset_transform_func(tgt_dataset)
if langpairs_sharing_datasets is not None:
langpairs_sharing_datasets[
(data_path, split, norm_direction, src)
] = src_dataset
langpairs_sharing_datasets[
(data_path, split, norm_direction, tgt)
] = tgt_dataset
langpairs_sharing_datasets[
(data_path, split, norm_direction, src, tgt)
] = align_dataset
if align_dataset is None:
# no align data so flag the reverse direction as well in sharing
langpairs_sharing_datasets[
(data_path, split, norm_direction, tgt, src)
] = align_dataset
else:
logger.info(
f"Reusing source and target datasets of [{split}] {tgt}-{src} for reversed direction: "
f"[{split}] {src}-{tgt}: src length={len(src_dataset)}; tgt length={len(tgt_dataset)}"
)
return LanguagePairDataset(
src_dataset,
src_dataset.sizes,
src_dict,
tgt_dataset,
tgt_dataset.sizes if tgt_dataset is not None else None,
tgt_dict,
left_pad_source=left_pad_source,
left_pad_target=left_pad_target,
align_dataset=align_dataset,
src_lang_id=src_lang_id,
tgt_lang_id=tgt_lang_id,
)
def src_dataset_tranform_func(self, src_lang, tgt_lang, dataset, spec=None):
if self.args.lang_tok_replacing_bos_eos:
# it is handled by self.alter_dataset_langtok
# TODO: Unifiy with alter_dataset_langtok
return dataset
if spec is None:
return dataset
tok = self.get_encoder_langtok(src_lang, tgt_lang, spec)
if tok:
return PrependTokenDataset(dataset, tok)
return dataset
def tgt_dataset_tranform_func(self, source_lang, target_lang, dataset, spec=None):
if dataset is None:
# note that target dataset can be None during inference time
return None
if self.args.lang_tok_replacing_bos_eos:
# TODO: Unifiy with alter_dataset_langtok
# It is handled by self.alter_dataset_langtok.
# The complication in self.alter_dataset_langtok
# makes a unified framework difficult.
return dataset
# if not self.args.decoder_langtok:
if not spec:
return dataset
tok = self.get_decoder_langtok(target_lang, spec)
if tok:
return PrependTokenDataset(dataset, tok)
return dataset
def alter_dataset_langtok(
self,
lang_pair_dataset,
src_eos=None,
src_lang=None,
tgt_eos=None,
tgt_lang=None,
src_langtok_spec=None,
tgt_langtok_spec=None,
):
if src_langtok_spec is None and tgt_langtok_spec is None:
return lang_pair_dataset
new_src_eos = None
if (
src_langtok_spec is not None
and src_eos is not None
and (src_lang is not None or tgt_lang is not None)
):
new_src_eos = self.get_encoder_langtok(src_lang, tgt_lang, src_langtok_spec)
else:
src_eos = None
new_tgt_bos = None
if tgt_langtok_spec and tgt_eos is not None and tgt_lang is not None:
new_tgt_bos = self.get_decoder_langtok(tgt_lang, tgt_langtok_spec)
else:
tgt_eos = None
return TransformEosLangPairDataset(
lang_pair_dataset,
src_eos=src_eos,
new_src_eos=new_src_eos,
tgt_bos=tgt_eos,
new_tgt_bos=new_tgt_bos,
)
def load_a_dataset(
self,
split,
data_path,
src,
src_dict,
tgt,
tgt_dict,
combine,
prepend_bos=False,
langpairs_sharing_datasets=None,
data_category=None,
**extra_kwargs,
):
dataset_impl = self.args.dataset_impl
upsample_primary = self.args.upsample_primary
left_pad_source = self.args.left_pad_source
left_pad_target = self.args.left_pad_target
max_source_positions = self.args.max_source_positions
max_target_positions = self.args.max_target_positions
load_alignments = self.args.load_alignments
truncate_source = self.args.truncate_source
src_dataset_transform_func = self.src_dataset_tranform_func
tgt_dataset_transform_func = self.tgt_dataset_tranform_func
enable_lang_ids = self.args.enable_lang_ids
lang_dictionary = self.lang_dict
src_langtok_spec, tgt_langtok_spec = extra_kwargs["langtok_spec"]
src_langtok = self.get_encoder_langtok(src, tgt, src_langtok_spec)
tgt_langtok = self.get_decoder_langtok(tgt, tgt_langtok_spec)
logger.info(
f"{data_category}:{src}-{tgt} src_langtok: {src_langtok}; tgt_langtok: {tgt_langtok}"
)
langpair_ds = self.load_langpair_dataset(
data_path,
split,
src,
src_dict,
tgt,
tgt_dict,
combine,
dataset_impl,
upsample_primary,
left_pad_source,
left_pad_target,
max_source_positions,
max_target_positions,
prepend_bos,
load_alignments,
truncate_source,
src_dataset_transform_func=lambda dataset: src_dataset_transform_func(
src, tgt, dataset, src_langtok_spec
),
tgt_dataset_transform_func=lambda dataset: tgt_dataset_transform_func(
src, tgt, dataset, tgt_langtok_spec
),
src_lang_id=_lang_id(lang_dictionary, src)
if enable_lang_ids and lang_dictionary is not None
else None,
tgt_lang_id=_lang_id(lang_dictionary, tgt)
if enable_lang_ids and lang_dictionary is not None
else None,
langpairs_sharing_datasets=langpairs_sharing_datasets,
)
# TODO: handle modified lang toks for mined data and dae data
if self.args.lang_tok_replacing_bos_eos:
ds = self.alter_dataset_langtok(
langpair_ds,
src_eos=self.dicts[src if src else tgt].eos(),
src_lang=src,
tgt_eos=self.dicts[tgt].eos(),
tgt_lang=tgt,
src_langtok_spec=src_langtok_spec,
tgt_langtok_spec=tgt_langtok_spec,
)
else:
ds = langpair_ds
return ds
def load_split_langpair_datasets(self, split, data_param_list):
datasets = []
langpairs_sharing_datasets = (
{} if self.args.enable_reservsed_directions_shared_datasets else None
)
for param in data_param_list:
ds = self.load_a_dataset(
split=split,
langpairs_sharing_datasets=langpairs_sharing_datasets,
**param,
)
datasets.append(ds)
return datasets
def get_data_paths_and_lang_pairs(self, split):
datapaths = {"main": self.args.data}
lang_pairs = {"main": self.lang_pairs}
if split == getattr(self.args, "train_subset", None):
# only training data can have extra data and extra language pairs
if self.args.extra_data:
extra_datapaths = self.args.extra_data
datapaths.update(extra_datapaths)
if self.args.extra_lang_pairs:
extra_lang_pairs = {
k: v.split(",") for k, v in self.args.extra_lang_pairs.items()
}
lang_pairs.update(extra_lang_pairs)
return datapaths, lang_pairs
@classmethod
def get_dataset_key(cls, data_category, src, tgt):
return f"{data_category}:{src}-{tgt}"
@classmethod
def _get_shard_num_dict(cls, split, paths):
shards = defaultdict(int)
for path in paths:
files = PathManager.ls(path)
directions = set()
for f in files:
if f.startswith(split) and f.endswith(".idx"):
# idx files of the form "{split}.{src}-{tgt}.{lang}.idx"
direction = f.split(".")[-3]
directions.add(direction)
for direction in directions:
shards[direction] += 1
return shards
def get_split_num_data_shards(self, split):
if split in self._num_shards_dict:
return self._num_shards_dict[split]
num_shards_dict = {}
data_paths, lang_pairs = self.get_data_paths_and_lang_pairs(split)
for data_category, paths in data_paths.items():
if data_category not in lang_pairs:
continue
paths = utils.split_paths(paths)
shards_dict = self._get_shard_num_dict(split, paths)
lang_dirs = [
lang_pair.split("-") for lang_pair in lang_pairs[data_category]
]
lang_dirs = [x if len(x) > 1 else (x[0], x[0]) for x in lang_dirs]
for src, tgt in lang_dirs:
key = self.get_dataset_key(data_category, src, tgt)
if "mono_" in data_category:
# monolingual data requires tgt only
assert src is None or src == tgt, (
f"error: src={src}, "
"tgt={tgt} for data_category={data_category}"
)
num_shards_dict[key] = shards_dict[tgt]
else:
if f"{src}-{tgt}" in shards_dict:
num_shards_dict[key] = shards_dict[f"{src}-{tgt}"]
elif f"{tgt}-{src}" in shards_dict:
# follow the fairseq tradition to use reversed direction data if it is not available
num_shards_dict[key] = shards_dict[f"{tgt}-{src}"]
self._num_shards_dict[split] = num_shards_dict
logger.info(f"[{split}] num of shards: {num_shards_dict}")
return num_shards_dict
@classmethod
def get_shard_id(cls, num_shards, epoch, shard_epoch=None):
shard = epoch if shard_epoch is None else shard_epoch
shard = (shard - 1) % num_shards
return shard
def get_split_data_path(self, paths, epoch, shard_epoch, num_shards):
path = paths[self.get_shard_id(num_shards, epoch, shard_epoch)]
return path
def get_split_data_param_list(self, split, epoch, shard_epoch=None):
# TODO: to extend with extra datasets and keys and loop over different shard data paths
param_list = []
data_paths, lang_pairs = self.get_data_paths_and_lang_pairs(split)
logger.info(f"langtoks settings: {self.args.langtoks}")
split_num_shards_dict = self.get_split_num_data_shards(split)
for data_category, paths in data_paths.items():
if data_category not in lang_pairs:
continue
paths = utils.split_paths(paths)
assert len(paths) > 0
if len(paths) > 1:
self._has_sharded_data = True
if split != getattr(self.args, "train_subset", None):
# if not training data set, use the first shard for valid and test
paths = paths[:1]
if data_category in self.args.langtoks:
lang_tok_spec = self.args.langtoks[data_category]
else:
# default to None
lang_tok_spec = (None, None)
# infer langcode
lang_dirs = [
lang_pair.split("-") for lang_pair in lang_pairs[data_category]
]
lang_dirs = [x if len(x) > 1 else (x[0], x[0]) for x in lang_dirs]
for src, tgt in lang_dirs:
assert src is not None or data_category == "mono_dae", (
f"error: src={src}, " "tgt={tgt} for data_category={data_category}"
)
# logger.info(f"preparing param for {data_category}: {src} - {tgt}")
key = self.get_dataset_key(data_category, src, tgt)
data_path = self.get_split_data_path(
paths, epoch, shard_epoch, split_num_shards_dict[key]
)
param_list.append(
{
"key": key,
"data_path": data_path,
"split": split,
"src": src,
"src_dict": self.dicts[src]
if src and data_category != "mono_dae"
else None,
"tgt": tgt,
"tgt_dict": self.dicts[tgt],
"data_category": data_category,
"langtok_spec": lang_tok_spec,
}
)
return param_list
def get_train_dataset_sizes(
self, data_param_list, datasets, epoch, shard_epoch=None
):
num_shards = [
self.get_split_num_data_shards(param["split"])[param["key"]]
for param in data_param_list
]
data_sizes = []
for (key, d), num_shard in zip(datasets, num_shards):
my_data_sizes = self._training_data_sizes[key]
shard_ind = self.get_shard_id(num_shard, epoch, shard_epoch)
if shard_ind not in my_data_sizes:
my_data_sizes[shard_ind] = len(d)
known_size = max(my_data_sizes.values())
data_sizes.append(
# If we don't know the data size of the shard yet,
# use the the max known data size to approximate.
# Note that we preprocess shards by a designated shard size
# and put any remaining data at the end into the last shard so
# the max shard size approximation is almost correct before loading
# the last shard; after loading the last shard, it will have the
# exact data sizes of the whole data size.
(key, sum(my_data_sizes.get(i, known_size) for i in range(num_shard)))
)
logger.info(
f"estimated total data sizes of all shards used in sampling ratios: {data_sizes}. "
"Note that if the data a shard has not been loaded yet, use the max known data size to approximate"
)
return [s for _, s in data_sizes]
def get_train_sampling_ratios(
self, data_param_list, datasets, epoch=1, shard_epoch=None
):
data_sizes = self.get_train_dataset_sizes(
data_param_list, datasets, epoch, shard_epoch
)
sampling_func = self.sampling_method.sampling_method_selector()
sample_ratios = sampling_func(data_sizes) if sampling_func is not None else None
return sample_ratios
def get_sampling_ratios(self, data_param_list, datasets, epoch, shard_epoch=None):
if self.args.sampling_weights_from_file:
weights = load_sampling_weights(self.args.sampling_weights_from_file)
sample_ratios = [weights[k] for k, _ in datasets]
logger.info(
"| ignoring --sampling-weights when loadding sampling weights "
f"from file {self.args.sampling_weights_from_file}"
)
elif self.args.sampling_weights:
sample_ratios = [self.args.sampling_weights[k] for k, _ in datasets]
else:
sample_ratios = self.get_train_sampling_ratios(
data_param_list, datasets, epoch, shard_epoch
)
if sample_ratios is not None:
logger.info(
"| Upsample ratios: {}".format(
list(zip(map(lambda x: x["key"], data_param_list), sample_ratios))
)
)
assert len(sample_ratios) == len(datasets)
return sample_ratios
def load_split_datasets(
self, split, training, epoch=1, combine=False, shard_epoch=None, **kwargs
):
data_param_list = self.get_split_data_param_list(
split, epoch, shard_epoch=shard_epoch
)
langpairs_sharing_datasets = (
{} if self.args.enable_reservsed_directions_shared_datasets else None
)
datasets = [
(
param["key"],
self.load_a_dataset(
combine=combine,
langpairs_sharing_datasets=langpairs_sharing_datasets,
**param,
),
)
for param in data_param_list
]
return datasets, data_param_list
def load_into_concat_dataset(self, split, datasets, data_param_list):
if self.args.lang_tok_replacing_bos_eos:
# TODO: to investigate why TransformEosLangPairDataset doesn't work with ConcatDataset
return SampledMultiDataset(
OrderedDict(datasets),
sampling_ratios=None,
eval_key=None,
collate_format=CollateFormat.single,
virtual_size=None,
split=split,
)
return ConcatDataset([d for _, d in datasets])
def load_sampled_multi_epoch_dataset(
self, split, training, epoch=0, combine=False, shard_epoch=None, **kwargs
):
datasets, data_param_list = self.load_split_datasets(
split, training, epoch, combine, shard_epoch=shard_epoch, **kwargs
)
if training and split == getattr(self.args, "train_subset", None):
sample_ratios = self.get_sampling_ratios(data_param_list, datasets, epoch)
return SampledMultiEpochDataset(
OrderedDict(datasets),
epoch=epoch,
shard_epoch=shard_epoch,
# valid and test datasets will be degenerate to concating datasets:
sampling_ratios=sample_ratios,
eval_key=None,
collate_format=CollateFormat.single,
virtual_size=self.args.virtual_data_size,
split=split,
virtual_epoch_size=self.args.virtual_epoch_size,
# if not using lang_tok altering, simplified to use the same collater
shared_collater=self._shared_collater(),
)
else:
return self.load_into_concat_dataset(split, datasets, data_param_list)
from enum import Enum
from typing import Dict, List, Optional, Sequence
import torch
from fairseq.data import Dictionary
class EncoderLangtok(Enum):
"""
Prepend to the beginning of source sentence either the
source or target language token. (src/tgt).
"""
src = "src"
tgt = "tgt"
class LangTokSpec(Enum):
main = "main"
mono_dae = "mono_dae"
class LangTokStyle(Enum):
multilingual = "multilingual"
mbart = "mbart"
@torch.jit.export
def get_lang_tok(
lang: str, lang_tok_style: str, spec: str = LangTokSpec.main.value
) -> str:
# TOKEN_STYLES can't be defined outside this fn since it needs to be
# TorchScriptable.
TOKEN_STYLES: Dict[str, str] = {
LangTokStyle.mbart.value: "[{}]",
LangTokStyle.multilingual.value: "__{}__",
}
if spec.endswith("dae"):
lang = f"{lang}_dae"
elif spec.endswith("mined"):
lang = f"{lang}_mined"
style = TOKEN_STYLES[lang_tok_style]
return style.format(lang)
def augment_dictionary(
dictionary: Dictionary,
language_list: List[str],
lang_tok_style: str,
langtoks_specs: Sequence[str] = (LangTokSpec.main.value,),
extra_data: Optional[Dict[str, str]] = None,
) -> None:
for spec in langtoks_specs:
for language in language_list:
dictionary.add_symbol(
get_lang_tok(lang=language, lang_tok_style=lang_tok_style, spec=spec)
)
if lang_tok_style == LangTokStyle.mbart.value or (
extra_data is not None and LangTokSpec.mono_dae.value in extra_data
):
dictionary.add_symbol("<mask>")
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import datetime
import hashlib
import logging
import time
from bisect import bisect_right
from collections import OrderedDict, defaultdict
from enum import Enum
from typing import List
import numpy as np
import torch
from fairseq import distributed_utils
from fairseq.data import FairseqDataset, data_utils
def get_time_gap(s, e):
return (
datetime.datetime.fromtimestamp(e) - datetime.datetime.fromtimestamp(s)
).__str__()
logger = logging.getLogger(__name__)
def default_virtual_size_func(datasets, ratios, max_scale_up=1.5):
sizes = [len(d) for d in datasets]
if ratios is None:
return sum(sizes)
largest_idx = np.argmax(sizes)
largest_r = ratios[largest_idx]
largest_s = sizes[largest_idx]
# set virtual sizes relative to the largest dataset
virtual_sizes = [(r / largest_r) * largest_s for r in ratios]
vsize = sum(virtual_sizes)
max_size = sum(sizes) * max_scale_up
return int(vsize if vsize < max_size else max_size)
class CollateFormat(Enum):
single = 1
ordered_dict = 2
class SampledMultiDataset(FairseqDataset):
"""Samples from multiple sub-datasets according to given sampling ratios.
Args:
datasets (
List[~torch.utils.data.Dataset]
or OrderedDict[str, ~torch.utils.data.Dataset]
): datasets
sampling_ratios (List[float]): list of probability of each dataset to be sampled
(default: None, which corresponds to concatenating all dataset together).
seed (int): RNG seed to use (default: 2).
epoch (int): starting epoch number (default: 1).
eval_key (str, optional): a key used at evaluation time that causes
this instance to pass-through batches from *datasets[eval_key]*.
collate_format (CollateFormat): collater output format, either CollateFormat.ordered_dict or
CollateFormat.single (default: CollateFormat.single) where CollateFormat.single configures
the collater to output batches of data mixed from all sub-datasets,
and CollateFormat.ordered_dict configures the collater to output a dictionary of batches indexed by keys
of sub-datasets.
Note that not all sub-datasets will present in a single batch in both formats.
virtual_size (int, or callable): the expected virtual size of the dataset (default: default_virtual_size_func).
split (str): the split of the data, e.g. 'train', 'valid' or 'test'.
shared_collater (bool): whether or not to all sub-datasets have the same collater.
shuffle (bool): whether or not to shuffle data (default: True).
"""
def __init__(
self,
datasets,
sampling_ratios=None,
seed=2,
epoch=1,
eval_key=None,
collate_format=CollateFormat.single,
virtual_size=default_virtual_size_func,
split="",
shared_collater=False,
shuffle=True,
):
super().__init__()
self.shared_collater = shared_collater
self.shuffle = shuffle
if isinstance(datasets, OrderedDict):
self.keys = list(datasets.keys())
datasets = list(datasets.values())
elif isinstance(datasets, List):
self.keys = list(range(len(datasets)))
else:
raise AssertionError()
self.datasets = datasets
self.split = split
self.eval_key = eval_key
if self.eval_key is not None:
self.collate_format = CollateFormat.single
else:
self.collate_format = collate_format
self.seed = seed
self._cur_epoch = None
self.cumulated_sizes = None
# self.datasets[k][self._cur_indices[i]] is the data item i in this sampled dataset
# namely, data item i is sampled from the kth sub-dataset self.datasets[k]
# where self.cumulated_sizes[k-1] <= i < self.cumulated_sizes[k]
self._cur_indices = None
self._sizes = None
self.virtual_size_per_dataset = None
# caching properties
self._reset_cached_properties()
self.setup_sampling(sampling_ratios, virtual_size)
self.set_epoch(epoch)
def _clean_if_not_none(self, var_list):
for v in var_list:
if v is not None:
del v
def _reset_cached_properties(self):
self._clean_if_not_none([self._sizes, self._cur_indices])
self._sizes = None
self._cur_indices = None
def setup_sampling(self, sample_ratios, virtual_size):
sizes = [len(d) for d in self.datasets]
if sample_ratios is None:
# default back to concating datasets
self.sample_ratios = None
self.virtual_size = sum(sizes)
else:
if not isinstance(sample_ratios, np.ndarray):
sample_ratios = np.array(sample_ratios)
self.sample_ratios = sample_ratios
virtual_size = (
default_virtual_size_func if virtual_size is None else virtual_size
)
self.virtual_size = (
virtual_size(self.datasets, self.sample_ratios)
if callable(virtual_size)
else virtual_size
)
def adjust_sampling(self, epoch, sampling_ratios, virtual_size):
if sampling_ratios is not None:
sampling_ratios = self._sync_sample_ratios(sampling_ratios)
self.setup_sampling(sampling_ratios, virtual_size)
def _sync_sample_ratios(self, ratios):
# in case the ratios are not precisely the same across processes
# also to ensure every procresses update the ratios in the same pace
ratios = torch.DoubleTensor(ratios)
if torch.distributed.is_initialized():
if torch.cuda.is_available():
distributed_utils.all_reduce(ratios.cuda())
else:
distributed_utils.all_reduce(ratios)
ret = ratios.cpu()
ret = ret.numpy()
return ret
def random_choice_in_dataset(self, rng, dataset, choice_size):
if hasattr(dataset, "random_choice_in_dataset"):
return dataset.random_choice_in_dataset(rng, choice_size)
dataset_size = len(dataset)
return rng.choice(
dataset_size, choice_size, replace=(choice_size > dataset_size)
)
def get_virtual_indices(self, rng, datasets, sample_ratios, virtual_size):
def get_counts(sample_ratios):
counts = np.array([virtual_size * r for r in sample_ratios], dtype=np.int64)
diff = virtual_size - counts.sum()
assert diff >= 0
# due to round-offs, the size might not match the desired sizes
if diff > 0:
dataset_indices = rng.choice(
len(sample_ratios), size=diff, p=sample_ratios
)
for i in dataset_indices:
counts[i] += 1
return counts
def get_in_dataset_indices(datasets, sizes, sample_ratios):
counts = get_counts(sample_ratios)
# uniformally sample desired counts for each dataset
# if the desired counts are large, sample with replacement:
indices = [
self.random_choice_in_dataset(rng, d, c)
for c, d in zip(counts, datasets)
]
return indices
sizes = [len(d) for d in datasets]
if sample_ratios is None:
# default back to concating datasets
in_dataset_indices = [list(range(s)) for s in sizes]
virtual_sizes_per_dataset = sizes
else:
ratios = sample_ratios / sample_ratios.sum()
in_dataset_indices = get_in_dataset_indices(datasets, sizes, ratios)
virtual_sizes_per_dataset = [len(d) for d in in_dataset_indices]
virtual_sizes_per_dataset = np.array(virtual_sizes_per_dataset, np.int64)
cumulative_sizes = np.cumsum(virtual_sizes_per_dataset)
assert sum(virtual_sizes_per_dataset) == virtual_size
assert cumulative_sizes[-1] == virtual_size
if virtual_size < sum(sizes):
logger.warning(
f"virtual data size ({virtual_size}) is less than real data size ({sum(sizes)})."
" If virtual size << real data size, there could be data coverage issue."
)
in_dataset_indices = np.hstack(in_dataset_indices)
return in_dataset_indices, cumulative_sizes, virtual_sizes_per_dataset
def _get_dataset_and_index(self, index):
i = bisect_right(self.cumulated_sizes, index)
return i, self._cur_indices[index]
def __getitem__(self, index):
# self.__getitem__(index) returns self.datasets[k][self._cur_indices[index]]
# where k satisfies self.cumulated_sizes[k - 1] <= k < self.cumulated_sizes[k]
ds_idx, ds_sample_idx = self._get_dataset_and_index(index)
ret = (ds_idx, self.datasets[ds_idx][ds_sample_idx])
return ret
def num_tokens(self, index):
return self.sizes[index].max()
def size(self, index):
return self.sizes[index]
def __len__(self):
return self.virtual_size
def collater(self, samples, **extra_args):
"""Merge a list of samples to form a mini-batch."""
if len(samples) == 0:
return None
if self.collate_format == "ordered_dict":
collect_samples = [[] for _ in range(len(self.datasets))]
for (i, sample) in samples:
collect_samples[i].append(sample)
batch = OrderedDict(
[
(self.keys[i], dataset.collater(collect_samples[i]))
for i, (key, dataset) in enumerate(zip(self.keys, self.datasets))
if len(collect_samples[i]) > 0
]
)
elif self.shared_collater:
batch = self.datasets[0].collater([s for _, s in samples])
else:
samples_dict = defaultdict(list)
pad_to_length = (
defaultdict(int)
if "pad_to_length" not in extra_args
else extra_args["pad_to_length"]
)
for ds_idx, s in samples:
pad_to_length["source"] = max(
pad_to_length["source"], s["source"].size(0)
)
if s["target"] is not None:
pad_to_length["target"] = max(
pad_to_length["target"], s["target"].size(0)
)
samples_dict[ds_idx].append(s)
batches = [
self.datasets[i].collater(samples_dict[i], pad_to_length=pad_to_length)
for i in range(len(self.datasets))
if len(samples_dict[i]) > 0
]
def straight_data(tensors):
batch = torch.cat(tensors, dim=0)
return batch
src_lengths = straight_data(
[b["net_input"]["src_lengths"] for b in batches]
)
src_lengths, sort_order = src_lengths.sort(descending=True)
def straight_order(tensors):
batch = straight_data(tensors)
return batch.index_select(0, sort_order)
batch = {
"id": straight_order([b["id"] for b in batches]),
"nsentences": sum(b["nsentences"] for b in batches),
"ntokens": sum(b["ntokens"] for b in batches),
"net_input": {
"src_tokens": straight_order(
[b["net_input"]["src_tokens"] for b in batches]
),
"src_lengths": src_lengths,
},
"target": straight_order([b["target"] for b in batches])
if batches[0]["target"] is not None
else None,
}
if "prev_output_tokens" in batches[0]["net_input"]:
batch["net_input"]["prev_output_tokens"] = straight_order(
[b["net_input"]["prev_output_tokens"] for b in batches]
)
if "src_lang_id" in batches[0]["net_input"]:
batch["net_input"]["src_lang_id"] = straight_order(
[b["net_input"]["src_lang_id"] for b in batches]
)
if "tgt_lang_id" in batches[0]:
batch["tgt_lang_id"] = straight_order(
[b["tgt_lang_id"] for b in batches]
)
return batch
@property
def sizes(self):
if self._sizes is not None:
return self._sizes
start_time = time.time()
in_sub_dataset_indices = [
self._cur_indices[
0 if i == 0 else self.cumulated_sizes[i - 1] : self.cumulated_sizes[i]
]
for i in range(len(self.datasets))
]
sub_dataset_sizes = [
d.sizes[indices]
for d, indices in zip(self.datasets, in_sub_dataset_indices)
]
self._sizes = np.vstack(sub_dataset_sizes)
logger.info(f"sizes() calling time: {get_time_gap(start_time, time.time())}")
return self._sizes
def ordered_indices(self):
if self.shuffle:
indices = np.random.permutation(len(self))
else:
indices = np.arange(len(self))
sizes = self.sizes
tgt_sizes = sizes[:, 1] if len(sizes.shape) > 0 and sizes.shape[1] > 1 else None
src_sizes = (
sizes[:, 0] if len(sizes.shape) > 0 and sizes.shape[1] > 1 else sizes
)
# sort by target length, then source length
if tgt_sizes is not None:
indices = indices[np.argsort(tgt_sizes[indices], kind="mergesort")]
sort_indices = indices[np.argsort(src_sizes[indices], kind="mergesort")]
return sort_indices
def prefetch(self, indices):
prefetch_indices = [[] for _ in range(len(self.datasets))]
for i in indices:
ds_idx, ds_sample_idx = self._get_dataset_and_index(i)
prefetch_indices[ds_idx].append(ds_sample_idx)
for i in range(len(prefetch_indices)):
self.datasets[i].prefetch(prefetch_indices[i])
@property
def can_reuse_epoch_itr_across_epochs(self):
return False
def set_epoch(self, epoch):
super().set_epoch(epoch)
if epoch == self._cur_epoch:
# re-enter so return
return
for d in self.datasets:
if hasattr(d, "set_epoch"):
d.set_epoch(epoch)
self._cur_epoch = epoch
self._establish_virtual_datasets()
def _establish_virtual_datasets(self):
if self.sample_ratios is None and self._cur_indices is not None:
# not a samping dataset, no need to resample if indices are already established
return
self._reset_cached_properties()
start_time = time.time()
# Generate a weighted sample of indices as a function of the
# random seed and the current epoch.
rng = np.random.RandomState(
[
int(
hashlib.sha1(
str(self.__class__.__name__).encode("utf-8")
).hexdigest(),
16,
)
% (2 ** 32),
self.seed % (2 ** 32), # global seed
self._cur_epoch, # epoch index,
]
)
self._clean_if_not_none(
[self.cumulated_sizes, self.virtual_size_per_dataset, self._sizes]
)
self._sizes = None
indices, cumulated_sizes, virtual_size_per_dataset = self.get_virtual_indices(
rng, self.datasets, self.sample_ratios, self.virtual_size
)
self._cur_indices = indices
self.cumulated_sizes = cumulated_sizes
self.virtual_size_per_dataset = virtual_size_per_dataset
raw_sizes = [len(d) for d in self.datasets]
sampled_sizes = self.virtual_size_per_dataset
logger.info(
f"[{self.split}] Raw sizes: {str(dict(zip(self.keys, raw_sizes)))}; "
f"raw total size: {sum(raw_sizes)}"
)
logger.info(
f"[{self.split}] Resampled sizes: {str(dict(zip(self.keys, sampled_sizes)))}; "
f"resampled total size: {sum(sampled_sizes)}"
)
if self.sample_ratios is not None:
logger.info(
f"[{self.split}] Upsampling ratios: {str(dict(zip(self.keys, self.sample_ratios)))}"
)
else:
logger.info(f"[{self.split}] A concat dataset")
logger.info(
f"[{self.split}] virtual dataset established time: {get_time_gap(start_time, time.time())}"
)
def filter_indices_by_size(self, indices, max_sizes):
"""Filter a list of sample indices. Remove those that are longer
than specified in max_sizes.
Args:
indices (np.array): original array of sample indices
max_sizes (int or list[int] or tuple[int]): max sample size,
can be defined separately for src and tgt (then list or tuple)
Returns:
np.array: filtered sample array
list: list of removed indices
"""
sizes = self.sizes
tgt_sizes = sizes[:, 1] if len(sizes.shape) > 0 and sizes.shape[1] > 1 else None
src_sizes = (
sizes[:, 0] if len(sizes.shape) > 0 and sizes.shape[1] > 1 else sizes
)
return data_utils.filter_paired_dataset_indices_by_size(
src_sizes, tgt_sizes, indices, max_sizes
)
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import hashlib
import logging
import math
import numpy as np
from fairseq.data import SampledMultiDataset
from .sampled_multi_dataset import CollateFormat, default_virtual_size_func
logger = logging.getLogger(__name__)
class SampledMultiEpochDataset(SampledMultiDataset):
"""Samples from multiple sub-datasets according to sampling ratios
using virtual epoch sizes to speed up dataloading.
Args:
datasets (
List[~torch.utils.data.Dataset]
or OrderedDict[str, ~torch.utils.data.Dataset]
): datasets
sampling_ratios (List[float]): list of probability of each dataset to be sampled
(default: None, which corresponds to concating all dataset together).
seed (int): RNG seed to use (default: 2).
epoch (int): starting epoch number (default: 1).
eval_key (str, optional): a key used at evaluation time that causes
this instance to pass-through batches from *datasets[eval_key]*.
collate_format (CollateFormat): collater output format, either CollateFormat.ordered_dict or
CollateFormat.single (default: CollateFormat.single) where CollateFormat.single configures
the collater to output batches of data mixed from all sub-datasets,
and CollateFormat.ordered_dict configures the collater to output a dictionary of batches indexed by keys
of sub-datasets.
Note that not all sub-datasets will present in a single batch in both formats.
virtual_size (int, or callable): the expected virtual size of the dataset (default: default_virtual_size_func).
split (str): the split of the data, e.g. 'train', 'valid' or 'test'.
virtual_epoch_size (int): virtual epoch size, the dataset will go through the data by
this virtual epoch size one by one to speed up data loading, e.g. indicing and filtering
can be performed whenever a virtual epoch is loaded without waiting for the whole dataset to be loaded.
shared_collater (bool): whether or not to all sub-datasets have the same collater.
shard_epoch (int): the real epoch number for shard selection.
shuffle (bool): whether or not to shuffle data (default: True).
"""
def __init__(
self,
datasets,
sampling_ratios=None,
seed=2,
epoch=1,
eval_key=None,
collate_format=CollateFormat.single,
virtual_size=default_virtual_size_func,
split="",
virtual_epoch_size=None,
shared_collater=False,
shard_epoch=1,
shuffle=True,
):
self.virtual_epoch_size = virtual_epoch_size
self._current_epoch_start_index = None
self._random_global_indices = None
self.shard_epoch = shard_epoch if shard_epoch is not None else 1
self.load_next_shard = None
self._epoch_sizes = None
super().__init__(
datasets=datasets,
sampling_ratios=sampling_ratios,
seed=seed,
epoch=epoch,
eval_key=eval_key,
collate_format=collate_format,
virtual_size=virtual_size,
split=split,
shared_collater=shared_collater,
shuffle=shuffle,
)
def _setup(self, epoch):
self.virtual_epoch_size = (
self.virtual_epoch_size
if self.virtual_epoch_size is not None
else self.virtual_size
)
if self.virtual_epoch_size > self.virtual_size:
logger.warning(
f"virtual epoch size {self.virtual_epoch_size} "
f"is greater than virtual dataset size {self.virtual_size}"
)
self.virtual_epoch_size = self.virtual_size
self.num_virtual_epochs = math.ceil(self.virtual_size / self.virtual_epoch_size)
self._current_epoch_start_index = self._get_epoch_start_index(epoch)
logger.info(
f"virtual epoch size {self.virtual_epoch_size}; virtual dataset size {self.virtual_size}"
)
def _map_epoch_index_to_global(self, index):
index = self._current_epoch_start_index + index
# add randomness
return self._random_global_indices[index]
@property
def sizes(self):
if self._epoch_sizes is not None:
return self._epoch_sizes
_sizes = super().sizes
indices = self._random_global_indices[
self._current_epoch_start_index : self._current_epoch_start_index
+ len(self)
]
self._epoch_sizes = _sizes[indices]
# del super()._sizes to save memory
del self._sizes
self._sizes = None
return self._epoch_sizes
def _get_dataset_and_index(self, index):
i = self._map_epoch_index_to_global(index)
return super()._get_dataset_and_index(i)
def __len__(self):
return (
self.virtual_epoch_size
if self._current_epoch_start_index + self.virtual_epoch_size
< self.virtual_size
else self.virtual_size - self._current_epoch_start_index
)
def set_epoch(self, epoch):
if self._current_epoch_start_index is None:
# initializing epoch idnices of a virtual dataset
self._setup(epoch)
self._next_virtual_epoch(epoch)
else:
# working on already intialized epoch indices
if epoch == self._cur_epoch:
# re-enter so return
return
self._next_virtual_epoch(epoch)
def _get_epoch_start_index(self, epoch):
assert epoch >= 1 # fairseq is using 1-based epoch everywhere
return ((epoch - 1) % self.num_virtual_epochs) * self.virtual_epoch_size
def _next_global_indices(self, epoch):
rng = np.random.RandomState(
[
int(
hashlib.sha1(
str(self.__class__.__name__).encode("utf-8")
).hexdigest(),
16,
)
% (2 ** 32),
self.seed % (2 ** 32), # global seed
epoch, # epoch index,
]
)
del self._random_global_indices
self._random_global_indices = rng.choice(
self.virtual_size, self.virtual_size, replace=False
)
if self.load_next_shard is None:
self.load_next_shard = False
else:
# increase shard epoch for next loading
self.shard_epoch += 1
self.load_next_shard = True
logger.info(
"to load next epoch/shard in next load_dataset: "
f"epoch={epoch}/shard_epoch={self.shard_epoch}"
)
def _next_virtual_epoch(self, epoch):
index = self._get_epoch_start_index(epoch)
if index == 0 or self._random_global_indices is None:
# need to start from the beginning,
# so call super().set_epoch(epoch) to establish the global virtual indices
logger.info(
"establishing a new set of global virtual indices for "
f"epoch={epoch}/shard_epoch={self.shard_epoch}"
)
super().set_epoch(epoch)
self._next_global_indices(epoch)
else:
self._cur_epoch = epoch
# reset cache sizes and ordered_indices for the epoch after moving to a new epoch
self._clean_if_not_none(
[
self._epoch_sizes,
]
)
self._epoch_sizes = None
self._current_epoch_start_index = index
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import logging
from typing import List
logger = logging.getLogger(__name__)
def uniform(dataset_sizes: List[int]):
return [1.0] * len(dataset_sizes)
def temperature_sampling(dataset_sizes, temp):
total_size = sum(dataset_sizes)
return [(size / total_size) ** (1.0 / temp) for size in dataset_sizes]
def make_temperature_sampling(temp=1.0):
def sampling_func(dataset_sizes):
return temperature_sampling(dataset_sizes, temp)
return sampling_func
def make_ratio_sampling(ratios):
def sampling_func(dataset_sizes):
return ratios
return sampling_func
class SamplingMethod:
@staticmethod
def add_arguments(parser):
parser.add_argument(
"--sampling-method",
choices=[
"uniform",
"temperature",
"concat",
"RoundRobin",
],
type=str,
default="concat",
help="The method to sample data per language pairs",
)
parser.add_argument(
"--sampling-temperature",
default=1.5,
type=float,
help="only work with --sampling-method temperature",
)
@staticmethod
def build_sampler(args, task):
return SamplingMethod(args, task)
def __init__(self, args, task):
self.args = args
self.task = task
def is_adaptive(self):
return False
def sampling_method_selector(self):
args = self.args
logger.info(f"selected sampler: {args.sampling_method}")
if args.sampling_method == "uniform":
return uniform
elif args.sampling_method == "temperature" or self.is_adaptive():
return make_temperature_sampling(float(args.sampling_temperature))
else:
# default to concating all data set together
return None
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment