Commit 18d27e00 authored by wangwei990215's avatar wangwei990215
Browse files

initial commit

parent 541f4c7a
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import itertools
import json
import logging
import math
import os
from collections import OrderedDict, defaultdict
from fairseq import utils
from fairseq.data import (
AppendTokenDataset,
ConcatDataset,
Dictionary,
LanguagePairDataset,
PrependTokenDataset,
SampledMultiDataset,
SampledMultiEpochDataset,
StripTokenDataset,
TransformEosLangPairDataset,
TruncateDataset,
data_utils,
indexed_dataset,
)
from fairseq.data.multilingual.multilingual_utils import (
EncoderLangtok,
LangTokSpec,
LangTokStyle,
augment_dictionary,
get_lang_tok,
)
from fairseq.data.multilingual.sampled_multi_dataset import CollateFormat
from fairseq.file_io import PathManager
from fairseq.utils import FileContentsAction, csv_str_list, eval_str_dict
logger = logging.getLogger(__name__)
def _lang_id(dic: Dictionary, lang: str):
"""Return language ID index."""
idx = dic.index(lang)
assert idx != dic.unk_index, "cannot find language ID for lang {}".format(lang)
return idx
def load_sampling_weights(from_file):
with open(from_file) as f:
weights = json.load(f)
return weights
class MultilingualDatasetManager(object):
def __init__(self, args, lang_pairs, langs, dicts, sampling_method):
super().__init__()
self.args = args
self.seed = args.seed
self.lang_pairs = lang_pairs
self.langs = langs
self.dicts = dicts
self.lang_dict = self.create_lang_dictionary(self.langs)
self.sampling_method = sampling_method
self.sampling_scheduler = None
self._has_sharded_data = False
self._num_shards_dict = {}
self._training_data_sizes = defaultdict(lambda: {})
@classmethod
def setup_data_manager(cls, args, lang_pairs, langs, dicts, sampling_method):
return MultilingualDatasetManager(
args, lang_pairs, langs, dicts, sampling_method
)
@staticmethod
def add_args(parser):
parser.add_argument(
"data",
help="colon separated path to data directories list, \
will be iterated upon during epochs in round-robin manner",
action=FileContentsAction,
)
parser.add_argument(
"--langs",
default=None,
type=csv_str_list,
help="a list of languages comma sperated languages which can appear in lang-pairs; "
"note that the ordering determines language token IDs",
)
parser.add_argument(
"--lang-dict",
default=None,
type=str,
help="an external file which contains a list of "
"languages which can appear in lang-pairs; "
"note that the ordering determines language token IDs; "
"--langs and --lang-dict are two exclusive options",
)
parser.add_argument(
"--lang-tok-style",
default=LangTokStyle.multilingual.value,
type=str,
choices=[LangTokStyle.multilingual.value, LangTokStyle.mbart.value],
help="language token styles",
)
parser.add_argument(
"--load-alignments",
action="store_true",
help="load the binarized alignments",
)
parser.add_argument(
"--left-pad-source",
default="True",
type=str,
metavar="BOOL",
help="pad the source on the left",
)
parser.add_argument(
"--left-pad-target",
default="False",
type=str,
metavar="BOOL",
help="pad the target on the left",
)
parser.add_argument(
"--max-source-positions",
default=1024,
type=int,
metavar="N",
help="max number of tokens in the source sequence",
)
parser.add_argument(
"--max-target-positions",
default=1024,
type=int,
metavar="N",
help="max number of tokens in the target sequence",
)
parser.add_argument(
"--upsample-primary",
default=1,
type=int,
help="amount to upsample primary dataset",
)
parser.add_argument(
"--truncate-source",
action="store_true",
default=False,
help="truncate source to max-source-positions",
)
parser.add_argument(
"--encoder-langtok",
default=None,
type=str,
choices=[EncoderLangtok.src.value, EncoderLangtok.tgt.value],
metavar="SRCTGT",
help="prepend to the beginning of source sentence the source or target "
"language token. (src/tgt)",
)
parser.add_argument(
"--decoder-langtok",
action="store_true",
help="prepend to the beginning of target sentence the target language token",
)
parser.add_argument(
"--lang-tok-replacing-bos-eos", action="store_true", default=False
)
parser.add_argument(
"--enable-lang-ids",
default=False,
action="store_true",
help="whether to include language IDs in samples",
)
parser.add_argument(
"--enable-reservsed-directions-shared-datasets",
default=False,
action="store_true",
help="whether to allow datasets be used in reversed directions",
)
parser.add_argument(
"--extra-data",
help='a dictionary of data name to this path, \
e.g. {"mined", path_to_mined_data, "denoised": path_to_denoised_data}',
type=lambda uf: eval_str_dict(uf, type=str),
default=None,
)
parser.add_argument(
"--extra-lang-pairs",
help='a dictionary of data name to the language pairs they serve, \
e.g. {"mined": comma-separated-lang-pairs, "denoised": comma-separated-lang-pairs}',
type=lambda uf: eval_str_dict(uf, type=str),
default=None,
)
parser.add_argument(
"--fixed-dictionary",
help="Fixed dictionary to use with model path",
default=None,
type=str,
)
parser.add_argument(
"--langtoks-specs",
help='a list of comma separated data types that a set of language tokens to be specialized for, \
e.g. "main,dae,mined". There will be a set of language tokens added to the vocab to \
distinguish languages in different training data types. If not specified, default language \
tokens per languages will be added',
default=LangTokSpec.main.value,
type=csv_str_list,
)
parser.add_argument(
"--langtoks",
help='a dictionary of how to add language tokens, \
e.g. {"mined": (None, "tgt"), "mono_dae": ("src.dae", "tgt"), "main": \
("src", "tgt")}, or {"mined": ("src.mined", "tgt")}',
default=None,
type=lambda uf: eval_str_dict(uf, type=str),
)
parser.add_argument(
"--sampling-weights-from-file",
help='a file contain a python dictionary of how to sample data sets, \
e.g. { "main:en_XX-es_XX": 0.2, "mined:en_XX-pt_XX": 0.5, \
"mono_dae:es_XX-es_XX: 0.3, "main:en_xx-fr_XX": 0.8 }',
default=None,
type=str,
)
parser.add_argument(
"--sampling-weights",
help='a dictionary of how to sample data sets, \
e.g. { "main:en_XX-es_XX": 0.2, "mined:en_XX-pt_XX": 0.5, \
"mono_dae:es_XX-es_XX: 0.3, "main:en_xx-fr_XX": 0.8 }',
default=None,
type=lambda uf: eval_str_dict(uf, type=str),
)
parser.add_argument(
"--virtual-epoch-size",
default=1000000,
type=int,
help="virtual epoch size to speed up data loading",
)
parser.add_argument(
"--virtual-data-size",
default=None,
type=int,
help="virtual data size of the whole joint dataset to speed"
"up data loading and have specific dynamic sampling strategy interval",
)
@classmethod
def load_langs(cls, args, **kwargs):
if args.lang_dict and args.langs:
raise ValueError("--langs and --lang-dict can not both be specified")
if args.lang_dict is None and args.langs is None:
logger.warning(
"External language dictionary is not provided; "
"use lang-pairs to infer the set of supported languages. "
"The language ordering is not stable which might cause "
"misalignment in pretraining and finetuning."
)
# infer from lang_pairs as it is
langs = list(
{x for lang_pair in args.lang_pairs for x in lang_pair.split("-")}
)
langs = sorted(langs)
logger.info(f"inferred language list: {langs}")
elif args.lang_dict:
with open(
PathManager.get_local_path(args.lang_dict), "r", encoding="utf-8"
) as f:
langs = [lang.strip() for lang in f.readlines() if lang.strip()]
logger.info(
f"loaded language list from {args.lang_dict} as they are ordered in file"
)
elif args.langs:
langs = args.langs
logger.info(
f"parsed the language list as they are ordered in the option: {langs}"
)
return langs
def has_sharded_data(self, split):
return self._has_sharded_data and split == getattr(
self.args, "train_subset", None
)
def _shared_collater(self):
return not (self.args.extra_data and "mono_dae" in self.args.extra_data) and (
not self.args.lang_tok_replacing_bos_eos
)
def estimate_global_pass_epoch(self, epoch):
if self.args.virtual_epoch_size is None or self.args.virtual_data_size is None:
return None
# one epoch more for remaining data in each shard
virtual_epochs_per_shard = math.ceil(
self.args.virtual_data_size / self.args.virtual_epoch_size
)
# note that fairseq epoch / shard_epoch starts from 1
shard_epoch = (epoch - 1) // virtual_epochs_per_shard + 1
return shard_epoch
@classmethod
def prepare(cls, load_dictionary, args, **kargs):
args.left_pad_source = utils.eval_bool(args.left_pad_source)
args.left_pad_target = utils.eval_bool(args.left_pad_target)
if not hasattr(args, "shuffle_instance"):
args.shuffle_instance = False
if args.langtoks is None:
args.langtoks = {}
if "main" not in args.langtoks:
src_langtok_spec = args.encoder_langtok if args.encoder_langtok else None
tgt_langtok_spec = "tgt" if args.decoder_langtok else None
args.langtoks["main"] = (src_langtok_spec, tgt_langtok_spec)
def check_langs(langs, pairs):
messages = []
for src, tgt in pairs:
if src not in langs or tgt not in langs:
messages.append(
f"language pair {src}-{tgt} contains languages "
"that are not in the language dictionary"
)
if len(messages) > 0:
raise ValueError(" ".join(messages) + f"; langs: {langs}")
if args.lang_pairs is None:
raise ValueError(
"--lang-pairs is required. List all the language pairs in the training objective."
)
if isinstance(args.lang_pairs, str):
args.lang_pairs = args.lang_pairs.split(",")
if args.source_lang is not None or args.target_lang is not None:
training = False
else:
training = True
language_list = cls.load_langs(args, **kargs)
check_langs(
language_list,
(
[p.split("-") for p in args.lang_pairs]
if training
else [(args.source_lang, args.target_lang)]
),
)
# load dictionaries
if training:
extra_lang_pairs = (
list(
{p for _, v in args.extra_lang_pairs.items() for p in v.split(",")}
)
if args.extra_lang_pairs
else []
)
langs_to_load_dicts = sorted(
{x for p in args.lang_pairs + extra_lang_pairs for x in p.split("-")}
)
else:
langs_to_load_dicts = sorted([args.source_lang, args.target_lang])
dicts = OrderedDict()
paths = utils.split_paths(args.data)
assert len(paths) > 0
for lang in langs_to_load_dicts:
if args.fixed_dictionary is not None:
dicts[lang] = load_dictionary(args.fixed_dictionary)
else:
dicts[lang] = load_dictionary(
os.path.join(paths[0], "dict.{}.txt".format(lang))
)
augment_dictionary(
dictionary=dicts[lang],
language_list=language_list,
lang_tok_style=args.lang_tok_style,
langtoks_specs=args.langtoks_specs,
extra_data=args.extra_data,
)
if len(dicts) > 0:
assert dicts[lang].pad() == dicts[langs_to_load_dicts[0]].pad()
assert dicts[lang].eos() == dicts[langs_to_load_dicts[0]].eos()
assert dicts[lang].unk() == dicts[langs_to_load_dicts[0]].unk()
logger.info("[{}] dictionary: {} types".format(lang, len(dicts[lang])))
return language_list, dicts, training
@classmethod
def create_lang_dictionary(cls, langs):
unk = "<unk>"
# hack to remove symbols other than unk as they are not needed by lang dict
lang_dict = Dictionary(pad=unk, eos=unk, unk=unk, bos=unk)
for lang in langs:
lang_dict.add_symbol(lang)
return lang_dict
@classmethod
def get_langtok_index(cls, lang_tok, dic):
idx = dic.index(lang_tok)
assert (
idx != dic.unk_index
), "cannot find language token {} in the dictionary".format(lang_tok)
return idx
def get_encoder_langtok(self, src_lang, tgt_lang, spec=None):
if spec is None:
return None
if spec and spec.startswith("src"):
if src_lang is None:
return None
langtok = get_lang_tok(
lang=src_lang, lang_tok_style=self.args.lang_tok_style, spec=spec
)
else:
if tgt_lang is None:
return None
langtok = get_lang_tok(
lang=tgt_lang, lang_tok_style=self.args.lang_tok_style, spec=spec
)
return self.get_langtok_index(
langtok, self.dicts[src_lang if src_lang else tgt_lang]
)
def get_decoder_langtok(self, tgt_lang, spec=None):
if spec is None:
return None
langtok = get_lang_tok(
lang=tgt_lang, lang_tok_style=self.args.lang_tok_style, spec=spec
)
return self.get_langtok_index(langtok, self.dicts[tgt_lang])
@classmethod
def load_data(cls, path, vdict, impl):
dataset = data_utils.load_indexed_dataset(path, vdict, impl)
return dataset
@classmethod
def split_exists(cls, split, src, tgt, lang, data_path, dataset_impl):
filename = os.path.join(data_path, "{}.{}-{}.{}".format(split, src, tgt, lang))
return indexed_dataset.dataset_exists(filename, impl=dataset_impl)
def load_lang_dataset(
self,
data_path,
split,
src,
src_dict,
tgt,
tgt_dict,
combine,
dataset_impl,
upsample_primary,
max_source_positions,
prepend_bos=False,
load_alignments=False,
truncate_source=False,
):
src_datasets = []
tgt_datasets = []
for k in itertools.count():
split_k = split + (str(k) if k > 0 else "")
# infer langcode
if self.split_exists(split_k, src, tgt, src, data_path, dataset_impl):
prefix = os.path.join(data_path, "{}.{}-{}.".format(split_k, src, tgt))
elif self.split_exists(split_k, tgt, src, src, data_path, dataset_impl):
prefix = os.path.join(data_path, "{}.{}-{}.".format(split_k, tgt, src))
else:
if k > 0:
break
else:
logger.error(
f"Dataset not found: {data_path}, {split_k}, {src}, {tgt}"
)
raise FileNotFoundError(
"Dataset not found: {} ({})".format(split, data_path)
)
src_dataset = self.load_data(prefix + src, src_dict, dataset_impl)
if truncate_source:
src_dataset = AppendTokenDataset(
TruncateDataset(
StripTokenDataset(src_dataset, src_dict.eos()),
max_source_positions - 1,
),
src_dict.eos(),
)
src_datasets.append(src_dataset)
tgt_datasets.append(self.load_data(prefix + tgt, tgt_dict, dataset_impl))
logger.info(
"{} {} {}-{} {} examples".format(
data_path, split_k, src, tgt, len(src_datasets[-1])
)
)
if not combine:
break
assert len(src_datasets) == len(tgt_datasets)
if len(src_datasets) == 1:
src_dataset, tgt_dataset = src_datasets[0], tgt_datasets[0]
else:
sample_ratios = [1] * len(src_datasets)
sample_ratios[0] = upsample_primary
src_dataset = ConcatDataset(src_datasets, sample_ratios)
tgt_dataset = ConcatDataset(tgt_datasets, sample_ratios)
if prepend_bos:
assert hasattr(src_dict, "bos_index") and hasattr(tgt_dict, "bos_index")
src_dataset = PrependTokenDataset(src_dataset, src_dict.bos())
tgt_dataset = PrependTokenDataset(tgt_dataset, tgt_dict.bos())
align_dataset = None
if load_alignments:
align_path = os.path.join(
data_path, "{}.align.{}-{}".format(split, src, tgt)
)
if indexed_dataset.dataset_exists(align_path, impl=dataset_impl):
align_dataset = data_utils.load_indexed_dataset(
align_path, None, dataset_impl
)
return src_dataset, tgt_dataset, align_dataset
def load_langpair_dataset(
self,
data_path,
split,
src,
src_dict,
tgt,
tgt_dict,
combine,
dataset_impl,
upsample_primary,
left_pad_source,
left_pad_target,
max_source_positions,
max_target_positions,
prepend_bos=False,
load_alignments=False,
truncate_source=False,
src_dataset_transform_func=lambda dataset: dataset,
tgt_dataset_transform_func=lambda dataset: dataset,
src_lang_id=None,
tgt_lang_id=None,
langpairs_sharing_datasets=None,
):
norm_direction = "-".join(sorted([src, tgt]))
if langpairs_sharing_datasets is not None:
src_dataset = langpairs_sharing_datasets.get(
(data_path, split, norm_direction, src), "NotInCache"
)
tgt_dataset = langpairs_sharing_datasets.get(
(data_path, split, norm_direction, tgt), "NotInCache"
)
align_dataset = langpairs_sharing_datasets.get(
(data_path, split, norm_direction, src, tgt), "NotInCache"
)
# a hack: any one is not in cache, we need to reload them
if (
langpairs_sharing_datasets is None
or src_dataset == "NotInCache"
or tgt_dataset == "NotInCache"
or align_dataset == "NotInCache"
or split != getattr(self.args, "train_subset", None)
):
# source and target datasets can be reused in reversed directions to save memory
# reversed directions of valid and test data will not share source and target datasets
src_dataset, tgt_dataset, align_dataset = self.load_lang_dataset(
data_path,
split,
src,
src_dict,
tgt,
tgt_dict,
combine,
dataset_impl,
upsample_primary,
max_source_positions=max_source_positions,
prepend_bos=prepend_bos,
load_alignments=load_alignments,
truncate_source=truncate_source,
)
src_dataset = src_dataset_transform_func(src_dataset)
tgt_dataset = tgt_dataset_transform_func(tgt_dataset)
if langpairs_sharing_datasets is not None:
langpairs_sharing_datasets[
(data_path, split, norm_direction, src)
] = src_dataset
langpairs_sharing_datasets[
(data_path, split, norm_direction, tgt)
] = tgt_dataset
langpairs_sharing_datasets[
(data_path, split, norm_direction, src, tgt)
] = align_dataset
if align_dataset is None:
# no align data so flag the reverse direction as well in sharing
langpairs_sharing_datasets[
(data_path, split, norm_direction, tgt, src)
] = align_dataset
else:
logger.info(
f"Reusing source and target datasets of [{split}] {tgt}-{src} for reversed direction: "
f"[{split}] {src}-{tgt}: src length={len(src_dataset)}; tgt length={len(tgt_dataset)}"
)
return LanguagePairDataset(
src_dataset,
src_dataset.sizes,
src_dict,
tgt_dataset,
tgt_dataset.sizes if tgt_dataset is not None else None,
tgt_dict,
left_pad_source=left_pad_source,
left_pad_target=left_pad_target,
align_dataset=align_dataset,
src_lang_id=src_lang_id,
tgt_lang_id=tgt_lang_id,
)
def src_dataset_tranform_func(self, src_lang, tgt_lang, dataset, spec=None):
if self.args.lang_tok_replacing_bos_eos:
# it is handled by self.alter_dataset_langtok
# TODO: Unifiy with alter_dataset_langtok
return dataset
if spec is None:
return dataset
tok = self.get_encoder_langtok(src_lang, tgt_lang, spec)
if tok:
return PrependTokenDataset(dataset, tok)
return dataset
def tgt_dataset_tranform_func(self, source_lang, target_lang, dataset, spec=None):
if dataset is None:
# note that target dataset can be None during inference time
return None
if self.args.lang_tok_replacing_bos_eos:
# TODO: Unifiy with alter_dataset_langtok
# It is handled by self.alter_dataset_langtok.
# The complication in self.alter_dataset_langtok
# makes a unified framework difficult.
return dataset
# if not self.args.decoder_langtok:
if not spec:
return dataset
tok = self.get_decoder_langtok(target_lang, spec)
if tok:
return PrependTokenDataset(dataset, tok)
return dataset
def alter_dataset_langtok(
self,
lang_pair_dataset,
src_eos=None,
src_lang=None,
tgt_eos=None,
tgt_lang=None,
src_langtok_spec=None,
tgt_langtok_spec=None,
):
if src_langtok_spec is None and tgt_langtok_spec is None:
return lang_pair_dataset
new_src_eos = None
if (
src_langtok_spec is not None
and src_eos is not None
and (src_lang is not None or tgt_lang is not None)
):
new_src_eos = self.get_encoder_langtok(src_lang, tgt_lang, src_langtok_spec)
else:
src_eos = None
new_tgt_bos = None
if tgt_langtok_spec and tgt_eos is not None and tgt_lang is not None:
new_tgt_bos = self.get_decoder_langtok(tgt_lang, tgt_langtok_spec)
else:
tgt_eos = None
return TransformEosLangPairDataset(
lang_pair_dataset,
src_eos=src_eos,
new_src_eos=new_src_eos,
tgt_bos=tgt_eos,
new_tgt_bos=new_tgt_bos,
)
def load_a_dataset(
self,
split,
data_path,
src,
src_dict,
tgt,
tgt_dict,
combine,
prepend_bos=False,
langpairs_sharing_datasets=None,
data_category=None,
**extra_kwargs,
):
dataset_impl = self.args.dataset_impl
upsample_primary = self.args.upsample_primary
left_pad_source = self.args.left_pad_source
left_pad_target = self.args.left_pad_target
max_source_positions = self.args.max_source_positions
max_target_positions = self.args.max_target_positions
load_alignments = self.args.load_alignments
truncate_source = self.args.truncate_source
src_dataset_transform_func = self.src_dataset_tranform_func
tgt_dataset_transform_func = self.tgt_dataset_tranform_func
enable_lang_ids = self.args.enable_lang_ids
lang_dictionary = self.lang_dict
src_langtok_spec, tgt_langtok_spec = extra_kwargs["langtok_spec"]
src_langtok = self.get_encoder_langtok(src, tgt, src_langtok_spec)
tgt_langtok = self.get_decoder_langtok(tgt, tgt_langtok_spec)
logger.info(
f"{data_category}:{src}-{tgt} src_langtok: {src_langtok}; tgt_langtok: {tgt_langtok}"
)
langpair_ds = self.load_langpair_dataset(
data_path,
split,
src,
src_dict,
tgt,
tgt_dict,
combine,
dataset_impl,
upsample_primary,
left_pad_source,
left_pad_target,
max_source_positions,
max_target_positions,
prepend_bos,
load_alignments,
truncate_source,
src_dataset_transform_func=lambda dataset: src_dataset_transform_func(
src, tgt, dataset, src_langtok_spec
),
tgt_dataset_transform_func=lambda dataset: tgt_dataset_transform_func(
src, tgt, dataset, tgt_langtok_spec
),
src_lang_id=_lang_id(lang_dictionary, src)
if enable_lang_ids and lang_dictionary is not None
else None,
tgt_lang_id=_lang_id(lang_dictionary, tgt)
if enable_lang_ids and lang_dictionary is not None
else None,
langpairs_sharing_datasets=langpairs_sharing_datasets,
)
# TODO: handle modified lang toks for mined data and dae data
if self.args.lang_tok_replacing_bos_eos:
ds = self.alter_dataset_langtok(
langpair_ds,
src_eos=self.dicts[src if src else tgt].eos(),
src_lang=src,
tgt_eos=self.dicts[tgt].eos(),
tgt_lang=tgt,
src_langtok_spec=src_langtok_spec,
tgt_langtok_spec=tgt_langtok_spec,
)
else:
ds = langpair_ds
return ds
def load_split_langpair_datasets(self, split, data_param_list):
datasets = []
langpairs_sharing_datasets = (
{} if self.args.enable_reservsed_directions_shared_datasets else None
)
for param in data_param_list:
ds = self.load_a_dataset(
split=split,
langpairs_sharing_datasets=langpairs_sharing_datasets,
**param,
)
datasets.append(ds)
return datasets
def get_data_paths_and_lang_pairs(self, split):
datapaths = {"main": self.args.data}
lang_pairs = {"main": self.lang_pairs}
if split == getattr(self.args, "train_subset", None):
# only training data can have extra data and extra language pairs
if self.args.extra_data:
extra_datapaths = self.args.extra_data
datapaths.update(extra_datapaths)
if self.args.extra_lang_pairs:
extra_lang_pairs = {
k: v.split(",") for k, v in self.args.extra_lang_pairs.items()
}
lang_pairs.update(extra_lang_pairs)
return datapaths, lang_pairs
@classmethod
def get_dataset_key(cls, data_category, src, tgt):
return f"{data_category}:{src}-{tgt}"
@classmethod
def _get_shard_num_dict(cls, split, paths):
shards = defaultdict(int)
for path in paths:
files = PathManager.ls(path)
directions = set()
for f in files:
if f.startswith(split) and f.endswith(".idx"):
# idx files of the form "{split}.{src}-{tgt}.{lang}.idx"
direction = f.split(".")[-3]
directions.add(direction)
for direction in directions:
shards[direction] += 1
return shards
def get_split_num_data_shards(self, split):
if split in self._num_shards_dict:
return self._num_shards_dict[split]
num_shards_dict = {}
data_paths, lang_pairs = self.get_data_paths_and_lang_pairs(split)
for data_category, paths in data_paths.items():
if data_category not in lang_pairs:
continue
paths = utils.split_paths(paths)
shards_dict = self._get_shard_num_dict(split, paths)
lang_dirs = [
lang_pair.split("-") for lang_pair in lang_pairs[data_category]
]
lang_dirs = [x if len(x) > 1 else (x[0], x[0]) for x in lang_dirs]
for src, tgt in lang_dirs:
key = self.get_dataset_key(data_category, src, tgt)
if "mono_" in data_category:
# monolingual data requires tgt only
assert src is None or src == tgt, (
f"error: src={src}, "
"tgt={tgt} for data_category={data_category}"
)
num_shards_dict[key] = shards_dict[tgt]
else:
if f"{src}-{tgt}" in shards_dict:
num_shards_dict[key] = shards_dict[f"{src}-{tgt}"]
elif f"{tgt}-{src}" in shards_dict:
# follow the fairseq tradition to use reversed direction data if it is not available
num_shards_dict[key] = shards_dict[f"{tgt}-{src}"]
self._num_shards_dict[split] = num_shards_dict
logger.info(f"[{split}] num of shards: {num_shards_dict}")
return num_shards_dict
@classmethod
def get_shard_id(cls, num_shards, epoch, shard_epoch=None):
shard = epoch if shard_epoch is None else shard_epoch
shard = (shard - 1) % num_shards
return shard
def get_split_data_path(self, paths, epoch, shard_epoch, num_shards):
path = paths[self.get_shard_id(num_shards, epoch, shard_epoch)]
return path
def get_split_data_param_list(self, split, epoch, shard_epoch=None):
# TODO: to extend with extra datasets and keys and loop over different shard data paths
param_list = []
data_paths, lang_pairs = self.get_data_paths_and_lang_pairs(split)
logger.info(f"langtoks settings: {self.args.langtoks}")
split_num_shards_dict = self.get_split_num_data_shards(split)
for data_category, paths in data_paths.items():
if data_category not in lang_pairs:
continue
paths = utils.split_paths(paths)
assert len(paths) > 0
if len(paths) > 1:
self._has_sharded_data = True
if split != getattr(self.args, "train_subset", None):
# if not training data set, use the first shard for valid and test
paths = paths[:1]
if data_category in self.args.langtoks:
lang_tok_spec = self.args.langtoks[data_category]
else:
# default to None
lang_tok_spec = (None, None)
# infer langcode
lang_dirs = [
lang_pair.split("-") for lang_pair in lang_pairs[data_category]
]
lang_dirs = [x if len(x) > 1 else (x[0], x[0]) for x in lang_dirs]
for src, tgt in lang_dirs:
assert src is not None or data_category == "mono_dae", (
f"error: src={src}, " "tgt={tgt} for data_category={data_category}"
)
# logger.info(f"preparing param for {data_category}: {src} - {tgt}")
key = self.get_dataset_key(data_category, src, tgt)
data_path = self.get_split_data_path(
paths, epoch, shard_epoch, split_num_shards_dict[key]
)
param_list.append(
{
"key": key,
"data_path": data_path,
"split": split,
"src": src,
"src_dict": self.dicts[src]
if src and data_category != "mono_dae"
else None,
"tgt": tgt,
"tgt_dict": self.dicts[tgt],
"data_category": data_category,
"langtok_spec": lang_tok_spec,
}
)
return param_list
def get_train_dataset_sizes(
self, data_param_list, datasets, epoch, shard_epoch=None
):
num_shards = [
self.get_split_num_data_shards(param["split"])[param["key"]]
for param in data_param_list
]
data_sizes = []
for (key, d), num_shard in zip(datasets, num_shards):
my_data_sizes = self._training_data_sizes[key]
shard_ind = self.get_shard_id(num_shard, epoch, shard_epoch)
if shard_ind not in my_data_sizes:
my_data_sizes[shard_ind] = len(d)
known_size = max(my_data_sizes.values())
data_sizes.append(
# If we don't know the data size of the shard yet,
# use the the max known data size to approximate.
# Note that we preprocess shards by a designated shard size
# and put any remaining data at the end into the last shard so
# the max shard size approximation is almost correct before loading
# the last shard; after loading the last shard, it will have the
# exact data sizes of the whole data size.
(key, sum(my_data_sizes.get(i, known_size) for i in range(num_shard)))
)
logger.info(
f"estimated total data sizes of all shards used in sampling ratios: {data_sizes}. "
"Note that if the data a shard has not been loaded yet, use the max known data size to approximate"
)
return [s for _, s in data_sizes]
def get_train_sampling_ratios(
self, data_param_list, datasets, epoch=1, shard_epoch=None
):
data_sizes = self.get_train_dataset_sizes(
data_param_list, datasets, epoch, shard_epoch
)
sampling_func = self.sampling_method.sampling_method_selector()
sample_ratios = sampling_func(data_sizes) if sampling_func is not None else None
return sample_ratios
def get_sampling_ratios(self, data_param_list, datasets, epoch, shard_epoch=None):
if self.args.sampling_weights_from_file:
weights = load_sampling_weights(self.args.sampling_weights_from_file)
sample_ratios = [weights[k] for k, _ in datasets]
logger.info(
"| ignoring --sampling-weights when loadding sampling weights "
f"from file {self.args.sampling_weights_from_file}"
)
elif self.args.sampling_weights:
sample_ratios = [self.args.sampling_weights[k] for k, _ in datasets]
else:
sample_ratios = self.get_train_sampling_ratios(
data_param_list, datasets, epoch, shard_epoch
)
if sample_ratios is not None:
logger.info(
"| Upsample ratios: {}".format(
list(zip(map(lambda x: x["key"], data_param_list), sample_ratios))
)
)
assert len(sample_ratios) == len(datasets)
return sample_ratios
def load_split_datasets(
self, split, training, epoch=1, combine=False, shard_epoch=None, **kwargs
):
data_param_list = self.get_split_data_param_list(
split, epoch, shard_epoch=shard_epoch
)
langpairs_sharing_datasets = (
{} if self.args.enable_reservsed_directions_shared_datasets else None
)
datasets = [
(
param["key"],
self.load_a_dataset(
combine=combine,
langpairs_sharing_datasets=langpairs_sharing_datasets,
**param,
),
)
for param in data_param_list
]
return datasets, data_param_list
def load_into_concat_dataset(self, split, datasets, data_param_list):
if self.args.lang_tok_replacing_bos_eos:
# TODO: to investigate why TransformEosLangPairDataset doesn't work with ConcatDataset
return SampledMultiDataset(
OrderedDict(datasets),
sampling_ratios=None,
eval_key=None,
collate_format=CollateFormat.single,
virtual_size=None,
split=split,
)
return ConcatDataset([d for _, d in datasets])
def load_sampled_multi_epoch_dataset(
self, split, training, epoch=0, combine=False, shard_epoch=None, **kwargs
):
datasets, data_param_list = self.load_split_datasets(
split, training, epoch, combine, shard_epoch=shard_epoch, **kwargs
)
if training and split == getattr(self.args, "train_subset", None):
sample_ratios = self.get_sampling_ratios(data_param_list, datasets, epoch)
return SampledMultiEpochDataset(
OrderedDict(datasets),
epoch=epoch,
shard_epoch=shard_epoch,
# valid and test datasets will be degenerate to concating datasets:
sampling_ratios=sample_ratios,
eval_key=None,
collate_format=CollateFormat.single,
virtual_size=self.args.virtual_data_size,
split=split,
virtual_epoch_size=self.args.virtual_epoch_size,
# if not using lang_tok altering, simplified to use the same collater
shared_collater=self._shared_collater(),
)
else:
return self.load_into_concat_dataset(split, datasets, data_param_list)
from enum import Enum
from typing import Dict, List, Optional, Sequence
import torch
from fairseq.data import Dictionary
class EncoderLangtok(Enum):
"""
Prepend to the beginning of source sentence either the
source or target language token. (src/tgt).
"""
src = "src"
tgt = "tgt"
class LangTokSpec(Enum):
main = "main"
mono_dae = "mono_dae"
class LangTokStyle(Enum):
multilingual = "multilingual"
mbart = "mbart"
@torch.jit.export
def get_lang_tok(
lang: str, lang_tok_style: str, spec: str = LangTokSpec.main.value
) -> str:
# TOKEN_STYLES can't be defined outside this fn since it needs to be
# TorchScriptable.
TOKEN_STYLES: Dict[str, str] = {
LangTokStyle.mbart.value: "[{}]",
LangTokStyle.multilingual.value: "__{}__",
}
if spec.endswith("dae"):
lang = f"{lang}_dae"
elif spec.endswith("mined"):
lang = f"{lang}_mined"
style = TOKEN_STYLES[lang_tok_style]
return style.format(lang)
def augment_dictionary(
dictionary: Dictionary,
language_list: List[str],
lang_tok_style: str,
langtoks_specs: Sequence[str] = (LangTokSpec.main.value,),
extra_data: Optional[Dict[str, str]] = None,
) -> None:
for spec in langtoks_specs:
for language in language_list:
dictionary.add_symbol(
get_lang_tok(lang=language, lang_tok_style=lang_tok_style, spec=spec)
)
if lang_tok_style == LangTokStyle.mbart.value or (
extra_data is not None and LangTokSpec.mono_dae.value in extra_data
):
dictionary.add_symbol("<mask>")
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import datetime
import hashlib
import logging
import time
from bisect import bisect_right
from collections import OrderedDict, defaultdict
from enum import Enum
from typing import List
import numpy as np
import torch
from fairseq import distributed_utils
from fairseq.data import FairseqDataset, data_utils
def get_time_gap(s, e):
return (
datetime.datetime.fromtimestamp(e) - datetime.datetime.fromtimestamp(s)
).__str__()
logger = logging.getLogger(__name__)
def default_virtual_size_func(datasets, ratios, max_scale_up=1.5):
sizes = [len(d) for d in datasets]
if ratios is None:
return sum(sizes)
largest_idx = np.argmax(sizes)
largest_r = ratios[largest_idx]
largest_s = sizes[largest_idx]
# set virtual sizes relative to the largest dataset
virtual_sizes = [(r / largest_r) * largest_s for r in ratios]
vsize = sum(virtual_sizes)
max_size = sum(sizes) * max_scale_up
return int(vsize if vsize < max_size else max_size)
class CollateFormat(Enum):
single = 1
ordered_dict = 2
class SampledMultiDataset(FairseqDataset):
"""Samples from multiple sub-datasets according to given sampling ratios.
Args:
datasets (
List[~torch.utils.data.Dataset]
or OrderedDict[str, ~torch.utils.data.Dataset]
): datasets
sampling_ratios (List[float]): list of probability of each dataset to be sampled
(default: None, which corresponds to concatenating all dataset together).
seed (int): RNG seed to use (default: 2).
epoch (int): starting epoch number (default: 1).
eval_key (str, optional): a key used at evaluation time that causes
this instance to pass-through batches from *datasets[eval_key]*.
collate_format (CollateFormat): collater output format, either CollateFormat.ordered_dict or
CollateFormat.single (default: CollateFormat.single) where CollateFormat.single configures
the collater to output batches of data mixed from all sub-datasets,
and CollateFormat.ordered_dict configures the collater to output a dictionary of batches indexed by keys
of sub-datasets.
Note that not all sub-datasets will present in a single batch in both formats.
virtual_size (int, or callable): the expected virtual size of the dataset (default: default_virtual_size_func).
split (str): the split of the data, e.g. 'train', 'valid' or 'test'.
shared_collater (bool): whether or not to all sub-datasets have the same collater.
shuffle (bool): whether or not to shuffle data (default: True).
"""
def __init__(
self,
datasets,
sampling_ratios=None,
seed=2,
epoch=1,
eval_key=None,
collate_format=CollateFormat.single,
virtual_size=default_virtual_size_func,
split="",
shared_collater=False,
shuffle=True,
):
super().__init__()
self.shared_collater = shared_collater
self.shuffle = shuffle
if isinstance(datasets, OrderedDict):
self.keys = list(datasets.keys())
datasets = list(datasets.values())
elif isinstance(datasets, List):
self.keys = list(range(len(datasets)))
else:
raise AssertionError()
self.datasets = datasets
self.split = split
self.eval_key = eval_key
if self.eval_key is not None:
self.collate_format = CollateFormat.single
else:
self.collate_format = collate_format
self.seed = seed
self._cur_epoch = None
self.cumulated_sizes = None
# self.datasets[k][self._cur_indices[i]] is the data item i in this sampled dataset
# namely, data item i is sampled from the kth sub-dataset self.datasets[k]
# where self.cumulated_sizes[k-1] <= i < self.cumulated_sizes[k]
self._cur_indices = None
self._sizes = None
self.virtual_size_per_dataset = None
# caching properties
self._reset_cached_properties()
self.setup_sampling(sampling_ratios, virtual_size)
self.set_epoch(epoch)
def _clean_if_not_none(self, var_list):
for v in var_list:
if v is not None:
del v
def _reset_cached_properties(self):
self._clean_if_not_none([self._sizes, self._cur_indices])
self._sizes = None
self._cur_indices = None
def setup_sampling(self, sample_ratios, virtual_size):
sizes = [len(d) for d in self.datasets]
if sample_ratios is None:
# default back to concating datasets
self.sample_ratios = None
self.virtual_size = sum(sizes)
else:
if not isinstance(sample_ratios, np.ndarray):
sample_ratios = np.array(sample_ratios)
self.sample_ratios = sample_ratios
virtual_size = (
default_virtual_size_func if virtual_size is None else virtual_size
)
self.virtual_size = (
virtual_size(self.datasets, self.sample_ratios)
if callable(virtual_size)
else virtual_size
)
def adjust_sampling(self, epoch, sampling_ratios, virtual_size):
if sampling_ratios is not None:
sampling_ratios = self._sync_sample_ratios(sampling_ratios)
self.setup_sampling(sampling_ratios, virtual_size)
def _sync_sample_ratios(self, ratios):
# in case the ratios are not precisely the same across processes
# also to ensure every procresses update the ratios in the same pace
ratios = torch.DoubleTensor(ratios)
if torch.distributed.is_initialized():
if torch.cuda.is_available():
distributed_utils.all_reduce(ratios.cuda())
else:
distributed_utils.all_reduce(ratios)
ret = ratios.cpu()
ret = ret.numpy()
return ret
def random_choice_in_dataset(self, rng, dataset, choice_size):
if hasattr(dataset, "random_choice_in_dataset"):
return dataset.random_choice_in_dataset(rng, choice_size)
dataset_size = len(dataset)
return rng.choice(
dataset_size, choice_size, replace=(choice_size > dataset_size)
)
def get_virtual_indices(self, rng, datasets, sample_ratios, virtual_size):
def get_counts(sample_ratios):
counts = np.array([virtual_size * r for r in sample_ratios], dtype=np.int64)
diff = virtual_size - counts.sum()
assert diff >= 0
# due to round-offs, the size might not match the desired sizes
if diff > 0:
dataset_indices = rng.choice(
len(sample_ratios), size=diff, p=sample_ratios
)
for i in dataset_indices:
counts[i] += 1
return counts
def get_in_dataset_indices(datasets, sizes, sample_ratios):
counts = get_counts(sample_ratios)
# uniformally sample desired counts for each dataset
# if the desired counts are large, sample with replacement:
indices = [
self.random_choice_in_dataset(rng, d, c)
for c, d in zip(counts, datasets)
]
return indices
sizes = [len(d) for d in datasets]
if sample_ratios is None:
# default back to concating datasets
in_dataset_indices = [list(range(s)) for s in sizes]
virtual_sizes_per_dataset = sizes
else:
ratios = sample_ratios / sample_ratios.sum()
in_dataset_indices = get_in_dataset_indices(datasets, sizes, ratios)
virtual_sizes_per_dataset = [len(d) for d in in_dataset_indices]
virtual_sizes_per_dataset = np.array(virtual_sizes_per_dataset, np.int64)
cumulative_sizes = np.cumsum(virtual_sizes_per_dataset)
assert sum(virtual_sizes_per_dataset) == virtual_size
assert cumulative_sizes[-1] == virtual_size
if virtual_size < sum(sizes):
logger.warning(
f"virtual data size ({virtual_size}) is less than real data size ({sum(sizes)})."
" If virtual size << real data size, there could be data coverage issue."
)
in_dataset_indices = np.hstack(in_dataset_indices)
return in_dataset_indices, cumulative_sizes, virtual_sizes_per_dataset
def _get_dataset_and_index(self, index):
i = bisect_right(self.cumulated_sizes, index)
return i, self._cur_indices[index]
def __getitem__(self, index):
# self.__getitem__(index) returns self.datasets[k][self._cur_indices[index]]
# where k satisfies self.cumulated_sizes[k - 1] <= k < self.cumulated_sizes[k]
ds_idx, ds_sample_idx = self._get_dataset_and_index(index)
ret = (ds_idx, self.datasets[ds_idx][ds_sample_idx])
return ret
def num_tokens(self, index):
return self.sizes[index].max()
def size(self, index):
return self.sizes[index]
def __len__(self):
return self.virtual_size
def collater(self, samples, **extra_args):
"""Merge a list of samples to form a mini-batch."""
if len(samples) == 0:
return None
if self.collate_format == "ordered_dict":
collect_samples = [[] for _ in range(len(self.datasets))]
for (i, sample) in samples:
collect_samples[i].append(sample)
batch = OrderedDict(
[
(self.keys[i], dataset.collater(collect_samples[i]))
for i, (key, dataset) in enumerate(zip(self.keys, self.datasets))
if len(collect_samples[i]) > 0
]
)
elif self.shared_collater:
batch = self.datasets[0].collater([s for _, s in samples])
else:
samples_dict = defaultdict(list)
pad_to_length = (
defaultdict(int)
if "pad_to_length" not in extra_args
else extra_args["pad_to_length"]
)
for ds_idx, s in samples:
pad_to_length["source"] = max(
pad_to_length["source"], s["source"].size(0)
)
if s["target"] is not None:
pad_to_length["target"] = max(
pad_to_length["target"], s["target"].size(0)
)
samples_dict[ds_idx].append(s)
batches = [
self.datasets[i].collater(samples_dict[i], pad_to_length=pad_to_length)
for i in range(len(self.datasets))
if len(samples_dict[i]) > 0
]
def straight_data(tensors):
batch = torch.cat(tensors, dim=0)
return batch
src_lengths = straight_data(
[b["net_input"]["src_lengths"] for b in batches]
)
src_lengths, sort_order = src_lengths.sort(descending=True)
def straight_order(tensors):
batch = straight_data(tensors)
return batch.index_select(0, sort_order)
batch = {
"id": straight_order([b["id"] for b in batches]),
"nsentences": sum(b["nsentences"] for b in batches),
"ntokens": sum(b["ntokens"] for b in batches),
"net_input": {
"src_tokens": straight_order(
[b["net_input"]["src_tokens"] for b in batches]
),
"src_lengths": src_lengths,
},
"target": straight_order([b["target"] for b in batches])
if batches[0]["target"] is not None
else None,
}
if "prev_output_tokens" in batches[0]["net_input"]:
batch["net_input"]["prev_output_tokens"] = straight_order(
[b["net_input"]["prev_output_tokens"] for b in batches]
)
if "src_lang_id" in batches[0]["net_input"]:
batch["net_input"]["src_lang_id"] = straight_order(
[b["net_input"]["src_lang_id"] for b in batches]
)
if "tgt_lang_id" in batches[0]:
batch["tgt_lang_id"] = straight_order(
[b["tgt_lang_id"] for b in batches]
)
return batch
@property
def sizes(self):
if self._sizes is not None:
return self._sizes
start_time = time.time()
in_sub_dataset_indices = [
self._cur_indices[
0 if i == 0 else self.cumulated_sizes[i - 1] : self.cumulated_sizes[i]
]
for i in range(len(self.datasets))
]
sub_dataset_sizes = [
d.sizes[indices]
for d, indices in zip(self.datasets, in_sub_dataset_indices)
]
self._sizes = np.vstack(sub_dataset_sizes)
logger.info(f"sizes() calling time: {get_time_gap(start_time, time.time())}")
return self._sizes
def ordered_indices(self):
if self.shuffle:
indices = np.random.permutation(len(self))
else:
indices = np.arange(len(self))
sizes = self.sizes
tgt_sizes = sizes[:, 1] if len(sizes.shape) > 0 and sizes.shape[1] > 1 else None
src_sizes = (
sizes[:, 0] if len(sizes.shape) > 0 and sizes.shape[1] > 1 else sizes
)
# sort by target length, then source length
if tgt_sizes is not None:
indices = indices[np.argsort(tgt_sizes[indices], kind="mergesort")]
sort_indices = indices[np.argsort(src_sizes[indices], kind="mergesort")]
return sort_indices
def prefetch(self, indices):
prefetch_indices = [[] for _ in range(len(self.datasets))]
for i in indices:
ds_idx, ds_sample_idx = self._get_dataset_and_index(i)
prefetch_indices[ds_idx].append(ds_sample_idx)
for i in range(len(prefetch_indices)):
self.datasets[i].prefetch(prefetch_indices[i])
@property
def can_reuse_epoch_itr_across_epochs(self):
return False
def set_epoch(self, epoch):
super().set_epoch(epoch)
if epoch == self._cur_epoch:
# re-enter so return
return
for d in self.datasets:
if hasattr(d, "set_epoch"):
d.set_epoch(epoch)
self._cur_epoch = epoch
self._establish_virtual_datasets()
def _establish_virtual_datasets(self):
if self.sample_ratios is None and self._cur_indices is not None:
# not a samping dataset, no need to resample if indices are already established
return
self._reset_cached_properties()
start_time = time.time()
# Generate a weighted sample of indices as a function of the
# random seed and the current epoch.
rng = np.random.RandomState(
[
int(
hashlib.sha1(
str(self.__class__.__name__).encode("utf-8")
).hexdigest(),
16,
)
% (2 ** 32),
self.seed % (2 ** 32), # global seed
self._cur_epoch, # epoch index,
]
)
self._clean_if_not_none(
[self.cumulated_sizes, self.virtual_size_per_dataset, self._sizes]
)
self._sizes = None
indices, cumulated_sizes, virtual_size_per_dataset = self.get_virtual_indices(
rng, self.datasets, self.sample_ratios, self.virtual_size
)
self._cur_indices = indices
self.cumulated_sizes = cumulated_sizes
self.virtual_size_per_dataset = virtual_size_per_dataset
raw_sizes = [len(d) for d in self.datasets]
sampled_sizes = self.virtual_size_per_dataset
logger.info(
f"[{self.split}] Raw sizes: {str(dict(zip(self.keys, raw_sizes)))}; "
f"raw total size: {sum(raw_sizes)}"
)
logger.info(
f"[{self.split}] Resampled sizes: {str(dict(zip(self.keys, sampled_sizes)))}; "
f"resampled total size: {sum(sampled_sizes)}"
)
if self.sample_ratios is not None:
logger.info(
f"[{self.split}] Upsampling ratios: {str(dict(zip(self.keys, self.sample_ratios)))}"
)
else:
logger.info(f"[{self.split}] A concat dataset")
logger.info(
f"[{self.split}] virtual dataset established time: {get_time_gap(start_time, time.time())}"
)
def filter_indices_by_size(self, indices, max_sizes):
"""Filter a list of sample indices. Remove those that are longer
than specified in max_sizes.
Args:
indices (np.array): original array of sample indices
max_sizes (int or list[int] or tuple[int]): max sample size,
can be defined separately for src and tgt (then list or tuple)
Returns:
np.array: filtered sample array
list: list of removed indices
"""
sizes = self.sizes
tgt_sizes = sizes[:, 1] if len(sizes.shape) > 0 and sizes.shape[1] > 1 else None
src_sizes = (
sizes[:, 0] if len(sizes.shape) > 0 and sizes.shape[1] > 1 else sizes
)
return data_utils.filter_paired_dataset_indices_by_size(
src_sizes, tgt_sizes, indices, max_sizes
)
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import hashlib
import logging
import math
import numpy as np
from fairseq.data import SampledMultiDataset
from .sampled_multi_dataset import CollateFormat, default_virtual_size_func
logger = logging.getLogger(__name__)
class SampledMultiEpochDataset(SampledMultiDataset):
"""Samples from multiple sub-datasets according to sampling ratios
using virtual epoch sizes to speed up dataloading.
Args:
datasets (
List[~torch.utils.data.Dataset]
or OrderedDict[str, ~torch.utils.data.Dataset]
): datasets
sampling_ratios (List[float]): list of probability of each dataset to be sampled
(default: None, which corresponds to concating all dataset together).
seed (int): RNG seed to use (default: 2).
epoch (int): starting epoch number (default: 1).
eval_key (str, optional): a key used at evaluation time that causes
this instance to pass-through batches from *datasets[eval_key]*.
collate_format (CollateFormat): collater output format, either CollateFormat.ordered_dict or
CollateFormat.single (default: CollateFormat.single) where CollateFormat.single configures
the collater to output batches of data mixed from all sub-datasets,
and CollateFormat.ordered_dict configures the collater to output a dictionary of batches indexed by keys
of sub-datasets.
Note that not all sub-datasets will present in a single batch in both formats.
virtual_size (int, or callable): the expected virtual size of the dataset (default: default_virtual_size_func).
split (str): the split of the data, e.g. 'train', 'valid' or 'test'.
virtual_epoch_size (int): virtual epoch size, the dataset will go through the data by
this virtual epoch size one by one to speed up data loading, e.g. indicing and filtering
can be performed whenever a virtual epoch is loaded without waiting for the whole dataset to be loaded.
shared_collater (bool): whether or not to all sub-datasets have the same collater.
shard_epoch (int): the real epoch number for shard selection.
shuffle (bool): whether or not to shuffle data (default: True).
"""
def __init__(
self,
datasets,
sampling_ratios=None,
seed=2,
epoch=1,
eval_key=None,
collate_format=CollateFormat.single,
virtual_size=default_virtual_size_func,
split="",
virtual_epoch_size=None,
shared_collater=False,
shard_epoch=1,
shuffle=True,
):
self.virtual_epoch_size = virtual_epoch_size
self._current_epoch_start_index = None
self._random_global_indices = None
self.shard_epoch = shard_epoch if shard_epoch is not None else 1
self.load_next_shard = None
self._epoch_sizes = None
super().__init__(
datasets=datasets,
sampling_ratios=sampling_ratios,
seed=seed,
epoch=epoch,
eval_key=eval_key,
collate_format=collate_format,
virtual_size=virtual_size,
split=split,
shared_collater=shared_collater,
shuffle=shuffle,
)
def _setup(self, epoch):
self.virtual_epoch_size = (
self.virtual_epoch_size
if self.virtual_epoch_size is not None
else self.virtual_size
)
if self.virtual_epoch_size > self.virtual_size:
logger.warning(
f"virtual epoch size {self.virtual_epoch_size} "
f"is greater than virtual dataset size {self.virtual_size}"
)
self.virtual_epoch_size = self.virtual_size
self.num_virtual_epochs = math.ceil(self.virtual_size / self.virtual_epoch_size)
self._current_epoch_start_index = self._get_epoch_start_index(epoch)
logger.info(
f"virtual epoch size {self.virtual_epoch_size}; virtual dataset size {self.virtual_size}"
)
def _map_epoch_index_to_global(self, index):
index = self._current_epoch_start_index + index
# add randomness
return self._random_global_indices[index]
@property
def sizes(self):
if self._epoch_sizes is not None:
return self._epoch_sizes
_sizes = super().sizes
indices = self._random_global_indices[
self._current_epoch_start_index : self._current_epoch_start_index
+ len(self)
]
self._epoch_sizes = _sizes[indices]
# del super()._sizes to save memory
del self._sizes
self._sizes = None
return self._epoch_sizes
def _get_dataset_and_index(self, index):
i = self._map_epoch_index_to_global(index)
return super()._get_dataset_and_index(i)
def __len__(self):
return (
self.virtual_epoch_size
if self._current_epoch_start_index + self.virtual_epoch_size
< self.virtual_size
else self.virtual_size - self._current_epoch_start_index
)
def set_epoch(self, epoch):
if self._current_epoch_start_index is None:
# initializing epoch idnices of a virtual dataset
self._setup(epoch)
self._next_virtual_epoch(epoch)
else:
# working on already intialized epoch indices
if epoch == self._cur_epoch:
# re-enter so return
return
self._next_virtual_epoch(epoch)
def _get_epoch_start_index(self, epoch):
assert epoch >= 1 # fairseq is using 1-based epoch everywhere
return ((epoch - 1) % self.num_virtual_epochs) * self.virtual_epoch_size
def _next_global_indices(self, epoch):
rng = np.random.RandomState(
[
int(
hashlib.sha1(
str(self.__class__.__name__).encode("utf-8")
).hexdigest(),
16,
)
% (2 ** 32),
self.seed % (2 ** 32), # global seed
epoch, # epoch index,
]
)
del self._random_global_indices
self._random_global_indices = rng.choice(
self.virtual_size, self.virtual_size, replace=False
)
if self.load_next_shard is None:
self.load_next_shard = False
else:
# increase shard epoch for next loading
self.shard_epoch += 1
self.load_next_shard = True
logger.info(
"to load next epoch/shard in next load_dataset: "
f"epoch={epoch}/shard_epoch={self.shard_epoch}"
)
def _next_virtual_epoch(self, epoch):
index = self._get_epoch_start_index(epoch)
if index == 0 or self._random_global_indices is None:
# need to start from the beginning,
# so call super().set_epoch(epoch) to establish the global virtual indices
logger.info(
"establishing a new set of global virtual indices for "
f"epoch={epoch}/shard_epoch={self.shard_epoch}"
)
super().set_epoch(epoch)
self._next_global_indices(epoch)
else:
self._cur_epoch = epoch
# reset cache sizes and ordered_indices for the epoch after moving to a new epoch
self._clean_if_not_none(
[
self._epoch_sizes,
]
)
self._epoch_sizes = None
self._current_epoch_start_index = index
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import logging
from typing import List
logger = logging.getLogger(__name__)
def uniform(dataset_sizes: List[int]):
return [1.0] * len(dataset_sizes)
def temperature_sampling(dataset_sizes, temp):
total_size = sum(dataset_sizes)
return [(size / total_size) ** (1.0 / temp) for size in dataset_sizes]
def make_temperature_sampling(temp=1.0):
def sampling_func(dataset_sizes):
return temperature_sampling(dataset_sizes, temp)
return sampling_func
def make_ratio_sampling(ratios):
def sampling_func(dataset_sizes):
return ratios
return sampling_func
class SamplingMethod:
@staticmethod
def add_arguments(parser):
parser.add_argument(
"--sampling-method",
choices=[
"uniform",
"temperature",
"concat",
"RoundRobin",
],
type=str,
default="concat",
help="The method to sample data per language pairs",
)
parser.add_argument(
"--sampling-temperature",
default=1.5,
type=float,
help="only work with --sampling-method temperature",
)
@staticmethod
def build_sampler(args, task):
return SamplingMethod(args, task)
def __init__(self, args, task):
self.args = args
self.task = task
def is_adaptive(self):
return False
def sampling_method_selector(self):
args = self.args
logger.info(f"selected sampler: {args.sampling_method}")
if args.sampling_method == "uniform":
return uniform
elif args.sampling_method == "temperature" or self.is_adaptive():
return make_temperature_sampling(float(args.sampling_temperature))
else:
# default to concating all data set together
return None
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from collections import OrderedDict
import torch
from torch.utils.data.dataloader import default_collate
from . import FairseqDataset
def _flatten(dico, prefix=None):
"""Flatten a nested dictionary."""
new_dico = OrderedDict()
if isinstance(dico, dict):
prefix = prefix + "." if prefix is not None else ""
for k, v in dico.items():
if v is None:
continue
new_dico.update(_flatten(v, prefix + k))
elif isinstance(dico, list):
for i, v in enumerate(dico):
new_dico.update(_flatten(v, prefix + ".[" + str(i) + "]"))
else:
new_dico = OrderedDict({prefix: dico})
return new_dico
def _unflatten(dico):
"""Unflatten a flattened dictionary into a nested dictionary."""
new_dico = OrderedDict()
for full_k, v in dico.items():
full_k = full_k.split(".")
node = new_dico
for k in full_k[:-1]:
if k.startswith("[") and k.endswith("]"):
k = int(k[1:-1])
if k not in node:
node[k] = OrderedDict()
node = node[k]
node[full_k[-1]] = v
return new_dico
class NestedDictionaryDataset(FairseqDataset):
def __init__(self, defn, sizes=None):
super().__init__()
self.defn = _flatten(defn)
self.sizes = [sizes] if not isinstance(sizes, (list, tuple)) else sizes
first = None
for v in self.defn.values():
if not isinstance(
v,
(
FairseqDataset,
torch.utils.data.Dataset,
),
):
raise ValueError("Expected Dataset but found: {}".format(v.__class__))
first = first or v
if len(v) > 0:
assert len(v) == len(first), "dataset lengths must match"
self._len = len(first)
def __getitem__(self, index):
return OrderedDict((k, ds[index]) for k, ds in self.defn.items())
def __len__(self):
return self._len
def collater(self, samples):
"""Merge a list of samples to form a mini-batch.
Args:
samples (List[dict]): samples to collate
Returns:
dict: a mini-batch suitable for forwarding with a Model
"""
if len(samples) == 0:
return {}
sample = OrderedDict()
for k, ds in self.defn.items():
try:
sample[k] = ds.collater([s[k] for s in samples])
except NotImplementedError:
sample[k] = default_collate([s[k] for s in samples])
return _unflatten(sample)
def num_tokens(self, index):
"""Return the number of tokens in a sample. This value is used to
enforce ``--max-tokens`` during batching."""
return max(s[index] for s in self.sizes)
def size(self, index):
"""Return an example's size as a float or tuple. This value is used when
filtering a dataset with ``--max-positions``."""
if len(self.sizes) == 1:
return self.sizes[0][index]
else:
return (s[index] for s in self.sizes)
@property
def supports_prefetch(self):
"""Whether this dataset supports prefetching."""
return any(ds.supports_prefetch for ds in self.defn.values())
def prefetch(self, indices):
"""Prefetch the data required for this epoch."""
for ds in self.defn.values():
if getattr(ds, "supports_prefetch", False):
ds.prefetch(indices)
@property
def can_reuse_epoch_itr_across_epochs(self):
return all(ds.can_reuse_epoch_itr_across_epochs for ds in self.defn.values())
def set_epoch(self, epoch):
super().set_epoch(epoch)
for ds in self.defn.values():
ds.set_epoch(epoch)
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import numpy as np
import torch
from fairseq.data import data_utils
class WordNoising(object):
"""Generate a noisy version of a sentence, without changing words themselves."""
def __init__(self, dictionary, bpe_cont_marker="@@", bpe_end_marker=None):
self.dictionary = dictionary
self.bpe_end = None
if bpe_cont_marker:
self.bpe_end = np.array(
[
not self.dictionary[i].endswith(bpe_cont_marker)
for i in range(len(self.dictionary))
]
)
elif bpe_end_marker:
self.bpe_end = np.array(
[
self.dictionary[i].endswith(bpe_end_marker)
for i in range(len(self.dictionary))
]
)
self.get_word_idx = (
self._get_bpe_word_idx if self.bpe_end is not None else self._get_token_idx
)
def noising(self, x, lengths, noising_prob=0.0):
raise NotImplementedError()
def _get_bpe_word_idx(self, x):
"""
Given a list of BPE tokens, for every index in the tokens list,
return the index of the word grouping that it belongs to.
For example, for input x corresponding to ["how", "are", "y@@", "ou"],
return [[0], [1], [2], [2]].
"""
# x: (T x B)
bpe_end = self.bpe_end[x]
if x.size(0) == 1 and x.size(1) == 1:
# Special case when we only have one word in x. If x = [[N]],
# bpe_end is a scalar (bool) instead of a 2-dim array of bools,
# which makes the sum operation below fail.
return np.array([[0]])
# do a reduce front sum to generate word ids
word_idx = bpe_end[::-1].cumsum(0)[::-1]
word_idx = word_idx.max(0)[None, :] - word_idx
return word_idx
def _get_token_idx(self, x):
"""
This is to extend noising functions to be able to apply to non-bpe
tokens, e.g. word or characters.
"""
x = torch.t(x)
word_idx = np.array([range(len(x_i)) for x_i in x])
return np.transpose(word_idx)
class WordDropout(WordNoising):
"""Randomly drop input words. If not passing blank_idx (default is None),
then dropped words will be removed. Otherwise, it will be replaced by the
blank_idx."""
def __init__(
self,
dictionary,
default_dropout_prob=0.1,
bpe_cont_marker="@@",
bpe_end_marker=None,
):
super().__init__(dictionary, bpe_cont_marker, bpe_end_marker)
self.default_dropout_prob = default_dropout_prob
def noising(self, x, lengths, dropout_prob=None, blank_idx=None):
if dropout_prob is None:
dropout_prob = self.default_dropout_prob
# x: (T x B), lengths: B
if dropout_prob == 0:
return x, lengths
assert 0 < dropout_prob < 1
# be sure to drop entire words
word_idx = self.get_word_idx(x)
sentences = []
modified_lengths = []
for i in range(lengths.size(0)):
# Since dropout probabilities need to apply over non-pad tokens,
# it is not trivial to generate the keep mask without consider
# input lengths; otherwise, this could be done outside the loop
# We want to drop whole words based on word_idx grouping
num_words = max(word_idx[:, i]) + 1
# ith example: [x0, x1, ..., eos, pad, ..., pad]
# We should only generate keep probs for non-EOS tokens. Thus if the
# input sentence ends in EOS, the last word idx is not included in
# the dropout mask generation and we append True to always keep EOS.
# Otherwise, just generate the dropout mask for all word idx
# positions.
has_eos = x[lengths[i] - 1, i] == self.dictionary.eos()
if has_eos: # has eos?
keep = np.random.rand(num_words - 1) >= dropout_prob
keep = np.append(keep, [True]) # keep EOS symbol
else:
keep = np.random.rand(num_words) >= dropout_prob
words = x[: lengths[i], i].tolist()
# TODO: speed up the following loop
# drop words from the input according to keep
new_s = [
w if keep[word_idx[j, i]] else blank_idx for j, w in enumerate(words)
]
new_s = [w for w in new_s if w is not None]
# we need to have at least one word in the sentence (more than the
# start / end sentence symbols)
if len(new_s) <= 1:
# insert at beginning in case the only token left is EOS
# EOS should be at end of list.
new_s.insert(0, words[np.random.randint(0, len(words))])
assert len(new_s) >= 1 and (
not has_eos # Either don't have EOS at end or last token is EOS
or (len(new_s) >= 2 and new_s[-1] == self.dictionary.eos())
), "New sentence is invalid."
sentences.append(new_s)
modified_lengths.append(len(new_s))
# re-construct input
modified_lengths = torch.LongTensor(modified_lengths)
modified_x = torch.LongTensor(
modified_lengths.max(), modified_lengths.size(0)
).fill_(self.dictionary.pad())
for i in range(modified_lengths.size(0)):
modified_x[: modified_lengths[i], i].copy_(torch.LongTensor(sentences[i]))
return modified_x, modified_lengths
class WordShuffle(WordNoising):
"""Shuffle words by no more than k positions."""
def __init__(
self,
dictionary,
default_max_shuffle_distance=3,
bpe_cont_marker="@@",
bpe_end_marker=None,
):
super().__init__(dictionary, bpe_cont_marker, bpe_end_marker)
self.default_max_shuffle_distance = 3
def noising(self, x, lengths, max_shuffle_distance=None):
if max_shuffle_distance is None:
max_shuffle_distance = self.default_max_shuffle_distance
# x: (T x B), lengths: B
if max_shuffle_distance == 0:
return x, lengths
# max_shuffle_distance < 1 will return the same sequence
assert max_shuffle_distance > 1
# define noise word scores
noise = np.random.uniform(
0,
max_shuffle_distance,
size=(x.size(0), x.size(1)),
)
noise[0] = -1 # do not move start sentence symbol
# be sure to shuffle entire words
word_idx = self.get_word_idx(x)
x2 = x.clone()
for i in range(lengths.size(0)):
length_no_eos = lengths[i]
if x[lengths[i] - 1, i] == self.dictionary.eos():
length_no_eos = lengths[i] - 1
# generate a random permutation
scores = word_idx[:length_no_eos, i] + noise[word_idx[:length_no_eos, i], i]
# ensure no reordering inside a word
scores += 1e-6 * np.arange(length_no_eos.item())
permutation = scores.argsort()
# shuffle words
x2[:length_no_eos, i].copy_(
x2[:length_no_eos, i][torch.from_numpy(permutation)]
)
return x2, lengths
class UnsupervisedMTNoising(WordNoising):
"""
Implements the default configuration for noising in UnsupervisedMT
(github.com/facebookresearch/UnsupervisedMT)
"""
def __init__(
self,
dictionary,
max_word_shuffle_distance,
word_dropout_prob,
word_blanking_prob,
bpe_cont_marker="@@",
bpe_end_marker=None,
):
super().__init__(dictionary)
self.max_word_shuffle_distance = max_word_shuffle_distance
self.word_dropout_prob = word_dropout_prob
self.word_blanking_prob = word_blanking_prob
self.word_dropout = WordDropout(
dictionary=dictionary,
bpe_cont_marker=bpe_cont_marker,
bpe_end_marker=bpe_end_marker,
)
self.word_shuffle = WordShuffle(
dictionary=dictionary,
bpe_cont_marker=bpe_cont_marker,
bpe_end_marker=bpe_end_marker,
)
def noising(self, x, lengths):
# 1. Word Shuffle
noisy_src_tokens, noisy_src_lengths = self.word_shuffle.noising(
x=x,
lengths=lengths,
max_shuffle_distance=self.max_word_shuffle_distance,
)
# 2. Word Dropout
noisy_src_tokens, noisy_src_lengths = self.word_dropout.noising(
x=noisy_src_tokens,
lengths=noisy_src_lengths,
dropout_prob=self.word_dropout_prob,
)
# 3. Word Blanking
noisy_src_tokens, noisy_src_lengths = self.word_dropout.noising(
x=noisy_src_tokens,
lengths=noisy_src_lengths,
dropout_prob=self.word_blanking_prob,
blank_idx=self.dictionary.unk(),
)
return noisy_src_tokens
class NoisingDataset(torch.utils.data.Dataset):
def __init__(
self,
src_dataset,
src_dict,
seed,
noiser=None,
noising_class=UnsupervisedMTNoising,
**kwargs
):
"""
Wrap a :class:`~torch.utils.data.Dataset` and apply noise to the
samples based on the supplied noising configuration.
Args:
src_dataset (~torch.utils.data.Dataset): dataset to wrap.
to build self.src_dataset --
a LanguagePairDataset with src dataset as the source dataset and
None as the target dataset. Should NOT have padding so that
src_lengths are accurately calculated by language_pair_dataset
collate function.
We use language_pair_dataset here to encapsulate the tgt_dataset
so we can re-use the LanguagePairDataset collater to format the
batches in the structure that SequenceGenerator expects.
src_dict (~fairseq.data.Dictionary): source dictionary
seed (int): seed to use when generating random noise
noiser (WordNoising): a pre-initialized :class:`WordNoising`
instance. If this is None, a new instance will be created using
*noising_class* and *kwargs*.
noising_class (class, optional): class to use to initialize a
default :class:`WordNoising` instance.
kwargs (dict, optional): arguments to initialize the default
:class:`WordNoising` instance given by *noiser*.
"""
self.src_dataset = src_dataset
self.src_dict = src_dict
self.seed = seed
self.noiser = (
noiser
if noiser is not None
else noising_class(
dictionary=src_dict,
**kwargs,
)
)
def __getitem__(self, index):
"""
Returns a single noisy sample. Multiple samples are fed to the collater
create a noising dataset batch.
"""
src_tokens = self.src_dataset[index]
src_lengths = torch.LongTensor([len(src_tokens)])
src_tokens = src_tokens.unsqueeze(0)
# Transpose src tokens to fit expected shape of x in noising function
# (batch size, sequence length) -> (sequence length, batch size)
src_tokens_t = torch.t(src_tokens)
with data_utils.numpy_seed(self.seed + index):
noisy_src_tokens = self.noiser.noising(src_tokens_t, src_lengths)
# Transpose back to expected src_tokens format
# (sequence length, 1) -> (1, sequence length)
noisy_src_tokens = torch.t(noisy_src_tokens)
return noisy_src_tokens[0]
def __len__(self):
"""
The length of the noising dataset is the length of src.
"""
return len(self.src_dataset)
@property
def supports_prefetch(self):
return self.src_dataset.supports_prefetch
def prefetch(self, indices):
if self.src_dataset.supports_prefetch:
self.src_dataset.prefetch(indices)
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from . import FairseqDataset
class NumSamplesDataset(FairseqDataset):
def __getitem__(self, index):
return 1
def __len__(self):
return 0
def collater(self, samples):
return sum(samples)
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import numpy as np
import torch
from . import BaseWrapperDataset
class NumelDataset(BaseWrapperDataset):
def __init__(self, dataset, reduce=False):
super().__init__(dataset)
self.reduce = reduce
def __getitem__(self, index):
item = self.dataset[index]
if torch.is_tensor(item):
return torch.numel(item)
else:
return np.size(item)
def __len__(self):
return len(self.dataset)
def collater(self, samples):
if self.reduce:
return sum(samples)
else:
return torch.tensor(samples)
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from . import BaseWrapperDataset
class OffsetTokensDataset(BaseWrapperDataset):
def __init__(self, dataset, offset):
super().__init__(dataset)
self.offset = offset
def __getitem__(self, idx):
return self.dataset[idx] + self.offset
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from fairseq.data import data_utils
from . import BaseWrapperDataset
class PadDataset(BaseWrapperDataset):
def __init__(self, dataset, pad_idx, left_pad):
super().__init__(dataset)
self.pad_idx = pad_idx
self.left_pad = left_pad
def collater(self, samples):
return data_utils.collate_tokens(samples, self.pad_idx, left_pad=self.left_pad)
class LeftPadDataset(PadDataset):
def __init__(self, dataset, pad_idx):
super().__init__(dataset, pad_idx, left_pad=True)
class RightPadDataset(PadDataset):
def __init__(self, dataset, pad_idx):
super().__init__(dataset, pad_idx, left_pad=False)
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import subprocess
import tempfile
class PlasmaArray(object):
"""
Wrapper around numpy arrays that automatically moves the data to shared
memory upon serialization. This is particularly helpful when passing numpy
arrays through multiprocessing, so that data is not unnecessarily
duplicated or pickled.
"""
def __init__(self, array):
super().__init__()
self.array = array
self.disable = array.nbytes < 134217728 # disable for arrays <128MB
self.object_id = None
self.path = None
# variables with underscores shouldn't be pickled
self._client = None
self._server = None
self._server_tmp = None
self._plasma = None
@property
def plasma(self):
if self._plasma is None and not self.disable:
try:
import pyarrow.plasma as plasma
self._plasma = plasma
except ImportError:
self._plasma = None
return self._plasma
def start_server(self):
if self.plasma is None or self._server is not None:
return
assert self.object_id is None
assert self.path is None
self._server_tmp = tempfile.NamedTemporaryFile()
self.path = self._server_tmp.name
self._server = subprocess.Popen(
[
"plasma_store",
"-m",
str(int(1.05 * self.array.nbytes)),
"-s",
self.path,
]
)
@property
def client(self):
if self._client is None:
assert self.path is not None
self._client = self.plasma.connect(self.path)
return self._client
def __getstate__(self):
if self.plasma is None:
return self.__dict__
if self.object_id is None:
self.start_server()
self.object_id = self.client.put(self.array)
state = self.__dict__.copy()
del state["array"]
state["_client"] = None
state["_server"] = None
state["_server_tmp"] = None
state["_plasma"] = None
return state
def __setstate__(self, state):
self.__dict__.update(state)
if self.plasma is None:
return
self.array = self.client.get(self.object_id)
def __del__(self):
if self._server is not None:
self._server.kill()
self._server = None
self._server_tmp.close()
self._server_tmp = None
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import numpy as np
import torch
from . import BaseWrapperDataset
class PrependDataset(BaseWrapperDataset):
def __init__(self, dataset, prepend_getter, ensure_first_token_is=None):
super().__init__(dataset)
self.prepend_getter = prepend_getter
self.ensure_first_token = ensure_first_token_is
def __getitem__(self, idx):
item = self.dataset[idx]
is_tuple = isinstance(item, tuple)
src = item[0] if is_tuple else item
assert self.ensure_first_token is None or src[0] == self.ensure_first_token
prepend_idx = self.prepend_getter(self.dataset, idx)
assert isinstance(prepend_idx, int)
src[0] = prepend_idx
item = tuple((src,) + item[1:]) if is_tuple else src
return item
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import numpy as np
import torch
from . import BaseWrapperDataset
class PrependTokenDataset(BaseWrapperDataset):
def __init__(self, dataset, token=None):
super().__init__(dataset)
self.token = token
if token is not None:
self._sizes = np.array(dataset.sizes) + 1
else:
self._sizes = dataset.sizes
def __getitem__(self, idx):
item = self.dataset[idx]
if self.token is not None:
item = torch.cat([item.new([self.token]), item])
return item
@property
def sizes(self):
return self._sizes
def num_tokens(self, index):
n = self.dataset.num_tokens(index)
if self.token is not None:
n += 1
return n
def size(self, index):
n = self.dataset.size(index)
if self.token is not None:
n += 1
return n
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import torch
from . import FairseqDataset
class RawLabelDataset(FairseqDataset):
def __init__(self, labels):
super().__init__()
self.labels = labels
def __getitem__(self, index):
return self.labels[index]
def __len__(self):
return len(self.labels)
def collater(self, samples):
return torch.tensor(samples)
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from . import BaseWrapperDataset
class ReplaceDataset(BaseWrapperDataset):
"""Replaces tokens found in the dataset by a specified replacement token
Args:
dataset (~torch.utils.data.Dataset): dataset to replace tokens in
replace_map(Dictionary[int,int]): map of token to replace -> replacement token
offsets (List[int]): do not replace tokens before (from left if pos, right if neg) this offset. should be
as many as the number of objects returned by the underlying dataset __getitem__ method.
"""
def __init__(self, dataset, replace_map, offsets):
super().__init__(dataset)
assert len(replace_map) > 0
self.replace_map = replace_map
self.offsets = offsets
def __getitem__(self, index):
item = self.dataset[index]
is_tuple = isinstance(item, tuple)
srcs = item if is_tuple else [item]
for offset, src in zip(self.offsets, srcs):
for k, v in self.replace_map.items():
src_off = src[offset:] if offset >= 0 else src[:offset]
src_off.masked_fill_(src_off == k, v)
item = srcs if is_tuple else srcs[0]
return item
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import logging
import numpy as np
from fairseq.data import BaseWrapperDataset, plasma_utils
logger = logging.getLogger(__name__)
class ResamplingDataset(BaseWrapperDataset):
"""Randomly samples from a given dataset at each epoch.
Sampling is done with or without replacement, depending on the "replace"
parameter.
Optionally, the epoch size can be rescaled. This is potentially desirable
to increase per-epoch coverage of the base dataset (since sampling with
replacement means that many items in the dataset will be left out). In the
case of sampling without replacement, size_ratio should be strictly less
than 1.
Args:
dataset (~torch.utils.data.Dataset): dataset on which to sample.
weights (List[float]): list of probability weights
(default: None, which corresponds to uniform sampling).
replace (bool): sampling mode; True for "with replacement", or False
for "without replacement" (default: True)
size_ratio (float): the ratio to subsample to; must be positive
(default: 1.0).
batch_by_size (bool): whether or not to batch by sequence length
(default: True).
seed (int): RNG seed to use (default: 0).
epoch (int): starting epoch number (default: 1).
"""
def __init__(
self,
dataset,
weights=None,
replace=True,
size_ratio=1.0,
batch_by_size=True,
seed=0,
epoch=1,
):
super().__init__(dataset)
if weights is None:
self.weights = None
else:
assert len(weights) == len(dataset)
weights_arr = np.array(weights, dtype=np.float64)
weights_arr /= weights_arr.sum()
self.weights = plasma_utils.PlasmaArray(weights_arr)
self.replace = replace
assert size_ratio > 0.0
if not self.replace:
assert size_ratio < 1.0
self.size_ratio = float(size_ratio)
self.actual_size = np.ceil(len(dataset) * self.size_ratio).astype(int)
self.batch_by_size = batch_by_size
self.seed = seed
self._cur_epoch = None
self._cur_indices = None
self.set_epoch(epoch)
def __getitem__(self, index):
return self.dataset[self._cur_indices.array[index]]
def __len__(self):
return self.actual_size
@property
def sizes(self):
if isinstance(self.dataset.sizes, list):
return [s[self._cur_indices.array] for s in self.dataset.sizes]
return self.dataset.sizes[self._cur_indices.array]
def num_tokens(self, index):
return self.dataset.num_tokens(self._cur_indices.array[index])
def size(self, index):
return self.dataset.size(self._cur_indices.array[index])
def ordered_indices(self):
if self.batch_by_size:
order = [
np.arange(len(self)),
self.sizes,
] # No need to handle `self.shuffle == True`
return np.lexsort(order)
else:
return np.arange(len(self))
def prefetch(self, indices):
self.dataset.prefetch(self._cur_indices.array[indices])
@property
def can_reuse_epoch_itr_across_epochs(self):
return False
def set_epoch(self, epoch):
logger.debug("ResamplingDataset.set_epoch: {}".format(epoch))
super().set_epoch(epoch)
if epoch == self._cur_epoch:
return
self._cur_epoch = epoch
# Generate a weighted sample of indices as a function of the
# random seed and the current epoch.
rng = np.random.RandomState(
[
42, # magic number
self.seed % (2 ** 32), # global seed
self._cur_epoch, # epoch index
]
)
self._cur_indices = plasma_utils.PlasmaArray(
rng.choice(
len(self.dataset),
self.actual_size,
replace=self.replace,
p=(None if self.weights is None else self.weights.array),
)
)
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import torch
from . import BaseWrapperDataset
class RollDataset(BaseWrapperDataset):
def __init__(self, dataset, shifts):
super().__init__(dataset)
self.shifts = shifts
def __getitem__(self, index):
item = self.dataset[index]
return torch.roll(item, self.shifts)
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from collections import OrderedDict
import numpy as np
from . import FairseqDataset
class RoundRobinZipDatasets(FairseqDataset):
"""Zip multiple :class:`~fairseq.data.FairseqDataset` instances together.
Shorter datasets are repeated in a round-robin fashion to match the length
of the longest one.
Args:
datasets (Dict[~fairseq.data.FairseqDataset]): a dictionary of
:class:`~fairseq.data.FairseqDataset` instances.
eval_key (str, optional): a key used at evaluation time that causes
this instance to pass-through batches from *datasets[eval_key]*.
"""
def __init__(self, datasets, eval_key=None):
super().__init__()
assert isinstance(datasets, OrderedDict)
self.datasets = datasets
self.eval_key = eval_key
self.longest_dataset = None
self.longest_dataset_key = None
for key, dataset in datasets.items():
assert isinstance(dataset, FairseqDataset)
if self.longest_dataset is None or len(dataset) > len(self.longest_dataset):
self.longest_dataset = dataset
self.longest_dataset_key = key
self._ordered_indices = None
def _map_index(self, key, index):
assert (
self._ordered_indices is not None
), "Must call RoundRobinZipDatasets.ordered_indices() first"
return self._ordered_indices[key][index % len(self.datasets[key])]
def __getitem__(self, index):
if self.eval_key is None:
return OrderedDict(
[
(key, dataset[self._map_index(key, index)])
for key, dataset in self.datasets.items()
]
)
else:
# at evaluation time it's useful to pass-through batches from a single key
return self.datasets[self.eval_key][self._map_index(self.eval_key, index)]
def __len__(self):
return len(self.longest_dataset)
def collater(self, samples):
"""Merge a list of samples to form a mini-batch."""
if len(samples) == 0:
return None
if self.eval_key is None:
return OrderedDict(
[
(key, dataset.collater([sample[key] for sample in samples]))
for key, dataset in self.datasets.items()
]
)
else:
# at evaluation time it's useful to pass-through batches from a single key
return self.datasets[self.eval_key].collater(samples)
def num_tokens(self, index):
"""Return an example's length (number of tokens), used for batching."""
# TODO make it configurable whether to use max() or sum() here
return max(
dataset.num_tokens(self._map_index(key, index))
for key, dataset in self.datasets.items()
)
def size(self, index):
"""Return an example's size as a float or tuple. This value is used when
filtering a dataset with ``--max-positions``."""
return {
key: dataset.size(self._map_index(key, index))
for key, dataset in self.datasets.items()
}
def ordered_indices(self):
"""Ordered indices for batching."""
if self._ordered_indices is None:
# Call the underlying dataset's ordered_indices() here, so that we
# get the same random ordering as we would have from using the
# underlying dataset directly.
self._ordered_indices = OrderedDict(
[
(key, dataset.ordered_indices())
for key, dataset in self.datasets.items()
]
)
return np.arange(len(self))
@property
def supports_prefetch(self):
return all(
getattr(dataset, "supports_prefetch", False)
for dataset in self.datasets.values()
)
def prefetch(self, indices):
for key, dataset in self.datasets.items():
dataset.prefetch([self._map_index(key, index) for index in indices])
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import numpy as np
from fairseq.data import data_utils
from . import BaseWrapperDataset
class TruncateDataset(BaseWrapperDataset):
"""Truncate a sequence by returning the first truncation_length tokens"""
def __init__(self, dataset, truncation_length):
super().__init__(dataset)
assert truncation_length is not None
self.truncation_length = truncation_length
self.dataset = dataset
def __getitem__(self, index):
item = self.dataset[index]
item_len = item.size(0)
if item_len > self.truncation_length:
item = item[: self.truncation_length]
return item
@property
def sizes(self):
return np.minimum(self.dataset.sizes, self.truncation_length)
def __len__(self):
return len(self.dataset)
class RandomCropDataset(TruncateDataset):
"""Truncate a sequence by returning a random crop of truncation_length tokens"""
def __init__(self, dataset, truncation_length, seed=1):
super().__init__(dataset, truncation_length)
self.seed = seed
self.epoch = 0
@property
def can_reuse_epoch_itr_across_epochs(self):
return True # only the crop changes, not item sizes
def set_epoch(self, epoch, **unused):
super().set_epoch(epoch)
self.epoch = epoch
def __getitem__(self, index):
with data_utils.numpy_seed(self.seed, self.epoch, index):
item = self.dataset[index]
item_len = item.size(0)
excess = item_len - self.truncation_length
if excess > 0:
start_idx = np.random.randint(0, excess)
item = item[start_idx : start_idx + self.truncation_length]
return item
def maybe_shorten_dataset(
dataset,
split,
shorten_data_split_list,
shorten_method,
tokens_per_sample,
seed,
):
truncate_split = (
split in shorten_data_split_list.split(",") or len(shorten_data_split_list) == 0
)
if shorten_method == "truncate" and truncate_split:
dataset = TruncateDataset(dataset, tokens_per_sample)
elif shorten_method == "random_crop" and truncate_split:
dataset = RandomCropDataset(dataset, tokens_per_sample, seed)
return dataset
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment