Commit 892dac23 authored by Frederick Liu's avatar Frederick Liu Committed by A. Unique TensorFlower
Browse files

[reuse] Add layers used in [Leveraging redundancy in attention with Reuse...

[reuse] Add layers used in [Leveraging redundancy in attention with Reuse Transformers](https://arxiv.org/abs/2110.06821).

PiperOrigin-RevId: 408969659
parent a241b9ae
......@@ -50,6 +50,14 @@ assemble new `tf.keras` layers or models.
feature-based Gaussian process described in ["Random Features for
Large-Scale Kernel Machines"](https://people.eecs.berkeley.edu/~brecht/papers/07.rah.rec.nips.pdf).
* [ReuseMultiHeadAttention](reuse_attention.py) supports passing
attention scores to be reused and avoid recomputation described in
["Leveraging redundancy in attention with Reuse Transformers"](https://arxiv.org/abs/2110.06821).
* [ReuseTransformer](reuse_transformer.py) supports reusing attention scores
from lower layers in higher layers to avoid recomputing attention scores
described in ["Leveraging redundancy in attention with Reuse Transformers"](https://arxiv.org/abs/2110.06821).
* [ReZeroTransformer](rezero_transformer.py) implements Transformer with
ReZero described in
["ReZero is All You Need: Fast Convergence at Large Depth"](https://arxiv.org/abs/2003.04887).
......
......@@ -39,6 +39,8 @@ from official.nlp.modeling.layers.position_embedding import RelativePositionBias
from official.nlp.modeling.layers.position_embedding import RelativePositionEmbedding
from official.nlp.modeling.layers.relative_attention import MultiHeadRelativeAttention
from official.nlp.modeling.layers.relative_attention import TwoStreamRelativeAttention
from official.nlp.modeling.layers.reuse_attention import ReuseMultiHeadAttention
from official.nlp.modeling.layers.reuse_transformer import ReuseTransformer
from official.nlp.modeling.layers.rezero_transformer import ReZeroTransformer
from official.nlp.modeling.layers.self_attention_mask import SelfAttentionMask
from official.nlp.modeling.layers.spectral_normalization import *
......
This diff is collapsed.
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for the attention layer."""
from absl.testing import parameterized
import numpy as np
import tensorflow as tf
from official.nlp.modeling.layers import reuse_attention as attention
class ReuseMultiHeadAttentionTest(tf.test.TestCase, parameterized.TestCase):
@parameterized.named_parameters(
("key_value_same_proj", None, None, [40, 80]),
("key_value_different_proj", 32, 60, [40, 60]),
)
def test_non_masked_attention(self, value_dim, output_shape, output_dims):
"""Test that the attention layer can be created without a mask tensor."""
test_layer = attention.ReuseMultiHeadAttention(
num_heads=12,
key_dim=64,
value_dim=value_dim,
output_shape=output_shape)
# Create a 3-dimensional input (the first dimension is implicit).
query = tf.keras.Input(shape=(40, 80))
value = tf.keras.Input(shape=(20, 80))
output = test_layer(query=query, value=value)
self.assertEqual(output.shape.as_list(), [None] + output_dims)
def test_non_masked_self_attention(self):
"""Test with one input (self-attenntion) and no mask tensor."""
test_layer = attention.ReuseMultiHeadAttention(
num_heads=12, key_dim=64)
# Create a 3-dimensional input (the first dimension is implicit).
query = tf.keras.Input(shape=(40, 80))
output = test_layer(query, query)
self.assertEqual(output.shape.as_list(), [None, 40, 80])
def test_attention_scores(self):
"""Test attention outputs with coefficients."""
test_layer = attention.ReuseMultiHeadAttention(
num_heads=12, key_dim=64)
# Create a 3-dimensional input (the first dimension is implicit).
query = tf.keras.Input(shape=(40, 80))
output, coef = test_layer(query, query, return_attention_scores=True)
self.assertEqual(output.shape.as_list(), [None, 40, 80])
self.assertEqual(coef.shape.as_list(), [None, 12, 40, 40])
def test_attention_scores_with_values(self):
"""Test attention outputs with coefficients."""
test_layer = attention.ReuseMultiHeadAttention(
num_heads=12, key_dim=64)
# Create a 3-dimensional input (the first dimension is implicit).
query = tf.keras.Input(shape=(40, 80))
value = tf.keras.Input(shape=(60, 80))
output, coef = test_layer(query, value, return_attention_scores=True)
self.assertEqual(output.shape.as_list(), [None, 40, 80])
self.assertEqual(coef.shape.as_list(), [None, 12, 40, 60])
@parameterized.named_parameters(
("with_bias", True, 0), ("no_bias", False, 0),
("reuse_all_with_bias", True, -1), ("reuse_all_no_bias", False, -1),
("reuse_partial_with_bias", True, 1),
("reuse_partial_no_bias", False, 1))
def test_masked_attention(self, use_bias, reuse_attention):
"""Test with a mask tensor."""
test_layer = attention.ReuseMultiHeadAttention(
num_heads=2, key_dim=2, use_bias=use_bias,
reuse_attention=reuse_attention)
# Create a 3-dimensional input (the first dimension is implicit).
batch_size = 3
query = tf.keras.Input(shape=(4, 8))
value = tf.keras.Input(shape=(2, 8))
mask_tensor = tf.keras.Input(shape=(4, 2))
reuse_attention_scores = tf.keras.Input(shape=(2, 4, 2))
output = test_layer(query=query, value=value, attention_mask=mask_tensor,
reuse_attention_scores=reuse_attention_scores)
# Create a model containing the test layer.
model = tf.keras.Model(
[query, value, mask_tensor, reuse_attention_scores], output)
# Generate data for the input (non-mask) tensors.
from_data = 10 * np.random.random_sample((batch_size, 4, 8))
to_data = 10 * np.random.random_sample((batch_size, 2, 8))
reuse_scores = np.random.random_sample((batch_size, 2, 4, 2))
# Invoke the data with a random set of mask data. This should mask at least
# one element.
mask_data = np.random.randint(2, size=(batch_size, 4, 2))
masked_output_data = model.predict(
[from_data, to_data, mask_data, reuse_scores])
# Invoke the same data, but with a null mask (where no elements are masked).
null_mask_data = np.ones((batch_size, 4, 2))
unmasked_output_data = model.predict(
[from_data, to_data, null_mask_data, reuse_scores])
# Because one data is masked and one is not, the outputs should not be the
# same.
if reuse_attention == -1:
self.assertAllEqual(masked_output_data, unmasked_output_data)
else:
self.assertNotAllClose(masked_output_data, unmasked_output_data)
# Tests the layer with three inputs: Q, K, V.
key = tf.keras.Input(shape=(2, 8))
output = test_layer(query, value=value, key=key, attention_mask=mask_tensor,
reuse_attention_scores=reuse_attention_scores)
model = tf.keras.Model(
[query, value, key, mask_tensor, reuse_attention_scores], output)
masked_output_data = model.predict(
[from_data, to_data, to_data, mask_data, reuse_scores])
unmasked_output_data = model.predict(
[from_data, to_data, to_data, null_mask_data, reuse_scores])
# Because one data is masked and one is not, the outputs should not be the
# same.
if reuse_attention == -1:
self.assertAllEqual(masked_output_data, unmasked_output_data)
else:
self.assertNotAllClose(masked_output_data, unmasked_output_data)
if reuse_attention > 0:
self.assertLen(test_layer._output_dense, 2)
if use_bias:
if reuse_attention == 0:
self.assertLen(test_layer._query_dense.trainable_variables, 2)
self.assertLen(test_layer._output_dense[0].trainable_variables, 2)
if len(test_layer._output_dense) == 2:
self.assertLen(test_layer._output_dense[1].trainable_variables, 1)
else:
if reuse_attention == 0:
self.assertLen(test_layer._query_dense.trainable_variables, 1)
self.assertLen(test_layer._output_dense[0].trainable_variables, 1)
if len(test_layer._output_dense) == 2:
self.assertLen(test_layer._output_dense[1].trainable_variables, 1)
def test_initializer(self):
"""Test with a specified initializer."""
test_layer = attention.ReuseMultiHeadAttention(
num_heads=12,
key_dim=64,
kernel_initializer=tf.keras.initializers.TruncatedNormal(stddev=0.02))
# Create a 3-dimensional input (the first dimension is implicit).
query = tf.keras.Input(shape=(40, 80))
output = test_layer(query, query)
self.assertEqual(output.shape.as_list(), [None, 40, 80])
def test_masked_attention_with_scores(self):
"""Test with a mask tensor."""
test_layer = attention.ReuseMultiHeadAttention(
num_heads=2, key_dim=2)
# Create a 3-dimensional input (the first dimension is implicit).
batch_size = 3
query = tf.keras.Input(shape=(4, 8))
value = tf.keras.Input(shape=(2, 8))
mask_tensor = tf.keras.Input(shape=(4, 2))
output = test_layer(query=query, value=value, attention_mask=mask_tensor)
# Create a model containing the test layer.
model = tf.keras.Model([query, value, mask_tensor], output)
# Generate data for the input (non-mask) tensors.
from_data = 10 * np.random.random_sample((batch_size, 4, 8))
to_data = 10 * np.random.random_sample((batch_size, 2, 8))
# Invoke the data with a random set of mask data. This should mask at least
# one element.
mask_data = np.random.randint(2, size=(batch_size, 4, 2))
masked_output_data = model.predict([from_data, to_data, mask_data])
# Invoke the same data, but with a null mask (where no elements are masked).
null_mask_data = np.ones((batch_size, 4, 2))
unmasked_output_data = model.predict([from_data, to_data, null_mask_data])
# Because one data is masked and one is not, the outputs should not be the
# same.
self.assertNotAllClose(masked_output_data, unmasked_output_data)
# Create a model containing attention scores.
output, scores = test_layer(
query=query, value=value, attention_mask=mask_tensor,
return_attention_scores=True)
model = tf.keras.Model([query, value, mask_tensor], [output, scores])
masked_output_data_score, masked_score = model.predict(
[from_data, to_data, mask_data])
unmasked_output_data_score, unmasked_score = model.predict(
[from_data, to_data, null_mask_data])
self.assertNotAllClose(masked_output_data_score, unmasked_output_data_score)
self.assertAllClose(masked_output_data, masked_output_data_score)
self.assertAllClose(unmasked_output_data, unmasked_output_data_score)
self.assertNotAllClose(masked_score, unmasked_score)
@parameterized.named_parameters(
("4d_inputs_1freebatch_mask2", [3, 4], [3, 2], [4, 2],
(2,)), ("4d_inputs_1freebatch_mask3", [3, 4], [3, 2], [3, 4, 2], (2,)),
("4d_inputs_1freebatch_mask4", [3, 4], [3, 2], [3, 2, 4, 2],
(2,)), ("4D_inputs_2D_attention", [3, 4], [3, 2], [3, 4, 3, 2], (1, 2)),
("5D_inputs_2D_attention", [5, 3, 4], [5, 3, 2], [3, 4, 3, 2], (2, 3)),
("5D_inputs_2D_attention_fullmask", [5, 3, 4], [5, 3, 2], [5, 3, 4, 3, 2],
(2, 3)))
def test_high_dim_attention(self, q_dims, v_dims, mask_dims, attention_axes):
"""Test with a mask tensor."""
test_layer = attention.ReuseMultiHeadAttention(
num_heads=2, key_dim=2, attention_axes=attention_axes)
batch_size, hidden_size = 3, 8
# Generate data for the input (non-mask) tensors.
query_shape = [batch_size] + q_dims + [hidden_size]
value_shape = [batch_size] + v_dims + [hidden_size]
mask_shape = [batch_size] + mask_dims
query = 10 * np.random.random_sample(query_shape)
value = 10 * np.random.random_sample(value_shape)
# Invoke the data with a random set of mask data. This should mask at least
# one element.
mask_data = np.random.randint(2, size=mask_shape).astype("bool")
# Invoke the same data, but with a null mask (where no elements are masked).
null_mask_data = np.ones(mask_shape)
# Because one data is masked and one is not, the outputs should not be the
# same.
query_tensor = tf.keras.Input(query_shape[1:], name="query")
value_tensor = tf.keras.Input(value_shape[1:], name="value")
mask_tensor = tf.keras.Input(mask_shape[1:], name="mask")
output = test_layer(query=query_tensor, value=value_tensor,
attention_mask=mask_tensor)
model = tf.keras.Model([query_tensor, value_tensor, mask_tensor], output)
self.assertNotAllClose(
model.predict([query, value, mask_data]),
model.predict([query, value, null_mask_data]))
def test_dropout(self):
test_layer = attention.ReuseMultiHeadAttention(
num_heads=2, key_dim=2, dropout=0.5)
# Generate data for the input (non-mask) tensors.
from_data = tf.keras.backend.ones(shape=(32, 4, 8))
to_data = tf.keras.backend.ones(shape=(32, 2, 8))
train_out = test_layer(from_data, to_data, None, None, None, True)
test_out = test_layer(from_data, to_data, None, None, None, False)
# Output should be close when not in training mode,
# and should not be close when enabling dropout in training mode.
self.assertNotAllClose(
tf.keras.backend.eval(train_out),
tf.keras.backend.eval(test_out))
def test_non_masked_self_attention_with_reuse(self):
"""Test with one input (self-attenntion) and no mask tensor."""
test_layer = attention.ReuseMultiHeadAttention(
num_heads=12, key_dim=64, reuse_attention=True)
# Create a 3-dimensional input (the first dimension is implicit).
query = tf.keras.Input(shape=(40, 80))
reuse_scores = tf.keras.Input(shape=(12, 40, 40))
output = test_layer(query, query, reuse_attention_scores=reuse_scores)
self.assertEqual(output.shape.as_list(), [None, 40, 80])
@parameterized.named_parameters(
("no_reuse_with_pe_max_seq_length_20", False, 20),
("reuse_all_with_pe_max_seq_length_20", True, 20),
("reuse_partial_with_pe_max_seq_length_20", 5, 20),
("no_reuse_with_pe_max_seq_length_40", False, 40),
("reuse_all_with_pe_max_seq_length_40", True, 40),
("reuse_partial_with_pe_max_seq_length_40", 5, 40))
def test_non_masked_self_attention_with_relative_pe(self, reuse_attention,
pe_max_seq_length):
"""Test with one input (self-attenntion) and no mask tensor."""
test_layer = attention.ReuseMultiHeadAttention(
num_heads=12, key_dim=64, reuse_attention=reuse_attention,
use_relative_pe=True, pe_max_seq_length=pe_max_seq_length)
# Create a 3-dimensional input (the first dimension is implicit).
query = tf.keras.Input(shape=(40, 80))
reuse_scores = tf.keras.Input(shape=(12, 40, 40))
output = test_layer(query, query, reuse_attention_scores=reuse_scores)
self.assertEqual(output.shape.as_list(), [None, 40, 80])
query = tf.keras.Input(shape=(30, 80))
reuse_scores = tf.keras.Input(shape=(12, 30, 30))
output = test_layer(query, query, reuse_attention_scores=reuse_scores)
self.assertEqual(output.shape.as_list(), [None, 30, 80])
query = tf.keras.Input(shape=(30, 80))
key = tf.keras.Input(shape=(20, 80))
reuse_scores = tf.keras.Input(shape=(12, 30, 20))
output = test_layer(query, key, reuse_attention_scores=reuse_scores)
self.assertEqual(output.shape.as_list(), [None, 30, 80])
query = tf.keras.Input(shape=(50, 80))
key = tf.keras.Input(shape=(60, 80))
reuse_scores = tf.keras.Input(shape=(12, 50, 60))
output = test_layer(query, key, reuse_attention_scores=reuse_scores)
self.assertEqual(output.shape.as_list(), [None, 50, 80])
if __name__ == "__main__":
tf.test.main()
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Keras-based TransformerEncoder block layer."""
import tensorflow as tf
from official.nlp.modeling.layers import reuse_attention as attention
class ReuseTransformer(tf.keras.layers.Layer):
"""Transformer layer.
This layer implements the ReuseTransformer Encoder from
"Leveraging redundancy in attention with Reuse Transformers".
(https://arxiv.org/abs/2110.06821)
"""
def __init__(self,
num_attention_heads,
inner_dim,
inner_activation,
head_size=None,
output_range=None,
kernel_initializer="glorot_uniform",
bias_initializer="zeros",
kernel_regularizer=None,
bias_regularizer=None,
activity_regularizer=None,
kernel_constraint=None,
bias_constraint=None,
use_bias=True,
norm_first=False,
norm_epsilon=1e-12,
output_dropout=0.0,
attention_dropout=0.0,
inner_dropout=0.0,
attention_initializer=None,
attention_axes=None,
reuse_attention=0,
use_relative_pe=False,
pe_max_seq_length=512,
layer_idx=None,
**kwargs):
"""Initializes `ReuseTransformer`.
Args:
num_attention_heads: Number of attention heads.
inner_dim: The output dimension of the first Dense layer in a two-layer
feedforward network.
inner_activation: The activation for the first Dense layer in a two-layer
feedforward network.
head_size: Projection size of heads.
output_range: the sequence output range, [0, output_range) for slicing the
target sequence. `None` means the target sequence is not sliced.
kernel_initializer: Initializer for dense layer kernels.
bias_initializer: Initializer for dense layer biases.
kernel_regularizer: Regularizer for dense layer kernels.
bias_regularizer: Regularizer for dense layer biases.
activity_regularizer: Regularizer for dense layer activity.
kernel_constraint: Constraint for dense layer kernels.
bias_constraint: Constraint for dense layer kernels.
use_bias: Whether to enable use_bias in attention layer. If set False,
use_bias in attention layer is disabled.
norm_first: Whether to normalize inputs to attention and intermediate
dense layers. If set False, output of attention and intermediate dense
layers is normalized.
norm_epsilon: Epsilon value to initialize normalization layers.
output_dropout: Dropout probability for the post-attention and output
dropout.
attention_dropout: Dropout probability for within the attention layer.
inner_dropout: Dropout probability for the first Dense layer in a
two-layer feedforward network.
attention_initializer: Initializer for kernels of attention layers. If set
`None`, attention layers use kernel_initializer as initializer for
kernel.
attention_axes: axes over which the attention is applied. `None` means
attention over all axes, but batch, heads, and features.
reuse_attention: reuse_attention: An integer specifying number of heads
to reuse. -1 for all heads.
use_relative_pe: whether to use relative position bias.
pe_max_seq_length: used to set the size of the relative positin encodings.
layer_idx: the idx of this layer.
**kwargs: keyword arguments.
"""
super().__init__(**kwargs)
self._num_heads = num_attention_heads
self._inner_dim = inner_dim
self._inner_activation = inner_activation
self._head_size = head_size
self._attention_dropout = attention_dropout
self._attention_dropout_rate = attention_dropout
self._output_dropout = output_dropout
self._output_dropout_rate = output_dropout
self._output_range = output_range
self._kernel_initializer = tf.keras.initializers.get(kernel_initializer)
self._bias_initializer = tf.keras.initializers.get(bias_initializer)
self._kernel_regularizer = tf.keras.regularizers.get(kernel_regularizer)
self._bias_regularizer = tf.keras.regularizers.get(bias_regularizer)
self._activity_regularizer = tf.keras.regularizers.get(activity_regularizer)
self._kernel_constraint = tf.keras.constraints.get(kernel_constraint)
self._bias_constraint = tf.keras.constraints.get(bias_constraint)
self._use_bias = use_bias
self._norm_first = norm_first
self._norm_epsilon = norm_epsilon
self._inner_dropout = inner_dropout
self._reuse_attention = reuse_attention
self._use_relative_pe = use_relative_pe
self._pe_max_seq_length = pe_max_seq_length
self._layer_idx = layer_idx
# Special handling for the first layer.
# Consider taking a list to config each layer by layer index.
if self._layer_idx is not None and self._layer_idx == 0:
self._reuse_attention = 0
if attention_initializer:
self._attention_initializer = tf.keras.initializers.get(
attention_initializer)
else:
self._attention_initializer = self._kernel_initializer
self._attention_axes = attention_axes
def build(self, input_shape):
if isinstance(input_shape, tf.TensorShape):
input_tensor_shape = input_shape
elif isinstance(input_shape, (list, tuple)):
input_tensor_shape = tf.TensorShape(input_shape[0])
else:
raise ValueError(
"The type of input shape argument is not supported, got: %s" %
type(input_shape))
einsum_equation = "abc,cd->abd"
if len(input_tensor_shape.as_list()) > 3:
einsum_equation = "...bc,cd->...bd"
hidden_size = input_tensor_shape[-1]
if self._head_size is None:
if hidden_size % self._num_heads != 0:
raise ValueError(
"The input size (%d) is not a multiple of the number of attention "
"heads (%d)" % (hidden_size, self._num_heads))
self._attention_head_size = int(hidden_size // self._num_heads)
else:
self._attention_head_size = self._head_size
common_kwargs = dict(
bias_initializer=self._bias_initializer,
kernel_regularizer=self._kernel_regularizer,
bias_regularizer=self._bias_regularizer,
activity_regularizer=self._activity_regularizer,
kernel_constraint=self._kernel_constraint,
bias_constraint=self._bias_constraint)
self._attention_layer = attention.ReuseMultiHeadAttention(
num_heads=self._num_heads,
key_dim=self._attention_head_size,
dropout=self._attention_dropout,
use_bias=self._use_bias,
kernel_initializer=self._attention_initializer,
attention_axes=self._attention_axes,
reuse_attention=self._reuse_attention,
use_relative_pe=self._use_relative_pe,
pe_max_seq_length=self._pe_max_seq_length,
name="self_attention",
**common_kwargs)
self._attention_dropout = tf.keras.layers.Dropout(
rate=self._output_dropout)
# Use float32 in layernorm for numeric stability.
# It is probably safe in mixed_float16, but we haven't validated this yet.
self._attention_layer_norm = (
tf.keras.layers.LayerNormalization(
name="self_attention_layer_norm",
axis=-1,
epsilon=self._norm_epsilon,
dtype=tf.float32))
self._intermediate_dense = tf.keras.layers.experimental.EinsumDense(
einsum_equation,
output_shape=(None, self._inner_dim),
bias_axes="d",
kernel_initializer=self._kernel_initializer,
name="intermediate",
**common_kwargs)
policy = tf.keras.mixed_precision.global_policy()
if policy.name == "mixed_bfloat16":
# bfloat16 causes BERT with the LAMB optimizer to not converge
# as well, so we use float32.
# TODO(b/154538392): Investigate this.
policy = tf.float32
self._intermediate_activation_layer = tf.keras.layers.Activation(
self._inner_activation, dtype=policy)
self._inner_dropout_layer = tf.keras.layers.Dropout(
rate=self._inner_dropout)
self._output_dense = tf.keras.layers.experimental.EinsumDense(
einsum_equation,
output_shape=(None, hidden_size),
bias_axes="d",
name="output",
kernel_initializer=self._kernel_initializer,
**common_kwargs)
self._output_dropout = tf.keras.layers.Dropout(rate=self._output_dropout)
# Use float32 in layernorm for numeric stability.
self._output_layer_norm = tf.keras.layers.LayerNormalization(
name="output_layer_norm",
axis=-1,
epsilon=self._norm_epsilon,
dtype=tf.float32)
super(ReuseTransformer, self).build(input_shape)
def get_config(self):
config = {
"num_attention_heads":
self._num_heads,
"inner_dim":
self._inner_dim,
"inner_activation":
self._inner_activation,
"head_size":
self._head_size,
"output_dropout":
self._output_dropout_rate,
"attention_dropout":
self._attention_dropout_rate,
"output_range":
self._output_range,
"reuse_attention":
self._reuse_attention,
"use_relative_pe": self._use_relative_pe,
"pe_max_seq_length": self._pe_max_seq_length,
"kernel_initializer":
tf.keras.initializers.serialize(self._kernel_initializer),
"bias_initializer":
tf.keras.initializers.serialize(self._bias_initializer),
"kernel_regularizer":
tf.keras.regularizers.serialize(self._kernel_regularizer),
"bias_regularizer":
tf.keras.regularizers.serialize(self._bias_regularizer),
"activity_regularizer":
tf.keras.regularizers.serialize(self._activity_regularizer),
"kernel_constraint":
tf.keras.constraints.serialize(self._kernel_constraint),
"bias_constraint":
tf.keras.constraints.serialize(self._bias_constraint),
"use_bias":
self._use_bias,
"norm_first":
self._norm_first,
"norm_epsilon":
self._norm_epsilon,
"inner_dropout":
self._inner_dropout,
"attention_initializer":
tf.keras.initializers.serialize(self._attention_initializer),
"attention_axes": self._attention_axes,
}
base_config = super(ReuseTransformer, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
def call(self, inputs):
"""Transformer self-attention encoder block call.
Args:
inputs: a single tensor or a list of tensors.
`input tensor` as the single sequence of embeddings.
[`input tensor`, `attention mask`] to have the additional attention
mask.
[`query tensor`, `attention mask`, `attention scores`] to have
additional attention scores for reuse computation. If `attention scores`
is None, the reuse_attention flag will be ignored.
Returns:
An output tensor with the same dimensions as input/query tensor.
Attention scores if return_attention_scores is true.
"""
if isinstance(inputs, (list, tuple)):
if len(inputs) == 2:
input_tensor, attention_mask = inputs
reuse_attention_scores = None
elif len(inputs) == 3:
input_tensor, attention_mask, reuse_attention_scores = inputs
else:
raise ValueError("Unexpected inputs to %s with length at %d" %
(self.__class__, len(inputs)))
else:
input_tensor, attention_mask, reuse_attention_scores = (inputs, None,
None)
key_value = None
if self._reuse_attention != 0 and reuse_attention_scores is None:
raise ValueError(
"reuse_attention_scores cannot be None when reuse_attention != 0.")
if self._output_range:
if self._norm_first:
source_tensor = input_tensor[:, 0:self._output_range, :]
input_tensor = self._attention_layer_norm(input_tensor)
if key_value is not None:
key_value = self._attention_layer_norm(key_value)
target_tensor = input_tensor[:, 0:self._output_range, :]
if attention_mask is not None:
attention_mask = attention_mask[:, 0:self._output_range, :]
if reuse_attention_scores is not None:
reuse_attention_scores = reuse_attention_scores[:, :,
0:self._output_range, :]
else:
if self._norm_first:
source_tensor = input_tensor
input_tensor = self._attention_layer_norm(input_tensor)
if key_value is not None:
key_value = self._attention_layer_norm(key_value)
target_tensor = input_tensor
if key_value is None:
key_value = input_tensor
attention_output = self._attention_layer(
query=target_tensor, value=key_value, attention_mask=attention_mask,
reuse_attention_scores=reuse_attention_scores,
return_attention_scores=True)
attention_output, attention_scores = attention_output
attention_output = self._attention_dropout(attention_output)
if self._norm_first:
attention_output = source_tensor + attention_output
else:
attention_output = self._attention_layer_norm(target_tensor +
attention_output)
if self._norm_first:
source_attention_output = attention_output
attention_output = self._output_layer_norm(attention_output)
inner_output = self._intermediate_dense(attention_output)
inner_output = self._intermediate_activation_layer(inner_output)
inner_output = self._inner_dropout_layer(inner_output)
layer_output = self._output_dense(inner_output)
layer_output = self._output_dropout(layer_output)
if self._norm_first:
return source_attention_output + layer_output, attention_scores
# During mixed precision training, layer norm output is always fp32 for now.
# Casts fp32 for the subsequent add.
layer_output = tf.cast(layer_output, tf.float32)
layer_output = self._output_layer_norm(layer_output + attention_output)
return layer_output, attention_scores
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for Keras-based transformer block layer."""
from absl.testing import parameterized
import numpy as np
import tensorflow as tf
from official.nlp.modeling.layers import reuse_transformer
@parameterized.named_parameters(
('base', reuse_transformer.ReuseTransformer))
class ReuseTransformerLayerTest(tf.test.TestCase, parameterized.TestCase):
def tearDown(self):
super(ReuseTransformerLayerTest, self).tearDown()
tf.keras.mixed_precision.set_global_policy('float32')
def test_layer_creation(self, transformer_cls):
test_layer = transformer_cls(
num_attention_heads=10, inner_dim=2048, inner_activation='relu')
sequence_length = 21
width = 80
# Create a 3-dimensional input (the first dimension is implicit).
data_tensor = tf.keras.Input(shape=(sequence_length, width))
output_tensor, _ = test_layer(data_tensor)
# The default output of a transformer layer should be the same as the input.
self.assertEqual(data_tensor.shape.as_list(), output_tensor.shape.as_list())
def test_layer_creation_with_mask(self, transformer_cls):
test_layer = transformer_cls(
num_attention_heads=10, inner_dim=2048, inner_activation='relu')
sequence_length = 21
width = 80
# Create a 3-dimensional input (the first dimension is implicit).
data_tensor = tf.keras.Input(shape=(sequence_length, width))
# Create a 2-dimensional input (the first dimension is implicit).
mask_tensor = tf.keras.Input(shape=(sequence_length, sequence_length))
output_tensor, _ = test_layer([data_tensor, mask_tensor])
# The default output of a transformer layer should be the same as the input.
self.assertEqual(data_tensor.shape.as_list(), output_tensor.shape.as_list())
def test_layer_invocation(self, transformer_cls):
test_layer = transformer_cls(
num_attention_heads=10, inner_dim=2048, inner_activation='relu')
sequence_length = 21
width = 80
# Create a 3-dimensional input (the first dimension is implicit).
data_tensor = tf.keras.Input(shape=(sequence_length, width))
output_tensor = test_layer(data_tensor)
# Create a model from the test layer.
model = tf.keras.Model(data_tensor, output_tensor)
# Invoke the model on test data. We can't validate the output data itself
# (the NN is too complex) but this will rule out structural runtime errors.
batch_size = 6
input_data = 10 * np.random.random_sample(
(batch_size, sequence_length, width))
_ = model.predict(input_data)
def test_layer_invocation_with_mask(self, transformer_cls):
test_layer = transformer_cls(
num_attention_heads=10, inner_dim=2048, inner_activation='relu')
sequence_length = 21
width = 80
# Create a 3-dimensional input (the first dimension is implicit).
data_tensor = tf.keras.Input(shape=(sequence_length, width))
# Create a 2-dimensional input (the first dimension is implicit).
mask_tensor = tf.keras.Input(shape=(sequence_length, sequence_length))
output_tensor = test_layer([data_tensor, mask_tensor])
# Create a model from the test layer.
model = tf.keras.Model([data_tensor, mask_tensor], output_tensor)
# Invoke the model on test data. We can't validate the output data itself
# (the NN is too complex) but this will rule out structural runtime errors.
batch_size = 6
input_data = 10 * np.random.random_sample(
(batch_size, sequence_length, width))
# The attention mask should be of shape (batch, from_seq_len, to_seq_len),
# which here is (batch, sequence_length, sequence_length)
mask_data = np.random.randint(
2, size=(batch_size, sequence_length, sequence_length))
_ = model.predict([input_data, mask_data])
def test_layer_output_range(self, transformer_cls):
test_layer = transformer_cls(
num_attention_heads=10, inner_dim=2048, inner_activation='relu')
sequence_length = 21
width = 80
batch_size = 6
input_data = 10 * np.random.random_sample(
(batch_size, sequence_length, width))
mask_data = np.random.randint(
2, size=(batch_size, sequence_length, sequence_length))
output_tensor, _ = test_layer([input_data, mask_data])
# The layer only attends to the first token and outputs the first token
# embedding.
new_layer = transformer_cls(
num_attention_heads=10,
inner_dim=2048,
inner_activation='relu',
output_range=1)
_ = new_layer([input_data, mask_data])
new_layer.set_weights(test_layer.get_weights())
new_output_tensor, _ = new_layer([input_data, mask_data])
self.assertAllClose(
new_output_tensor, output_tensor[:, 0:1, :], atol=5e-5, rtol=0.003)
def test_layer_output_range_with_relative_pe(self, transformer_cls):
test_layer = transformer_cls(
num_attention_heads=10, inner_dim=2048, inner_activation='relu',
use_relative_pe=True)
sequence_length = 21
width = 80
batch_size = 6
input_data = 10 * np.random.random_sample(
(batch_size, sequence_length, width))
mask_data = np.random.randint(
2, size=(batch_size, sequence_length, sequence_length))
output_tensor, _ = test_layer([input_data, mask_data])
# The layer only attends to the first token and outputs the first token
# embedding.
new_layer = transformer_cls(
num_attention_heads=10,
inner_dim=2048,
inner_activation='relu',
output_range=1,
use_relative_pe=True)
_ = new_layer([input_data, mask_data])
new_layer.set_weights(test_layer.get_weights())
new_output_tensor, _ = new_layer([input_data, mask_data])
self.assertAllClose(
new_output_tensor, output_tensor[:, 0:1, :], atol=5e-5, rtol=0.003)
def test_layer_output_range_without_mask(self, transformer_cls):
test_layer = transformer_cls(
num_attention_heads=10, inner_dim=2048,
inner_activation='relu', norm_first=True)
sequence_length = 21
width = 80
batch_size = 6
input_data = 10 * np.random.random_sample(
(batch_size, sequence_length, width))
output_tensor, _ = test_layer(input_data)
# The layer only attends to the first token and outputs the first token
# embedding.
new_layer = transformer_cls(
num_attention_heads=10,
inner_dim=2048,
inner_activation='relu',
output_range=1,
norm_first=True)
_ = new_layer(input_data)
new_layer.set_weights(test_layer.get_weights())
new_output_tensor, _ = new_layer(input_data)
self.assertAllClose(
new_output_tensor, output_tensor[:, 0:1, :], atol=5e-5, rtol=0.003)
def test_layer_output_range_with_pre_norm(self, transformer_cls):
test_layer = transformer_cls(
num_attention_heads=10, inner_dim=2048,
inner_activation='relu', norm_first=True)
sequence_length = 21
width = 80
batch_size = 6
input_data = 10 * np.random.random_sample(
(batch_size, sequence_length, width))
mask_data = np.random.randint(
2, size=(batch_size, sequence_length, sequence_length))
output_tensor, _ = test_layer([input_data, mask_data])
# The layer only attends to the first token and outputs the first token
# embedding.
new_layer = transformer_cls(
num_attention_heads=10,
inner_dim=2048,
inner_activation='relu',
output_range=1,
norm_first=True)
_ = new_layer([input_data, mask_data])
new_layer.set_weights(test_layer.get_weights())
new_output_tensor, _ = new_layer([input_data, mask_data])
self.assertAllClose(
new_output_tensor, output_tensor[:, 0:1, :], atol=5e-5, rtol=0.003)
def test_layer_invocation_with_float16_dtype(self, transformer_cls):
tf.keras.mixed_precision.set_global_policy('mixed_float16')
test_layer = transformer_cls(
num_attention_heads=10, inner_dim=2048, inner_activation='relu')
sequence_length = 21
width = 80
# Create a 3-dimensional input (the first dimension is implicit).
data_tensor = tf.keras.Input(shape=(sequence_length, width))
# Create a 2-dimensional input (the first dimension is implicit).
mask_tensor = tf.keras.Input(shape=(sequence_length, sequence_length))
output_tensor = test_layer([data_tensor, mask_tensor])
# Create a model from the test layer.
model = tf.keras.Model([data_tensor, mask_tensor], output_tensor)
# Invoke the model on test data. We can't validate the output data itself
# (the NN is too complex) but this will rule out structural runtime errors.
batch_size = 6
input_data = (10 * np.random.random_sample(
(batch_size, sequence_length, width)))
# The attention mask should be of shape (batch, from_seq_len, to_seq_len),
# which here is (batch, sequence_length, sequence_length)
mask_data = np.random.randint(
2, size=(batch_size, sequence_length, sequence_length))
_ = model.predict([input_data, mask_data])
def test_transform_with_initializer(self, transformer_cls):
test_layer = transformer_cls(
num_attention_heads=10,
inner_dim=2048,
inner_activation='relu',
kernel_initializer=tf.keras.initializers.TruncatedNormal(stddev=0.02))
sequence_length = 21
width = 80
# Create a 3-dimensional input (the first dimension is implicit).
data_tensor = tf.keras.Input(shape=(sequence_length, width))
output, _ = test_layer(data_tensor)
# The default output of a transformer layer should be the same as the input.
self.assertEqual(data_tensor.shape.as_list(), output.shape.as_list())
def test_dynamic_layer_sequence(self, transformer_cls):
test_layer = transformer_cls(
num_attention_heads=10,
inner_dim=2048,
inner_activation='relu',
kernel_initializer=tf.keras.initializers.TruncatedNormal(stddev=0.02))
# Create a 3-dimensional input (the first dimension is implicit).
width = 30
input_tensor = tf.keras.Input(shape=(None, width))
output_tensor, _ = test_layer(input_tensor)
model = tf.keras.Model(input_tensor, output_tensor)
input_length = 17
input_data = np.ones((1, input_length, width))
output_data = model.predict(input_data)
self.assertAllEqual([1, input_length, width], output_data.shape)
class ReuseTransformerArgumentTest(tf.test.TestCase, parameterized.TestCase):
def test_use_bias_norm_first(self):
num_attention_heads = 2
hidden_size = 16
encoder_block = reuse_transformer.ReuseTransformer(
num_attention_heads=num_attention_heads,
inner_dim=32,
inner_activation='relu',
output_dropout=0.1,
attention_dropout=0.1,
use_bias=False,
norm_first=True,
norm_epsilon=1e-6,
inner_dropout=0.1,
attention_initializer=tf.keras.initializers.RandomUniform(
minval=0., maxval=1.))
# Forward path.
dummy_tensor = tf.zeros([2, 4, 16], dtype=tf.float32)
dummy_mask = tf.zeros([2, 4, 4], dtype=tf.float32)
inputs = [dummy_tensor, dummy_mask]
output, _ = encoder_block(inputs)
self.assertEqual(output.shape, (2, 4, hidden_size))
def test_get_config(self):
num_attention_heads = 2
encoder_block = reuse_transformer.ReuseTransformer(
num_attention_heads=num_attention_heads,
inner_dim=32,
inner_activation='relu',
output_dropout=0.1,
attention_dropout=0.1,
use_bias=False,
norm_first=True,
norm_epsilon=1e-6,
inner_dropout=0.1,
attention_initializer=tf.keras.initializers.RandomUniform(
minval=0., maxval=1.))
encoder_block_config = encoder_block.get_config()
new_encoder_block = reuse_transformer.ReuseTransformer.from_config(
encoder_block_config)
self.assertEqual(encoder_block_config, new_encoder_block.get_config())
@parameterized.parameters({'attention_axes': None}, {'attention_axes': [1]},
{'attention_axes': [2]}, {'attention_axes': [1, 2]})
def test_several_attention_axes(self, attention_axes):
test_layer = reuse_transformer.ReuseTransformer(
inner_dim=32,
inner_activation='relu',
output_dropout=0.1,
attention_dropout=0.1,
use_bias=False,
norm_first=True,
norm_epsilon=1e-6,
inner_dropout=0.1,
num_attention_heads=10,
attention_axes=attention_axes)
num_rows = 21
num_cols = 13
width = 80
# Create a 3-dimensional input (the first dimension is implicit).
data_tensor = tf.keras.Input(shape=(num_rows, num_cols, width))
output_tensor, _ = test_layer(data_tensor)
# The default output of a transformer layer should be the same as the input.
self.assertEqual(data_tensor.shape.as_list(), output_tensor.shape.as_list())
@parameterized.named_parameters(
('plain', False, False, False),
('plain_returnscore', False, True, False),
('plain_with_relative_pe', False, False, True),
('reuse_all', True, False, False),
('reuse_all_returnscore', True, True, False),
('reuse_all_with_relative_pe', True, False, True),
('reuse_5', 5, False, False),
('reuse_5_returnscore', 5, True, False),
('reuse_5_with_relative_pe', 5, False, True),)
def test_layer_invocation_with_mask(self, reuse_attention,
return_attention_scores, use_relative_pe):
test_layer = reuse_transformer.ReuseTransformer(
num_attention_heads=10,
inner_dim=2048,
inner_activation='relu',
reuse_attention=reuse_attention,
use_relative_pe=use_relative_pe)
sequence_length = 21
width = 80
# Create a 3-dimensional input (the first dimension is implicit).
data_tensor = tf.keras.Input(shape=(sequence_length, width))
# Create a 2-dimensional input (the first dimension is implicit).
mask_tensor = tf.keras.Input(shape=(sequence_length, sequence_length))
return_scores_tensor = tf.keras.Input(shape=(1,))
reuse_attention_scores = tf.keras.Input(
shape=(10, sequence_length, sequence_length))
output_tensor, _ = test_layer(
[data_tensor, mask_tensor, reuse_attention_scores])
# Create a model from the test layer.
model = tf.keras.Model(
([data_tensor, mask_tensor, reuse_attention_scores],
return_scores_tensor), output_tensor)
# Invoke the model on test data. We can't validate the output data itself
# (the NN is too complex) but this will rule out structural runtime errors.
batch_size = 6
input_data = 10 * np.random.random_sample(
(batch_size, sequence_length, width))
# The attention mask should be of shape (batch, from_seq_len, to_seq_len),
# which here is (batch, sequence_length, sequence_length)
mask_data = np.random.randint(
2, size=(batch_size, sequence_length, sequence_length))
reuse_scores = np.random.rand(
batch_size, 10, sequence_length, sequence_length)
_ = model.predict([input_data, mask_data, reuse_scores],
return_attention_scores)
@parameterized.named_parameters(
('without_relative_pe_with_pe_max_seq_length_10', False, 10),
('with_relative_pe_with_pe_max_seq_length_10', True, 10),
('without_relative_pe_with_pe_max_seq_length_100', False, 100),
('with_relative_pe_with_pe_max_seq_length_100', True, 100))
def test_layer_invocation_with_float16_with_relative_pe(
self, use_relative_pe, pe_max_seq_length):
tf.keras.mixed_precision.set_global_policy('mixed_float16')
test_layer = reuse_transformer.ReuseTransformer(
num_attention_heads=10, inner_dim=2048, inner_activation='relu',
use_relative_pe=use_relative_pe, pe_max_seq_length=pe_max_seq_length)
sequence_length = 21
width = 80
# Create a 3-dimensional input (the first dimension is implicit).
data_tensor = tf.keras.Input(shape=(sequence_length, width))
# Create a 2-dimensional input (the first dimension is implicit).
mask_tensor = tf.keras.Input(shape=(sequence_length, sequence_length))
output_tensor = test_layer([data_tensor, mask_tensor])
# Create a model from the test layer.
model = tf.keras.Model([data_tensor, mask_tensor], output_tensor)
# Invoke the model on test data. We can't validate the output data itself
# (the NN is too complex) but this will rule out structural runtime errors.
batch_size = 6
input_data = (10 * np.random.random_sample(
(batch_size, sequence_length, width)))
# The attention mask should be of shape (batch, from_seq_len, to_seq_len),
# which here is (batch, sequence_length, sequence_length)
mask_data = np.random.randint(
2, size=(batch_size, sequence_length, sequence_length))
_ = model.predict([input_data, mask_data])
if __name__ == '__main__':
tf.test.main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment