Commit 790e49e5 authored by stephenwu's avatar stephenwu
Browse files

Merge branch 'master' of https://github.com/tensorflow/models into run_superglue

parents 8ab018b0 5bb827c3
#!/bin/bash
# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
#
# Pylint wrapper extracted from main TensorFlow, sharing same exceptions.
# As this is meant for smaller repos, drops "modified files" checking in favor
# of full-repo checking.
set -euo pipefail
# Download latest configs from main TensorFlow repo.
wget -q -O /tmp/pylintrc https://raw.githubusercontent.com/tensorflow/tensorflow/master/tensorflow/tools/ci_build/pylintrc
wget -q -O /tmp/pylint_allowlist https://raw.githubusercontent.com/tensorflow/tensorflow/master/tensorflow/tools/ci_build/pylint_allowlist
SCRIPT_DIR=/tmp
num_cpus() {
# Get the number of CPUs
if [[ -f /proc/cpuinfo ]]; then
N_CPUS=$(grep -c ^processor /proc/cpuinfo)
else
# Fallback method
N_CPUS=`getconf _NPROCESSORS_ONLN`
fi
if [[ -z ${N_CPUS} ]]; then
die "ERROR: Unable to determine the number of CPUs"
fi
echo ${N_CPUS}
}
get_py_files_to_check() {
find . -name '*.py'
}
do_pylint() {
# Get all Python files, regardless of mode.
PYTHON_SRC_FILES=$(get_py_files_to_check)
# Something happened. TF no longer has Python code if this branch is taken
if [[ -z ${PYTHON_SRC_FILES} ]]; then
echo "do_pylint found no Python files to check. Returning."
return 0
fi
# Now that we know we have to do work, check if `pylint` is installed
PYLINT_BIN="python3.8 -m pylint"
echo ""
echo "check whether pylint is available or not."
echo ""
${PYLINT_BIN} --version
if [[ $? -eq 0 ]]
then
echo ""
echo "pylint available, proceeding with pylint sanity check."
echo ""
else
echo ""
echo "pylint not available."
echo ""
return 1
fi
# Configure pylint using the following file
PYLINTRC_FILE="${SCRIPT_DIR}/pylintrc"
if [[ ! -f "${PYLINTRC_FILE}" ]]; then
die "ERROR: Cannot find pylint rc file at ${PYLINTRC_FILE}"
fi
# Run pylint in parallel, after some disk setup
NUM_SRC_FILES=$(echo ${PYTHON_SRC_FILES} | wc -w)
NUM_CPUS=$(num_cpus)
echo "Running pylint on ${NUM_SRC_FILES} files with ${NUM_CPUS} "\
"parallel jobs..."
echo ""
PYLINT_START_TIME=$(date +'%s')
OUTPUT_FILE="$(mktemp)_pylint_output.log"
ERRORS_FILE="$(mktemp)_pylint_errors.log"
PERMIT_FILE="$(mktemp)_pylint_permit.log"
FORBID_FILE="$(mktemp)_pylint_forbid.log"
rm -rf ${OUTPUT_FILE}
rm -rf ${ERRORS_FILE}
rm -rf ${PERMIT_FILE}
rm -rf ${FORBID_FILE}
set +e
# When running, filter to only contain the error code lines. Removes module
# header, removes lines of context that show up from some lines.
# Also, don't redirect stderr as this would hide pylint fatal errors.
${PYLINT_BIN} --rcfile="${PYLINTRC_FILE}" --output-format=parseable \
--jobs=${NUM_CPUS} ${PYTHON_SRC_FILES} | grep '\[[CEFW]' > ${OUTPUT_FILE}
PYLINT_END_TIME=$(date +'%s')
echo ""
echo "pylint took $((PYLINT_END_TIME - PYLINT_START_TIME)) s"
echo ""
# Report only what we care about
# Ref https://pylint.readthedocs.io/en/latest/technical_reference/features.html
# E: all errors
# W0311 bad-indentation
# W0312 mixed-indentation
# C0330 bad-continuation
# C0301 line-too-long
# C0326 bad-whitespace
# W0611 unused-import
# W0622 redefined-builtin
grep -E '(\[E|\[W0311|\[W0312|\[C0330|\[C0301|\[C0326|\[W0611|\[W0622)' ${OUTPUT_FILE} > ${ERRORS_FILE}
# Split the pylint reported errors into permitted ones and those we want to
# block submit on until fixed.
# We use `${ALLOW_LIST_FILE}` to record the errors we temporarily accept. Goal
# is to make that file only contain errors caused by difference between
# internal and external versions.
ALLOW_LIST_FILE="${SCRIPT_DIR}/pylint_allowlist"
if [[ ! -f "${ALLOW_LIST_FILE}" ]]; then
die "ERROR: Cannot find pylint allowlist file at ${ALLOW_LIST_FILE}"
fi
# We can split with just 2 grep invocations
grep -f ${ALLOW_LIST_FILE} ${ERRORS_FILE} > ${PERMIT_FILE}
grep -v -f ${ALLOW_LIST_FILE} ${ERRORS_FILE} > ${FORBID_FILE}
# Determine counts of errors
N_PERMIT_ERRORS=$(wc -l ${PERMIT_FILE} | cut -d' ' -f1)
N_FORBID_ERRORS=$(wc -l ${FORBID_FILE} | cut -d' ' -f1)
set -e
# First print all allowed errors
echo ""
if [[ ${N_PERMIT_ERRORS} != 0 ]]; then
echo "Found ${N_PERMIT_ERRORS} allowlisted pylint errors:"
cat ${PERMIT_FILE}
fi
# Now, print the errors we should fix
echo ""
if [[ ${N_FORBID_ERRORS} != 0 ]]; then
echo "Found ${N_FORBID_ERRORS} non-allowlisted pylint errors:"
cat ${FORBID_FILE}
fi
echo ""
if [[ ${N_FORBID_ERRORS} != 0 ]]; then
echo "FAIL: Found ${N_FORBID_ERRORS} non-allowlisted errors and ${N_PERMIT_ERRORS} allowlisted errors"
return 1
else
echo "PASS: Found only ${N_PERMIT_ERRORS} allowlisted errors"
fi
}
do_pylint
......@@ -18,6 +18,6 @@
/research/object_detection/ @jch1 @tombstone @pkulzc
/research/pcl_rl/ @ofirnachum
/research/rebar/ @gjtucker
/research/sequence_projection/ @thunderfyc
/research/seq_flow_lite/ @thunderfyc
/research/slim/ @sguada @marksandler2
/research/vid2depth/ @rezama
......@@ -49,6 +49,7 @@ This repository provides a curated list of the GitHub repositories with machine
| Model | Paper | Features | Maintainer |
|-------|-------|----------|------------|
| [Wide & Deep](https://github.com/IntelAI/models/tree/master/benchmarks/recommendation/tensorflow/wide_deep_large_ds) | [Wide & Deep Learning for Recommender Systems](https://arxiv.org/pdf/1606.07792) | • Int8 Inference<br/>• FP32 Inference<br/>• FP32 Training | [Intel](https://github.com/IntelAI) |
| [Wide & Deep](https://github.com/NVIDIA/DeepLearningExamples/tree/master/TensorFlow2/Recommendation/WideAndDeep) | [Wide & Deep Learning for Recommender Systems](https://arxiv.org/pdf/1606.07792) | • Automatic mixed precision<br/>• Multi-GPU training support with Horovod<br/>• XLA | [NVIDIA](https://github.com/NVIDIA) |
## Contributions
......
......@@ -18,7 +18,7 @@ The base trainer implements the Orbit `StandardTrainable` and
`StandardEvaluable` interfaces. Trainers inside this project should be
interchangable and independent on model architectures and tasks.
"""
import functools
from absl import logging
import gin
import orbit
......@@ -84,10 +84,95 @@ class Recovery:
"%f at step %d.", checkpoint_path, loss_value, global_step)
class _AsyncTrainer(orbit.StandardTrainer, orbit.StandardEvaluator):
"""Trainer class for both sync and async Strategy."""
def init_async(self):
"""Initializes the Async Trainer base class."""
assert isinstance(self._strategy, tf.distribute.Strategy)
self._is_async = isinstance(
self._strategy, tf.distribute.experimental.ParameterServerStrategy)
self._coordinator = None
if self._is_async:
self._coordinator = (
tf.distribute.experimental.coordinator.ClusterCoordinator(
self._strategy))
def join(self):
"""Join all async steps. Only useful in aysnc training."""
if getattr(self, "_is_async", False):
self._coordinator.join()
def create_train_loop_fn(self):
"""Creates a eval loop from the given step function and options."""
train_loop_fn = super().create_train_loop_fn()
if getattr(self, "_is_async", False):
def _async_loop_fn(iterator, num_steps):
self._coordinator.schedule(train_loop_fn, args=(iterator, num_steps))
return _async_loop_fn
else:
return train_loop_fn
def create_eval_loop_fn(self, has_state: bool):
"""Creates a training loop from the given step function and options."""
eval_loop_fn = super().create_eval_loop_fn(has_state)
if getattr(self, "_is_async", False):
if has_state:
raise ValueError(
"Stateful eval loop is not supported in async training.")
def _async_loop_fn(iterator, num_steps, state=None, reduce_fn=None):
assert state is None
assert reduce_fn is None
self._coordinator.schedule(eval_loop_fn, args=(iterator, num_steps))
return _async_loop_fn
else:
return eval_loop_fn
def distribute_dataset(self, dataset_or_fn, *args, **kwargs):
"""A utility function to help create a `tf.distribute.DistributedDataset`.
Args:
dataset_or_fn: A instance of `tf.data.Dataset`, or a "dataset function"
returning a `tf.data.Dataset`. If it is a function, it may optionally
have an argument named `input_context` which will be passed a
`tf.distribute.InputContext` instance.
*args: Any positional arguments to pass through to `dataset_or_fn`.
**kwargs: Any keyword arguments to pass through to `dataset_or_fn`.
Returns:
A distributed Dataset.
"""
if getattr(self, "_is_async", False):
per_worker_dataset_fn = functools.partial(
orbit.utils.make_distributed_dataset, self._strategy, dataset_or_fn,
*args, **kwargs)
per_worker_dataset_fn = tf.function(per_worker_dataset_fn)
return self._coordinator.create_per_worker_dataset(per_worker_dataset_fn)
else:
return orbit.utils.make_distributed_dataset(self._strategy, dataset_or_fn,
*args, **kwargs)
def get_runtime_options(config: ExperimentConfig):
"""Get tf.distribute.RunOptions from config."""
xla_options = {}
if config.runtime.tpu_enable_xla_dynamic_padder is not None:
xla_options["enable_xla_dynamic_padder"] = (
config.runtime.tpu_enable_xla_dynamic_padder)
return tf.distribute.RunOptions(
experimental_xla_options=tf.tpu.XLAOptions(**xla_options))
@gin.configurable
class Trainer(orbit.StandardTrainer, orbit.StandardEvaluator):
class Trainer(_AsyncTrainer):
"""Implements the common trainer shared for TensorFlow models."""
# pylint: disable=super-init-not-called
def __init__(self,
config: ExperimentConfig,
task: base_task.Task,
......@@ -120,6 +205,9 @@ class Trainer(orbit.StandardTrainer, orbit.StandardEvaluator):
self._optimizer = optimizer
self._checkpoint_exporter = checkpoint_exporter
self._recovery = None
# Runtime options are only applied to train_step.
# We use default for eval_step.
self._runtime_options = get_runtime_options(config)
# Creates a shadow copy of the weights to store weights moving average.
if isinstance(self._optimizer, optimization.ExponentialMovingAverage):
......@@ -147,9 +235,11 @@ class Trainer(orbit.StandardTrainer, orbit.StandardEvaluator):
self._validation_metrics = self.task.build_metrics(
training=False) + self.model.metrics
self.init_async()
if train:
train_dataset = orbit.utils.make_distributed_dataset(
self.strategy, self.task.build_inputs, self.config.task.train_data)
train_dataset = self.distribute_dataset(
self.task.build_inputs, self.config.task.train_data)
orbit.StandardTrainer.__init__(
self,
train_dataset,
......@@ -159,9 +249,8 @@ class Trainer(orbit.StandardTrainer, orbit.StandardEvaluator):
use_tpu_summary_optimization=config.trainer.allow_tpu_summary))
if evaluate:
eval_dataset = orbit.utils.make_distributed_dataset(
self.strategy, self.task.build_inputs,
self.config.task.validation_data)
eval_dataset = self.distribute_dataset(
self.task.build_inputs, self.config.task.validation_data)
orbit.StandardEvaluator.__init__(
self,
eval_dataset,
......@@ -270,6 +359,7 @@ class Trainer(orbit.StandardTrainer, orbit.StandardEvaluator):
def train_loop_end(self):
"""See base class."""
self.join()
# Checks if the model numeric status is stable and conducts the checkpoint
# recovery accordingly.
if self._recovery:
......@@ -297,7 +387,8 @@ class Trainer(orbit.StandardTrainer, orbit.StandardEvaluator):
self._train_loss.update_state(logs[self.task.loss])
self.global_step.assign_add(1)
self.strategy.run(step_fn, args=(next(iterator),))
self.strategy.run(
step_fn, args=(next(iterator),), options=self._runtime_options)
def eval_begin(self):
"""Sets up metrics."""
......@@ -324,6 +415,7 @@ class Trainer(orbit.StandardTrainer, orbit.StandardEvaluator):
def eval_end(self, aggregated_logs=None):
"""Processes evaluation results."""
self.join()
logs = {}
for metric in self.validation_metrics:
logs[metric.name] = metric.result()
......
......@@ -14,9 +14,13 @@
"""Tests for tensorflow_models.core.trainers.trainer."""
# pylint: disable=g-direct-tensorflow-import
import multiprocessing
import os
import sys
from absl.testing import parameterized
import numpy as np
import portpicker
import tensorflow as tf
from tensorflow.python.distribute import combinations
......@@ -26,6 +30,9 @@ from official.core import config_definitions as cfg
from official.core import train_lib
from official.utils.testing import mock_task
TPU_TEST = 'test_tpu' in sys.argv[0]
GPU_TEST = 'test_gpu' in sys.argv[0]
def all_strategy_combinations():
return combinations.combine(
......@@ -36,6 +43,113 @@ def all_strategy_combinations():
],)
def create_in_process_cluster(num_workers, num_ps):
"""Creates and starts local servers and returns the cluster_resolver."""
worker_ports = [portpicker.pick_unused_port() for _ in range(num_workers)]
ps_ports = [portpicker.pick_unused_port() for _ in range(num_ps)]
cluster_dict = {}
cluster_dict['worker'] = ['localhost:%s' % port for port in worker_ports]
if num_ps > 0:
cluster_dict['ps'] = ['localhost:%s' % port for port in ps_ports]
cluster_spec = tf.train.ClusterSpec(cluster_dict)
# Workers need some inter_ops threads to work properly.
worker_config = tf.compat.v1.ConfigProto()
if multiprocessing.cpu_count() < num_workers + 1:
worker_config.inter_op_parallelism_threads = num_workers + 1
for i in range(num_workers):
tf.distribute.Server(
cluster_spec,
job_name='worker',
task_index=i,
config=worker_config,
protocol='grpc')
for i in range(num_ps):
tf.distribute.Server(
cluster_spec, job_name='ps', task_index=i, protocol='grpc')
cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
cluster_spec, rpc_layer='grpc')
return cluster_resolver
def dataset_fn(input_context=None):
del input_context
def dummy_data(_):
return tf.zeros((1, 1), dtype=tf.float32)
dataset = tf.data.Dataset.range(1)
dataset = dataset.repeat()
dataset = dataset.map(
dummy_data, num_parallel_calls=tf.data.experimental.AUTOTUNE)
return dataset
class MockAsyncTrainer(trainer_lib._AsyncTrainer):
"""Mock AsyncTrainer to test the _AsyncTrainer class."""
def __init__(self):
self._strategy = tf.distribute.get_strategy()
self.init_async()
self.global_step = tf.Variable(
0,
dtype=tf.int64,
name='global_step',
trainable=False,
aggregation=tf.VariableAggregation.ONLY_FIRST_REPLICA)
self.eval_global_step = tf.Variable(
0,
dtype=tf.int64,
name='eval_global_step',
trainable=False,
aggregation=tf.VariableAggregation.ONLY_FIRST_REPLICA)
train_dataset = self.distribute_dataset(dataset_fn)
trainer_lib.orbit.StandardTrainer.__init__(
self, train_dataset, options=trainer_lib.orbit.StandardTrainerOptions())
eval_dataset = self.distribute_dataset(dataset_fn)
trainer_lib.orbit.StandardEvaluator.__init__(
self,
eval_dataset,
options=trainer_lib.orbit.StandardEvaluatorOptions(
use_tf_while_loop=True))
def train_loop_begin(self):
self.global_step.assign(0)
def train_step(self, iterator):
def replica_step(_):
self.global_step.assign_add(1)
self._strategy.run(replica_step, args=(next(iterator),))
def train_loop_end(self):
self.join()
return self.global_step.numpy()
def eval_begin(self):
self.eval_global_step.assign(0)
def eval_step(self, iterator):
def replica_step(_):
self.eval_global_step.assign_add(1)
self._strategy.run(replica_step, args=(next(iterator),))
def eval_end(self):
self.join()
return self.eval_global_step.numpy()
class TrainerTest(tf.test.TestCase, parameterized.TestCase):
def setUp(self):
......@@ -71,6 +185,55 @@ class TrainerTest(tf.test.TestCase, parameterized.TestCase):
self.assertIn('training_loss', logs)
self.assertIn('learning_rate', logs)
def test_base_async_trainer(self):
if TPU_TEST or GPU_TEST:
self.skipTest('Aysnc training is not available on GPU/GPU.')
num_workers = 3
num_ps = 2
cluster_resolver = create_in_process_cluster(num_workers, num_ps)
distribution = tf.distribute.experimental.ParameterServerStrategy(
cluster_resolver)
with distribution.scope():
trainer = MockAsyncTrainer()
trainer.init_async()
self.assertIsInstance(
trainer._coordinator,
tf.distribute.experimental.coordinator.ClusterCoordinator)
self.assertEqual(trainer.train(tf.constant(10)), 10)
self.assertEqual(trainer.evaluate(tf.constant(11)), 11)
def test_async_trainer_train(self):
if TPU_TEST or GPU_TEST:
self.skipTest('Aysnc training is not available on GPU/GPU.')
num_workers = 3
num_ps = 2
cluster_resolver = create_in_process_cluster(num_workers, num_ps)
distribution = tf.distribute.experimental.ParameterServerStrategy(
cluster_resolver)
with distribution.scope():
config = cfg.ExperimentConfig(**self._config.as_dict())
config.trainer.eval_tf_while_loop = True
trainer = self.create_test_trainer(config)
logs = trainer.train(tf.convert_to_tensor(5, dtype=tf.int32))
self.assertIn('training_loss', logs)
self.assertIn('learning_rate', logs)
def test_async_trainer_validate(self):
if TPU_TEST or GPU_TEST:
self.skipTest('Aysnc training is not available on GPU/GPU.')
num_workers = 3
num_ps = 2
cluster_resolver = create_in_process_cluster(num_workers, num_ps)
distribution = tf.distribute.experimental.ParameterServerStrategy(
cluster_resolver)
with distribution.scope():
config = cfg.ExperimentConfig(**self._config.as_dict())
config.trainer.eval_tf_while_loop = True
trainer = self.create_test_trainer(config)
logs = trainer.evaluate(tf.convert_to_tensor(5, dtype=tf.int32))
self.assertIn('acc', logs)
self.assertIn('validation_loss', logs)
@combinations.generate(all_strategy_combinations())
def test_trainer_validate(self, distribution):
with distribution.scope():
......
......@@ -44,8 +44,10 @@ class DataConfig(base_config.Config):
drop_remainder: Whether the last batch should be dropped in the case it has
fewer than `global_batch_size` elements.
shuffle_buffer_size: The buffer size used for shuffling training data.
cache: Whether to cache dataset examples. Can be used to avoid re-reading
from disk on the second epoch. Requires significant memory overhead.
cache: Whether to cache dataset examples. If `True`, we will cache the
dataset after applying the decode_fn and parse_fn. It can be used to avoid
re-reading from disk, re-decoding and re-parsing the example on the
second epoch, but it requires significant memory overhead.
cycle_length: The number of files that will be processed concurrently when
interleaving files.
block_length: The number of consecutive elements to produce from each input
......@@ -138,6 +140,20 @@ class RuntimeConfig(base_config.Config):
run_eagerly: bool = False
batchnorm_spatial_persistent: bool = False
# XLA runtime params.
# XLA params are only applied to the train_step.
# These augments can improve training speed. They can also improve eval, but
# may reduce usability and users would need to make changes to code.
# Whether to enable XLA dynamic padder
# infrastructure to handle dynamic shapes inputs inside XLA. True by
# default. Disabling this may cause correctness issues with dynamic shapes
# inputs, as XLA will just assume the inputs are with padded shapes. However
# users can optionally set it to False to improve device time if masking is
# already handled in the user side.
# If None, will respect XLA default.
tpu_enable_xla_dynamic_padder: Optional[bool] = None
# Global model parallelism configurations.
num_cores_per_replica: int = 1
default_shard_dim: int = -1
......
......@@ -174,11 +174,13 @@ class InputReader:
dataset = tf.data.Dataset.from_tensor_slices(matched_files)
# Shuffle and repeat at file level.
# If cache is enabled, `reshuffle_each_iteration` is set to False,
# because we will read the same cached data in every iteration anyway.
if self._is_training:
dataset = dataset.shuffle(
len(matched_files),
seed=self._seed,
reshuffle_each_iteration=True)
reshuffle_each_iteration=True if not self._cache else False)
# Do not enable sharding if tf.data service is enabled, as sharding will be
# handled inside tf.data service.
......@@ -187,7 +189,9 @@ class InputReader:
not self._enable_tf_data_service):
dataset = dataset.shard(input_context.num_input_pipelines,
input_context.input_pipeline_id)
if self._is_training:
# If cache is enabled, we will call `repeat()` later after `cache()`.
if self._is_training and not self._cache:
dataset = dataset.repeat()
dataset = dataset.interleave(
......@@ -222,7 +226,9 @@ class InputReader:
not self._enable_tf_data_service):
dataset = dataset.shard(input_context.num_input_pipelines,
input_context.input_pipeline_id)
if self._is_training:
# If cache is enabled, we will call `repeat()` later after `cache()`.
if self._is_training and not self._cache:
dataset = dataset.repeat()
return dataset
......@@ -249,7 +255,8 @@ class InputReader:
decoders=decoders,
read_config=read_config)
if self._is_training:
# If cache is enabled, we will call `repeat()` later after `cache()`.
if self._is_training and not self._cache:
dataset = dataset.repeat()
return dataset
......@@ -295,10 +302,8 @@ class InputReader:
raise ValueError('It is unexpected that `tfds_builder` is None and '
'there is also no `matched_files`.')
if self._cache:
dataset = dataset.cache()
if self._is_training:
# If cache is enabled, we will call `shuffle()` later after `cache()`.
if self._is_training and not self._cache:
dataset = dataset.shuffle(self._shuffle_buffer_size)
dataset = _maybe_map_fn(dataset, self._decoder_fn)
......@@ -306,6 +311,12 @@ class InputReader:
dataset = dataset.apply(self._sample_fn)
dataset = _maybe_map_fn(dataset, self._parser_fn)
if self._cache:
dataset = dataset.cache()
if self._is_training:
dataset = dataset.repeat()
dataset = dataset.shuffle(self._shuffle_buffer_size)
if self._transform_and_batch_fn is not None:
dataset = self._transform_and_batch_fn(dataset, input_context)
else:
......
......@@ -92,6 +92,7 @@ class ProgressiveTrainer(trainer_lib.Trainer):
# it gets a single-replica no-op strategy.
self._strategy = tf.distribute.get_strategy()
self._config = config
self._runtime_options = trainer_lib.get_runtime_options(config)
self._task = prog_task
# Directory for non-progressive checkpoint
......
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for official.nlp.configs.encoders."""
import os
import tensorflow as tf
from official.modeling import hyperparams
from official.nlp.configs import encoders
class EncodersTest(tf.test.TestCase):
def test_encoder_from_yaml(self):
config = encoders.EncoderConfig(
type="bert", bert=encoders.BertEncoderConfig(num_layers=1))
encoder = encoders.build_encoder(config)
ckpt = tf.train.Checkpoint(encoder=encoder)
ckpt_path = ckpt.save(self.get_temp_dir() + "/ckpt")
params_save_path = os.path.join(self.get_temp_dir(), "params.yaml")
hyperparams.save_params_dict_to_yaml(config, params_save_path)
retored_cfg = encoders.EncoderConfig.from_yaml(params_save_path)
retored_encoder = encoders.build_encoder(retored_cfg)
status = tf.train.Checkpoint(encoder=retored_encoder).restore(ckpt_path)
status.assert_consumed()
if __name__ == "__main__":
tf.test.main()
......@@ -18,8 +18,34 @@ from official.core import config_definitions as cfg
from official.core import exp_factory
from official.modeling import optimization
from official.nlp.data import pretrain_dataloader
from official.nlp.data import pretrain_dynamic_dataloader
from official.nlp.tasks import masked_lm
_TRAINER = cfg.TrainerConfig(
train_steps=1000000,
optimizer_config=optimization.OptimizationConfig({
'optimizer': {
'type': 'adamw',
'adamw': {
'weight_decay_rate':
0.01,
'exclude_from_weight_decay': [
'LayerNorm', 'layer_norm', 'bias'
],
}
},
'learning_rate': {
'type': 'polynomial',
'polynomial': {
'initial_learning_rate': 1e-4,
'end_learning_rate': 0.0,
}
},
'warmup': {
'type': 'polynomial'
}
}))
@exp_factory.register_config_factory('bert/pretraining')
def bert_pretraining() -> cfg.ExperimentConfig:
......@@ -29,30 +55,26 @@ def bert_pretraining() -> cfg.ExperimentConfig:
train_data=pretrain_dataloader.BertPretrainDataConfig(),
validation_data=pretrain_dataloader.BertPretrainDataConfig(
is_training=False)),
trainer=cfg.TrainerConfig(
train_steps=1000000,
optimizer_config=optimization.OptimizationConfig({
'optimizer': {
'type': 'adamw',
'adamw': {
'weight_decay_rate':
0.01,
'exclude_from_weight_decay': [
'LayerNorm', 'layer_norm', 'bias'
],
}
},
'learning_rate': {
'type': 'polynomial',
'polynomial': {
'initial_learning_rate': 1e-4,
'end_learning_rate': 0.0,
}
},
'warmup': {
'type': 'polynomial'
}
})),
trainer=_TRAINER,
restrictions=[
'task.train_data.is_training != None',
'task.validation_data.is_training != None'
])
return config
@exp_factory.register_config_factory('bert/pretraining_dynamic')
def bert_dynamic() -> cfg.ExperimentConfig:
"""BERT base with dynamic input sequences.
TPU needs to run with tf.data service with round-robin behavior.
"""
config = cfg.ExperimentConfig(
task=masked_lm.MaskedLMConfig(
train_data=pretrain_dynamic_dataloader.BertPretrainDataConfig(),
validation_data=pretrain_dataloader.BertPretrainDataConfig(
is_training=False)),
trainer=_TRAINER,
restrictions=[
'task.train_data.is_training != None',
'task.validation_data.is_training != None'
......
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Dataset loader for the pre-training with dynamic sequence length."""
from typing import Optional, Tuple
import dataclasses
import tensorflow as tf
from official.core import config_definitions as cfg
from official.core import input_reader
from official.nlp.data import data_loader_factory
from official.nlp.data import pretrain_dataloader
@dataclasses.dataclass
class BertPretrainDataConfig(cfg.DataConfig):
"""Data config for BERT pretraining task (tasks/masked_lm)."""
input_path: str = ''
global_batch_size: int = 512
is_training: bool = True
seq_bucket_lengths: Tuple[int, ...] = (128, 256, 384, 512,)
# TODO(rxsang): `seq_bucket_window_scale` is only useful when round robin
# tf.data service is disabled. Deprecate this flag once we always enable round
# robin tf.data service.
seq_bucket_window_scale: int = 8
use_next_sentence_label: bool = True
use_position_id: bool = False
deterministic: bool = False
enable_tf_data_service: bool = False
enable_round_robin_tf_data_service: bool = False
tf_data_service_job_name: str = 'bert_pretrain'
use_v2_feature_names: bool = False
@data_loader_factory.register_data_loader_cls(BertPretrainDataConfig)
class PretrainingDynamicDataLoader(pretrain_dataloader.BertPretrainDataLoader):
"""Dataset loader for bert-style pretraining with dynamic sequenece length.
Bucketizes the input id features by the seq_bucket_lengths and features are
padded to the bucket boundaries. The mask features are usually short than
input id features and can also be dynamic. We require the mask feature lengths
within a bucket must be the same. For example, with [128, 256] buckets,
the mask features for bucket 128 should always have the length as X and
features for bucket 256 should always have the length as Y.
The dataloader does not filter out empty masks. Make sure to handle this
in the model.
"""
def __init__(self, params):
self._params = params
if len(params.seq_bucket_lengths) < 1:
raise ValueError('The seq_bucket_lengths cannot be empty.')
self._seq_bucket_lengths = params.seq_bucket_lengths
self._seq_bucket_window_scale = params.seq_bucket_window_scale
self._global_batch_size = params.global_batch_size
self._use_next_sentence_label = params.use_next_sentence_label
self._use_position_id = params.use_position_id
self._drop_remainder = params.drop_remainder
self._enable_tf_data_service = params.enable_tf_data_service
self._enable_round_robin_tf_data_service = (
params.enable_round_robin_tf_data_service)
self._mask_keys = [
'masked_lm_positions', 'masked_lm_ids', 'masked_lm_weights'
]
def _decode(self, record: tf.Tensor):
"""Decodes a serialized tf.Example."""
name_to_features = {
'input_ids': tf.io.VarLenFeature(tf.int64),
'input_mask': tf.io.VarLenFeature(tf.int64),
'segment_ids': tf.io.VarLenFeature(tf.int64),
'masked_lm_positions': tf.io.VarLenFeature(tf.int64),
'masked_lm_ids': tf.io.VarLenFeature(tf.int64),
'masked_lm_weights': tf.io.VarLenFeature(tf.float32),
}
if self._use_next_sentence_label:
name_to_features['next_sentence_labels'] = tf.io.FixedLenFeature([1],
tf.int64)
dynamic_keys = ['input_ids', 'input_mask', 'segment_ids']
if self._use_position_id:
name_to_features['position_ids'] = tf.io.VarLenFeature(tf.int64)
dynamic_keys.append('position_ids')
example = tf.io.parse_single_example(record, name_to_features)
for key in dynamic_keys + self._mask_keys:
example[key] = tf.sparse.to_dense(example[key])
# Truncate padded data after the first non pad in the
# sequence length dimension.
# Pad before the first non pad from the back should not be removed.
mask = tf.math.greater(
tf.math.cumsum(example['input_ids'], reverse=True), 0)
for key in dynamic_keys:
example[key] = tf.boolean_mask(example[key], mask)
# masked_lm_ids should be 0 padded.
# Change mask features to -1 padding so that we can differentiate
# padding from data or from bucketizing.
mask = tf.math.not_equal(example['masked_lm_ids'], 0)
example['masked_lm_ids'] = tf.where(
mask, example['masked_lm_ids'],
-tf.ones(tf.shape(example['masked_lm_ids']), dtype=example[key].dtype))
# tf.Example only supports tf.int64, but the TPU only supports tf.int32.
# So cast all int64 to int32.
# tf.data service uses dataset graph fingerprint to distinguish input
# pipeline jobs, thus we sort the keys here to make sure they are generated
# in a deterministic order each time the dataset function is traced.
for name in sorted(list(example.keys())):
t = example[name]
if t.dtype == tf.int64:
t = tf.cast(t, tf.int32)
example[name] = t
return example
def _bucketize_and_batch(
self,
dataset,
input_context: Optional[tf.distribute.InputContext] = None):
"""Bucketize by sequence length and batch the datasets."""
per_replica_batch_size = input_context.get_per_replica_batch_size(
self._global_batch_size) if input_context else self._global_batch_size
def element_length_func(example, seq_len_dim):
return tf.shape(example['input_word_ids'])[seq_len_dim]
bucket_boundaries = [length + 1 for length in self._seq_bucket_lengths]
bucket_batch_sizes = [per_replica_batch_size] * (len(bucket_boundaries) + 1)
# Bucketize and batch the dataset with per replica batch size first.
dataset = dataset.apply(
tf.data.experimental.bucket_by_sequence_length(
lambda example: tf.cast(element_length_func(example, 0), tf.int32),
bucket_boundaries,
bucket_batch_sizes,
pad_to_bucket_boundary=True,
drop_remainder=self._drop_remainder))
if input_context:
window_size = input_context.num_replicas_in_sync
if self._enable_tf_data_service and (
not self._enable_round_robin_tf_data_service):
# If tf.data service is enabled but round-robin behavior is not enabled,
# different TPU workers may fetch data from one tf.data service worker
# in different speed. We set the window size to be
# `seq_bucket_window_scale` larger to leave buffer if some workers are
# fetching data faster than others, so all the data within the same
# global batch can still have more chances to be in the same bucket.
window_size *= self._seq_bucket_window_scale
# Group `num_replicas_in_sync` batches from same bucket together, so all
# replicas can get the same sequence length for one global step.
dataset = dataset.apply(
tf.data.experimental.group_by_window(
key_func=lambda example: tf.cast( # pylint: disable=g-long-lambda
element_length_func(example, 1), tf.int64),
reduce_func=lambda _, x: tf.data.Dataset.from_tensors(x),
window_size=window_size))
dataset = dataset.flat_map(lambda x: x)
def _remove_pads_from_bucketize(features):
# All mask features must have the same effective length.
# The real masked ids padding token is -1 and 0 comes from
# bucket_by_sequence_length.
mask = tf.math.not_equal(features['masked_lm_ids'], 0)
mask_per_example = tf.math.reduce_sum(tf.cast(mask, tf.int32), axis=1)
normalized = tf.cast(
mask_per_example / tf.math.reduce_max(mask_per_example), tf.int32)
assert_op = tf.debugging.assert_equal(
tf.math.reduce_sum(normalized), per_replica_batch_size,
'Number of non padded mask tokens is not the same for each example '
'in the same sequence length.')
with tf.control_dependencies([assert_op]):
for key in self._mask_keys:
features[key] = tf.reshape(
tf.boolean_mask(
features[key], mask), [per_replica_batch_size, -1])
# Revert masked_lm_ids to be 0-padded.
mask = tf.math.not_equal(features['masked_lm_ids'], -1)
features['masked_lm_ids'] = tf.where(
mask, features['masked_lm_ids'],
tf.zeros(
tf.shape(features['masked_lm_ids']),
dtype=features['masked_lm_ids'].dtype))
return features
dataset = dataset.map(_remove_pads_from_bucketize)
return dataset
def load(self, input_context: Optional[tf.distribute.InputContext] = None):
"""Returns a tf.dataset.Dataset."""
reader = input_reader.InputReader(
params=self._params,
decoder_fn=self._decode,
parser_fn=self._parse,
transform_and_batch_fn=self._bucketize_and_batch)
return reader.read(input_context)
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for nlp.data.pretrain_dynamic_dataloader."""
import os
from absl import logging
from absl.testing import parameterized
import numpy as np
import orbit
import tensorflow as tf
from tensorflow.python.distribute import combinations
from tensorflow.python.distribute import strategy_combinations
from official.nlp.configs import bert
from official.nlp.configs import encoders
from official.nlp.data import pretrain_dataloader
from official.nlp.data import pretrain_dynamic_dataloader
from official.nlp.tasks import masked_lm
def _create_fake_dataset(output_path, seq_length, num_masked_tokens,
max_seq_length, num_examples):
"""Creates a fake dataset."""
writer = tf.io.TFRecordWriter(output_path)
def create_int_feature(values):
f = tf.train.Feature(int64_list=tf.train.Int64List(value=list(values)))
return f
def create_float_feature(values):
f = tf.train.Feature(float_list=tf.train.FloatList(value=list(values)))
return f
for _ in range(num_examples):
features = {}
padding = np.zeros(shape=(max_seq_length - seq_length), dtype=np.int32)
input_ids = np.random.randint(low=1, high=100, size=(seq_length))
features['input_ids'] = create_int_feature(
np.concatenate((input_ids, padding)))
features['input_mask'] = create_int_feature(
np.concatenate((np.ones_like(input_ids), padding)))
features['segment_ids'] = create_int_feature(
np.concatenate((np.ones_like(input_ids), padding)))
features['position_ids'] = create_int_feature(
np.concatenate((np.ones_like(input_ids), padding)))
features['masked_lm_positions'] = create_int_feature(
np.random.randint(60, size=(num_masked_tokens), dtype=np.int64))
features['masked_lm_ids'] = create_int_feature(
np.random.randint(100, size=(num_masked_tokens), dtype=np.int64))
features['masked_lm_weights'] = create_float_feature(
np.ones((num_masked_tokens,), dtype=np.float32))
features['next_sentence_labels'] = create_int_feature(np.array([0]))
tf_example = tf.train.Example(features=tf.train.Features(feature=features))
writer.write(tf_example.SerializeToString())
writer.close()
class PretrainDynamicDataLoaderTest(tf.test.TestCase, parameterized.TestCase):
@combinations.generate(
combinations.combine(
distribution_strategy=[
strategy_combinations.cloud_tpu_strategy,
],
mode='eager'))
def test_distribution_strategy(self, distribution_strategy):
max_seq_length = 128
batch_size = 8
input_path = os.path.join(self.get_temp_dir(), 'train.tf_record')
_create_fake_dataset(
input_path,
seq_length=60,
num_masked_tokens=20,
max_seq_length=max_seq_length,
num_examples=batch_size)
data_config = pretrain_dynamic_dataloader.BertPretrainDataConfig(
is_training=False,
input_path=input_path,
seq_bucket_lengths=[64, 128],
global_batch_size=batch_size)
dataloader = pretrain_dynamic_dataloader.PretrainingDynamicDataLoader(
data_config)
distributed_ds = orbit.utils.make_distributed_dataset(
distribution_strategy, dataloader.load)
train_iter = iter(distributed_ds)
with distribution_strategy.scope():
config = masked_lm.MaskedLMConfig(
init_checkpoint=self.get_temp_dir(),
model=bert.PretrainerConfig(
encoders.EncoderConfig(
bert=encoders.BertEncoderConfig(
vocab_size=30522, num_layers=1)),
cls_heads=[
bert.ClsHeadConfig(
inner_dim=10, num_classes=2, name='next_sentence')
]),
train_data=data_config)
task = masked_lm.MaskedLMTask(config)
model = task.build_model()
metrics = task.build_metrics()
@tf.function
def step_fn(features):
return task.validation_step(features, model, metrics=metrics)
distributed_outputs = distribution_strategy.run(
step_fn, args=(next(train_iter),))
local_results = tf.nest.map_structure(
distribution_strategy.experimental_local_results, distributed_outputs)
logging.info('Dynamic padding: local_results= %s', str(local_results))
dynamic_metrics = {}
for metric in metrics:
dynamic_metrics[metric.name] = metric.result()
data_config = pretrain_dataloader.BertPretrainDataConfig(
is_training=False,
input_path=input_path,
seq_length=max_seq_length,
max_predictions_per_seq=20,
global_batch_size=batch_size)
dataloader = pretrain_dataloader.BertPretrainDataLoader(data_config)
distributed_ds = orbit.utils.make_distributed_dataset(
distribution_strategy, dataloader.load)
train_iter = iter(distributed_ds)
with distribution_strategy.scope():
metrics = task.build_metrics()
@tf.function
def step_fn_b(features):
return task.validation_step(features, model, metrics=metrics)
distributed_outputs = distribution_strategy.run(
step_fn_b, args=(next(train_iter),))
local_results = tf.nest.map_structure(
distribution_strategy.experimental_local_results, distributed_outputs)
logging.info('Static padding: local_results= %s', str(local_results))
static_metrics = {}
for metric in metrics:
static_metrics[metric.name] = metric.result()
for key in static_metrics:
# We need to investigate the differences on losses.
if key != 'next_sentence_loss':
self.assertEqual(dynamic_metrics[key], static_metrics[key])
def test_load_dataset(self):
max_seq_length = 128
batch_size = 2
input_path_1 = os.path.join(self.get_temp_dir(), 'train_1.tf_record')
_create_fake_dataset(
input_path_1,
seq_length=60,
num_masked_tokens=20,
max_seq_length=max_seq_length,
num_examples=batch_size)
input_path_2 = os.path.join(self.get_temp_dir(), 'train_2.tf_record')
_create_fake_dataset(
input_path_2,
seq_length=100,
num_masked_tokens=70,
max_seq_length=max_seq_length,
num_examples=batch_size)
input_paths = ','.join([input_path_1, input_path_2])
data_config = pretrain_dynamic_dataloader.BertPretrainDataConfig(
is_training=False,
input_path=input_paths,
seq_bucket_lengths=[64, 128],
use_position_id=True,
global_batch_size=batch_size)
dataset = pretrain_dynamic_dataloader.PretrainingDynamicDataLoader(
data_config).load()
dataset_it = iter(dataset)
features = next(dataset_it)
self.assertCountEqual([
'input_word_ids',
'input_mask',
'input_type_ids',
'next_sentence_labels',
'masked_lm_positions',
'masked_lm_ids',
'masked_lm_weights',
'position_ids',
], features.keys())
# Sequence length dimension should be bucketized and pad to 64.
self.assertEqual(features['input_word_ids'].shape, (batch_size, 64))
self.assertEqual(features['input_mask'].shape, (batch_size, 64))
self.assertEqual(features['input_type_ids'].shape, (batch_size, 64))
self.assertEqual(features['position_ids'].shape, (batch_size, 64))
self.assertEqual(features['masked_lm_positions'].shape, (batch_size, 20))
features = next(dataset_it)
self.assertEqual(features['input_word_ids'].shape, (batch_size, 128))
self.assertEqual(features['input_mask'].shape, (batch_size, 128))
self.assertEqual(features['input_type_ids'].shape, (batch_size, 128))
self.assertEqual(features['position_ids'].shape, (batch_size, 128))
self.assertEqual(features['masked_lm_positions'].shape, (batch_size, 70))
def test_load_dataset_not_same_masks(self):
max_seq_length = 128
batch_size = 2
input_path_1 = os.path.join(self.get_temp_dir(), 'train_3.tf_record')
_create_fake_dataset(
input_path_1,
seq_length=60,
num_masked_tokens=20,
max_seq_length=max_seq_length,
num_examples=batch_size)
input_path_2 = os.path.join(self.get_temp_dir(), 'train_4.tf_record')
_create_fake_dataset(
input_path_2,
seq_length=60,
num_masked_tokens=15,
max_seq_length=max_seq_length,
num_examples=batch_size)
input_paths = ','.join([input_path_1, input_path_2])
data_config = pretrain_dynamic_dataloader.BertPretrainDataConfig(
is_training=False,
input_path=input_paths,
seq_bucket_lengths=[64, 128],
use_position_id=True,
global_batch_size=batch_size * 2)
dataset = pretrain_dynamic_dataloader.PretrainingDynamicDataLoader(
data_config).load()
dataset_it = iter(dataset)
with self.assertRaisesRegex(
tf.errors.InvalidArgumentError, '.*Number of non padded mask tokens.*'):
next(dataset_it)
if __name__ == '__main__':
tf.test.main()
# NLP Modeling Library
This library provides a set of Keras primitives (Layers, Networks, and Models)
that can be assembled into transformer-based models. They are
flexible, validated, interoperable, and both TF1 and TF2 compatible.
This library provides a set of Keras primitives (`tf.keras.Layer` and
`tf.keras.Model`) that can be assembled into transformer-based models.
They are flexible, validated, interoperable, and both TF1 and TF2 compatible.
* [`layers`](layers) are the fundamental building blocks for NLP models.
They can be used to assemble new layers, networks, or models.
They can be used to assemble new `tf.keras` layers or models.
* [`networks`](networks) are combinations of layers (and possibly other networks). They are sub-units of models that would not be trained alone. They
encapsulate common network structures like a classification head
or a transformer encoder into an easily handled object with a
standardized configuration.
* [`networks`](networks) are combinations of `tf.keras` layers (and possibly
other networks). They are `tf.keras` models that would not be trained alone.
It encapsulates common network structures like a transformer encoder into an
easily handled object with a standardized configuration.
* [`models`](models) are combinations of layers and networks that would be trained. Pre-built canned models are provided as both convenience functions and canonical examples.
* [`models`](models) are combinations of `tf.keras` layers and models that can
be trained. Several pre-built canned models are provided to train encoder
networks. These models are intended as both convenience functions and canonical
examples.
* [`losses`](losses) contains common loss computation used in NLP tasks.
......@@ -22,7 +25,9 @@ Please see the colab
for how to build transformer-based NLP models using above primitives.
Besides the pre-defined primitives, it also provides scaffold classes to allow
easy experimentation with noval achitectures, e.g., you don’t need to fork a whole Transformer object to try a different kind of attention primitive, for instance.
easy experimentation with noval achitectures, e.g., you don’t need to fork a
whole Transformer object to try a different kind of attention primitive,
for instance.
* [`TransformerScaffold`](layers/transformer_scaffold.py) implements the
Transformer from ["Attention Is All You Need"]
......@@ -43,4 +48,5 @@ Please see the colab
(https://colab.sandbox.google.com/github/tensorflow/models/blob/master/official/colab/nlp/customize_encoder.ipynb)
for how to use scaffold classes to build noval achitectures.
BERT and ALBERT models in this repo are implemented using this library. Code examples can be found in the corresponding model folder.
BERT and ALBERT models in this repo are implemented using this library.
Code examples can be found in the corresponding model folder.
......@@ -12,7 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Modeling package definition."""
"""NLP Modeling Library.
This library provides a set of Keras primitives (`tf.keras.Layer` and
`tf.keras.Model`) that can be assembled into transformer-based models.
They are flexible, validated, interoperable, and both TF1 and TF2 compatible.
"""
from official.nlp.modeling import layers
from official.nlp.modeling import losses
from official.nlp.modeling import models
......
# Layers
Layers are the fundamental building blocks for NLP models. They can be used to
assemble new layers, networks, or models.
assemble new `tf.keras` layers or models.
* [MultiHeadAttention](attention.py) implements an optionally masked attention
between query, key, value tensors as described in
......@@ -33,6 +33,10 @@ assemble new layers, networks, or models.
of self multi-head attention, cross multi-head attention and feedforward
network.
* [RandomFeatureGaussianProcess](gaussian_process.py) implements random
feature-based Gaussian process described in ["Random Features for
Large-Scale Kernel Machines"](https://people.eecs.berkeley.edu/~brecht/papers/07.rah.rec.nips.pdf).
* [ReZeroTransformer](rezero_transformer.py) implements Transformer with
ReZero described in
["ReZero is All You Need: Fast Convergence at Large Depth"](https://arxiv.org/abs/2003.04887).
......@@ -47,6 +51,11 @@ assemble new layers, networks, or models.
* [SelfAttentionMask](self_attention_mask.py) creates a 3D attention mask from
a 2D tensor mask.
* [SpectralNormalization](spectral_normalization.py) implements a tf.Wrapper
that applies spectral normalization regularization to a given layer. See
[Spectral Norm Regularization for Improving the Generalizability of
Deep Learning](https://arxiv.org/abs/1705.10941)
* [MaskedSoftmax](masked_softmax.py) implements a softmax with an optional
masking input. If no mask is provided to this layer, it performs a standard
softmax; however, if a mask tensor is applied (which should be 1 in
......@@ -60,6 +69,11 @@ assemble new layers, networks, or models.
* [ClassificationHead](cls_head.py) A pooling head over a sequence of
embeddings, commonly used by classification tasks.
* [GaussianProcessClassificationHead](cls_head.py) A spectral-normalized
neural Gaussian process (SNGP)-based classification head as described in
["Simple and Principled Uncertainty Estimation with Deterministic Deep
Learning via Distance Awareness"](https://arxiv.org/abs/2006.10108).
* [GatedFeedforward](gated_feedforward.py) implements the gated linear layer
feedforward as described in
["GLU Variants Improve Transformer"](https://arxiv.org/abs/2002.05202).
......
......@@ -12,7 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Layers package definition."""
"""Layers are the fundamental building blocks for NLP models.
They can be used to assemble new `tf.keras` layers or models.
"""
# pylint: disable=wildcard-import
from official.nlp.modeling.layers.attention import *
from official.nlp.modeling.layers.cls_head import *
......
......@@ -18,6 +18,9 @@ import tensorflow as tf
from official.modeling import tf_utils
from official.nlp.modeling.layers import gaussian_process
from official.nlp.modeling.layers import spectral_normalization
class ClassificationHead(tf.keras.layers.Layer):
"""Pooling head for sentence-level classification tasks."""
......@@ -160,3 +163,110 @@ class MultiClsHeads(tf.keras.layers.Layer):
items = {self.dense.name: self.dense}
items.update({v.name: v for v in self.out_projs})
return items
class GaussianProcessClassificationHead(ClassificationHead):
"""Gaussian process-based pooling head for sentence classification.
This class implements a classifier head for BERT encoder that is based on the
spectral-normalized neural Gaussian process (SNGP) [1]. SNGP is a simple
method to improve a neural network's uncertainty quantification ability
without sacrificing accuracy or lantency. It applies spectral normalization to
the hidden pooler layer, and then replaces the dense output layer with a
Gaussian process.
[1]: Jeremiah Liu et al. Simple and Principled Uncertainty Estimation with
Deterministic Deep Learning via Distance Awareness.
In _Neural Information Processing Systems_, 2020.
https://arxiv.org/abs/2006.10108
"""
def __init__(self,
inner_dim,
num_classes,
cls_token_idx=0,
activation="tanh",
dropout_rate=0.0,
initializer="glorot_uniform",
use_spec_norm=True,
use_gp_layer=True,
**kwargs):
"""Initializes the `GaussianProcessClassificationHead`.
Args:
inner_dim: The dimensionality of inner projection layer.
num_classes: Number of output classes.
cls_token_idx: The index inside the sequence to pool.
activation: Dense layer activation.
dropout_rate: Dropout probability.
initializer: Initializer for dense layer kernels.
use_spec_norm: Whether to apply spectral normalization to pooler layer.
use_gp_layer: Whether to use Gaussian process as the output layer.
**kwargs: Additional keyword arguments.
"""
# Collects spectral normalization and Gaussian process args from kwargs.
self.use_spec_norm = use_spec_norm
self.use_gp_layer = use_gp_layer
self.spec_norm_kwargs = extract_spec_norm_kwargs(kwargs)
self.gp_layer_kwargs = extract_gp_layer_kwargs(kwargs)
super().__init__(
inner_dim=inner_dim,
num_classes=num_classes,
cls_token_idx=cls_token_idx,
activation=activation,
dropout_rate=dropout_rate,
initializer=initializer,
**kwargs)
# Applies spectral normalization to the pooler layer.
if use_spec_norm:
self.dense = spectral_normalization.SpectralNormalization(
self.dense, inhere_layer_name=True, **self.spec_norm_kwargs)
# Replace Dense output layer with the Gaussian process layer.
if use_gp_layer:
self.out_proj = gaussian_process.RandomFeatureGaussianProcess(
self.num_classes,
kernel_initializer=self.initializer,
name="logits",
**self.gp_layer_kwargs)
def get_config(self):
config = dict(
use_spec_norm=self.use_spec_norm, use_gp_layer=self.use_gp_layer)
config.update(self.spec_norm_kwargs)
config.update(self.gp_layer_kwargs)
config.update(super(GaussianProcessClassificationHead, self).get_config())
return config
def extract_gp_layer_kwargs(kwargs):
"""Extracts Gaussian process layer configs from a given kwarg."""
return dict(
num_inducing=kwargs.pop("num_inducing", 1024),
normalize_input=kwargs.pop("normalize_input", True),
gp_cov_momentum=kwargs.pop("gp_cov_momentum", 0.999),
gp_cov_ridge_penalty=kwargs.pop("gp_cov_ridge_penalty", 1e-6),
scale_random_features=kwargs.pop("scale_random_features", False),
l2_regularization=kwargs.pop("l2_regularization", 0.),
gp_cov_likelihood=kwargs.pop("gp_cov_likelihood", "gaussian"),
return_gp_cov=kwargs.pop("return_gp_cov", True),
return_random_features=kwargs.pop("return_random_features", False),
use_custom_random_features=kwargs.pop("use_custom_random_features", True),
custom_random_features_initializer=kwargs.pop(
"custom_random_features_initializer", "random_normal"),
custom_random_features_activation=kwargs.pop(
"custom_random_features_activation", None))
def extract_spec_norm_kwargs(kwargs):
"""Extracts spectral normalization configs from a given kwarg."""
return dict(
iteration=kwargs.pop("iteration", 1),
norm_multiplier=kwargs.pop("norm_multiplier", .99))
......@@ -58,5 +58,56 @@ class MultiClsHeadsTest(tf.test.TestCase):
self.assertAllEqual(test_layer.get_config(), new_layer.get_config())
class GaussianProcessClassificationHead(tf.test.TestCase):
def setUp(self):
super().setUp()
self.spec_norm_kwargs = dict(norm_multiplier=1.,)
self.gp_layer_kwargs = dict(num_inducing=512)
def test_layer_invocation(self):
test_layer = cls_head.GaussianProcessClassificationHead(
inner_dim=5,
num_classes=2,
use_spec_norm=True,
use_gp_layer=True,
initializer="zeros",
**self.spec_norm_kwargs,
**self.gp_layer_kwargs)
features = tf.zeros(shape=(2, 10, 10), dtype=tf.float32)
output, _ = test_layer(features)
self.assertAllClose(output, [[0., 0.], [0., 0.]])
self.assertSameElements(test_layer.checkpoint_items.keys(),
["pooler_dense"])
def test_layer_serialization(self):
layer = cls_head.GaussianProcessClassificationHead(
inner_dim=5,
num_classes=2,
use_spec_norm=True,
use_gp_layer=True,
**self.spec_norm_kwargs,
**self.gp_layer_kwargs)
new_layer = cls_head.GaussianProcessClassificationHead.from_config(
layer.get_config())
# If the serialization was successful, the new config should match the old.
self.assertAllEqual(layer.get_config(), new_layer.get_config())
def test_sngp_kwargs_serialization(self):
"""Tests if SNGP-specific kwargs are added during serialization."""
layer = cls_head.GaussianProcessClassificationHead(
inner_dim=5,
num_classes=2,
use_spec_norm=True,
use_gp_layer=True,
**self.spec_norm_kwargs,
**self.gp_layer_kwargs)
layer_config = layer.get_config()
# The config value should equal to those defined in setUp().
self.assertEqual(layer_config["norm_multiplier"], 1.)
self.assertEqual(layer_config["num_inducing"], 512)
if __name__ == "__main__":
tf.test.main()
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Lint as: python3
"""Definitions for random feature Gaussian process layer.
## References:
[1]: Ali Rahimi and Benjamin Recht. Random Features for Large-Scale Kernel
Machines. In _Neural Information Processing Systems_, 2007.
https://people.eecs.berkeley.edu/~brecht/papers/07.rah.rec.nips.pdf
"""
import math
import tensorflow as tf
_SUPPORTED_LIKELIHOOD = ('binary_logistic', 'poisson', 'gaussian')
class RandomFeatureGaussianProcess(tf.keras.layers.Layer):
"""Gaussian process layer with random feature approximation.
During training, the model updates the maximum a posteriori (MAP) logits
estimates and posterior precision matrix using minibatch statistics. During
inference, the model divides the MAP logit estimates by the predictive
standard deviation, which is equivalent to approximating the posterior mean
of the predictive probability via the mean-field approximation.
User can specify different types of random features by setting
`use_custom_random_features=True`, and change the initializer and activations
of the custom random features. For example:
MLP Kernel: initializer='random_normal', activation=tf.nn.relu
RBF Kernel: initializer='random_normal', activation=tf.math.cos
A linear kernel can also be specified by setting gp_kernel_type='linear' and
`use_custom_random_features=True`.
Attributes:
units: (int) The dimensionality of layer.
num_inducing: (int) The number of random features for the approximation.
is_training: (tf.bool) Whether the layer is set in training mode. If so the
layer updates the Gaussian process' variance estimate using statistics
computed from the incoming minibatches.
"""
def __init__(self,
units,
num_inducing=1024,
gp_kernel_type='gaussian',
gp_kernel_scale=1.,
gp_output_bias=0.,
normalize_input=True,
gp_kernel_scale_trainable=False,
gp_output_bias_trainable=False,
gp_cov_momentum=0.999,
gp_cov_ridge_penalty=1e-6,
scale_random_features=True,
use_custom_random_features=True,
custom_random_features_initializer=None,
custom_random_features_activation=None,
l2_regularization=0.,
gp_cov_likelihood='gaussian',
return_gp_cov=True,
return_random_features=False,
dtype=None,
name='random_feature_gaussian_process',
**gp_output_kwargs):
"""Initializes a random-feature Gaussian process layer instance.
Args:
units: (int) Number of output units.
num_inducing: (int) Number of random Fourier features used for
approximating the Gaussian process.
gp_kernel_type: (string) The type of kernel function to use for Gaussian
process. Currently default to 'gaussian' which is the Gaussian RBF
kernel.
gp_kernel_scale: (float) The length-scale parameter of the a
shift-invariant kernel function, i.e., for RBF kernel:
exp(-|x1 - x2|**2 / gp_kernel_scale).
gp_output_bias: (float) Scalar initial value for the bias vector.
normalize_input: (bool) Whether to normalize the input to Gaussian
process.
gp_kernel_scale_trainable: (bool) Whether the length scale variable is
trainable.
gp_output_bias_trainable: (bool) Whether the bias is trainable.
gp_cov_momentum: (float) A discount factor used to compute the moving
average for posterior covariance matrix.
gp_cov_ridge_penalty: (float) Initial Ridge penalty to posterior
covariance matrix.
scale_random_features: (bool) Whether to scale the random feature
by sqrt(2. / num_inducing).
use_custom_random_features: (bool) Whether to use custom random
features implemented using tf.keras.layers.Dense.
custom_random_features_initializer: (str or callable) Initializer for
the random features. Default to random normal which approximates a RBF
kernel function if activation function is cos.
custom_random_features_activation: (callable) Activation function for the
random feature layer. Default to cosine which approximates a RBF
kernel function.
l2_regularization: (float) The strength of l2 regularization on the output
weights.
gp_cov_likelihood: (string) Likelihood to use for computing Laplace
approximation for covariance matrix. Default to `gaussian`.
return_gp_cov: (bool) Whether to also return GP covariance matrix.
If False then no covariance learning is performed.
return_random_features: (bool) Whether to also return random features.
dtype: (tf.DType) Input data type.
name: (string) Layer name.
**gp_output_kwargs: Additional keyword arguments to dense output layer.
"""
super(RandomFeatureGaussianProcess, self).__init__(name=name, dtype=dtype)
self.units = units
self.num_inducing = num_inducing
self.normalize_input = normalize_input
self.gp_input_scale = 1. / tf.sqrt(gp_kernel_scale)
self.gp_feature_scale = tf.sqrt(2. / float(num_inducing))
self.scale_random_features = scale_random_features
self.return_random_features = return_random_features
self.return_gp_cov = return_gp_cov
self.gp_kernel_type = gp_kernel_type
self.gp_kernel_scale = gp_kernel_scale
self.gp_output_bias = gp_output_bias
self.gp_kernel_scale_trainable = gp_kernel_scale_trainable
self.gp_output_bias_trainable = gp_output_bias_trainable
self.use_custom_random_features = use_custom_random_features
self.custom_random_features_initializer = custom_random_features_initializer
self.custom_random_features_activation = custom_random_features_activation
self.l2_regularization = l2_regularization
self.gp_output_kwargs = gp_output_kwargs
self.gp_cov_momentum = gp_cov_momentum
self.gp_cov_ridge_penalty = gp_cov_ridge_penalty
self.gp_cov_likelihood = gp_cov_likelihood
if self.use_custom_random_features:
# Default to Gaussian RBF kernel.
self.random_features_bias_initializer = tf.random_uniform_initializer(
minval=0., maxval=2. * math.pi)
if self.custom_random_features_initializer is None:
self.custom_random_features_initializer = (
tf.keras.initializers.RandomNormal(stddev=1.))
if self.custom_random_features_activation is None:
self.custom_random_features_activation = tf.math.cos
def build(self, input_shape):
# Defines model layers.
if self.normalize_input:
self._input_norm_layer = tf.keras.layers.LayerNormalization(
name='gp_input_normalization')
self._input_norm_layer.build(input_shape)
input_shape = self._input_norm_layer.compute_output_shape(input_shape)
self._random_feature = self._make_random_feature_layer(
name='gp_random_feature')
self._random_feature.build(input_shape)
input_shape = self._random_feature.compute_output_shape(input_shape)
if self.return_gp_cov:
self._gp_cov_layer = LaplaceRandomFeatureCovariance(
momentum=self.gp_cov_momentum,
ridge_penalty=self.gp_cov_ridge_penalty,
likelihood=self.gp_cov_likelihood,
dtype=self.dtype,
name='gp_covariance')
self._gp_cov_layer.build(input_shape)
self._gp_output_layer = tf.keras.layers.Dense(
units=self.units,
use_bias=False,
kernel_regularizer=tf.keras.regularizers.l2(self.l2_regularization),
dtype=self.dtype,
name='gp_output_weights',
**self.gp_output_kwargs)
self._gp_output_layer.build(input_shape)
self._gp_output_bias = tf.Variable(
initial_value=[self.gp_output_bias] * self.units,
dtype=self.dtype,
trainable=self.gp_output_bias_trainable,
name='gp_output_bias')
self.built = True
def _make_random_feature_layer(self, name):
"""Defines random feature layer depending on kernel type."""
if not self.use_custom_random_features:
# Use default RandomFourierFeatures layer from tf.keras.
return tf.keras.layers.experimental.RandomFourierFeatures(
output_dim=self.num_inducing,
kernel_initializer=self.gp_kernel_type,
scale=self.gp_kernel_scale,
trainable=self.gp_kernel_scale_trainable,
dtype=self.dtype,
name=name)
if self.gp_kernel_type.lower() == 'linear':
custom_random_feature_layer = tf.keras.layers.Lambda(
lambda x: x, name=name)
else:
# Use user-supplied configurations.
custom_random_feature_layer = tf.keras.layers.Dense(
units=self.num_inducing,
use_bias=True,
activation=self.custom_random_features_activation,
kernel_initializer=self.custom_random_features_initializer,
bias_initializer=self.random_features_bias_initializer,
trainable=False,
name=name)
return custom_random_feature_layer
def reset_covariance_matrix(self):
"""Resets covariance matrix of the GP layer.
This function is useful for reseting the model's covariance matrix at the
begining of a new epoch.
"""
self._gp_cov_layer.reset_precision_matrix()
def call(self, inputs, global_step=None, training=None):
# Computes random features.
gp_inputs = inputs
if self.normalize_input:
gp_inputs = self._input_norm_layer(gp_inputs)
elif self.use_custom_random_features:
# Supports lengthscale for custom random feature layer by directly
# rescaling the input.
gp_input_scale = tf.cast(self.gp_input_scale, inputs.dtype)
gp_inputs = gp_inputs * gp_input_scale
gp_feature = self._random_feature(gp_inputs)
if self.scale_random_features:
# Scale random feature by 2. / sqrt(num_inducing) following [1].
# When using GP layer as the output layer of a nerual network,
# it is recommended to turn this scaling off to prevent it from changing
# the learning rate to the hidden layers.
gp_feature_scale = tf.cast(self.gp_feature_scale, inputs.dtype)
gp_feature = gp_feature * gp_feature_scale
# Computes posterior center (i.e., MAP estimate) and variance.
gp_output = self._gp_output_layer(gp_feature) + self._gp_output_bias
if self.return_gp_cov:
gp_covmat = self._gp_cov_layer(gp_feature, gp_output, training)
# Assembles model output.
model_output = [gp_output,]
if self.return_gp_cov:
model_output.append(gp_covmat)
if self.return_random_features:
model_output.append(gp_feature)
return model_output
class LaplaceRandomFeatureCovariance(tf.keras.layers.Layer):
"""Computes the Gaussian Process covariance using Laplace method.
At training time, this layer updates the Gaussian process posterior using
model features in minibatches.
Attributes:
momentum: (float) A discount factor used to compute the moving average for
posterior precision matrix. Analogous to the momentum factor in batch
normalization. If -1 then update covariance matrix using a naive sum
without momentum, which is desirable if the goal is to compute the exact
covariance matrix by passing through data once (say in the final epoch).
ridge_penalty: (float) Initial Ridge penalty to weight covariance matrix.
This value is used to stablize the eigenvalues of weight covariance
estimate so that the matrix inverse can be computed for Cov = inv(t(X) * X
+ s * I). The ridge factor s cannot be too large since otherwise it will
dominate the t(X) * X term and make covariance estimate not meaningful.
likelihood: (str) The likelihood to use for computing Laplace approximation
for the covariance matrix. Can be one of ('binary_logistic', 'poisson',
'gaussian').
"""
def __init__(self,
momentum=0.999,
ridge_penalty=1e-6,
likelihood='gaussian',
dtype=None,
name='laplace_covariance'):
if likelihood not in _SUPPORTED_LIKELIHOOD:
raise ValueError(
f'"likelihood" must be one of {_SUPPORTED_LIKELIHOOD}, got {likelihood}.'
)
self.ridge_penalty = ridge_penalty
self.momentum = momentum
self.likelihood = likelihood
super(LaplaceRandomFeatureCovariance, self).__init__(dtype=dtype, name=name)
def compute_output_shape(self, input_shape):
gp_feature_dim = input_shape[-1]
return tf.TensorShape([gp_feature_dim, gp_feature_dim])
def build(self, input_shape):
gp_feature_dim = input_shape[-1]
# Convert gp_feature_dim to int value for TF1 compatibility.
if isinstance(gp_feature_dim, tf.compat.v1.Dimension):
gp_feature_dim = gp_feature_dim.value
# Posterior precision matrix for the GP's random feature coefficients.
self.initial_precision_matrix = (
self.ridge_penalty * tf.eye(gp_feature_dim, dtype=self.dtype))
self.precision_matrix = (
self.add_weight(
name='gp_precision_matrix',
shape=(gp_feature_dim, gp_feature_dim),
dtype=self.dtype,
initializer=tf.keras.initializers.Identity(self.ridge_penalty),
trainable=False,
aggregation=tf.VariableAggregation.ONLY_FIRST_REPLICA))
self.built = True
def make_precision_matrix_update_op(self,
gp_feature,
logits,
precision_matrix):
"""Defines update op for the precision matrix of feature weights."""
if self.likelihood != 'gaussian':
if logits is None:
raise ValueError(
f'"logits" cannot be None when likelihood={self.likelihood}')
if logits.shape[-1] != 1:
raise ValueError(
f'likelihood={self.likelihood} only support univariate logits.'
f'Got logits dimension: {logits.shape[-1]}')
batch_size = tf.shape(gp_feature)[0]
batch_size = tf.cast(batch_size, dtype=gp_feature.dtype)
# Computes batch-specific normalized precision matrix.
if self.likelihood == 'binary_logistic':
prob = tf.sigmoid(logits)
prob_multiplier = prob * (1. - prob)
elif self.likelihood == 'poisson':
prob_multiplier = tf.exp(logits)
else:
prob_multiplier = 1.
gp_feature_adjusted = tf.sqrt(prob_multiplier) * gp_feature
precision_matrix_minibatch = tf.matmul(
gp_feature_adjusted, gp_feature_adjusted, transpose_a=True)
# Updates the population-wise precision matrix.
if self.momentum > 0:
# Use moving-average updates to accumulate batch-specific precision
# matrices.
precision_matrix_minibatch = precision_matrix_minibatch / batch_size
precision_matrix_new = (
self.momentum * precision_matrix +
(1. - self.momentum) * precision_matrix_minibatch)
else:
# Compute exact population-wise covariance without momentum.
# If use this option, make sure to pass through data only once.
precision_matrix_new = precision_matrix + precision_matrix_minibatch
# Returns the update op.
return precision_matrix.assign(precision_matrix_new)
def reset_precision_matrix(self):
"""Resets precision matrix to its initial value.
This function is useful for reseting the model's covariance matrix at the
begining of a new epoch.
"""
precision_matrix_reset_op = self.precision_matrix.assign(
self.initial_precision_matrix)
self.add_update(precision_matrix_reset_op)
def compute_predictive_covariance(self, gp_feature):
"""Computes posterior predictive variance.
Approximates the Gaussian process posterior using random features.
Given training random feature Phi_tr (num_train, num_hidden) and testing
random feature Phi_ts (batch_size, num_hidden). The predictive covariance
matrix is computed as (assuming Gaussian likelihood):
s * Phi_ts @ inv(t(Phi_tr) * Phi_tr + s * I) @ t(Phi_ts),
where s is the ridge factor to be used for stablizing the inverse, and I is
the identity matrix with shape (num_hidden, num_hidden).
Args:
gp_feature: (tf.Tensor) The random feature of testing data to be used for
computing the covariance matrix. Shape (batch_size, gp_hidden_size).
Returns:
(tf.Tensor) Predictive covariance matrix, shape (batch_size, batch_size).
"""
# Computes the covariance matrix of the feature coefficient.
feature_cov_matrix = tf.linalg.inv(self.precision_matrix)
# Computes the covariance matrix of the gp prediction.
cov_feature_product = tf.matmul(
feature_cov_matrix, gp_feature, transpose_b=True) * self.ridge_penalty
gp_cov_matrix = tf.matmul(gp_feature, cov_feature_product)
return gp_cov_matrix
def _get_training_value(self, training=None):
if training is None:
training = tf.keras.backend.learning_phase()
if isinstance(training, int):
training = bool(training)
return training
def call(self, inputs, logits=None, training=None):
"""Minibatch updates the GP's posterior precision matrix estimate.
Args:
inputs: (tf.Tensor) GP random features, shape (batch_size,
gp_hidden_size).
logits: (tf.Tensor) Pre-activation output from the model. Needed
for Laplace approximation under a non-Gaussian likelihood.
training: (tf.bool) whether or not the layer is in training mode. If in
training mode, the gp_weight covariance is updated using gp_feature.
Returns:
gp_stddev (tf.Tensor): GP posterior predictive variance,
shape (batch_size, batch_size).
"""
batch_size = tf.shape(inputs)[0]
training = self._get_training_value(training)
if training:
# Define and register the update op for feature precision matrix.
precision_matrix_update_op = self.make_precision_matrix_update_op(
gp_feature=inputs,
logits=logits,
precision_matrix=self.precision_matrix)
self.add_update(precision_matrix_update_op)
# Return null estimate during training.
return tf.eye(batch_size, dtype=self.dtype)
else:
# Return covariance estimate during inference.
return self.compute_predictive_covariance(gp_feature=inputs)
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Lint as: python3
"""Tests for Gaussian process functions."""
import os
import shutil
from absl.testing import parameterized
import numpy as np
import tensorflow as tf
from official.nlp.modeling.layers import gaussian_process
def exact_gaussian_kernel(x1, x2):
"""Computes exact Gaussian kernel value(s) for tensors x1 and x2."""
x1_squared = tf.reduce_sum(tf.square(x1), list(range(1, len(x1.shape))))
x2_squared = tf.reduce_sum(tf.square(x2), list(range(1, len(x2.shape))))
square = (x1_squared[:, tf.newaxis] + x2_squared[tf.newaxis, :] -
2 * tf.matmul(x1, x2, transpose_b=True))
return tf.math.exp(-square / 2.)
def _generate_normal_data(num_sample, num_dim, loc):
"""Generates random data sampled from i.i.d. normal distribution."""
return np.random.normal(
size=(num_sample, num_dim), loc=loc, scale=1. / np.sqrt(num_dim))
def _generate_rbf_data(x_data, orthogonal=True):
"""Generates high-dim data that is the eigen components of a RBF kernel."""
k_rbf = exact_gaussian_kernel(x_data, x_data)
x_orth, x_diag, _ = np.linalg.svd(k_rbf)
if orthogonal:
return x_orth
return np.diag(np.sqrt(x_diag)).dot(x_orth.T)
def _make_minibatch_iterator(data_numpy, batch_size, num_epoch):
"""Makes a tf.data.Dataset for given batch size and num epoches."""
dataset = tf.data.Dataset.from_tensor_slices(data_numpy)
dataset = dataset.repeat(num_epoch).batch(batch_size)
return iter(dataset)
def _compute_posterior_kernel(x_tr, x_ts, kernel_func, ridge_penalty):
"""Computes the posterior covariance matrix of a Gaussian process."""
num_sample = x_tr.shape[0]
k_tt_inv = tf.linalg.inv(
kernel_func(x_tr, x_tr) + ridge_penalty * np.eye(num_sample))
k_ts = kernel_func(x_tr, x_ts)
k_ss = kernel_func(x_ts, x_ts)
return k_ss - tf.matmul(k_ts, tf.matmul(k_tt_inv, k_ts), transpose_a=True)
class GaussianProcessTest(tf.test.TestCase, parameterized.TestCase):
def setUp(self):
super(GaussianProcessTest, self).setUp()
self.num_data_dim = 10
self.num_inducing = 1024
self.num_train_sample = 1024
self.num_test_sample = 256
self.prec_tolerance = {'atol': 1e-3, 'rtol': 5e-2}
self.cov_tolerance = {'atol': 5e-2, 'rtol': 2.}
self.rbf_kern_func = exact_gaussian_kernel
self.x_tr = _generate_normal_data(
self.num_train_sample, self.num_data_dim, loc=0.)
self.x_ts = _generate_normal_data(
self.num_test_sample, self.num_data_dim, loc=1.)
def test_layer_build(self):
"""Tests if layer.built=True after building."""
rfgp_model = gaussian_process.RandomFeatureGaussianProcess(units=1)
rfgp_model.build(input_shape=self.x_tr.shape)
self.assertTrue(rfgp_model.built)
@parameterized.named_parameters(('rbf_data', False),
('orthogonal_data', True))
def test_laplace_covariance_minibatch(self, generate_orthogonal_data):
"""Tests if model correctly learns population-lvel precision matrix."""
batch_size = 50
epochs = 1000
x_data = _generate_rbf_data(self.x_ts, generate_orthogonal_data)
data_iterator = _make_minibatch_iterator(x_data, batch_size, epochs)
# Estimates precision matrix using minibatch.
cov_estimator = gaussian_process.LaplaceRandomFeatureCovariance(
momentum=0.999, ridge_penalty=0)
for minibatch_data in data_iterator:
_ = cov_estimator(minibatch_data, training=True)
# Evaluation
prec_mat_expected = x_data.T.dot(x_data)
prec_mat_computed = (
cov_estimator.precision_matrix.numpy() * self.num_test_sample)
np.testing.assert_allclose(prec_mat_computed, prec_mat_expected,
**self.prec_tolerance)
def test_random_feature_prior_approximation(self):
"""Tests random feature GP's ability in approximating exact GP prior."""
num_inducing = 10240
rfgp_model = gaussian_process.RandomFeatureGaussianProcess(
units=1,
num_inducing=num_inducing,
normalize_input=False,
gp_kernel_type='gaussian',
return_random_features=True)
# Extract random features.
_, _, gp_feature = rfgp_model(self.x_tr, training=True)
gp_feature_np = gp_feature.numpy()
prior_kernel_computed = gp_feature_np.dot(gp_feature_np.T)
prior_kernel_expected = self.rbf_kern_func(self.x_tr, self.x_tr)
np.testing.assert_allclose(prior_kernel_computed, prior_kernel_expected,
**self.cov_tolerance)
def test_random_feature_posterior_approximation(self):
"""Tests random feature GP's ability in approximating exact GP posterior."""
# Set momentum = 0.5 so posterior precision matrix is 0.5 * (I + K).
gp_cov_momentum = 0.5
gp_cov_ridge_penalty = 1.
num_inducing = 1024
rfgp_model = gaussian_process.RandomFeatureGaussianProcess(
units=1,
num_inducing=num_inducing,
normalize_input=False,
gp_kernel_type='gaussian',
gp_cov_momentum=gp_cov_momentum,
gp_cov_ridge_penalty=gp_cov_ridge_penalty)
# Computes posterior covariance on test data.
_, _ = rfgp_model(self.x_tr, training=True)
_, gp_cov_ts = rfgp_model(self.x_ts, training=False)
# Scale up covariance estimate since prec matrix is down-scaled by momentum.
post_kernel_computed = gp_cov_ts * gp_cov_momentum
post_kernel_expected = _compute_posterior_kernel(self.x_tr, self.x_ts,
self.rbf_kern_func,
gp_cov_ridge_penalty)
np.testing.assert_allclose(post_kernel_computed, post_kernel_expected,
**self.cov_tolerance)
def test_random_feature_linear_kernel(self):
"""Tests if linear kernel indeed leads to an identity mapping."""
# Specify linear kernel
gp_kernel_type = 'linear'
normalize_input = False
scale_random_features = False
use_custom_random_features = True
rfgp_model = gaussian_process.RandomFeatureGaussianProcess(
units=1,
normalize_input=normalize_input,
gp_kernel_type=gp_kernel_type,
scale_random_features=scale_random_features,
use_custom_random_features=use_custom_random_features,
return_random_features=True)
_, _, gp_feature = rfgp_model(self.x_tr, training=True)
# Check if linear kernel leads to identity mapping.
np.testing.assert_allclose(gp_feature, self.x_tr, **self.prec_tolerance)
def test_no_matrix_update_during_test(self):
"""Tests if the precision matrix is not updated during testing."""
rfgp_model = gaussian_process.RandomFeatureGaussianProcess(units=1)
# Training.
_, gp_covmat_null = rfgp_model(self.x_tr, training=True)
precision_mat_before_test = rfgp_model._gp_cov_layer.precision_matrix
# Testing.
_ = rfgp_model(self.x_ts, training=False)
precision_mat_after_test = rfgp_model._gp_cov_layer.precision_matrix
self.assertAllClose(
gp_covmat_null, tf.eye(self.num_train_sample), atol=1e-4)
self.assertAllClose(
precision_mat_before_test, precision_mat_after_test, atol=1e-4)
def test_state_saving_and_loading(self):
"""Tests if the loaded model returns same results."""
input_data = np.random.random((1, 2))
rfgp_model = gaussian_process.RandomFeatureGaussianProcess(units=1)
inputs = tf.keras.Input((2,), batch_size=1)
outputs = rfgp_model(inputs)
model = tf.keras.Model(inputs, outputs)
gp_output, gp_covmat = model.predict(input_data)
# Save and then load the model.
temp_dir = self.get_temp_dir()
self.addCleanup(shutil.rmtree, temp_dir)
saved_model_dir = os.path.join(temp_dir, 'rfgp_model')
model.save(saved_model_dir)
new_model = tf.keras.models.load_model(saved_model_dir)
gp_output_new, gp_covmat_new = new_model.predict(input_data)
self.assertAllClose(gp_output, gp_output_new, atol=1e-4)
self.assertAllClose(gp_covmat, gp_covmat_new, atol=1e-4)
if __name__ == '__main__':
tf.test.main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment