Commit 73557d95 authored by yuguo960516's avatar yuguo960516
Browse files

glm

parents
Pipeline #148 failed with stages
in 0 seconds
# coding=utf-8
# Copyright 2021 The OneFlow Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import sys
def default_argument_parser(epilog=None):
"""Create a parser with some common arguments used by libai users.
Args:
epilog (str): epilog passed to ArgumentParser describing the usage.
Returns:
argparse.ArgumentParser.
"""
parser = argparse.ArgumentParser(
epilog=epilog
or f"""
Examples:
Run on single machine:
$ python3 -m oneflow.distributed.launch \
--nproc_per_node 8 --nnodes 1 --node_rank 0 --master_addr 127.0.0.1 --master_port 12345 \
{sys.argv[0]} --config-file cfg.yaml
Change some config options:
$ python3 -m oneflow.distributed.launch \
--nproc_per_node 8 --nnodes 1 --node_rank 0 --master_addr 127.0.0.1 --master_port 12345 \
{sys.argv[0]} --config-file cfg.yaml train.load_weight=/path/to/weight.pth optim.lr=0.001
Run on multiple machines:
(machine0)$ python3 -m oneflow.distributed.launch \
--nproc_per_node 8 --nnodes 2 --node_rank 0 --master_addr <URL> --master_port 12345 \
{sys.argv[0]} --config-file cfg.yaml
$ python3 -m oneflow.distributed.launch \
--nproc_per_node 8 --nnodes 2 --node_rank 1 --master_addr <URL> --master_port 12345 \
{sys.argv[0]} --config-file cfg.yaml
""",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("--config-file", default="", metavar="FILE", help="path to config file")
parser.add_argument(
"--resume",
action="store_true",
help="Whether to attempt to resume from the checkpoint directory. "
"See documentation of `DefaultTrainer.resume_or_load()` for what it means.",
)
parser.add_argument("--eval-only", action="store_true", help="Perform evaluation only")
parser.add_argument(
"--fast-dev-run",
action="store_true",
help="Run several batches of train, eval and test to find any bugs, "
"(ie: a sort of unit test)",
)
parser.add_argument(
"opts",
help="""
Modify config options at the end of the command. For Yacs configs, use
space-separated "path.key value" pairs.
For python-based LazyConfig, use "path.key=value".
""".strip(),
default=None,
nargs=argparse.REMAINDER,
)
return parser
# coding=utf-8
# Copyright 2021 The OneFlow Authors. All rights reserved.
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import inspect
import os
import pkg_resources
from omegaconf import OmegaConf
from .lazy import LazyConfig
# --------------------------------------------------------
# References:
# https://github.com/facebookresearch/detectron2/blob/main/detectron2/config/config.py
# --------------------------------------------------------
def configurable(init_func=None, *, from_config=None):
"""
Decorate a function or a class's __init__ method so that it can be called
with a :class:`CfgNode` object using a :func:`from_config` function that translates
:class:`CfgNode` to arguments.
Examples:
.. code-block:: python
# Usage 1: Decorator on __init__:
class A:
@configurable
def __init__(self, a, b=2, c=3):
pass
@classmethod
def from_config(cls, cfg): # 'cfg' must be the first argument
# Returns kwargs to be passed to __init__
return {"a": cfg.A, "b": cfg.B}
a1 = A(a=1, b=2) # regular construction
a2 = A(cfg) # construct with a cfg
a3 = A(cfg, b=3, c=4) # construct with extra overwrite
# Usage 2: Decorator on any function. Needs an extra from_config argument:
@configurable(from_config=lambda cfg: {"a: cfg.A, "b": cfg.B})
def a_func(a, b=2, c=3):
pass
a1 = a_func(a=1, b=2) # regular call
a2 = a_func(cfg) # call with a cfg
a3 = a_func(cfg, b=3, c=4) # call with extra overwrite
Args:
init_func (callable): a class's ``__init__`` method in usage 1. The
class must have a ``from_config`` classmethod which takes `cfg` as
the first argument.
from_config (callable): the from_config function in usage 2. It must take `cfg`
as its first argument.
"""
if init_func is not None:
assert (
inspect.isfunction(init_func)
and from_config is None
and init_func.__name__ == "__init__"
), "Incorrect use of @configurable. Check API documentation for examples."
@functools.wraps(init_func)
def wrapped(self, *args, **kwargs):
try:
from_config_func = type(self).from_config
except AttributeError as e:
raise AttributeError(
"Class with @configurable must have a 'from_config' classmethod."
) from e
if not inspect.ismethod(from_config_func):
raise TypeError("Class with @configurable must have a 'from_config' classmethod.")
if _called_with_cfg(*args, **kwargs):
explicit_args = _get_args_from_config(from_config_func, *args, **kwargs)
init_func(self, **explicit_args)
else:
init_func(self, *args, **kwargs)
return wrapped
else:
if from_config is None:
return configurable # @configurable() is made equivalent to @configurable
assert inspect.isfunction(
from_config
), "from_config argument of configurable must be a function!"
def wrapper(orig_func):
@functools.wraps(orig_func)
def wrapped(*args, **kwargs):
if _called_with_cfg(*args, **kwargs):
explicit_args = _get_args_from_config(from_config, *args, **kwargs)
return orig_func(**explicit_args)
else:
return orig_func(*args, **kwargs)
wrapped.from_config = from_config
return wrapped
return wrapper
def _get_args_from_config(from_config_func, *args, **kwargs):
"""
Use `from_config` to obtain explicit arguments.
Returns:
dict: arguments to be used for cls.__init__
"""
signature = inspect.signature(from_config_func)
if list(signature.parameters.keys())[0] != "cfg":
if inspect.isfunction(from_config_func):
name = from_config_func.__name__
else:
name = f"{from_config_func.__self__}.from_config"
raise TypeError(f"{name} must take 'cfg' as the first argument!")
support_var_arg = any(
param.kind in [param.VAR_POSITIONAL, param.VAR_KEYWORD]
for param in signature.parameters.values()
)
if support_var_arg: # forward all arguments to from_config, if from_config accepts them
ret = from_config_func(*args, **kwargs)
else:
# forward supported arguments to from_config
supported_arg_names = set(signature.parameters.keys())
extra_kwargs = {}
for name in list(kwargs.keys()):
if name not in supported_arg_names:
extra_kwargs[name] = kwargs.pop(name)
ret = from_config_func(*args, **kwargs)
# forward the other arguments to __init__
ret.update(extra_kwargs)
return ret
def _called_with_cfg(*args, **kwargs):
"""
Returns:
bool: whether the arguments contain CfgNode and should be considered
forwarded to from_config.
"""
from omegaconf import DictConfig
if len(args) and isinstance(args[0], DictConfig):
return True
if isinstance(kwargs.pop("cfg", None), DictConfig):
return True
# `from_config`'s first argument is forced to be "cfg".
# So the above check covers all cases.
return False
def try_get_key(cfg, *keys, default=None):
"""
Try select keys from cfg until the first key that exists. Otherwise return default.
"""
for k in keys:
none = object()
p = OmegaConf.select(cfg, k, default=none)
if p is not none:
return p
return default
def get_config(config_path):
"""
Returns a config object from a config_path.
Args:
config_path (str): config file name relative to libai's "configs/"
directory, e.g., "common/models/bert.py"
Returns:
omegaconf.DictConfig: a config object
"""
cfg_file = pkg_resources.resource_filename("libai.config", os.path.join("configs", config_path))
if not os.path.exists(cfg_file):
raise RuntimeError("{} not available in LiBai configs!".format(config_path))
cfg = LazyConfig.load(cfg_file)
return cfg
# coding=utf-8
# Copyright 2021 The OneFlow Authors. All rights reserved.
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import dataclasses
import logging
from collections import abc
from enum import Enum
from typing import Any, Callable, Dict, List, Union
from hydra.errors import InstantiationException
from omegaconf import OmegaConf
from libai.config.lazy import _convert_target_to_string, locate
logger = logging.getLogger(__name__)
__all__ = ["dump_dataclass", "instantiate"]
# --------------------------------------------------------
# References:
# https://github.com/facebookresearch/detectron2/blob/main/detectron2/config/instantiate.py
# --------------------------------------------------------
class _Keys(str, Enum):
"""Special keys in configs used by instantiate."""
TARGET = "_target_"
RECURSIVE = "_recursive_"
def _is_target(x: Any) -> bool:
if isinstance(x, dict):
return _Keys.TARGET in x
if OmegaConf.is_dict(x):
return _Keys.TARGET in x
return False
def _is_dict(cfg: Any) -> bool:
return OmegaConf.is_dict(cfg) or isinstance(cfg, abc.Mapping)
def _is_list(cfg: Any) -> bool:
return OmegaConf.is_list(cfg) or isinstance(cfg, list)
def dump_dataclass(obj: Any):
"""
Dump a dataclass recursively into a dict that can be later instantiated.
Args:
obj: a dataclass object
Returns:
dict
"""
assert dataclasses.is_dataclass(obj) and not isinstance(
obj, type
), "dump_dataclass() requires an instance of a dataclass."
ret = {"_target_": _convert_target_to_string(type(obj))}
for f in dataclasses.fields(obj):
v = getattr(obj, f.name)
if dataclasses.is_dataclass(v):
v = dump_dataclass(v)
if isinstance(v, (list, tuple)):
v = [dump_dataclass(x) if dataclasses.is_dataclass(x) else x for x in v]
ret[f.name] = v
return ret
def _prepare_input_dict_or_list(d: Union[Dict[Any, Any], List[Any]]) -> Any:
res: Any
if isinstance(d, dict):
res = {}
for k, v in d.items():
if k == "_target_":
v = _convert_target_to_string(d["_target_"])
elif isinstance(v, (dict, list)):
v = _prepare_input_dict_or_list(v)
res[k] = v
elif isinstance(d, list):
res = []
for v in d:
if isinstance(v, (list, dict)):
v = _prepare_input_dict_or_list(v)
res.append(v)
else:
assert False
return res
def _resolve_target(target):
if isinstance(target, str):
try:
target = locate(target)
except Exception as e:
msg = f"Error locating target '{target}', see chained exception above."
raise InstantiationException(msg) from e
if not callable(target):
msg = f"Expected a callable target, got '{target}' of type '{type(target).__name__}'"
raise InstantiationException(msg)
return target
def _call_target(_target_: Callable[..., Any], kwargs: Dict[str, Any]):
"""Call target (type) with kwargs"""
try:
return _target_(**kwargs)
except Exception as e:
msg = f"Error in call to target '{_convert_target_to_string(_target_)}':\n{repr(e)}"
raise InstantiationException(msg) from e
def instantiate(cfg, **kwargs: Any) -> Any:
"""
Recursively instantiate objects defined in dictionaries by
"_target_" and arguments.
Args:
cfg: a dict-like object with "_target_" that defines the caller, and
other keys that define the arguments
Returns:
object instantiated by cfg
"""
if cfg is None:
return None
if isinstance(cfg, (dict, list)):
cfg = _prepare_input_dict_or_list(cfg)
kwargs = _prepare_input_dict_or_list(kwargs)
if _is_dict(cfg):
if kwargs:
cfg = OmegaConf.merge(cfg, kwargs)
_recursive_ = kwargs.pop(_Keys.RECURSIVE, True)
return instantiate_cfg(cfg, recursive=_recursive_)
elif _is_list(cfg):
_recursive_ = kwargs.pop(_Keys.RECURSIVE, True)
return instantiate_cfg(cfg, recursive=_recursive_)
else:
return cfg # return as-is if don't know what to do
def instantiate_cfg(cfg: Any, recursive: bool = True):
if cfg is None:
return cfg
if _is_dict(cfg):
recursive = cfg[_Keys.RECURSIVE] if _Keys.RECURSIVE in cfg else recursive
if not isinstance(recursive, bool):
msg = f"Instantiation: _recursive_ flag must be a bool, got {type(recursive)}"
raise TypeError(msg)
# If OmegaConf list, create new list of instances if recursive
if OmegaConf.is_list(cfg):
items = [instantiate_cfg(item, recursive=recursive) for item in cfg._iter_ex(resolve=True)]
lst = OmegaConf.create(items, flags={"allow_objects": True})
return lst
elif isinstance(cfg, list):
# Specialize for list, because many classes take
# list[objects] as arguments, such as ResNet, DatasetMapper
return [instantiate(item, recursive=recursive) for item in cfg]
elif _is_dict(cfg):
exclude_keys = set({"_target_", "_recursive_"})
if _is_target(cfg):
_target_ = instantiate(cfg.get(_Keys.TARGET)) # instantiate lazy target
_target_ = _resolve_target(_target_)
kwargs = {}
for key, value in cfg.items():
if key not in exclude_keys:
if recursive:
value = instantiate_cfg(value, recursive=recursive)
kwargs[key] = value
return _call_target(_target_, kwargs)
else:
return cfg
else:
return cfg # return as-is if don't know what to do
# coding=utf-8
# Copyright 2021 The OneFlow Authors. All rights reserved.
# Copyright (c) Facebook, Inc. and its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ast
import builtins
import importlib
import inspect
import logging
import os
import pydoc
import uuid
from collections import abc
from contextlib import contextmanager
from copy import deepcopy
from dataclasses import is_dataclass
from typing import Any, List, Tuple, Union
import cloudpickle
import yaml
from omegaconf import DictConfig, ListConfig, OmegaConf
__all__ = ["LazyCall", "LazyConfig"]
# --------------------------------------------------------
# References:
# https://github.com/facebookresearch/detectron2/blob/main/detectron2/config/lazy.py
# --------------------------------------------------------
def locate(name: str) -> Any:
"""
Locate and return an object ``x`` using an input string ``{x.__module__}.{x.__qualname__}``,
such as "module.submodule.class_name".
Raise Exception if it cannot be found.
"""
obj = pydoc.locate(name)
# Some cases (e.g. flow.optim.sgd.SGD) not handled correctly
# by pydoc.locate. Try a private function from hydra.
if obj is None:
try:
# from hydra.utils import get_method - will print many errors
from hydra.utils import _locate
except ImportError as e:
raise ImportError(f"Cannot dynamically locate object {name}!") from e
else:
obj = _locate(name) # it raises if fails
return obj
def _convert_target_to_string(t: Any) -> str:
"""
Inverse of ``locate()``.
Args:
t: any object with ``__module__`` and ``__qualname__``
"""
module, qualname = t.__module__, t.__qualname__
# Compress the path to this object, e.g. ``module.submodule._impl.class``
# may become ``module.submodule.class``, if the later also resolves to the same
# object. This simplifies the string, and also is less affected by moving the
# class implementation.
module_parts = module.split(".")
for k in range(1, len(module_parts)):
prefix = ".".join(module_parts[:k])
candidate = f"{prefix}.{qualname}"
try:
if locate(candidate) is t:
return candidate
except ImportError:
pass
return f"{module}.{qualname}"
class LazyCall:
"""
Wrap a callable so that when it's called, the call will not be executed,
but returns a dict that describes the call.
LazyCall object has to be called with only keyword arguments. Positional
arguments are not yet supported.
Examples:
.. code-block:: python
from libai.config import instantiate, LazyCall
layer_cfg = LazyCall(nn.Conv2d)(in_channels=32, out_channels=32)
layer_cfg.out_channels = 64 # can edit it afterwards
layer = instantiate(layer_cfg)
"""
def __init__(self, target):
if not (callable(target) or isinstance(target, (str, abc.Mapping))):
raise TypeError(
f"target of LazyCall must be a callable or defines a callable! Got {target}"
)
self._target = target
def __call__(self, **kwargs):
if is_dataclass(self._target):
# omegaconf object cannot hold dataclass type
# https://github.com/omry/omegaconf/issues/784
target = _convert_target_to_string(self._target)
else:
target = self._target
kwargs["_target_"] = target
return DictConfig(content=kwargs, flags={"allow_objects": True})
def _visit_dict_config(cfg, func):
"""
Apply func recursively to all DictConfig in cfg.
"""
if isinstance(cfg, DictConfig):
func(cfg)
for v in cfg.values():
_visit_dict_config(v, func)
elif isinstance(cfg, ListConfig):
for v in cfg:
_visit_dict_config(v, func)
def _validate_py_syntax(filename):
# see also https://github.com/open-mmlab/mmcv/blob/master/mmcv/utils/config.py
with open(filename, "r", encoding="utf-8") as f:
# Setting encoding explicitly to resolve coding issue on windows
content = f.read()
try:
ast.parse(content)
except SyntaxError as e:
raise SyntaxError(f"Config file {filename} has syntax error!") from e
def _cast_to_config(obj):
# if given a dict, return DictConfig instead
if isinstance(obj, dict):
return DictConfig(obj, flags={"allow_objects": True})
return obj
_CFG_PACKAGE_NAME = "libai._cfg_loader"
"""
A namespace to put all imported config into.
"""
def _random_package_name(filename):
# generate a random package name when loading config files
return _CFG_PACKAGE_NAME + str(uuid.uuid4())[:4] + "." + os.path.basename(filename)
@contextmanager
def _patch_import():
"""
Enhance relative import statements in config files, so that they:
1. locate files purely based on relative location, regardless of packages.
e.g. you can import file without having __init__
2. do not cache modules globally; modifications of module states has no side effect
3. support other storage system through PathManager
4. imported dict are turned into omegaconf.DictConfig automatically
"""
old_import = builtins.__import__
def find_relative_file(original_file, relative_import_path, level):
cur_file = os.path.dirname(original_file)
for _ in range(level - 1):
cur_file = os.path.dirname(cur_file)
cur_name = relative_import_path.lstrip(".")
for part in cur_name.split("."):
cur_file = os.path.join(cur_file, part)
# NOTE: directory import is not handled. Because then it's unclear
# if such import should produce python module or DictConfig. This can
# be discussed further if needed.
if not cur_file.endswith(".py"):
cur_file += ".py"
if not os.path.isfile(cur_file):
raise ImportError(
f"Cannot import name {relative_import_path} from "
f"{original_file}: {cur_file} has to exist."
)
return cur_file
def new_import(name, globals=None, locals=None, fromlist=(), level=0):
if (
# Only deal with relative imports inside config files
level != 0
and globals is not None
and (globals.get("__package__", "") or "").startswith(_CFG_PACKAGE_NAME)
):
cur_file = find_relative_file(globals["__file__"], name, level)
_validate_py_syntax(cur_file)
spec = importlib.machinery.ModuleSpec(
_random_package_name(cur_file), None, origin=cur_file
)
module = importlib.util.module_from_spec(spec)
module.__file__ = cur_file
with open(cur_file, "r", encoding="utf-8") as f:
content = f.read()
exec(compile(content, cur_file, "exec"), module.__dict__)
for name in fromlist: # turn imported dict into DictConfig automatically
val = _cast_to_config(module.__dict__[name])
module.__dict__[name] = val
return module
return old_import(name, globals, locals, fromlist=fromlist, level=level)
builtins.__import__ = new_import
yield new_import
builtins.__import__ = old_import
class LazyConfig:
"""
Provide methods to save, load, and overrides an omegaconf config object
which may contain definition of lazily-constructed objects.
"""
@staticmethod
def load_rel(filename: str, keys: Union[None, str, Tuple[str, ...]] = None):
"""
Similar to :meth:`load()`, but load path relative to the caller's
source file.
This has the same functionality as a relative import, except that this method
accepts filename as a string, so more characters are allowed in the filename.
"""
caller_frame = inspect.stack()[1]
caller_fname = caller_frame[0].f_code.co_filename
assert caller_fname != "<string>", "load_rel Unable to find caller"
caller_dir = os.path.dirname(caller_fname)
filename = os.path.join(caller_dir, filename)
return LazyConfig.load(filename, keys)
@staticmethod
def load(filename: str, keys: Union[None, str, Tuple[str, ...]] = None):
"""
Load a config file.
Args:
filename: absolute path or relative path w.r.t. the current working directory
keys: keys to load and return. If not given, return all keys
(whose values are config objects) in a dict.
"""
has_keys = keys is not None
filename = filename.replace("/./", "/") # redundant
if os.path.splitext(filename)[1] not in [".py", ".yaml", ".yml"]:
raise ValueError(f"Config file {filename} has to be a python or yaml file.")
if filename.endswith(".py"):
_validate_py_syntax(filename)
with _patch_import():
# Record the filename
module_namespace = {
"__file__": filename,
"__package__": _random_package_name(filename),
}
with open(filename, "r", encoding="utf-8") as f:
content = f.read()
# Compile first with filename to:
# 1. make filename appears in stacktrace
# 2. make load_rel able to find its parent's (possibly remote) location
exec(compile(content, filename, "exec"), module_namespace)
ret = module_namespace
else:
with open(filename, "r", encoding="utf-8") as f:
obj = yaml.unsafe_load(f)
ret = OmegaConf.create(obj, flags={"allow_objects": True})
if has_keys:
if isinstance(keys, str):
return _cast_to_config(ret[keys])
else:
return tuple(_cast_to_config(ret[a]) for a in keys)
else:
if filename.endswith(".py"):
# when not specified, only load those that are config objects
ret = DictConfig(
{
name: _cast_to_config(value)
for name, value in ret.items()
if isinstance(value, (DictConfig, ListConfig, dict))
and not name.startswith("_")
},
flags={"allow_objects": True},
)
return ret
@staticmethod
def save(cfg, filename: str):
"""
Save a config object to a yaml file.
Note that when the config dictionary contains complex objects (e.g. lambda),
it can't be saved to yaml. In that case we will print an error and
attempt to save to a pkl file instead.
Args:
cfg: an omegaconf config object
filename: yaml file name to save the config file
"""
logger = logging.getLogger(__name__)
try:
cfg = deepcopy(cfg)
except Exception:
pass
else:
# if it's deep-copyable, then...
def _replace_type_by_name(x):
if "_target_" in x and callable(x._target_):
try:
x._target_ = _convert_target_to_string(x._target_)
except AttributeError:
pass
# not necessary, but makes yaml looks nicer
_visit_dict_config(cfg, _replace_type_by_name)
save_pkl = False
try:
dict = OmegaConf.to_container(cfg, resolve=False)
dumped = yaml.dump(dict, default_flow_style=None, allow_unicode=True, width=9999)
with open(filename, "w") as f:
f.write(dumped)
try:
_ = yaml.unsafe_load(dumped) # test that it is loadable
except Exception:
logger.warning(
"The config contains objects that cannot serialize to a valid yaml. "
f"{filename} is human-readable but cannot be loaded."
)
save_pkl = True
except Exception:
logger.exception("Unable to serialize the config to yaml. Error:")
save_pkl = True
if save_pkl:
new_filename = filename + ".pkl"
try:
# retry by pickle
with open(new_filename, "wb") as f:
cloudpickle.dump(cfg, f)
logger.warning(f"Config is saved using cloudpickle at {new_filename}.")
except Exception:
pass
@staticmethod
def apply_overrides(cfg, overrides: List[str]):
"""
In-place override contents of cfg.
Args:
cfg: an omegaconf config object
overrides: list of strings in the format of "a=b" to override configs.
See https://hydra.cc/docs/next/advanced/override_grammar/basic/ for syntax.
Returns:
the cfg object
"""
def safe_update(cfg, key, value):
parts = key.split(".")
for idx in range(1, len(parts)):
prefix = ".".join(parts[:idx])
v = OmegaConf.select(cfg, prefix, default=None)
if v is None:
break
if not OmegaConf.is_config(v):
raise KeyError(
f"Trying to update key {key}, but {prefix} "
f"is not a config, but has type {type(v)}."
)
OmegaConf.update(cfg, key, value, merge=True)
from hydra.core.override_parser.overrides_parser import OverridesParser
parser = OverridesParser.create()
overrides = parser.parse_overrides(overrides)
for o in overrides:
key = o.key_or_group
value = o.value()
if o.is_delete():
# TODO support this
raise NotImplementedError("deletion is not yet a supported override")
safe_update(cfg, key, value)
return cfg
@staticmethod
def to_py(cfg, prefix: str = "cfg."):
"""
Try to convert a config object into Python-like pseudo code.
Note that perfect conversion is not always possible. So the returned
results are mainly meant to be human-readable, and not meant to be executed.
Args:
cfg: an omegaconf config object
prefix: root name for the resulting code (default: "cfg.")
Returns:
str of formatted Python code
"""
import black
cfg = OmegaConf.to_container(cfg, resolve=True)
def _to_str(obj, prefix=None, inside_call=False):
if prefix is None:
prefix = []
if isinstance(obj, abc.Mapping) and "_target_" in obj:
# Dict representing a function call
target = _convert_target_to_string(obj.pop("_target_"))
args = []
for k, v in sorted(obj.items()):
args.append(f"{k}={_to_str(v, inside_call=True)}")
args = ", ".join(args)
call = f"{target}({args})"
return "".join(prefix) + call
elif isinstance(obj, abc.Mapping) and not inside_call:
# Dict that is not inside a call is a list of top-level config objects that we
# render as one object per line with dot separated prefixes
key_list = []
for k, v in sorted(obj.items()):
if isinstance(v, abc.Mapping) and "_target_" not in v:
key_list.append(_to_str(v, prefix=prefix + [k + "."]))
else:
key = "".join(prefix) + k
key_list.append(f"{key}={_to_str(v)}")
return "\n".join(key_list)
elif isinstance(obj, abc.Mapping):
# Dict that is inside a call is rendered as a regular dict
return (
"{"
+ ",".join(
f"{repr(k)}: {_to_str(v, inside_call=inside_call)}"
for k, v in sorted(obj.items())
)
+ "}"
)
elif isinstance(obj, list):
return "[" + ",".join(_to_str(x, inside_call=inside_call) for x in obj) + "]"
else:
return repr(obj)
py_str = _to_str(cfg, prefix=[prefix])
try:
return black.format_str(py_str, mode=black.Mode())
except black.InvalidInput:
return py_str
# coding=utf-8
# Copyright 2021 The OneFlow Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .structures import DistTensorData, Instance
from .build import (
build_image_train_loader,
build_image_test_loader,
build_nlp_train_val_test_loader,
build_nlp_test_loader,
)
# coding=utf-8
# Copyright 2021 The OneFlow Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from omegaconf import OmegaConf
from oneflow.utils.data import DataLoader
from oneflow.utils.data.dataset import ConcatDataset
from libai.config import LazyCall, instantiate
from libai.utils import distributed as dist
from .data_utils import get_train_valid_test_split_
from .samplers import CyclicSampler, SingleRoundSampler
from .structures import Instance
def build_nlp_train_val_test_loader(
dataset,
splits,
weights,
train_val_test_num_samples,
train_batch_size,
test_batch_size,
train_sampler=LazyCall(CyclicSampler)(shuffle=True),
test_sampler=LazyCall(SingleRoundSampler)(shuffle=False, drop_last=False),
num_workers=4,
consumed_samples=0,
seed=0,
collate_fn=None,
dataset_mixer=ConcatDataset,
):
"""
Build nlp train_val_test dataloader, used for dataset lack of valid/test dataset
Returns:
It will return train/valid/test dataloader
* train_loader: dataloader for training
* valid_loader: dataloader for validation
* test_loader: dataloader for testing
Arguments:
dataset: dataset from which to load the data. e.g.: dataset or [dataset1, dataset2, ...]
splits: ratio config for spliting dataset to train/valid/test. e.g.: [[7, 2, 1], ...]
weights: ratio config for concate dataset list (Not Supported yet). e.g.: [1.0, ...]
train_batch_size: how many samples per batch to load in training (micro-batch-size per GPU).
test_batch_size: how many samples per batch to load in testing (micro-batch-size per GPU).
sampler: defines the strategy to draw
samples from the dataset. Can be any ``Iterable`` with ``__len__``
implemented.
num_workers: how many subprocesses to use for data
loading. ``0`` means that the data will be loaded in the main process.
(default: ``4``).
consumed_samples: the number of samples that have been trained at the current time,
used for resuming training (default: ``0``).
seed: random seed, used for reproducing experiments (default: ``0``).
collate_fn: merges a list of samples to form a
mini-batch of Tensor(s). Used when using batched loading from a
map-style dataset.
dataset_mixer: function for concating list dataset.
"""
def build_dataset(index, dataset):
doc_idx_ptr = indexed_dataset.get_doc_idx()
start_index = ds_splits[index]
end_index = ds_splits[index + 1] + 1
indexed_dataset.set_doc_idx(doc_idx_ptr[start_index:end_index])
dataset.indexed_dataset = indexed_dataset
dataset.max_num_samples = train_val_test_num_samples[index]
dataset = instantiate(dataset)
# Set the original pointer so dataset remains the main dataset.
indexed_dataset.set_doc_idx(doc_idx_ptr)
# check
assert indexed_dataset.doc_idx[0] == 0
assert indexed_dataset.doc_idx.shape[0] == (total_num_of_documents + 1)
return dataset
if OmegaConf.is_list(dataset):
dataset = list(dataset)
elif not isinstance(dataset, list):
dataset = [dataset]
assert len(dataset) == len(splits), "datasets length must equal splits length"
assert len(dataset) == len(weights), "datasets length must equal weights length"
train_datasets, val_datasets, test_datasets = [], [], []
for dst, split in zip(dataset, splits):
indexed_dataset = instantiate(dst.indexed_dataset)
total_num_of_documents = indexed_dataset.doc_idx.shape[0] - 1
ds_splits = get_train_valid_test_split_(total_num_of_documents, split)
train_dataset = build_dataset(0, dst)
val_dataset = build_dataset(1, dst)
test_dataset = build_dataset(2, dst)
train_datasets.append(train_dataset)
val_datasets.append(val_dataset)
test_datasets.append(test_dataset)
# [dataset, dataset] -> dataset -> dataloader
train_dataset = dataset_mixer(train_datasets)
val_dataset = dataset_mixer(val_datasets)
test_dataset = dataset_mixer(test_datasets)
collate_fn = trivial_batch_collator if collate_fn is None else collate_fn
train_loader, _, _ = build_nlp_train_loader(
dataset=train_dataset,
train_batch_size=train_batch_size,
test_batch_size=None,
sampler=train_sampler,
num_workers=num_workers,
consumed_samples=consumed_samples,
seed=seed,
collate_fn=collate_fn,
)
valid_loader = build_nlp_test_loader(
dataset=val_dataset,
test_batch_size=test_batch_size,
sampler=test_sampler,
num_workers=num_workers,
seed=seed,
collate_fn=collate_fn,
)
test_loader = build_nlp_test_loader(
dataset=test_dataset,
test_batch_size=test_batch_size,
sampler=test_sampler,
num_workers=num_workers,
seed=seed,
collate_fn=collate_fn,
)
return train_loader, valid_loader, test_loader
def build_nlp_train_loader(
dataset,
train_batch_size,
test_batch_size=None,
sampler=LazyCall(CyclicSampler)(shuffle=True),
num_workers=4,
consumed_samples=0,
seed=0,
collate_fn=None,
dataset_mixer=ConcatDataset,
**kwargs
):
"""
Build nlp train dataloader, it's used for train dataset
Returns:
It will return train dataloader, and Nonetype for valid/test dataloader
* train_loader: dataloader for training
* None: Nonetype
* None: Nonetype
Arguments:
dataset: dataset from which to load the data. e.g.: dataset or [dataset1, dataset2, ...]
train_batch_size: how many samples per batch to load in training (micro-batch-size per GPU).
test_batch_size: no use, set it to None.
sampler: defines the strategy to draw
samples from the dataset. Can be any ``Iterable`` with ``__len__``
implemented.
num_workers: how many subprocesses to use for data
loading. ``0`` means that the data will be loaded in the main process.
(default: ``4``).
consumed_samples: the number of samples that have been trained at the current time,
used for resuming training (default: ``0``).
seed: random seed, used for reproducing experiments (default: ``0``).
collate_fn: merges a list of samples to form a
mini-batch of Tensor(s). Used when using batched loading from a
map-style dataset.
dataset_mixer: function for concating list dataset.
"""
dataset = instantiate(dataset)
if OmegaConf.is_list(dataset):
dataset = list(dataset)
elif not isinstance(dataset, list):
dataset = [dataset]
if len(dataset) > 1:
dataset = dataset_mixer(dataset)
else:
dataset = dataset[0]
sampler.dataset = dataset
sampler.micro_batch_size = train_batch_size
sampler.consumed_samples = consumed_samples
sampler.data_parallel_rank = dist.get_data_parallel_rank()
sampler.data_parallel_size = dist.get_data_parallel_size()
sampler.seed = seed
sampler = instantiate(sampler)
dataloader = DataLoader(
dataset,
batch_sampler=sampler,
num_workers=num_workers,
persistent_workers=True if num_workers > 0 else False,
collate_fn=trivial_batch_collator if collate_fn is None else collate_fn,
**kwargs,
)
return dataloader, None, None
def build_nlp_test_loader(
dataset,
test_batch_size,
sampler=LazyCall(SingleRoundSampler)(shuffle=False, drop_last=False),
num_workers=4,
seed=0,
collate_fn=None,
):
"""
Build nlp test dataloader, it's used for test dataset
Returns:
It will return test dataloader
* test_loader: dataloader for testing
Arguments:
dataset: dataset from which to load the data. e.g.: dataset or [dataset1, dataset2, ...]
test_batch_size: how many samples per batch to load in testing (micro-batch-size per GPU).
sampler: defines the strategy to draw
samples from the dataset. Can be any ``Iterable`` with ``__len__``
implemented.
num_workers: how many subprocesses to use for data
loading. ``0`` means that the data will be loaded in the main process.
(default: ``4``).
seed: random seed, used for reproducing experiments (default: ``0``).
collate_fn: merges a list of samples to form a
mini-batch of Tensor(s). Used when using batched loading from a
map-style dataset.
"""
dataset = instantiate(dataset)
collate_fn = trivial_batch_collator if collate_fn is None else collate_fn
sampler.dataset = dataset
sampler.micro_batch_size = test_batch_size
sampler.data_parallel_rank = dist.get_data_parallel_rank()
sampler.data_parallel_size = dist.get_data_parallel_size()
sampler.seed = seed
sampler = instantiate(sampler)
test_loader = DataLoader(
dataset,
batch_sampler=sampler,
num_workers=num_workers,
persistent_workers=True if num_workers > 0 else False,
collate_fn=collate_fn,
)
return test_loader
def build_image_train_loader(
dataset,
train_batch_size,
test_batch_size=None,
sampler=LazyCall(CyclicSampler)(shuffle=True),
num_workers=4,
consumed_samples=0,
seed=0,
collate_fn=None,
dataset_mixer=ConcatDataset,
mixup_func=None,
**kwargs
):
"""
Build image train dataloader, it's used for train dataset
Returns:
It will return train dataloader, and Nonetype for valid/test dataloader
* train_loader: dataloader for training
* None: Nonetype
* None: Nonetype
Arguments:
dataset: dataset from which to load the data. e.g.: dataset or [dataset1, dataset2, ...]
train_batch_size: how many samples per batch to load in training (micro-batch-size per GPU).
test_batch_size: no use, set it to None.
sampler: defines the strategy to draw
samples from the dataset. Can be any ``Iterable`` with ``__len__``
implemented.
num_workers: how many subprocesses to use for data
loading. ``0`` means that the data will be loaded in the main process.
(default: ``4``).
consumed_samples: the number of samples that have been trained at the current time,
used for resuming training (default: ``0``).
seed: random seed, used for reproducing experiments (default: ``0``).
collate_fn: merges a list of samples to form a
mini-batch of Tensor(s). Used when using batched loading from a
map-style dataset.
dataset_mixer: function for concating list dataset.
mixup_func: function for data argumentation.
"""
dataset = instantiate(dataset)
if OmegaConf.is_list(dataset):
dataset = list(dataset)
elif not isinstance(dataset, list):
dataset = [dataset]
if len(dataset) > 1:
dataset = dataset_mixer(dataset)
else:
dataset = dataset[0]
sampler.dataset = dataset
sampler.micro_batch_size = train_batch_size
sampler.consumed_samples = consumed_samples
sampler.data_parallel_rank = dist.get_data_parallel_rank()
sampler.data_parallel_size = dist.get_data_parallel_size()
sampler.seed = seed
sampler = instantiate(sampler)
dataloader = DataLoader(
dataset,
batch_sampler=sampler,
num_workers=num_workers,
persistent_workers=True if num_workers > 0 else False,
collate_fn=trivial_batch_collator if collate_fn is None else collate_fn,
**kwargs,
)
# Bind up mixup_func to dataloader, and this will be used in Trainer.get_batch
dataloader.mixup_func = instantiate(mixup_func)
return dataloader, None, None
def build_image_test_loader(
dataset,
test_batch_size,
sampler=LazyCall(SingleRoundSampler)(shuffle=True, drop_last=False),
num_workers=4,
seed=0,
collate_fn=None,
**kwargs
):
"""
Build image test dataloader, used for test dataset
Returns:
It will return test dataloader
* test_loader: dataloader for testing
Arguments:
dataset: dataset from which to load the data. e.g.: dataset or [dataset1, dataset2, ...]
test_batch_size: how many samples per batch to load in testing (micro-batch-size per GPU).
sampler: defines the strategy to draw
samples from the dataset. Can be any ``Iterable`` with ``__len__``
implemented.
num_workers: how many subprocesses to use for data
loading. ``0`` means that the data will be loaded in the main process.
(default: ``4``).
seed: random seed, used for reproducing experiments (default: ``0``).
collate_fn: merges a list of samples to form a
mini-batch of Tensor(s). Used when using batched loading from a
map-style dataset.
"""
dataset = instantiate(dataset)
sampler.dataset = dataset
sampler.micro_batch_size = test_batch_size
sampler.data_parallel_rank = dist.get_data_parallel_rank()
sampler.data_parallel_size = dist.get_data_parallel_size()
sampler.seed = seed
sampler = instantiate(sampler)
return DataLoader(
dataset,
batch_sampler=sampler,
num_workers=num_workers,
persistent_workers=True if num_workers > 0 else False,
collate_fn=trivial_batch_collator if collate_fn is None else collate_fn,
**kwargs,
)
def trivial_batch_collator(batch):
assert isinstance(batch[0], Instance), "batch[0] must be `instance` for trivial batch collator"
batch = Instance.stack(batch)
return batch
CXXFLAGS += -O3 -Wall -shared -std=c++11 -fPIC -fdiagnostics-color
CPPFLAGS += $(shell python3 -m pybind11 --includes)
LIBNAME = helpers
LIBEXT = $(shell python3-config --extension-suffix)
default: $(LIBNAME)$(LIBEXT)
%$(LIBEXT): %.cpp
$(CXX) $(CXXFLAGS) $(CPPFLAGS) $< -o $@
\ No newline at end of file
# coding=utf-8
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .dataset_utils import (
compile_helper,
create_masked_lm_predictions,
get_samples_mapping,
get_train_valid_test_split_,
)
from .indexed_dataset import (
IndexedCachedDataset,
IndexedDataset,
MMapIndexedDataset,
get_indexed_dataset,
)
# coding=utf-8
# Copyright 2018 The Google AI Language Team Authors, and NVIDIA.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import logging
import os
import re
import time
import numpy as np
import oneflow as flow
from libai.utils import distributed as dist
logger = logging.getLogger(__name__)
# Most of the code here has been copied from:
# https://github.com/google-research/albert/blob/master/create_pretraining_data.py
# with some modifications.
def compile_helper():
"""Compile helper function at runtime. Make sure this
is invoked on a single process."""
import os
import subprocess
path = os.path.abspath(os.path.dirname(__file__))
ret = subprocess.run(["make", "-C", path])
if ret.returncode != 0:
logger.info("Making C++ dataset helpers module failed, exiting.")
import sys
sys.exit(1)
MaskedLmInstance = collections.namedtuple("MaskedLmInstance", ["index", "label"])
def is_start_piece(piece):
"""Check if the current word piece is the starting piece (BERT)."""
# When a word has been split into
# WordPieces, the first token does not have any marker and any subsequence
# tokens are prefixed with ##. So whenever we see the ## token, we
# append it to the previous set of word indexes.
return not piece.startswith("##")
def create_masked_lm_predictions(
tokenizer,
tokens,
vocab_id_list,
vocab_id_to_token_dict,
masked_lm_prob,
cls_id,
sep_id,
mask_id,
max_predictions_per_seq,
np_rng,
max_ngrams=3,
do_whole_word_mask=True,
favor_longer_ngram=False,
do_permutation=False,
geometric_dist=False,
masking_style="bert",
):
"""Creates the predictions for the masked LM objective.
Note: Tokens here are vocab ids and not text tokens."""
cand_indexes = []
# Note(mingdachen): We create a list for recording if the piece is
# the starting piece of current token, where 1 means true, so that
# on-the-fly whole word masking is possible.
token_boundary = [0] * len(tokens)
for (i, token) in enumerate(tokens):
if token == cls_id or token == sep_id:
token_boundary[i] = 1
continue
# Whole Word Masking means that if we mask all of the wordpieces
# corresponding to an original word.
#
# Note that Whole Word Masking does *not* change the training code
# at all -- we still predict each WordPiece independently, softmaxed
# over the entire vocabulary.
if (
do_whole_word_mask
and len(cand_indexes) >= 1
and not is_start_piece(vocab_id_to_token_dict[token])
):
cand_indexes[-1].append(i)
else:
cand_indexes.append([i])
if is_start_piece(vocab_id_to_token_dict[token]):
token_boundary[i] = 1
output_tokens = list(tokens)
# add by ganruyi
if masking_style == "bert-cn-wwm":
# if non chinese is False, that means it is chinese,
# then try to remove "##" which is added previously
new_token_ids = []
for token_id in output_tokens:
token = tokenizer.convert_ids_to_tokens([token_id])[0]
if len(re.findall("##[\u4E00-\u9FA5]", token)) > 0:
token = token[2:]
new_token_id = tokenizer.convert_tokens_to_ids([token])[0]
new_token_ids.append(new_token_id)
output_tokens = new_token_ids
masked_lm_positions = []
masked_lm_labels = []
if masked_lm_prob == 0:
return (output_tokens, masked_lm_positions, masked_lm_labels, token_boundary)
num_to_predict = min(max_predictions_per_seq, max(1, int(round(len(tokens) * masked_lm_prob))))
ngrams = np.arange(1, max_ngrams + 1, dtype=np.int64)
if not geometric_dist:
# Note(mingdachen):
# By default, we set the probabities to favor shorter ngrams sequences.
pvals = 1.0 / np.arange(1, max_ngrams + 1)
pvals /= pvals.sum(keepdims=True)
if favor_longer_ngram:
pvals = pvals[::-1]
ngram_indexes = []
for idx in range(len(cand_indexes)):
ngram_index = []
for n in ngrams:
ngram_index.append(cand_indexes[idx : idx + n])
ngram_indexes.append(ngram_index)
np_rng.shuffle(ngram_indexes)
(masked_lms, masked_spans) = ([], [])
covered_indexes = set()
for cand_index_set in ngram_indexes:
if len(masked_lms) >= num_to_predict:
break
if not cand_index_set:
continue
# Note(mingdachen):
# Skip current piece if they are covered in lm masking or previous ngrams.
for index_set in cand_index_set[0]:
for index in index_set:
if index in covered_indexes:
continue
if not geometric_dist:
n = np_rng.choice(
ngrams[: len(cand_index_set)],
p=pvals[: len(cand_index_set)] / pvals[: len(cand_index_set)].sum(keepdims=True),
)
else:
# Sampling "n" from the geometric distribution and clipping it to
# the max_ngrams. Using p=0.2 default from the SpanBERT paper
# https://arxiv.org/pdf/1907.10529.pdf (Sec 3.1)
n = min(np_rng.geometric(0.2), max_ngrams)
index_set = sum(cand_index_set[n - 1], [])
n -= 1
# Note(mingdachen):
# Repeatedly looking for a candidate that does not exceed the
# maximum number of predictions by trying shorter ngrams.
while len(masked_lms) + len(index_set) > num_to_predict:
if n == 0:
break
index_set = sum(cand_index_set[n - 1], [])
n -= 1
# If adding a whole-word mask would exceed the maximum number of
# predictions, then just skip this candidate.
if len(masked_lms) + len(index_set) > num_to_predict:
continue
is_any_index_covered = False
for index in index_set:
if index in covered_indexes:
is_any_index_covered = True
break
if is_any_index_covered:
continue
for index in index_set:
covered_indexes.add(index)
masked_token = None
if masking_style == "bert":
# 80% of the time, replace with [MASK]
if np_rng.random() < 0.8:
masked_token = mask_id
else:
# 10% of the time, keep original
if np_rng.random() < 0.5:
masked_token = tokens[index]
# 10% of the time, replace with random word
else:
masked_token = vocab_id_list[np_rng.randint(0, len(vocab_id_list))]
elif masking_style == "bert-cn-wwm":
# 80% of the time, replace with [MASK]
if np_rng.random() < 0.8:
masked_token = mask_id
else:
# 10% of the time, keep original
if np_rng.random() < 0.5:
# if it's chinese wwm, remove ## in toknes
token_id = tokens[index]
token = tokenizer.convert_ids_to_tokens([token_id])[0]
if len(re.findall("##[\u4E00-\u9FA5]", token)) > 0:
token = token[2:]
new_token_id = tokenizer.convert_tokens_to_ids([token])[0]
masked_token = new_token_id
# 10% of the time, replace with random word
else:
masked_token = vocab_id_list[np_rng.randint(0, len(vocab_id_list))]
elif masking_style == "t5":
masked_token = mask_id
else:
raise ValueError("invalid value of masking style")
output_tokens[index] = masked_token
masked_lms.append(MaskedLmInstance(index=index, label=tokens[index]))
masked_spans.append(
MaskedLmInstance(index=index_set, label=[tokens[index] for index in index_set])
)
assert len(masked_lms) <= num_to_predict
np_rng.shuffle(ngram_indexes)
select_indexes = set()
if do_permutation:
for cand_index_set in ngram_indexes:
if len(select_indexes) >= num_to_predict:
break
if not cand_index_set:
continue
# Note(mingdachen):
# Skip current piece if they are covered in lm masking or previous ngrams.
for index_set in cand_index_set[0]:
for index in index_set:
if index in covered_indexes or index in select_indexes:
continue
n = np.random.choice(
ngrams[: len(cand_index_set)],
p=pvals[: len(cand_index_set)] / pvals[: len(cand_index_set)].sum(keepdims=True),
)
index_set = sum(cand_index_set[n - 1], [])
n -= 1
while len(select_indexes) + len(index_set) > num_to_predict:
if n == 0:
break
index_set = sum(cand_index_set[n - 1], [])
n -= 1
# If adding a whole-word mask would exceed the maximum number of
# predictions, then just skip this candidate.
if len(select_indexes) + len(index_set) > num_to_predict:
continue
is_any_index_covered = False
for index in index_set:
if index in covered_indexes or index in select_indexes:
is_any_index_covered = True
break
if is_any_index_covered:
continue
for index in index_set:
select_indexes.add(index)
assert len(select_indexes) <= num_to_predict
select_indexes = sorted(select_indexes)
permute_indexes = list(select_indexes)
np_rng.shuffle(permute_indexes)
orig_token = list(output_tokens)
for src_i, tgt_i in zip(select_indexes, permute_indexes):
output_tokens[src_i] = orig_token[tgt_i]
masked_lms.append(MaskedLmInstance(index=src_i, label=orig_token[src_i]))
masked_lms = sorted(masked_lms, key=lambda x: x.index)
# Sort the spans by the index of the first span
masked_spans = sorted(masked_spans, key=lambda x: x.index[0])
for p in masked_lms:
masked_lm_positions.append(p.index)
masked_lm_labels.append(p.label)
return (
output_tokens,
masked_lm_positions,
masked_lm_labels,
token_boundary,
masked_spans,
)
def get_samples_mapping(
indexed_dataset,
data_prefix,
num_epochs,
max_num_samples,
max_seq_length,
short_seq_prob,
seed,
name,
binary_head,
):
"""Get a list that maps a sample index to a starting sentence index,
end sentence index, and length"""
if not num_epochs:
if not max_num_samples:
raise ValueError("Need to specify either max_num_samples " "or num_epochs")
num_epochs = np.iinfo(np.int32).max - 1
if not max_num_samples:
max_num_samples = np.iinfo(np.int64).max - 1
# Filename of the index mapping
indexmap_filename = data_prefix
indexmap_filename += "_{}_indexmap".format(name)
if num_epochs != (np.iinfo(np.int32).max - 1):
indexmap_filename += "_{}ep".format(num_epochs)
if max_num_samples != (np.iinfo(np.int64).max - 1):
indexmap_filename += "_{}mns".format(max_num_samples)
indexmap_filename += "_{}msl".format(max_seq_length)
indexmap_filename += "_{:0.2f}ssp".format(short_seq_prob)
indexmap_filename += "_{}s".format(seed)
indexmap_filename += ".npy"
# Build the indexed mapping if not exist.
# NOTE: use `get_local_rank() == 0` to promise samples will be build in each node.
if flow.env.get_local_rank() == 0 and not os.path.isfile(indexmap_filename):
logger.info(
" > WARNING: could not find index map file {}, building "
"the indices on rank 0 ...".format(indexmap_filename)
)
# Make sure the types match the helpers input types.
assert indexed_dataset.doc_idx.dtype == np.int64
assert indexed_dataset.sizes.dtype == np.int32
# Build samples mapping
verbose = flow.env.get_local_rank() == 0
start_time = time.time()
logger.info(" > building samples index mapping for {} ...".format(name))
# First compile and then import.
from libai.data.data_utils import helpers
samples_mapping = helpers.build_mapping(
indexed_dataset.doc_idx,
indexed_dataset.sizes,
num_epochs,
max_num_samples,
max_seq_length,
short_seq_prob,
seed,
verbose,
2 if binary_head else 1,
)
logger.info(" > done building samples index maping")
np.save(indexmap_filename, samples_mapping, allow_pickle=True)
logger.info(" > saved the index mapping in {}".format(indexmap_filename))
# Make sure all the ranks have built the mapping
logger.info(
" > elapsed time to build and save samples mapping "
"(seconds): {:4f}".format(time.time() - start_time)
)
# This should be a barrier but nccl barrier assumes
# device_index=rank which is not the case for model
# parallel case
dist.synchronize()
# Load indexed dataset.
logger.info(" > loading indexed mapping from {}".format(indexmap_filename))
start_time = time.time()
samples_mapping = np.load(indexmap_filename, allow_pickle=True, mmap_mode="r")
logger.info(" loaded indexed file in {:3.3f} seconds".format(time.time() - start_time))
logger.info(" total number of samples: {}".format(samples_mapping.shape[0]))
return samples_mapping
def get_train_valid_test_split_(size, splits=None):
"""
Split a dataset into subsets given proportions of how
much to allocate per split. If a split is 0% returns None for that split.
Purpose: Useful for creating train/val/test splits
Arguments:
ds (Dataset or array-like): Data to be split.
split (1D array-like): proportions to split `ds`. `sum(splits) != 0`
"""
if splits is None:
splits = [0.8, 0.2, 0.0]
while len(splits) < 3:
splits.append(0.0)
splits = splits[:3]
splits_sum = sum(splits)
assert splits_sum > 0.0, "Split sum must be larger than 0."
splits = [split / splits_sum for split in splits]
splits_index = [0]
for index, split in enumerate(splits):
splits_index.append(splits_index[index] + int(round(split * float(size))))
diff = splits_index[-1] - size
for index in range(1, len(splits_index)):
splits_index[index] -= diff
assert len(splits_index) == 4
assert splits_index[-1] == size
return splits_index
/*
coding=utf-8
Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/* Helper methods for fast index mapping builds */
#include <algorithm>
#include <iostream>
#include <limits>
#include <math.h>
#include <stdexcept>
#include <pybind11/pybind11.h>
#include <pybind11/numpy.h>
#include <random>
namespace py = pybind11;
using namespace std;
const int32_t LONG_SENTENCE_LEN = 512;
void build_blending_indices(py::array_t<uint8_t>& dataset_index,
py::array_t<int64_t>& dataset_sample_index,
const py::array_t<double>& weights, const int32_t num_datasets,
const int64_t size, const bool verbose) {
/* Given multiple datasets and a weighting array, build samples
such that it follows those wieghts.*/
if (verbose) { std::cout << "> building indices for blendable datasets ..." << std::endl; }
// Get the pointer access without the checks.
auto dataset_index_ptr = dataset_index.mutable_unchecked<1>();
auto dataset_sample_index_ptr = dataset_sample_index.mutable_unchecked<1>();
auto weights_ptr = weights.unchecked<1>();
// Initialize buffer for number of samples used for each dataset.
int64_t current_samples[num_datasets];
for (int64_t i = 0; i < num_datasets; ++i) { current_samples[i] = 0; }
// For each sample:
for (int64_t sample_idx = 0; sample_idx < size; ++sample_idx) {
// Determine where the max error in sampling is happening.
auto sample_idx_double = std::max(static_cast<double>(sample_idx), 1.0);
int64_t max_error_index = 0;
double max_error = weights_ptr[0] * sample_idx_double - static_cast<double>(current_samples[0]);
for (int64_t dataset_idx = 1; dataset_idx < num_datasets; ++dataset_idx) {
double error = weights_ptr[dataset_idx] * sample_idx_double
- static_cast<double>(current_samples[dataset_idx]);
if (error > max_error) {
max_error = error;
max_error_index = dataset_idx;
}
}
// Populate the indices.
dataset_index_ptr[sample_idx] = static_cast<uint8_t>(max_error_index);
dataset_sample_index_ptr[sample_idx] = current_samples[max_error_index];
// Update the total samples.
current_samples[max_error_index] += 1;
}
// print info
if (verbose) {
std::cout << " > sample ratios:" << std::endl;
for (int64_t dataset_idx = 0; dataset_idx < num_datasets; ++dataset_idx) {
auto ratio = static_cast<double>(current_samples[dataset_idx]) / static_cast<double>(size);
std::cout << " dataset " << dataset_idx << ", input: " << weights_ptr[dataset_idx]
<< ", achieved: " << ratio << std::endl;
}
}
}
py::array build_sample_idx(const py::array_t<int32_t>& sizes_, const py::array_t<int32_t>& doc_idx_,
const int32_t seq_length, const int32_t num_epochs,
const int64_t tokens_per_epoch) {
/* Sample index (sample_idx) is used for gpt2 like dataset for which
the documents are flattened and the samples are built based on this
1-D flatten array. It is a 2D array with sizes [number-of-samples + 1, 2]
where [..., 0] contains the index into `doc_idx` and [..., 1] is the
starting offset in that document.*/
// Consistency checks.
assert(seq_length > 1);
assert(num_epochs > 0);
assert(tokens_per_epoch > 1);
// Remove bound checks.
auto sizes = sizes_.unchecked<1>();
auto doc_idx = doc_idx_.unchecked<1>();
// Mapping and it's length (1D).
int64_t num_samples = (num_epochs * tokens_per_epoch - 1) / seq_length;
int32_t* sample_idx = new int32_t[2 * (num_samples + 1)];
cout << " using:" << endl << std::flush;
cout << " number of documents: " << doc_idx_.shape(0) / num_epochs << endl
<< std::flush;
cout << " number of epochs: " << num_epochs << endl << std::flush;
cout << " sequence length: " << seq_length << endl << std::flush;
cout << " total number of samples: " << num_samples << endl << std::flush;
// Index into sample_idx.
int64_t sample_index = 0;
// Index into doc_idx.
int64_t doc_idx_index = 0;
// Begining offset for each document.
int32_t doc_offset = 0;
// Start with first document and no offset.
sample_idx[2 * sample_index] = doc_idx_index;
sample_idx[2 * sample_index + 1] = doc_offset;
++sample_index;
int count = 0;
while (sample_index <= num_samples) {
count++;
// Start with a fresh sequence.
int32_t remaining_seq_length = seq_length + 1;
while (remaining_seq_length != 0) {
// Get the document length.
auto doc_id = doc_idx[doc_idx_index];
auto doc_length = sizes[doc_id] - doc_offset;
// And add it to the current sequence.
remaining_seq_length -= doc_length;
// If we have more than a full sequence, adjust offset and set
// remaining length to zero so we return from the while loop.
// Note that -1 here is for the same reason we have -1 in
// `_num_epochs` calculations.
if (remaining_seq_length <= 0) {
doc_offset += (remaining_seq_length + doc_length - 1);
remaining_seq_length = 0;
} else {
// Otherwise, start from the begining of the next document.
++doc_idx_index;
doc_offset = 0;
}
}
// cout << "count: " << count << endl;
// Record the sequence.
sample_idx[2 * sample_index] = doc_idx_index;
sample_idx[2 * sample_index + 1] = doc_offset;
++sample_index;
}
// Method to deallocate memory.
py::capsule free_when_done(sample_idx, [](void* mem_) {
int32_t* mem = reinterpret_cast<int32_t*>(mem_);
delete[] mem;
});
// Return the numpy array.
const auto byte_size = sizeof(int32_t);
return py::array(std::vector<int64_t>{num_samples + 1, 2}, // shape
{2 * byte_size, byte_size}, // C-style contiguous strides
sample_idx, // the data pointer
free_when_done); // numpy array references
}
inline int32_t get_target_sample_len(const int32_t short_seq_ratio, const int32_t max_length,
std::mt19937& rand32_gen) {
/* Training sample length. */
if (short_seq_ratio == 0) { return max_length; }
const auto random_number = rand32_gen();
if ((random_number % short_seq_ratio) == 0) { return 2 + random_number % (max_length - 1); }
return max_length;
}
template<typename DocIdx>
py::array build_mapping_impl(const py::array_t<int64_t>& docs_, const py::array_t<int32_t>& sizes_,
const int32_t num_epochs, const uint64_t max_num_samples,
const int32_t max_seq_length, const double short_seq_prob,
const int32_t seed, const bool verbose, const int32_t min_num_sent) {
/* Build a mapping of (start-index, end-index, sequence-length) where
start and end index are the indices of the sentences in the sample
and sequence-length is the target sequence length.
*/
// Consistency checks.
assert(num_epochs > 0);
assert(max_seq_length > 1);
assert(short_seq_prob >= 0.0);
assert(short_seq_prob <= 1.0);
assert(seed > 0);
// Remove bound checks.
auto docs = docs_.unchecked<1>();
auto sizes = sizes_.unchecked<1>();
// For efficiency, convert probability to ratio. Note: rand() generates int.
int32_t short_seq_ratio = 0;
if (short_seq_prob > 0) { short_seq_ratio = static_cast<int32_t>(round(1.0 / short_seq_prob)); }
if (verbose) {
const auto sent_start_index = docs[0];
const auto sent_end_index = docs[docs_.shape(0) - 1];
const auto num_sentences = sent_end_index - sent_start_index;
cout << " using:" << endl << std::flush;
cout << " number of documents: " << docs_.shape(0) - 1 << endl << std::flush;
cout << " sentences range: [" << sent_start_index << ", " << sent_end_index
<< ")" << endl
<< std::flush;
cout << " total number of sentences: " << num_sentences << endl << std::flush;
cout << " number of epochs: " << num_epochs << endl << std::flush;
cout << " maximum number of samples: " << max_num_samples << endl << std::flush;
cout << " maximum sequence length: " << max_seq_length << endl << std::flush;
cout << " short sequence probability: " << short_seq_prob << endl << std::flush;
cout << " short sequence ration (1/prob): " << short_seq_ratio << endl << std::flush;
cout << " seed: " << seed << endl << std::flush;
}
// Mapping and it's length (1D).
int64_t num_samples = -1;
DocIdx* maps = NULL;
// Perform two iterations, in the first iteration get the size
// and allocate memory and in the second iteration populate the map.
bool second = false;
for (int32_t iteration = 0; iteration < 2; ++iteration) {
// Set the seed so both iterations produce the same results.
std::mt19937 rand32_gen(seed);
// Set the flag on second iteration.
second = (iteration == 1);
// Counters:
uint64_t empty_docs = 0;
uint64_t one_sent_docs = 0;
uint64_t long_sent_docs = 0;
// Current map index.
uint64_t map_index = 0;
// For each epoch:
for (int32_t epoch = 0; epoch < num_epochs; ++epoch) {
if (map_index >= max_num_samples) {
if (verbose && (!second)) {
cout << " reached " << max_num_samples << " samples after " << epoch << " epochs ..."
<< endl
<< std::flush;
}
break;
}
// For each document:
for (int32_t doc = 0; doc < (docs.shape(0) - 1); ++doc) {
// Document sentences are in [sent_index_first, sent_index_last)
const auto sent_index_first = docs[doc];
const auto sent_index_last = docs[doc + 1];
// At the begining of the document previous index is the
// start index.
auto prev_start_index = sent_index_first;
// Remaining documents.
auto num_remain_sent = sent_index_last - sent_index_first;
// Some bookkeeping
if ((epoch == 0) && (!second)) {
if (num_remain_sent == 0) { ++empty_docs; }
if (num_remain_sent == 1) { ++one_sent_docs; }
}
// Detect documents with long sentences.
bool contains_long_sentence = false;
if (num_remain_sent > 1) {
for (auto sent_index = sent_index_first; sent_index < sent_index_last; ++sent_index) {
if (sizes[sent_index] > LONG_SENTENCE_LEN) {
if ((epoch == 0) && (!second)) { ++long_sent_docs; }
contains_long_sentence = true;
break;
}
}
}
// If we have more than two sentences.
if ((num_remain_sent >= min_num_sent) && (!contains_long_sentence)) {
// Set values.
auto seq_len = int32_t{0};
auto num_sent = int32_t{0};
auto target_seq_len = get_target_sample_len(short_seq_ratio, max_seq_length, rand32_gen);
// Loop through sentences.
for (auto sent_index = sent_index_first; sent_index < sent_index_last; ++sent_index) {
// Add the size and number of sentences.
seq_len += sizes[sent_index];
++num_sent;
--num_remain_sent;
// If we have reached the target length.
// and if not only one sentence is left in the document.
// and if we have at least two sentneces.
// and if we have reached end of the document.
if (((seq_len >= target_seq_len) && (num_remain_sent > 1) && (num_sent >= min_num_sent))
|| (num_remain_sent == 0)) {
// Check for overflow.
if ((3 * map_index + 2) > std::numeric_limits<int64_t>::max()) {
cout << "number of samples exceeded maximum "
<< "allowed by type int64: " << std::numeric_limits<int64_t>::max() << endl;
throw std::overflow_error("Number of samples");
}
// Populate the map.
if (second) {
const auto map_index_0 = 3 * map_index;
maps[map_index_0] = static_cast<DocIdx>(prev_start_index);
maps[map_index_0 + 1] = static_cast<DocIdx>(sent_index + 1);
maps[map_index_0 + 2] = static_cast<DocIdx>(target_seq_len);
}
// Update indices / counters.
++map_index;
prev_start_index = sent_index + 1;
target_seq_len = get_target_sample_len(short_seq_ratio, max_seq_length, rand32_gen);
seq_len = 0;
num_sent = 0;
}
} // for (auto sent_index=sent_index_first; ...
} // if (num_remain_sent > 1) {
} // for (int doc=0; doc < num_docs; ++doc) {
} // for (int epoch=0; epoch < num_epochs; ++epoch) {
if (!second) {
if (verbose) {
cout << " number of empty documents: " << empty_docs << endl << std::flush;
cout << " number of documents with one sentence: " << one_sent_docs << endl << std::flush;
cout << " number of documents with long sentences: " << long_sent_docs << endl
<< std::flush;
cout << " will create mapping for " << map_index << " samples" << endl << std::flush;
}
assert(maps == NULL);
assert(num_samples < 0);
maps = new DocIdx[3 * map_index];
num_samples = static_cast<int64_t>(map_index);
}
} // for (int iteration=0; iteration < 2; ++iteration) {
// Shuffle.
// We need a 64 bit random number generator as we might have more
// than 2 billion samples.
std::mt19937_64 rand64_gen(seed + 1);
for (auto i = (num_samples - 1); i > 0; --i) {
const auto j = static_cast<int64_t>(rand64_gen() % (i + 1));
const auto i0 = 3 * i;
const auto j0 = 3 * j;
// Swap values.
swap(maps[i0], maps[j0]);
swap(maps[i0 + 1], maps[j0 + 1]);
swap(maps[i0 + 2], maps[j0 + 2]);
}
// Method to deallocate memory.
py::capsule free_when_done(maps, [](void* mem_) {
DocIdx* mem = reinterpret_cast<DocIdx*>(mem_);
delete[] mem;
});
// Return the numpy array.
const auto byte_size = sizeof(DocIdx);
return py::array(std::vector<int64_t>{num_samples, 3}, // shape
{3 * byte_size, byte_size}, // C-style contiguous strides
maps, // the data pointer
free_when_done); // numpy array references
}
py::array build_mapping(const py::array_t<int64_t>& docs_, const py::array_t<int>& sizes_,
const int num_epochs, const uint64_t max_num_samples,
const int max_seq_length, const double short_seq_prob, const int seed,
const bool verbose, const int32_t min_num_sent) {
if (sizes_.size() > std::numeric_limits<uint32_t>::max()) {
if (verbose) { cout << " using uint64 for data mapping..." << endl << std::flush; }
return build_mapping_impl<uint64_t>(docs_, sizes_, num_epochs, max_num_samples, max_seq_length,
short_seq_prob, seed, verbose, min_num_sent);
} else {
if (verbose) { cout << " using uint32 for data mapping..." << endl << std::flush; }
return build_mapping_impl<uint32_t>(docs_, sizes_, num_epochs, max_num_samples, max_seq_length,
short_seq_prob, seed, verbose, min_num_sent);
}
}
template<typename DocIdx>
py::array build_blocks_mapping_impl(const py::array_t<int64_t>& docs_,
const py::array_t<int32_t>& sizes_,
const py::array_t<int32_t>& titles_sizes_,
const int32_t num_epochs, const uint64_t max_num_samples,
const int32_t max_seq_length, const int32_t seed,
const bool verbose, const bool use_one_sent_blocks) {
/* Build a mapping of (start-index, end-index, sequence-length) where
start and end index are the indices of the sentences in the sample
and sequence-length is the target sequence length.
*/
// Consistency checks.
assert(num_epochs > 0);
assert(max_seq_length > 1);
assert(seed > 0);
// Remove bound checks.
auto docs = docs_.unchecked<1>();
auto sizes = sizes_.unchecked<1>();
auto titles_sizes = titles_sizes_.unchecked<1>();
if (verbose) {
const auto sent_start_index = docs[0];
const auto sent_end_index = docs[docs_.shape(0) - 1];
const auto num_sentences = sent_end_index - sent_start_index;
cout << " using:" << endl << std::flush;
cout << " number of documents: " << docs_.shape(0) - 1 << endl << std::flush;
cout << " sentences range: [" << sent_start_index << ", " << sent_end_index
<< ")" << endl
<< std::flush;
cout << " total number of sentences: " << num_sentences << endl << std::flush;
cout << " number of epochs: " << num_epochs << endl << std::flush;
cout << " maximum number of samples: " << max_num_samples << endl << std::flush;
cout << " maximum sequence length: " << max_seq_length << endl << std::flush;
cout << " seed: " << seed << endl << std::flush;
}
// Mapping and its length (1D).
int64_t num_samples = -1;
DocIdx* maps = NULL;
// Acceptable number of sentences per block.
int min_num_sent = 2;
if (use_one_sent_blocks) { min_num_sent = 1; }
// Perform two iterations, in the first iteration get the size
// and allocate memory and in the second iteration populate the map.
bool second = false;
for (int32_t iteration = 0; iteration < 2; ++iteration) {
// Set the flag on second iteration.
second = (iteration == 1);
// Current map index.
uint64_t map_index = 0;
uint64_t empty_docs = 0;
uint64_t one_sent_docs = 0;
uint64_t long_sent_docs = 0;
// For each epoch:
for (int32_t epoch = 0; epoch < num_epochs; ++epoch) {
// assign every block a unique id
int32_t block_id = 0;
if (map_index >= max_num_samples) {
if (verbose && (!second)) {
cout << " reached " << max_num_samples << " samples after " << epoch << " epochs ..."
<< endl
<< std::flush;
}
break;
}
// For each document:
for (int32_t doc = 0; doc < (docs.shape(0) - 1); ++doc) {
// Document sentences are in [sent_index_first, sent_index_last)
const auto sent_index_first = docs[doc];
const auto sent_index_last = docs[doc + 1];
const auto target_seq_len = max_seq_length - titles_sizes[doc];
// At the begining of the document previous index is the
// start index.
auto prev_start_index = sent_index_first;
// Remaining documents.
auto num_remain_sent = sent_index_last - sent_index_first;
// Some bookkeeping
if ((epoch == 0) && (!second)) {
if (num_remain_sent == 0) { ++empty_docs; }
if (num_remain_sent == 1) { ++one_sent_docs; }
}
// Detect documents with long sentences.
bool contains_long_sentence = false;
if (num_remain_sent >= min_num_sent) {
for (auto sent_index = sent_index_first; sent_index < sent_index_last; ++sent_index) {
if (sizes[sent_index] > LONG_SENTENCE_LEN) {
if ((epoch == 0) && (!second)) { ++long_sent_docs; }
contains_long_sentence = true;
break;
}
}
}
// If we have enough sentences and no long sentences.
if ((num_remain_sent >= min_num_sent) && (!contains_long_sentence)) {
// Set values.
auto seq_len = int32_t{0};
auto num_sent = int32_t{0};
// Loop through sentences.
for (auto sent_index = sent_index_first; sent_index < sent_index_last; ++sent_index) {
// Add the size and number of sentences.
seq_len += sizes[sent_index];
++num_sent;
--num_remain_sent;
// If we have reached the target length.
// and there are an acceptable number of sentences left
// and if we have at least the minimum number of sentences.
// or if we have reached end of the document.
if (((seq_len >= target_seq_len) && (num_remain_sent >= min_num_sent)
&& (num_sent >= min_num_sent))
|| (num_remain_sent == 0)) {
// Populate the map.
if (second) {
const auto map_index_0 = 4 * map_index;
// Each sample has 4 items: the starting sentence index, ending sentence index,
// the index of the document from which the block comes (used for fetching titles)
// and the unique id of the block (used for creating block indexes)
maps[map_index_0] = static_cast<DocIdx>(prev_start_index);
maps[map_index_0 + 1] = static_cast<DocIdx>(sent_index + 1);
maps[map_index_0 + 2] = static_cast<DocIdx>(doc);
maps[map_index_0 + 3] = static_cast<DocIdx>(block_id);
}
// Update indices / counters.
++map_index;
++block_id;
prev_start_index = sent_index + 1;
seq_len = 0;
num_sent = 0;
}
} // for (auto sent_index=sent_index_first; ...
} // if (num_remain_sent > 1) {
} // for (int doc=0; doc < num_docs; ++doc) {
} // for (int epoch=0; epoch < num_epochs; ++epoch) {
if (!second) {
if (verbose) {
cout << " number of empty documents: " << empty_docs << endl << std::flush;
cout << " number of documents with one sentence: " << one_sent_docs << endl << std::flush;
cout << " number of documents with long sentences: " << long_sent_docs << endl
<< std::flush;
cout << " will create mapping for " << map_index << " samples" << endl << std::flush;
}
assert(maps == NULL);
assert(num_samples < 0);
maps = new DocIdx[4 * map_index];
num_samples = static_cast<int64_t>(map_index);
}
} // for (int iteration=0; iteration < 2; ++iteration) {
// Shuffle.
// We need a 64 bit random number generator as we might have more
// than 2 billion samples.
std::mt19937_64 rand64_gen(seed + 1);
for (auto i = (num_samples - 1); i > 0; --i) {
const auto j = static_cast<int64_t>(rand64_gen() % (i + 1));
const auto i0 = 4 * i;
const auto j0 = 4 * j;
// Swap values.
swap(maps[i0], maps[j0]);
swap(maps[i0 + 1], maps[j0 + 1]);
swap(maps[i0 + 2], maps[j0 + 2]);
swap(maps[i0 + 3], maps[j0 + 3]);
}
// Method to deallocate memory.
py::capsule free_when_done(maps, [](void* mem_) {
DocIdx* mem = reinterpret_cast<DocIdx*>(mem_);
delete[] mem;
});
// Return the numpy array.
const auto byte_size = sizeof(DocIdx);
return py::array(std::vector<int64_t>{num_samples, 4}, // shape
{4 * byte_size, byte_size}, // C-style contiguous strides
maps, // the data pointer
free_when_done); // numpy array references
}
py::array build_blocks_mapping(const py::array_t<int64_t>& docs_, const py::array_t<int>& sizes_,
const py::array_t<int>& titles_sizes_, const int num_epochs,
const uint64_t max_num_samples, const int max_seq_length,
const int seed, const bool verbose, const bool use_one_sent_blocks) {
if (sizes_.size() > std::numeric_limits<uint32_t>::max()) {
if (verbose) { cout << " using uint64 for data mapping..." << endl << std::flush; }
return build_blocks_mapping_impl<uint64_t>(docs_, sizes_, titles_sizes_, num_epochs,
max_num_samples, max_seq_length, seed, verbose,
use_one_sent_blocks);
} else {
if (verbose) { cout << " using uint32 for data mapping..." << endl << std::flush; }
return build_blocks_mapping_impl<uint32_t>(docs_, sizes_, titles_sizes_, num_epochs,
max_num_samples, max_seq_length, seed, verbose,
use_one_sent_blocks);
}
}
PYBIND11_MODULE(helpers, m) {
m.def("build_mapping", &build_mapping);
m.def("build_blocks_mapping", &build_blocks_mapping);
m.def("build_sample_idx", &build_sample_idx);
m.def("build_blending_indices", &build_blending_indices);
}
# coding=utf-8
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
# copied from fairseq/fairseq/data/indexed_dataset.py
# Removed IndexedRawTextDataset since it relied on Fairseq dictionary
# other slight modifications to remove fairseq dependencies
# Added document index to index file and made it accessible.
# An empty sentence no longer separates documents.
import logging
import os
import shutil
import struct
import time
from functools import lru_cache
from itertools import accumulate
import numpy as np
import oneflow as flow
logger = logging.getLogger(__name__)
def __best_fitting_dtype(vocab_size=None):
if vocab_size is not None and vocab_size < 65500:
return np.uint16
else:
return np.int32
def get_available_dataset_impl():
return ["lazy", "cached", "mmap"]
def infer_dataset_impl(path):
if IndexedDataset.exists(path):
with open(index_file_path(path), "rb") as f:
magic = f.read(8)
if magic == IndexedDataset._HDR_MAGIC:
return "cached"
elif magic == MMapIndexedDataset.Index._HDR_MAGIC[:8]:
return "mmap"
else:
return None
else:
logger.info(f"Dataset does not exist: {path}")
logger.info(
"Path should be a basename that both .idx and .bin can be "
"appended to get full filenames."
)
return None
def make_builder(out_file, impl, vocab_size=None):
if impl == "mmap":
return MMapIndexedDatasetBuilder(out_file, dtype=__best_fitting_dtype(vocab_size))
else:
return IndexedDatasetBuilder(out_file)
def make_dataset(path, impl, skip_warmup=False):
if not IndexedDataset.exists(path):
logger.info(f"Dataset does not exist: {path}")
logger.info(
"Path should be a basename that both .idx and .bin can be "
"appended to get full filenames."
)
raise ValueError(f"Dataset does not exist: {path}")
if impl == "infer":
impl = infer_dataset_impl(path)
if impl == "lazy" and IndexedDataset.exists(path):
return IndexedDataset(path)
elif impl == "cached" and IndexedDataset.exists(path):
return IndexedCachedDataset(path)
elif impl == "mmap" and MMapIndexedDataset.exists(path):
return MMapIndexedDataset(path, skip_warmup)
logger.info(f"Unknown dataset implementation: {impl}")
return None
def dataset_exists(path, impl):
if impl == "mmap":
return MMapIndexedDataset.exists(path)
else:
return IndexedDataset.exists(path)
def read_longs(f, n):
a = np.empty(n, dtype=np.int64)
f.readinto(a)
return a
def write_longs(f, a):
f.write(np.array(a, dtype=np.int64))
dtypes = {
1: np.uint8,
2: np.int8,
3: np.int16,
4: np.int32,
5: np.int64,
6: np.float32,
7: np.double,
8: np.uint16,
}
def code(dtype):
for k in dtypes.keys():
if dtypes[k] == dtype:
return k
raise ValueError(dtype)
def index_file_path(prefix_path):
return prefix_path + ".idx"
def data_file_path(prefix_path):
return prefix_path + ".bin"
def create_doc_idx(sizes):
doc_idx = [0]
for i, s in enumerate(sizes):
if s == 0:
doc_idx.append(i + 1)
return doc_idx
class IndexedDataset(flow.utils.data.Dataset):
"""Loader for IndexedDataset"""
_HDR_MAGIC = b"TNTIDX\x00\x00"
def __init__(self, path):
super().__init__()
self.path = path
self.data_file = None
self.read_index(path)
def read_index(self, path):
with open(index_file_path(path), "rb") as f:
magic = f.read(8)
assert magic == self._HDR_MAGIC, (
"Index file doesn't match expected format. "
"Make sure that --dataset-impl is configured properly."
)
version = f.read(8)
assert struct.unpack("<Q", version) == (1,)
code, self.element_size = struct.unpack("<QQ", f.read(16))
self.dtype = dtypes[code]
self._len, self.s = struct.unpack("<QQ", f.read(16))
self.doc_count = struct.unpack("<Q", f.read(8))
self.dim_offsets = read_longs(f, self._len + 1)
self.data_offsets = read_longs(f, self._len + 1)
self.sizes = read_longs(f, self.s)
self.doc_idx = read_longs(f, self.doc_count)
def read_data(self, path):
self.data_file = open(data_file_path(path), "rb", buffering=0)
def check_index(self, i):
if i < 0 or i >= self._len:
raise IndexError("index out of range")
def __del__(self):
if self.data_file:
self.data_file.close()
# @lru_cache(maxsize=8)
def __getitem__(self, idx):
if not self.data_file:
self.read_data(self.path)
if isinstance(idx, int):
i = idx
self.check_index(i)
tensor_size = self.sizes[self.dim_offsets[i] : self.dim_offsets[i + 1]]
a = np.empty(tensor_size, dtype=self.dtype)
self.data_file.seek(self.data_offsets[i] * self.element_size)
self.data_file.readinto(a)
return a
elif isinstance(idx, slice):
start, stop, step = idx.indices(len(self))
if step != 1:
raise ValueError("Slices into indexed_dataset must be contiguous")
sizes = self.sizes[self.dim_offsets[start] : self.dim_offsets[stop]]
size = sum(sizes)
a = np.empty(size, dtype=self.dtype)
self.data_file.seek(self.data_offsets[start] * self.element_size)
self.data_file.readinto(a)
offsets = list(accumulate(sizes))
sents = np.split(a, offsets[:-1])
return sents
def __len__(self):
return self._len
def num_tokens(self, index):
return self.sizes[index]
def size(self, index):
return self.sizes[index]
@staticmethod
def exists(path):
return os.path.exists(index_file_path(path)) and os.path.exists(data_file_path(path))
@property
def supports_prefetch(self):
return False # avoid prefetching to save memory
class IndexedCachedDataset(IndexedDataset):
def __init__(self, path):
super().__init__(path)
self.cache = None
self.cache_index = {}
@property
def supports_prefetch(self):
return True
def prefetch(self, indices):
if all(i in self.cache_index for i in indices):
return
if not self.data_file:
self.read_data(self.path)
indices = sorted(set(indices))
total_size = 0
for i in indices:
total_size += self.data_offsets[i + 1] - self.data_offsets[i]
self.cache = np.empty(total_size, dtype=self.dtype)
ptx = 0
self.cache_index.clear()
for i in indices:
self.cache_index[i] = ptx
size = self.data_offsets[i + 1] - self.data_offsets[i]
a = self.cache[ptx : ptx + size]
self.data_file.seek(self.data_offsets[i] * self.element_size)
self.data_file.readinto(a)
ptx += size
if self.data_file:
# close and delete data file after prefetch so we can pickle
self.data_file.close()
self.data_file = None
# @lru_cache(maxsize=8)
def __getitem__(self, idx):
if isinstance(idx, int):
i = idx
self.check_index(i)
tensor_size = self.sizes[self.dim_offsets[i] : self.dim_offsets[i + 1]]
a = np.empty(tensor_size, dtype=self.dtype)
ptx = self.cache_index[i]
np.copyto(a, self.cache[ptx : ptx + a.size])
return a
elif isinstance(idx, slice):
# Hack just to make this work, can optimizer later if necessary
sents = []
for i in range(*idx.indices(len(self))):
sents.append(self[i])
return sents
class IndexedDatasetBuilder(object):
element_sizes = {
np.uint8: 1,
np.int8: 1,
np.int16: 2,
np.int32: 4,
np.int64: 8,
np.float32: 4,
np.double: 8,
}
def __init__(self, out_file, dtype=np.int32):
self.out_file = open(out_file, "wb")
self.dtype = dtype
self.data_offsets = [0]
self.dim_offsets = [0]
self.sizes = []
self.element_size = self.element_sizes[self.dtype]
self.doc_idx = [0]
def add_item(self, tensor):
bytes = self.out_file.write(np.array(tensor.numpy(), dtype=self.dtype))
self.data_offsets.append(self.data_offsets[-1] + bytes / self.element_size)
for s in tensor.size():
self.sizes.append(s)
self.dim_offsets.append(self.dim_offsets[-1] + len(tensor.size()))
def end_document(self):
self.doc_idx.append(len(self.sizes))
def merge_file_(self, another_file):
index = IndexedDataset(another_file)
assert index.dtype == self.dtype
begin = self.data_offsets[-1]
for offset in index.data_offsets[1:]:
self.data_offsets.append(begin + offset)
self.sizes.extend(index.sizes)
begin = self.dim_offsets[-1]
for dim_offset in index.dim_offsets[1:]:
self.dim_offsets.append(begin + dim_offset)
with open(data_file_path(another_file), "rb") as f:
while True:
data = f.read(1024)
if data:
self.out_file.write(data)
else:
break
def finalize(self, index_file):
self.out_file.close()
index = open(index_file, "wb")
index.write(b"TNTIDX\x00\x00")
index.write(struct.pack("<Q", 1))
index.write(struct.pack("<QQ", code(self.dtype), self.element_size))
index.write(struct.pack("<QQ", len(self.data_offsets) - 1, len(self.sizes)))
index.write(struct.pack("<Q", len(self.doc_idx)))
write_longs(index, self.dim_offsets)
write_longs(index, self.data_offsets)
write_longs(index, self.sizes)
write_longs(index, self.doc_idx)
index.close()
def _warmup_mmap_file(path):
with open(path, "rb") as stream:
while stream.read(100 * 1024 * 1024):
pass
class MMapIndexedDataset(flow.utils.data.Dataset):
class Index(object):
_HDR_MAGIC = b"MMIDIDX\x00\x00"
@classmethod
def writer(cls, path, dtype):
class _Writer(object):
def __enter__(self):
self._file = open(path, "wb")
self._file.write(cls._HDR_MAGIC)
self._file.write(struct.pack("<Q", 1))
self._file.write(struct.pack("<B", code(dtype)))
return self
@staticmethod
def _get_pointers(sizes):
dtype_size = dtype().itemsize
address = 0
pointers = []
for size in sizes:
pointers.append(address)
address += size * dtype_size
return pointers
def write(self, sizes, doc_idx):
pointers = self._get_pointers(sizes)
self._file.write(struct.pack("<Q", len(sizes)))
self._file.write(struct.pack("<Q", len(doc_idx)))
sizes = np.array(sizes, dtype=np.int32)
self._file.write(sizes.tobytes(order="C"))
del sizes
pointers = np.array(pointers, dtype=np.int64)
self._file.write(pointers.tobytes(order="C"))
del pointers
doc_idx = np.array(doc_idx, dtype=np.int64)
self._file.write(doc_idx.tobytes(order="C"))
def __exit__(self, exc_type, exc_val, exc_tb):
self._file.close()
return _Writer()
def __init__(self, path, skip_warmup=False):
with open(path, "rb") as stream:
magic_test = stream.read(9)
assert self._HDR_MAGIC == magic_test, (
"Index file doesn't match expected format. "
"Make sure that --dataset-impl is configured properly."
)
version = struct.unpack("<Q", stream.read(8))
assert (1,) == version
(dtype_code,) = struct.unpack("<B", stream.read(1))
self._dtype = dtypes[dtype_code]
self._dtype_size = self._dtype().itemsize
self._len = struct.unpack("<Q", stream.read(8))[0]
self._doc_count = struct.unpack("<Q", stream.read(8))[0]
offset = stream.tell()
if not skip_warmup:
logger.info("warming up index mmap file...")
_warmup_mmap_file(path)
self._bin_buffer_mmap = np.memmap(path, mode="r", order="C")
self._bin_buffer = memoryview(self._bin_buffer_mmap)
logger.info("reading sizes...")
self._sizes = np.frombuffer(
self._bin_buffer, dtype=np.int32, count=self._len, offset=offset
)
logger.info("reading pointers...")
self._pointers = np.frombuffer(
self._bin_buffer,
dtype=np.int64,
count=self._len,
offset=offset + self._sizes.nbytes,
)
logger.info("reading document index...")
self._doc_idx = np.frombuffer(
self._bin_buffer,
dtype=np.int64,
count=self._doc_count,
offset=offset + self._sizes.nbytes + self._pointers.nbytes,
)
def __del__(self):
self._bin_buffer_mmap._mmap.close()
del self._bin_buffer_mmap
@property
def dtype(self):
return self._dtype
@property
def sizes(self):
return self._sizes
@property
def doc_idx(self):
return self._doc_idx
@lru_cache(maxsize=8)
def __getitem__(self, i):
return self._pointers[i], self._sizes[i]
def __len__(self):
return self._len
def __init__(self, path, skip_warmup=False):
super().__init__()
self._path = None
self._index = None
self._bin_buffer = None
self._do_init(path, skip_warmup)
def __getstate__(self):
return self._path
def __setstate__(self, state):
self._do_init(state)
def _do_init(self, path, skip_warmup):
self._path = path
self._index = self.Index(index_file_path(self._path), skip_warmup)
if not skip_warmup:
logger.info("warming up data mmap file...")
_warmup_mmap_file(data_file_path(self._path))
logger.info("creating numpy buffer of mmap...")
self._bin_buffer_mmap = np.memmap(data_file_path(self._path), mode="r", order="C")
logger.info("creating memory view of numpy buffer...")
self._bin_buffer = memoryview(self._bin_buffer_mmap)
def __del__(self):
self._bin_buffer_mmap._mmap.close()
del self._bin_buffer_mmap
del self._index
def __len__(self):
return len(self._index)
# @lru_cache(maxsize=8)
def __getitem__(self, idx):
if isinstance(idx, int):
ptr, size = self._index[idx]
np_array = np.frombuffer(
self._bin_buffer, dtype=self._index.dtype, count=size, offset=ptr
)
return np_array
elif isinstance(idx, slice):
start, stop, step = idx.indices(len(self))
if step != 1:
raise ValueError("Slices into indexed_dataset must be contiguous")
ptr = self._index._pointers[start]
sizes = self._index._sizes[idx]
offsets = list(accumulate(sizes))
total_size = sum(sizes)
np_array = np.frombuffer(
self._bin_buffer, dtype=self._index.dtype, count=total_size, offset=ptr
)
sents = np.split(np_array, offsets[:-1])
return sents
def get(self, idx, offset=0, length=None):
"""Retrieves a single item from the dataset with the option to only
return a portion of the item.
get(idx) is the same as [idx] but get() does not support slicing.
"""
ptr, size = self._index[idx]
if length is None:
length = size - offset
ptr += offset * np.dtype(self._index.dtype).itemsize
np_array = np.frombuffer(
self._bin_buffer, dtype=self._index.dtype, count=length, offset=ptr
)
return np_array
@property
def sizes(self):
return self._index.sizes
@property
def doc_idx(self):
return self._index.doc_idx
def get_doc_idx(self):
return self._index._doc_idx
def set_doc_idx(self, doc_idx_):
self._index._doc_idx = doc_idx_
@property
def supports_prefetch(self):
return False
@staticmethod
def exists(path):
return os.path.exists(index_file_path(path)) and os.path.exists(data_file_path(path))
class MMapIndexedDatasetBuilder(object):
def __init__(self, out_file, dtype=np.int64):
self._data_file = open(out_file, "wb")
self._dtype = dtype
self._sizes = []
self._doc_idx = [0]
def add_item(self, tensor):
np_array = np.array(tensor.numpy(), dtype=self._dtype)
self._data_file.write(np_array.tobytes(order="C"))
self._sizes.append(np_array.size)
def end_document(self):
self._doc_idx.append(len(self._sizes))
def merge_file_(self, another_file):
# Concatenate index
index = MMapIndexedDataset.Index(index_file_path(another_file))
assert index.dtype == self._dtype
for size in index.sizes:
self._sizes.append(size)
# Concatenate data
with open(data_file_path(another_file), "rb") as f:
shutil.copyfileobj(f, self._data_file)
def finalize(self, index_file):
self._data_file.close()
with MMapIndexedDataset.Index.writer(index_file, self._dtype) as index:
index.write(self._sizes, self._doc_idx)
def get_indexed_dataset(data_prefix, data_impl, skip_warmup):
logger.info("building dataset index ...")
start_time = time.time()
indexed_dataset = make_dataset(data_prefix, data_impl, skip_warmup)
assert indexed_dataset.sizes.shape[0] == indexed_dataset.doc_idx[-1]
logger.info(
"Finished creating indexed dataset in {:4f} " "seconds".format(time.time() - start_time)
)
logger.info("indexed dataset stats:")
logger.info("number of documents: {}".format(indexed_dataset.doc_idx.shape[0] - 1))
logger.info("number of sentences: {}".format(indexed_dataset.sizes.shape[0]))
return indexed_dataset
# coding=utf-8
# Copyright 2021 The OneFlow Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .cifar import CIFAR10Dataset, CIFAR100Dataset
from .imagenet import ImageNetDataset
from .mnist import MNISTDataset
from .bert_dataset import BertDataset
from .roberta_dataset import RobertaDataset
from .gpt_dataset import GPT2Dataset
from .t5_dataset import T5Dataset
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""BERT Style dataset."""
import numpy as np
import oneflow as flow
from libai.data.structures import DistTensorData, Instance
from ..data_utils import create_masked_lm_predictions, get_samples_mapping
class BertDataset(flow.utils.data.Dataset):
"""Dataset containing sentence pairs for BERT training.
Each index corresponds to a randomly generated sentence pair.
Args:
name: Name of dataset for clarification.
tokenizer: Tokenizer to use.
data_prefix: Path to the training dataset.
indexed_dataset: Indexed dataset to use.
max_seq_length: Maximum length of the sequence. All values are padded to
this length. Defaults to 512.
mask_lm_prob: Probability to mask tokens. Defaults to 0.15.
short_seq_prob: Probability of producing a short sequence. Defaults to 0.0.
max_predictions_per_seq: Maximum number of mask tokens in each sentence. Defaults to None.
seed: Seed for random number generator for reproducibility. Defaults to 1234.
binary_head: Specifies whether the underlying dataset
generates a pair of blocks along with a sentence_target or not.
Setting it to True assumes that the underlying dataset generates a
label for the pair of sentences which is surfaced as
sentence_target. Defaults to True.
"""
def __init__(
self,
name,
tokenizer,
indexed_dataset,
data_prefix,
max_num_samples,
mask_lm_prob,
max_seq_length,
short_seq_prob=0.0,
seed=1234,
binary_head=True,
masking_style="bert",
):
# Params to store.
self.name = name
self.seed = seed
self.masked_lm_prob = mask_lm_prob
self.max_seq_length = max_seq_length
self.binary_head = binary_head
self.masking_style = masking_style
# Dataset.
self.indexed_dataset = indexed_dataset
# Build the samples mapping.
self.samples_mapping = get_samples_mapping(
self.indexed_dataset,
data_prefix,
None,
max_num_samples,
self.max_seq_length - 3, # account for added tokens
short_seq_prob,
self.seed,
self.name,
self.binary_head,
)
# Vocab stuff.
self.tokenizer = tokenizer
self.vocab_id_list = list(tokenizer.get_vocab().values())
self.vocab_id_to_token_dict = {v: k for k, v in tokenizer.get_vocab().items()}
self.cls_id = tokenizer.cls_token_id
self.sep_id = tokenizer.sep_token_id
self.mask_id = tokenizer.mask_token_id
self.pad_id = tokenizer.pad_token_id
def __len__(self):
return self.samples_mapping.shape[0]
def __getitem__(self, idx):
start_idx, end_idx, seq_length = self.samples_mapping[idx]
sample = [self.indexed_dataset[i] for i in range(start_idx, end_idx)]
# Note that this rng state should be numpy and not python since
# python randint is inclusive whereas the numpy one is exclusive.
# We % 2**32 since numpy requires the seed to be between 0 and 2**32 - 1
np_rng = np.random.RandomState(seed=((self.seed + idx) % 2 ** 32))
return build_training_sample(
self.tokenizer,
sample,
seq_length,
self.max_seq_length, # needed for padding
self.vocab_id_list,
self.vocab_id_to_token_dict,
self.cls_id,
self.sep_id,
self.mask_id,
self.pad_id,
self.masked_lm_prob,
np_rng,
self.binary_head,
masking_style=self.masking_style,
)
def build_training_sample(
tokenizer,
sample,
target_seq_length,
max_seq_length,
vocab_id_list,
vocab_id_to_token_dict,
cls_id,
sep_id,
mask_id,
pad_id,
masked_lm_prob,
np_rng,
binary_head,
masking_style="bert",
):
"""Build training sample.
Arguments:
sample: A list of sentences in which each sentence is a list token ids.
target_seq_length: Desired sequence length.
max_seq_length: Maximum length of the sequence. All values are padded to
this length.
vocab_id_list: List of vocabulary ids. Used to pick a random id.
vocab_id_to_token_dict: A dictionary from vocab ids to text tokens.
cls_id: Start of example id.
sep_id: Separator id.
mask_id: Mask token id.
pad_id: Padding token id.
masked_lm_prob: Probability to mask tokens.
np_rng: Random number genenrator. Note that this rng state should be
numpy and not python since python randint is inclusive for
the upper bound whereas the numpy one is exclusive.
"""
if binary_head:
# We assume that we have at least two sentences in the sample
assert len(sample) > 1
assert target_seq_length <= max_seq_length
# Divide sample into two segments (A and B).
if binary_head:
tokens_a, tokens_b, is_next_random = get_a_and_b_segments(sample, np_rng)
else:
tokens_a = []
for j in range(len(sample)):
tokens_a.extend(sample[j])
tokens_b = []
is_next_random = False
# Truncate to `target_sequence_length`.
max_num_tokens = target_seq_length
truncate_segments(tokens_a, tokens_b, len(tokens_a), len(tokens_b), max_num_tokens, np_rng)
# Build tokens and toketypes.
tokens, tokentypes = create_tokens_and_tokentypes(tokens_a, tokens_b, cls_id, sep_id)
# Masking.
max_predictions_per_seq = masked_lm_prob * max_num_tokens
(tokens, masked_positions, masked_labels, _, _) = create_masked_lm_predictions(
tokenizer,
tokens,
vocab_id_list,
vocab_id_to_token_dict,
masked_lm_prob,
cls_id,
sep_id,
mask_id,
max_predictions_per_seq,
np_rng,
masking_style=masking_style,
)
# Padding.
tokens_np, tokentypes_np, labels_np, padding_mask_np, loss_mask_np = pad_and_convert_to_numpy(
tokens, tokentypes, masked_positions, masked_labels, pad_id, max_seq_length
)
train_sample = Instance(
input_ids=DistTensorData(flow.tensor(tokens_np)),
attention_mask=DistTensorData(flow.tensor(padding_mask_np)),
tokentype_ids=DistTensorData(flow.tensor(tokentypes_np)),
ns_labels=DistTensorData(
flow.tensor(int(is_next_random), dtype=flow.long), placement_idx=-1
),
lm_labels=DistTensorData(flow.tensor(labels_np), placement_idx=-1),
loss_mask=DistTensorData(flow.tensor(loss_mask_np), placement_idx=-1),
)
return train_sample
def pad_and_convert_to_numpy(
tokens, tokentypes, masked_positions, masked_labels, pad_id, max_seq_length
):
"""Pad sequences and convert them to numpy."""
# Some checks.
num_tokens = len(tokens)
padding_length = max_seq_length - num_tokens
assert padding_length >= 0
assert len(tokentypes) == num_tokens
assert len(masked_positions) == len(masked_labels)
# Tokens and token types.
filler = [pad_id] * padding_length
tokens_np = np.array(tokens + filler, dtype=np.int64)
tokentypes_np = np.array(tokentypes + filler, dtype=np.int64)
# Padding mask.
padding_mask_np = np.array([1] * num_tokens + [0] * padding_length, dtype=np.bool)
# Lables and loss mask.
labels = [-1] * max_seq_length
loss_mask = [0] * max_seq_length
for i in range(len(masked_positions)):
assert masked_positions[i] < num_tokens
labels[masked_positions[i]] = masked_labels[i]
loss_mask[masked_positions[i]] = 1
labels_np = np.array(labels, dtype=np.int64)
loss_mask_np = np.array(loss_mask, dtype=np.bool)
return tokens_np, tokentypes_np, labels_np, padding_mask_np, loss_mask_np
def get_a_and_b_segments(sample, np_rng):
"""Divide sample into a and b segments."""
# Number of sentences in the sample.
n_sentences = len(sample)
# Make sure we always have two sentences.
assert n_sentences > 1, "make sure each sample has at least two sentences."
# First part:
# `a_end` is how many sentences go into the `A`.
a_end = 1
if n_sentences >= 3:
# Note that randin in numpy is exclusive.
a_end = np_rng.randint(1, n_sentences)
tokens_a = []
for j in range(a_end):
tokens_a.extend(sample[j])
# Second part:
tokens_b = []
for j in range(a_end, n_sentences):
tokens_b.extend(sample[j])
# Random next:
is_next_random = False
if np_rng.random() < 0.5:
is_next_random = True
tokens_a, tokens_b = tokens_b, tokens_a
return tokens_a, tokens_b, is_next_random
def truncate_segments(tokens_a, tokens_b, len_a, len_b, max_num_tokens, np_rng):
"""Truncates a pair of sequences to a maximum sequence length."""
assert len_a > 0
if len_a + len_b <= max_num_tokens:
return False
while len_a + len_b > max_num_tokens:
if len_a > len_b:
len_a -= 1
tokens = tokens_a
else:
len_b -= 1
tokens = tokens_b
if np_rng.random() < 0.5:
del tokens[0]
else:
tokens.pop()
return True
def create_tokens_and_tokentypes(tokens_a, tokens_b, cls_id, sep_id):
"""Merge segments A and B, add [CLS] and [SEP] and build tokentypes."""
tokens = []
tokentypes = []
# [CLS].
tokens.append(cls_id)
tokentypes.append(0)
# Segment A.
for token in tokens_a:
tokens.append(token)
tokentypes.append(0)
# [SEP].
tokens.append(sep_id)
tokentypes.append(0)
# Segment B.
for token in tokens_b:
tokens.append(token)
tokentypes.append(1)
if tokens_b:
# [SEP].
tokens.append(sep_id)
tokentypes.append(1)
return tokens, tokentypes
# coding=utf-8
# Copyright 2021 The OneFlow Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Callable, Optional
import oneflow as flow
from flowvision import datasets
from libai.data.structures import DistTensorData, Instance
class CIFAR10Dataset(datasets.CIFAR10):
r"""`CIFAR10 <https://www.cs.toronto.edu/~kriz/cifar.html>`_ Dataset in LiBai.
Args:
root (string): Root directory of dataset where directory
``cifar-10-batches-py`` exists or will be saved to if download is set to True.
train (bool, optional): If True, creates dataset from training set, otherwise
creates from test set.
transform (callable, optional): A function/transform that takes in a PIL image
and returns a transformed version. E.g, ``transforms.RandomCrop``
download (bool, optional): If true, downloads the dataset from the internet and
puts it in root directory. If the dataset is already downloaded, it will not be
downloaded again.
"""
def __init__(
self,
root: str,
train: bool = True,
transform: Optional[Callable] = None,
download: bool = False,
**kwargs
):
super(CIFAR10Dataset, self).__init__(
root=root, train=train, transform=transform, download=download, **kwargs
)
def __getitem__(self, index: int):
img, target = super().__getitem__(index)
data_sample = Instance(
images=DistTensorData(img, placement_idx=0),
labels=DistTensorData(flow.tensor(target, dtype=flow.long), placement_idx=-1),
)
return data_sample
class CIFAR100Dataset(datasets.CIFAR100):
r"""`CIFAR100 <https://www.cs.toronto.edu/~kriz/cifar.html>`_ Dataset in LiBai.
Args:
root (string): Root directory of dataset where directory
``cifar-10-batches-py`` exists or will be saved to if download is set to True.
train (bool, optional): If True, creates dataset from training set, otherwise
creates from test set.
transform (callable, optional): A function/transform that takes in a PIL image
and returns a transformed version. E.g, ``transforms.RandomCrop``
download (bool, optional): If true, downloads the dataset from the internet and
puts it in root directory. If the dataset is already downloaded, it will not be
downloaded again.
dataset_name (str, optional): Name for the dataset as an identifier. E.g, ``cifar100``
"""
def __init__(
self,
root: str,
train: bool = True,
transform: Optional[Callable] = None,
download: bool = False,
**kwargs
):
super(CIFAR100Dataset, self).__init__(
root=root, train=train, transform=transform, download=download, **kwargs
)
def __getitem__(self, index: int):
img, target = super().__getitem__(index)
data_sample = Instance(
images=DistTensorData(img, placement_idx=0),
labels=DistTensorData(flow.tensor(target, dtype=flow.long), placement_idx=-1),
)
return data_sample
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""GPT style dataset."""
import logging
import os
import time
import numpy as np
import oneflow as flow
from libai.data.structures import DistTensorData, Instance
from libai.utils import distributed as dist
logger = logging.getLogger(__name__)
class GPT2Dataset(flow.utils.data.Dataset):
def __init__(
self,
name,
tokenizer,
data_prefix,
indexed_dataset,
max_num_samples,
max_seq_length,
seed=1234,
):
self.name = name
self.tokenizer = tokenizer
self.indexed_dataset = indexed_dataset
documents = np.arange(start=0, stop=indexed_dataset.sizes.shape[0], step=1, dtype=np.int32)
# Build index mappings.
self.doc_idx, self.sample_idx, self.shuffle_idx = _build_index_mappings(
self.name,
data_prefix,
documents,
self.indexed_dataset.sizes,
max_num_samples,
max_seq_length,
seed,
)
def __len__(self):
# -1 is due to data structure used to retrieve the index:
# sample i --> [sample_idx[i], sample_idx[i+1])
return self.sample_idx.shape[0] - 1
def __getitem__(self, idx):
# Get the shuffled index.
idx = self.shuffle_idx[idx]
# Start and end documents and offsets.
doc_index_f = self.sample_idx[idx][0]
doc_index_l = self.sample_idx[idx + 1][0]
offset_f = self.sample_idx[idx][1]
offset_l = self.sample_idx[idx + 1][1]
# If we are within the same document, just extract the chunk.
if doc_index_f == doc_index_l:
sample = self.indexed_dataset.get(
self.doc_idx[doc_index_f], offset=offset_f, length=offset_l - offset_f + 1
)
else:
# Otherwise, get the rest of the initial document.
sample_list = [self.indexed_dataset.get(self.doc_idx[doc_index_f], offset=offset_f)]
# Loop over all in between documents and add the entire document.
for i in range(doc_index_f + 1, doc_index_l):
sample_list.append(self.indexed_dataset.get(self.doc_idx[i]))
# And finally add the relevant portion of last document.
sample_list.append(
self.indexed_dataset.get(self.doc_idx[doc_index_l], length=offset_l + 1)
)
sample = np.concatenate(sample_list)
input_ids = flow.tensor(np.array(sample[:-1], dtype=np.int64))
lm_labels = flow.tensor(np.array(sample[1:], dtype=np.int64))
sample = Instance(
input_ids=DistTensorData(input_ids),
labels=DistTensorData(lm_labels, placement_idx=-1),
)
return sample
def _build_index_mappings(name, data_prefix, documents, sizes, num_samples, seq_length, seed):
"""Build doc-idx, sample-idx, and shuffle-idx.
doc-idx: is an array (ordered) of documents to be used in training.
sample-idx: is the start document index and document offset for each
training sample.
shuffle-idx: maps the sample index into a random index into sample-idx.
"""
# Number of tokens in each epoch and number of required epochs.
tokens_per_epoch = _num_tokens(documents, sizes)
num_epochs = _num_epochs(tokens_per_epoch, seq_length, num_samples)
# rng state
np_rng = np.random.RandomState(seed=seed)
# Filename of the index mappings.
_filename = data_prefix
_filename += "_{}_indexmap".format(name)
_filename += "_{}ns".format(num_samples)
_filename += "_{}sl".format(seq_length)
_filename += "_{}s".format(seed)
doc_idx_filename = _filename + "_doc_idx.npy"
sample_idx_filename = _filename + "_sample_idx.npy"
shuffle_idx_filename = _filename + "_shuffle_idx.npy"
# Build the indexed mapping if not exist.
# NOTE: use `get_local_rank() == 0` to promise samples will be build in each node.
if flow.env.get_local_rank() == 0:
if (
(not os.path.isfile(doc_idx_filename))
or (not os.path.isfile(sample_idx_filename))
or (not os.path.isfile(shuffle_idx_filename))
):
logger.info(
" > WARNING: could not find index map files, building " "the indices on rank 0 ..."
)
# For the last epoch, decide whether include the entire epoch
# in the global shuffle or not.
# If we need only one epoch, then separating last epoch does
# not mean anything.
if num_epochs == 1:
separate_last_epoch = False
logger.info(" > only one epoch required, setting " "separate_last_epoch to False")
else:
# Get the number of samples for the last epoch
num_samples_from_epochs_minus_one = (
(num_epochs - 1) * tokens_per_epoch - 1
) // seq_length
last_epoch_num_samples = num_samples - num_samples_from_epochs_minus_one
assert (
last_epoch_num_samples >= 0
), "last epoch number of samples should be non-negative."
num_samples_per_epoch = (tokens_per_epoch - 1) // seq_length
assert last_epoch_num_samples < (
num_samples_per_epoch + 1
), "last epoch number of samples exceeded max value."
# If we have less than 80% of the samples for the last epoch,
# separate out the epoch and treat it differently.
# Note: the 80% number is just based on common sense and can
# be adjusted if needed.
separate_last_epoch = last_epoch_num_samples < int(0.80 * num_samples_per_epoch)
if separate_last_epoch:
string = (
" > last epoch number of samples ({}) is smaller "
"than 80% of number of samples per epoch ({}), "
"setting separate_last_epoch to True"
)
else:
string = (
" > last epoch number of samples ({}) is larger "
"than 80% of number of samples per epoch ({}), "
"setting separate_last_epoch to False"
)
logger.info(string.format(last_epoch_num_samples, num_samples_per_epoch))
# doc-idx.
logger.info("start to build and save doc-idx mapping ...")
start_time = time.time()
doc_idx = _build_doc_idx(documents, num_epochs, np_rng, separate_last_epoch)
np.save(doc_idx_filename, doc_idx, allow_pickle=True)
logger.info(
" > elapsed time to build and save doc-idx mapping "
"(seconds): {:4f}".format(time.time() - start_time)
)
# sample-idx.
logger.info("start to build and save sample-idx mapping ...")
start_time = time.time()
# Use C++ implementation for speed.
# First compile and then import.
from libai.data.data_utils import helpers
assert doc_idx.dtype == np.int32
assert sizes.dtype == np.int32
sample_idx = helpers.build_sample_idx(
sizes, doc_idx, seq_length, num_epochs, tokens_per_epoch
)
# sample_idx = _build_sample_idx(sizes, doc_idx, seq_length,
# num_epochs, tokens_per_epoch)
np.save(sample_idx_filename, sample_idx, allow_pickle=True)
logger.info(
" > elapsed time to build and save sample-idx mapping "
"(seconds): {:4f}".format(time.time() - start_time)
)
# shuffle-idx.
start_time = time.time()
# -1 is due to data structure used to retrieve the index:
# sample i --> [sample_idx[i], sample_idx[i+1])
if separate_last_epoch:
num_samples_ = num_samples_from_epochs_minus_one
else:
num_samples_ = sample_idx.shape[0] - 1
shuffle_idx = _build_shuffle_idx(num_samples_, sample_idx.shape[0] - 1, np_rng)
np.save(shuffle_idx_filename, shuffle_idx, allow_pickle=True)
logger.info(
" > elapsed time to build and save shuffle-idx mapping"
" (seconds): {:4f}".format(time.time() - start_time)
)
# This should be a barrier but nccl barrier assumes
# device_index=rank which is not the case for model
# parallel case
dist.synchronize()
# Load mappings.
start_time = time.time()
logger.info(" > loading doc-idx mapping from {}".format(doc_idx_filename))
doc_idx = np.load(doc_idx_filename, allow_pickle=True, mmap_mode="r")
logger.info(" > loading sample-idx mapping from {}".format(sample_idx_filename))
sample_idx = np.load(sample_idx_filename, allow_pickle=True, mmap_mode="r")
logger.info(" > loading shuffle-idx mapping from {}".format(shuffle_idx_filename))
shuffle_idx = np.load(shuffle_idx_filename, allow_pickle=True, mmap_mode="r")
logger.info(" loaded indexed file in {:3.3f} seconds".format(time.time() - start_time))
logger.info(" total number of samples: {}".format(sample_idx.shape[0]))
logger.info(" total number of epochs: {}".format(num_epochs))
return doc_idx, sample_idx, shuffle_idx
def _num_tokens(documents, sizes):
"""Total number of tokens in the dataset."""
return np.sum(sizes[documents])
def _num_epochs(tokens_per_epoch, seq_length, num_samples):
"""Based on number of samples and sequence length, calculate how many
epochs will be needed."""
num_epochs = 0
total_tokens = 0
while True:
num_epochs += 1
total_tokens += tokens_per_epoch
# -1 is because we need to retrieve seq_length + 1 token each time
# but the last token will overlap with the first token of the next
# sample except for the last sample.
if ((total_tokens - 1) // seq_length) >= num_samples:
return num_epochs
def _build_doc_idx(documents, num_epochs, np_rng, separate_last_epoch):
"""Build an array with length = number-of-epochs * number-of-documents.
Each index is mapped to a corresponding document."""
if not separate_last_epoch or num_epochs == 1:
doc_idx = np.mgrid[0:num_epochs, 0 : len(documents)][1]
doc_idx[:] = documents
doc_idx = doc_idx.reshape(-1)
doc_idx = doc_idx.astype(np.int32)
np_rng.shuffle(doc_idx)
return doc_idx
doc_idx_first = _build_doc_idx(documents, num_epochs - 1, np_rng, False)
doc_idx_last = _build_doc_idx(documents, 1, np_rng, False)
return np.concatenate((doc_idx_first, doc_idx_last))
def _build_shuffle_idx(num_samples, total_size, np_rng):
"""Build the range [0, size) and shuffle."""
logger.info(
" > building shuffle index with split [0, {}) and [{}, {}) "
"...".format(num_samples, num_samples, total_size)
)
dtype_ = np.uint32
if total_size >= (np.iinfo(np.uint32).max - 1):
dtype_ = np.int64
shuffle_idx_first = np.arange(start=0, stop=num_samples, step=1, dtype=dtype_)
np_rng.shuffle(shuffle_idx_first)
if num_samples == total_size:
return shuffle_idx_first
shuffle_idx_last = np.arange(start=num_samples, stop=total_size, step=1, dtype=dtype_)
np_rng.shuffle(shuffle_idx_last)
return np.concatenate((shuffle_idx_first, shuffle_idx_last))
# coding=utf-8
# Copyright 2021 The OneFlow Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from typing import Callable, Optional
import oneflow as flow
from flowvision import datasets
from libai.data.structures import DistTensorData, Instance
class ImageNetDataset(datasets.ImageFolder):
r"""`ImageNet <http://image-net.org/>`_ 2012 Classification Dataset in LiBai.
Args:
root (string): Root directory of the ImageNet Dataset.
train (bool, optional): If True, creates dataset from training set, otherwise
creates from test set.
transform (callable, optional): A function/transform that takes in a PIL image
and returns a transformed version. E.g, ``transforms.RandomCrop``
"""
def __init__(
self, root: str, train: bool = True, transform: Optional[Callable] = None, **kwargs
):
prefix = "train" if train else "val"
root = os.path.join(root, prefix)
super(ImageNetDataset, self).__init__(root=root, transform=transform, **kwargs)
def __getitem__(self, index: int):
sample, target = super().__getitem__(index)
data_sample = Instance(
images=DistTensorData(sample, placement_idx=0),
labels=DistTensorData(flow.tensor(target, dtype=flow.long), placement_idx=-1),
)
return data_sample
# coding=utf-8
# Copyright 2021 The OneFlow Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Callable, Optional
import oneflow as flow
from flowvision import datasets
from libai.data.structures import DistTensorData, Instance
class MNISTDataset(datasets.MNIST):
r"""`MNIST <http://yann.lecun.com/exdb/mnist/>`_ Dataset in LiBai.
Args:
root (string): Root directory of dataset where ``MNIST/processed/training.pt``
and ``MNIST/processed/test.pt`` exist.
train (bool, optional): If True, creates dataset from ``training.pt``,
otherwise from ``test.pt``.
download (bool, optional): If true, downloads the dataset from the internet and
puts it in root directory. If the dataset is already downloaded, it will not be
downloaded again.
transform (callable, optional): A function/transform that takes in a PIL image
and returns a transformed version. E.g, ``transforms.RandomCrop``
dataset_name (str, optional): Name for the dataset as an identifier. E.g, ``mnist``
"""
def __init__(
self,
root: str,
train: bool = True,
transform: Optional[Callable] = None,
download: bool = False,
**kwargs
):
super(MNISTDataset, self).__init__(
root=root, train=train, transform=transform, download=download, **kwargs
)
def __getitem__(self, index: int):
img, target = super().__getitem__(index)
data_sample = Instance(
images=DistTensorData(img, placement_idx=0),
labels=DistTensorData(flow.tensor(target, dtype=flow.long), placement_idx=-1),
)
return data_sample
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Roberta Style dataset."""
import numpy as np
import oneflow as flow
from libai.data.structures import DistTensorData, Instance
from ..data_utils import create_masked_lm_predictions, get_samples_mapping
from .bert_dataset import pad_and_convert_to_numpy
class RobertaDataset(flow.utils.data.Dataset):
"""Dataset containing sentence for RoBERTa training.
Each index corresponds to a randomly selected sentence.
Args:
name: Name of dataset for clarification.
tokenizer: Tokenizer to use.
data_prefix: Path to the training dataset.
indexed_dataset: Indexed dataset to use.
max_seq_length: Maximum length of the sequence. All values are padded to
this length. Defaults to 512.
mask_lm_prob: Probability to mask tokens. Defaults to 0.15.
short_seq_prob: Probability of producing a short sequence. Defaults to 0.0.
max_predictions_per_seq: Maximum number of mask tokens in each sentence. Defaults to None.
seed: Seed for random number generator for reproducibility. Defaults to 1234.
"""
def __init__(
self,
name,
tokenizer,
indexed_dataset,
data_prefix,
max_num_samples,
mask_lm_prob,
max_seq_length,
short_seq_prob=0.0,
seed=1234,
masking_style="bert",
):
super().__init__()
# Params to store.
self.name = name
self.seed = seed
self.masked_lm_prob = mask_lm_prob
self.max_seq_length = max_seq_length
self.masking_style = masking_style
# Dataset.
self.indexed_dataset = indexed_dataset
# Build the samples mapping.
self.samples_mapping = get_samples_mapping(
self.indexed_dataset,
data_prefix,
None,
max_num_samples,
self.max_seq_length - 2, # account for added tokens
short_seq_prob,
self.seed,
self.name,
binary_head=False,
)
# Vocab stuff.
self.tokenizer = tokenizer
self.vocab_id_list = list(tokenizer.get_vocab().values())
self.vocab_id_to_token_dict = {v: k for k, v in tokenizer.get_vocab().items()}
self.cls_id = tokenizer.cls_token_id
self.sep_id = tokenizer.sep_token_id
self.mask_id = tokenizer.mask_token_id
self.pad_id = tokenizer.pad_token_id
def __len__(self):
return self.samples_mapping.shape[0]
def __getitem__(self, idx):
start_idx, end_idx, seq_length = self.samples_mapping[idx]
sample = [self.indexed_dataset[i] for i in range(start_idx, end_idx)]
# Note that this rng state should be numpy and not python since
# python randint is inclusive whereas the numpy one is exclusive.
# We % 2**32 since numpy requires the seed to be between 0 and 2**32 - 1
np_rng = np.random.RandomState(seed=((self.seed + idx) % 2 ** 32))
return build_training_sample(
self.tokenizer,
sample,
seq_length,
self.max_seq_length, # needed for padding
self.vocab_id_list,
self.vocab_id_to_token_dict,
self.cls_id,
self.sep_id,
self.mask_id,
self.pad_id,
self.masked_lm_prob,
np_rng,
masking_style=self.masking_style,
)
def build_training_sample(
tokenizer,
sample,
target_seq_length,
max_seq_length,
vocab_id_list,
vocab_id_to_token_dict,
cls_id,
sep_id,
mask_id,
pad_id,
masked_lm_prob,
np_rng,
masking_style="bert",
):
"""Build training sample.
Arguments:
sample: A list of sentences in which each sentence is a list token ids.
target_seq_length: Desired sequence length.
max_seq_length: Maximum length of the sequence. All values are padded to
this length.
vocab_id_list: List of vocabulary ids. Used to pick a random id.
vocab_id_to_token_dict: A dictionary from vocab ids to text tokens.
cls_id: Start of example id.
sep_id: Separator id.
mask_id: Mask token id.
pad_id: Padding token id.
masked_lm_prob: Probability to mask tokens.
np_rng: Random number genenrator. Note that this rng state should be
numpy and not python since python randint is inclusive for
the upper bound whereas the numpy one is exclusive.
"""
assert target_seq_length <= max_seq_length
tokens = []
for j in range(len(sample)):
tokens.extend(sample[j])
max_num_tokens = target_seq_length
truncate_segments(tokens, len(tokens), max_num_tokens, np_rng)
# create tokens and tokentypes
tokens, tokentypes = create_tokens_and_tokentypes(tokens, cls_id, sep_id)
# Masking
max_predictions_per_seq = masked_lm_prob * max_num_tokens
(tokens, masked_positions, masked_labels, _, _) = create_masked_lm_predictions(
tokenizer,
tokens,
vocab_id_list,
vocab_id_to_token_dict,
masked_lm_prob,
cls_id,
sep_id,
mask_id,
max_predictions_per_seq,
np_rng,
masking_style=masking_style,
)
# Padding.
tokens_np, tokentypes_np, labels_np, padding_mask_np, loss_mask_np = pad_and_convert_to_numpy(
tokens, tokentypes, masked_positions, masked_labels, pad_id, max_seq_length
)
train_sample = Instance(
input_ids=DistTensorData(flow.tensor(tokens_np)),
attention_mask=DistTensorData(flow.tensor(padding_mask_np)),
tokentype_ids=DistTensorData(flow.tensor(tokentypes_np)),
lm_labels=DistTensorData(flow.tensor(labels_np), placement_idx=-1),
loss_mask=DistTensorData(flow.tensor(loss_mask_np), placement_idx=-1),
)
return train_sample
def truncate_segments(tokens, len_tokens, max_num_tokens, np_rng):
"""Truncates a sequences to a maximum sequence length."""
assert len_tokens > 0
if len_tokens <= max_num_tokens:
return False
while len_tokens > max_num_tokens:
if np_rng.random() < 0.5:
del tokens[0]
else:
tokens.pop()
len_tokens -= 1
return True
def create_tokens_and_tokentypes(tokens, cls_id, sep_id):
"""Add [CLS] and [SEP] and build tokentypes."""
# [CLS].
tokens.insert(0, cls_id)
# [SPE].
tokens.append(sep_id)
tokentypes = [0] * len(tokens)
return tokens, tokentypes
# coding=utf-8
# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""T5 Style dataset."""
import collections
import numpy as np
import oneflow as flow
from libai.data.structures import DistTensorData, Instance
from ..data_utils import create_masked_lm_predictions, get_samples_mapping
class T5Dataset(flow.utils.data.Dataset):
"""
Dataset containing sentences for T5 training.
Args:
name: Name of dataset.
tokenizer: Tokenizer to use.
data_prefix (str): Path to the training dataset.
indexed_dataset: Indexed dataset to use.
max_seq_length (int, optional): Maximum length of the sequence passing into encoder.
All values are padded to this length. Defaults to 512.
max_seq_length_dec (int, optional): Maximum length of the sequence passing into decoder.
All values are padded to this length. Defaults to 128.
mask_lm_prob (float, optional): Probability to mask tokens. Defaults to 0.15.
max_preds_per_seq (int, optional): Maximum number of masked tokens in each sentence.
Defaults to None.
short_seq_prob (float, optional):
Probability of producing a short sequence. Defaults to 0.0.
seed (int, optional):
Seed for random number generator for reproducibility. Defaults to 1234.
"""
def __init__(
self,
name,
tokenizer,
indexed_dataset,
data_prefix,
max_num_samples,
masked_lm_prob,
max_seq_length,
max_seq_length_dec,
short_seq_prob,
seed,
):
# Params to store.
self.name = name
self.seed = seed
self.masked_lm_prob = masked_lm_prob
self.max_seq_length = max_seq_length
self.max_seq_length_dec = max_seq_length_dec
# Dataset.
self.indexed_dataset = indexed_dataset
# Build the samples mapping.
self.samples_mapping = get_samples_mapping(
self.indexed_dataset,
data_prefix,
None,
max_num_samples,
self.max_seq_length - 2, # account for added tokens
short_seq_prob,
self.seed,
self.name,
False,
)
# Vocab stuff.
self.tokenizer = tokenizer
tokenizer.add_tokens(
[tokenizer._bos_token, tokenizer._eos_token, *tokenizer._additional_special_tokens]
)
vocab = tokenizer.get_vocab()
inv_vocab = {v: k for k, v in vocab.items()}
self.vocab_id_list = list(inv_vocab.keys())
self.vocab_id_to_token_dict = inv_vocab
self.cls_id = vocab[tokenizer._cls_token]
self.sep_id = vocab[tokenizer._sep_token]
self.mask_id = vocab[tokenizer._mask_token]
self.pad_id = vocab[tokenizer._pad_token]
self.bos_id = vocab[tokenizer._bos_token]
self.eos_id = vocab[tokenizer._eos_token]
self.sentinel_tokens = [vocab[x] for x in tokenizer._additional_special_tokens]
assert len(self.sentinel_tokens) > 0
def __len__(self):
return self.samples_mapping.shape[0]
def __getitem__(self, idx):
start_index, end_index, seq_length = self.samples_mapping[idx]
sample = []
for index in range(start_index, end_index):
sample.append(self.indexed_dataset[index])
# Note that this rng state should be numpy and not python since
# python randint is inclusive whereas the numpy one is exclusive.
np_rng = np.random.RandomState(seed=(self.seed + idx))
return build_training_sample(
self.tokenizer,
sample,
seq_length,
self.max_seq_length, # needed for padding
self.max_seq_length_dec,
self.vocab_id_list,
self.vocab_id_to_token_dict,
self.cls_id,
self.sep_id,
self.mask_id,
self.pad_id,
self.masked_lm_prob,
np_rng,
self.bos_id,
self.eos_id,
self.sentinel_tokens,
)
def build_training_sample(
tokenizer,
sample,
target_seq_length,
max_seq_length,
max_seq_length_dec,
vocab_id_list,
vocab_id_to_token_dict,
cls_id,
sep_id,
mask_id,
pad_id,
masked_lm_prob,
np_rng,
bos_id=None,
eos_id=None,
sentinel_tokens=None,
):
"""Build training sample.
Arguments:
sample: A list of sentences in which each sentence is a list token ids.
target_seq_length: Desired sequence length.
max_seq_length: Maximum length of the sequence. All values are padded to
this length.
vocab_id_list: List of vocabulary ids. Used to pick a random id.
vocab_id_to_token_dict: A dictionary from vocab ids to text tokens.
cls_id: Start of example id.
sep_id: Separator id.
mask_id: Mask token id.
pad_id: Padding token id.
masked_lm_prob: Probability to mask tokens.
np_rng: Random number genenrator. Note that this rng state should be
numpy and not python since python randint is inclusive for
the opper bound whereas the numpy one is exclusive.
bos_id: start of decoder example id
eos_id: end of generation id
sentinel_tokens: unique value to be substituted for every replaced span
"""
assert target_seq_length <= max_seq_length
# flatten sentences into one list
tokens = [token for sentence in sample for token in sentence]
# Truncate to `target_sequence_length`.
max_num_tokens = target_seq_length
len(tokens) > max_num_tokens
tokens = tokens[:max_num_tokens]
# Masking.
max_predictions_per_seq = masked_lm_prob * max_num_tokens
(tokens, masked_positions, masked_labels, _, masked_spans) = create_masked_lm_predictions(
tokenizer,
tokens,
vocab_id_list,
vocab_id_to_token_dict,
masked_lm_prob,
cls_id,
sep_id,
mask_id,
max_predictions_per_seq,
np_rng,
max_ngrams=10,
geometric_dist=True,
masking_style="t5",
)
# Padding.
(
tokens_enc,
tokens_dec_in,
labels,
enc_mask,
dec_mask,
enc_dec_mask,
loss_mask,
) = pad_and_convert_to_numpy(
tokens,
masked_positions,
masked_labels,
pad_id,
max_seq_length,
max_seq_length_dec,
masked_spans,
bos_id,
eos_id,
sentinel_tokens,
)
sample = Instance(
encoder_input_ids=DistTensorData(tokens_enc),
decoder_input_ids=DistTensorData(tokens_dec_in),
encoder_attn_mask=DistTensorData(enc_mask),
decoder_attn_mask=DistTensorData(dec_mask),
encoder_decoder_attn_mask=DistTensorData(enc_dec_mask),
lm_labels=DistTensorData(labels, placement_idx=-1),
loss_mask=DistTensorData(loss_mask, placement_idx=-1),
)
return sample
def pad_and_convert_to_numpy(
tokens,
masked_positions,
masked_labels,
pad_id,
max_seq_length,
max_seq_length_dec,
masked_spans=None,
bos_id=None,
eos_id=None,
sentinel_tokens=None,
):
"""Pad sequences and convert them to numpy."""
sentinel_tokens = collections.deque(sentinel_tokens)
t5_input = []
(t5_decoder_in, t5_decoder_out) = ([bos_id], [])
(start_index, end_index) = (0, None)
for span in masked_spans:
flag = sentinel_tokens.popleft()
# Append the same tokens in decoder input and output
t5_decoder_in.append(flag)
t5_decoder_in.extend(span.label)
t5_decoder_out.append(flag)
t5_decoder_out.extend(span.label)
end_index = span.index[0]
t5_input.extend(tokens[start_index:end_index])
t5_input.append(flag)
# the next start index is the token after the last span token
start_index = span.index[-1] + 1
# Add <eos> token to the t5_decoder_out
t5_decoder_out.append(eos_id)
# Add the remaining tokens to the t5 input
t5_input.extend(tokens[start_index:])
# assert (len(t5_input) - len(masked_spans)) + \
# (len(t5_decoder_in) - (len(masked_spans) + 1)) == len(tokens)
# Some checks.
# Encoder-side padding mask.
num_tokens = len(t5_input)
padding_length = max_seq_length - num_tokens
assert padding_length >= 0
assert len(masked_positions) == len(masked_labels)
# Tokens..
filler = [pad_id] * padding_length
tokens_enc = np.array(t5_input + filler, dtype=np.int64)
# Decoder-side padding mask.
num_tokens_dec = len(t5_decoder_in)
padding_length_dec = max_seq_length_dec - num_tokens_dec
assert padding_length_dec >= 0
filler_dec = [pad_id] * padding_length_dec
tokens_dec_in = np.array(t5_decoder_in + filler_dec, dtype=np.int64)
# Create attention masks
enc_mask = make_attention_mask(tokens_enc, tokens_enc)
enc_dec_mask = make_attention_mask(tokens_dec_in, tokens_enc)
dec_mask = make_attention_mask(tokens_dec_in, tokens_dec_in)
dec_mask = dec_mask * make_history_mask(tokens_dec_in)
# Labels mask.
labels = t5_decoder_out + ([-1] * padding_length_dec)
labels = np.array(labels, dtype=np.int64)
# Loss mask
loss_mask = ([1] * num_tokens_dec) + ([0] * padding_length_dec)
loss_mask = np.array(loss_mask, dtype=np.bool)
tokens_enc = flow.tensor(tokens_enc, dtype=flow.long)
tokens_dec_in = flow.tensor(tokens_dec_in, dtype=flow.long)
labels = flow.tensor(labels, dtype=flow.long)
enc_mask = flow.tensor(enc_mask, dtype=flow.bool)
dec_mask = flow.tensor(dec_mask, dtype=flow.bool)
enc_dec_mask = flow.tensor(enc_dec_mask, dtype=flow.bool)
loss_mask = flow.tensor(loss_mask, dtype=flow.bool)
return tokens_enc, tokens_dec_in, labels, enc_mask, dec_mask, enc_dec_mask, loss_mask
def make_attention_mask(source_block, target_block):
"""
Returns a 2-dimensional (2-D) attention mask
:param source_block: 1-D array
:param target_block: 1-D array
"""
mask = (target_block[None, :] >= 1) * (source_block[:, None] >= 1)
mask = mask.astype(np.int64)
# (source_length, target_length)
return mask
def make_history_mask(block):
length = block.shape[0]
arange = np.arange(length)
history_mask = (
arange[
None,
]
<= arange[:, None]
)
history_mask = history_mask.astype(np.int64)
return history_mask
# coding=utf-8
# Copyright 2021 The OneFlow Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .samplers import CyclicSampler, SingleRoundSampler
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment