Commit f1d35b4e authored by Hongkun Yu's avatar Hongkun Yu Committed by A. Unique TensorFlower
Browse files

Release keras bert:

- Update classifier example.
- Add new converted checkpoints.
- Update benchmark,

PiperOrigin-RevId: 279762797
parent 0351cb87
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for masked LM loss."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import tensorflow as tf
from tensorflow.python.keras import keras_parameterized # pylint: disable=g-direct-tensorflow-import
from official.nlp.modeling import networks
from official.nlp.modeling.losses import weighted_sparse_categorical_crossentropy
@keras_parameterized.run_all_keras_modes
class ClassificationLossTest(keras_parameterized.TestCase):
def create_lm_model(self,
vocab_size,
sequence_length,
hidden_size,
num_predictions,
output="predictions"):
# First, create a transformer stack that we can use to get the LM's
# vocabulary weight.
xformer_stack = networks.TransformerEncoder(
vocab_size=vocab_size,
num_layers=1,
sequence_length=sequence_length,
hidden_size=hidden_size,
num_attention_heads=4,
)
word_ids = tf.keras.Input(shape=(sequence_length,), dtype=tf.int32)
mask = tf.keras.Input(shape=(sequence_length,), dtype=tf.int32)
type_ids = tf.keras.Input(shape=(sequence_length,), dtype=tf.int32)
lm_outputs, _ = xformer_stack([word_ids, mask, type_ids])
# Create a maskedLM from the transformer stack.
test_network = networks.MaskedLM(
num_predictions=num_predictions,
input_width=lm_outputs.shape[-1],
source_network=xformer_stack,
output=output)
# Create a model from the masked LM layer.
lm_input_tensor = tf.keras.Input(shape=(sequence_length, hidden_size))
masked_lm_positions = tf.keras.Input(
shape=(num_predictions,), dtype=tf.int32)
output = test_network([lm_input_tensor, masked_lm_positions])
return tf.keras.Model([lm_input_tensor, masked_lm_positions], output)
def create_classification_model(self, input_width, num_classes):
test_object = networks.Classification(
input_width=input_width, num_classes=num_classes)
# Create a 2-dimensional input (the first dimension is implicit).
pooled_data = tf.keras.Input(shape=(input_width,), dtype=tf.float32)
output = test_object(pooled_data)
return tf.keras.Model(pooled_data, output)
def test_per_example_loss_3d_input(self):
"""Test per-example loss with a 3-dimensional input, from a masked LM."""
vocab_size = 100
sequence_length = 32
hidden_size = 64
num_predictions = 21
model = self.create_lm_model(
vocab_size=vocab_size,
sequence_length=sequence_length,
hidden_size=hidden_size,
num_predictions=num_predictions)
# Get the output of the masked LM.
batch_size = 3
lm_input_data = 10 * np.random.random_sample(
(batch_size, sequence_length, hidden_size))
masked_position_data = np.random.randint(
2, size=(batch_size, num_predictions))
output_data = model.predict([lm_input_data, masked_position_data])
# Calculate per-example loss.
labels = np.random.randint(vocab_size, size=(batch_size, num_predictions))
per_example_loss_data = weighted_sparse_categorical_crossentropy.per_example_loss(
predictions=output_data, labels=labels)
# Per-example loss data should have one value per prediction, and those
# values shouldn't be zero in this case (as we're using random data).
expected_shape = [batch_size, num_predictions]
self.assertEqual(expected_shape, per_example_loss_data.shape.as_list())
self.assertNotAllClose(
tf.zeros_like(per_example_loss_data), per_example_loss_data)
def test_per_example_loss_2d_input(self):
"""Test per-example loss with a 2-d input, from a classifier."""
input_width = 512
num_classes = 10
model = self.create_classification_model(input_width, num_classes)
# Invoke the network as part of a Model.
batch_size = 3
input_data = 10 * np.random.random_sample((batch_size, input_width))
output_data = model.predict(input_data)
# Calculate per example loss.
labels = np.random.randint(num_classes, size=(batch_size))
per_example_loss_data = weighted_sparse_categorical_crossentropy.per_example_loss(
predictions=output_data, labels=labels)
# Per-example loss data should have one value per batch item, and those
# values shouldn't be zero in this case (as we're using random data).
self.assertEqual([batch_size], per_example_loss_data.shape.as_list())
self.assertNotAllClose(
tf.zeros_like(per_example_loss_data), per_example_loss_data)
def test_per_example_loss_weights_3d_input(self):
"""Test weighted per-example loss with a 3-d input, from a masked LM."""
vocab_size = 100
sequence_length = 32
hidden_size = 64
num_predictions = 21
model = self.create_lm_model(
vocab_size=vocab_size,
sequence_length=sequence_length,
hidden_size=hidden_size,
num_predictions=num_predictions)
# Get the output of the masked LM.
batch_size = 3
lm_input_data = 10 * np.random.random_sample(
(batch_size, sequence_length, hidden_size))
masked_position_data = np.random.randint(
2, size=(batch_size, num_predictions))
output_data = model.predict([lm_input_data, masked_position_data])
# Calculate per-example loss with weights.
labels = np.random.randint(vocab_size, size=(batch_size, num_predictions))
weights = np.random.randint(2, size=(batch_size, num_predictions))
per_example_loss_data = weighted_sparse_categorical_crossentropy.per_example_loss(
predictions=output_data, labels=labels, weights=weights)
# Weighted per-example loss data should be equivalent to multiplying the
# loss tensor by the weights tensor.
expected_weighted_loss = per_example_loss_data * weights
self.assertAllClose(expected_weighted_loss, per_example_loss_data)
def test_per_example_loss_weights_2d_input(self):
"""Test weighted per-example loss with a 2-d input, from a classifier."""
input_width = 512
num_classes = 10
model = self.create_classification_model(input_width, num_classes)
# Invoke the network as part of a Model.
batch_size = 3
input_data = 10 * np.random.random_sample((batch_size, input_width))
output_data = model.predict(input_data)
# Calculate per-example loss with weights.
labels = np.random.randint(num_classes, size=(batch_size))
weights = np.random.randint(2, size=(batch_size))
per_example_loss_data = weighted_sparse_categorical_crossentropy.per_example_loss(
predictions=output_data, labels=labels, weights=weights)
# Weighted per-example loss data should be equivalent to multiplying the
# loss tensor by the weights tensor.
expected_weighted_loss = per_example_loss_data * weights
self.assertAllClose(expected_weighted_loss, per_example_loss_data)
def test_loss_3d_input(self):
"""Test overall loss with a 3-dimensional input, from a masked LM."""
vocab_size = 100
sequence_length = 32
hidden_size = 64
num_predictions = 21
model = self.create_lm_model(
vocab_size=vocab_size,
sequence_length=sequence_length,
hidden_size=hidden_size,
num_predictions=num_predictions)
# Get the output of the masked LM.
batch_size = 3
lm_input_data = 10 * np.random.random_sample(
(batch_size, sequence_length, hidden_size))
masked_position_data = np.random.randint(
2, size=(batch_size, num_predictions))
output_data = model.predict([lm_input_data, masked_position_data])
# Calculate loss.
labels = np.random.randint(vocab_size, size=(batch_size, num_predictions))
weights = np.random.randint(2, size=(batch_size, num_predictions))
per_example_loss_data = weighted_sparse_categorical_crossentropy.loss(
predictions=output_data, labels=labels, weights=weights)
# Total loss data should have one value, and that value shouldn't be zero
# in this case (as we're using random data).
expected_shape = [] # Scalar
self.assertEqual(expected_shape, per_example_loss_data.shape.as_list())
self.assertNotAllClose(
tf.zeros_like(per_example_loss_data), per_example_loss_data)
def test_loss_2d_input(self):
"""Test overall loss with a 2-d input, from a classifier."""
input_width = 512
num_classes = 10
model = self.create_classification_model(input_width, num_classes)
# Invoke the network as part of a Model.
batch_size = 3
input_data = 10 * np.random.random_sample((batch_size, input_width))
output_data = model.predict(input_data)
# Calculate per example loss.
labels = np.random.randint(num_classes, size=(batch_size))
loss_data = weighted_sparse_categorical_crossentropy.loss(
predictions=output_data, labels=labels)
# Loss data should have one value only, and that value shouldn't be zero in
# this case (as we're using random data).
self.assertNotAllClose(0, loss_data)
def test_loss_weights_3d_input(self):
"""Test masked loss with a 3-dimensional input, from a masked LM."""
vocab_size = 100
sequence_length = 32
hidden_size = 64
num_predictions = 21
model = self.create_lm_model(
vocab_size=vocab_size,
sequence_length=sequence_length,
hidden_size=hidden_size,
num_predictions=num_predictions)
# Get the output of the masked LM.
batch_size = 3
lm_input_data = 10 * np.random.random_sample(
(batch_size, sequence_length, hidden_size))
masked_position_data = np.random.randint(
2, size=(batch_size, num_predictions))
output_data = model.predict([lm_input_data, masked_position_data])
# Calculate a fully masked weight tensor. This should give a loss of zero.
labels = np.random.randint(vocab_size, size=(batch_size, num_predictions))
null_weights = np.zeros((batch_size, num_predictions))
weighted_loss_data = weighted_sparse_categorical_crossentropy.loss(
predictions=output_data, labels=labels, weights=null_weights)
# Because the tensor is fully masked, the loss should be 0.
self.assertAllClose(0, weighted_loss_data)
def test_loss_weights_2d_input(self):
"""Test masked loss with a 2-d input, from a classifier."""
input_width = 512
num_classes = 10
model = self.create_classification_model(input_width, num_classes)
# Invoke the network as part of a Model.
batch_size = 3
input_data = 10 * np.random.random_sample((batch_size, input_width))
output_data = model.predict(input_data)
# Calculate a fully masked weight tensor. This should give a loss of zero.
labels = np.random.randint(num_classes, size=(batch_size))
null_weights = np.zeros((batch_size))
weighted_loss_data = weighted_sparse_categorical_crossentropy.loss(
predictions=output_data, labels=labels, weights=null_weights)
# Because the tensor is fully masked, the loss should be 0.
self.assertAllClose(0, weighted_loss_data)
def test_mismatched_predictions_and_labels_ranks_squeezes(self):
"""Test that the loss asserts when rank(predictions)-1 != rank(labels)."""
batch_size = 3
output_data = np.random.random_sample((batch_size, 10))
labels = np.random.randint(10, size=(batch_size, 1))
# All that this test tests is that the squeeze is successful.
_ = weighted_sparse_categorical_crossentropy.per_example_loss(
predictions=output_data, labels=labels)
def test_mismatched_weights_and_labels_ranks_fail(self):
"""Test that the loss asserts when rank(predictions) != rank(labels)."""
batch_size = 3
output_data = np.random.random_sample((batch_size, 10, 15))
labels = np.random.randint(10, size=(batch_size, 10))
weights = np.random.randint(2, size=(batch_size))
with self.assertRaisesRegex(RuntimeError, ".*of the same rank.*"):
_ = weighted_sparse_categorical_crossentropy.per_example_loss(
predictions=output_data, labels=labels, weights=weights)
with self.assertRaisesRegex(RuntimeError, ".*of the same rank.*"):
_ = weighted_sparse_categorical_crossentropy.loss(
predictions=output_data, labels=labels, weights=weights)
def test_tf_tensor_inputs(self):
"""Test that tf.Tensors can be used as inputs to the loss function."""
batch_size = 3
output_data = tf.convert_to_tensor(
np.random.random_sample((batch_size, 10, 15)))
labels = tf.convert_to_tensor(np.random.randint(10, size=(batch_size, 10)))
weights = tf.convert_to_tensor(np.random.randint(2, size=(batch_size, 10)))
# We're not trying to validate numerical correctness, just ensure that
# we can in fact pass tensors to these functions without causing runtime
# errors from the shape checking code.
_ = weighted_sparse_categorical_crossentropy.per_example_loss(
predictions=output_data, labels=labels, weights=weights)
_ = weighted_sparse_categorical_crossentropy.loss(
predictions=output_data, labels=labels, weights=weights)
def test_legacy_lm_loss_compatibility(self):
"""Test to validate computational correctness during refactors."""
# This is the empirical output of a masked LM with the following parameters:
# batch_size = 3
# vocab_size = 5
# sequence_length = 4
# num_predictions = 2
output_data = np.array(
[[[-2.5286622, -1.0963473, -1.4925185, -2.4451098, -1.2923571],
[-2.7117882, -1.1205841, -4.02187, -0.9966936, -1.5119683]],
[[-2.5379114, -0.82479054, -2.287932, -1.3747153, -2.053741],
[-2.5379114, -0.82479054, -2.287932, -1.3747153, -2.053741]],
[[-2.7760355, -1.8219438, -3.0924666, -1.0779881, -0.9407509],
[-2.7760355, -1.8219438, -3.0924666, -1.0779881, -0.9407509]]])
labels = np.array([[4, 0], [2, 2], [2, 1]])
# Validate that per_example loss calculations are the same.
per_example_loss_data = weighted_sparse_categorical_crossentropy.per_example_loss(
predictions=output_data, labels=labels)
expected_per_example_loss_data = [[1.2923571, 2.7117882],
[2.287932, 2.287932],
[3.0924666, 1.8219438]]
self.assertAllClose(expected_per_example_loss_data, per_example_loss_data)
# Validate that overall loss calculations are the same.
weights = np.array([[1, 0], [0, 0], [0, 0]])
loss_data = weighted_sparse_categorical_crossentropy.loss(
predictions=output_data, labels=labels, weights=weights)
expected_loss_data = 1.2923441
self.assertAllClose(expected_loss_data, loss_data)
def test_legacy_classification_loss_compatibility(self):
"""Test to validate computational correctness during refactors."""
# This is the empirical output of a classifier with the following params:
# batch_size = 2
# num_classes = 3
output_data = np.array([[-1.6094601e-03, -1.0966038e+01, -6.4434357e+00],
[-1.6975292e-03, -6.4009643e+00, -1.0226612e+01]])
labels = np.array([2, 1])
# Validate that per_example loss calculations are the same.
per_example_loss_data = weighted_sparse_categorical_crossentropy.per_example_loss(
predictions=output_data, labels=labels)
expected_per_example_loss_data = [6.4434357, 6.4009643]
self.assertAllClose(expected_per_example_loss_data, per_example_loss_data)
# Validate that overall loss calculations are the same.
weights = None
loss_data = weighted_sparse_categorical_crossentropy.loss(
predictions=output_data, labels=labels, weights=weights)
expected_loss_data = 6.4222
self.assertAllClose(expected_loss_data, loss_data)
if __name__ == "__main__":
tf.test.main()
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Networks package definition."""
from official.nlp.modeling.networks.classification import Classification
from official.nlp.modeling.networks.masked_lm import MaskedLM
from official.nlp.modeling.networks.span_labeling import SpanLabeling
from official.nlp.modeling.networks.transformer_encoder import TransformerEncoder
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Trainer network for BERT-style models."""
from __future__ import absolute_import
from __future__ import division
# from __future__ import google_type_annotations
from __future__ import print_function
import tensorflow as tf
from official.nlp.modeling import networks
@tf.keras.utils.register_keras_serializable(package='Text')
class BertClassifier(tf.keras.Model):
"""Classifier model based on a BERT-style transformer-based encoder.
This is an implementation of the network structure surrounding a transformer
encoder as described in "BERT: Pre-training of Deep Bidirectional Transformers
for Language Understanding" (https://arxiv.org/abs/1810.04805).
The BertClassifier allows a user to pass in a transformer stack, and
instantiates a classification network based on the passed `num_classes`
argument.
Attributes:
network: A transformer network. This network should output a sequence output
and a classification output. Furthermore, it should expose its embedding
table via a "get_embedding_table" method.
num_classes: Number of classes to predict from the classification network.
initializer: The initializer (if any) to use in the classification networks.
Defaults to a Glorot uniform initializer.
output: The output style for this network. Can be either 'logits' or
'predictions'.
"""
def __init__(self,
network,
num_classes,
initializer='glorot_uniform',
output='logits',
dropout_rate=0.1,
**kwargs):
self._self_setattr_tracking = False
self._config = {
'network': network,
'num_classes': num_classes,
'initializer': initializer,
'output': output,
}
# We want to use the inputs of the passed network as the inputs to this
# Model. To do this, we need to keep a handle to the network inputs for use
# when we construct the Model object at the end of init.
inputs = network.inputs
# Because we have a copy of inputs to create this Model object, we can
# invoke the Network object with its own input tensors to start the Model.
_, cls_output = network(inputs)
cls_output = tf.keras.layers.Dropout(rate=dropout_rate)(cls_output)
self.classifier = networks.Classification(
input_width=cls_output.shape[-1],
num_classes=num_classes,
initializer=initializer,
output=output,
name='classification')
predictions = self.classifier(cls_output)
super(BertClassifier, self).__init__(
inputs=inputs, outputs=predictions, **kwargs)
def get_config(self):
return self._config
@classmethod
def from_config(cls, config, custom_objects=None):
return cls(**config)
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for BERT trainer network."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow as tf
from tensorflow.python.keras import keras_parameterized # pylint: disable=g-direct-tensorflow-import
from official.nlp.modeling import networks
from official.nlp.modeling.networks import bert_classifier
# This decorator runs the test in V1, V2-Eager, and V2-Functional mode. It
# guarantees forward compatibility of this code for the V2 switchover.
@keras_parameterized.run_all_keras_modes
class BertClassifierTest(keras_parameterized.TestCase):
def test_bert_trainer(self):
"""Validate that the Keras object can be created."""
# Build a transformer network to use within the BERT trainer.
vocab_size = 100
sequence_length = 512
test_network = networks.TransformerEncoder(
vocab_size=vocab_size, num_layers=2, sequence_length=sequence_length)
# Create a BERT trainer with the created network.
num_classes = 3
bert_trainer_model = bert_classifier.BertClassifier(
test_network,
num_classes=num_classes)
# Create a set of 2-dimensional inputs (the first dimension is implicit).
word_ids = tf.keras.Input(shape=(sequence_length,), dtype=tf.int32)
mask = tf.keras.Input(shape=(sequence_length,), dtype=tf.int32)
type_ids = tf.keras.Input(shape=(sequence_length,), dtype=tf.int32)
# Invoke the trainer model on the inputs. This causes the layer to be built.
cls_outs = bert_trainer_model([word_ids, mask, type_ids])
# Validate that the outputs are of the expected shape.
expected_classification_shape = [None, num_classes]
self.assertAllEqual(expected_classification_shape, cls_outs.shape.as_list())
def test_bert_trainer_tensor_call(self):
"""Validate that the Keras object can be invoked."""
# Build a transformer network to use within the BERT trainer. (Here, we use
# a short sequence_length for convenience.)
test_network = networks.TransformerEncoder(
vocab_size=100, num_layers=2, sequence_length=2)
# Create a BERT trainer with the created network.
bert_trainer_model = bert_classifier.BertClassifier(
test_network, num_classes=2)
# Create a set of 2-dimensional data tensors to feed into the model.
word_ids = tf.constant([[1, 1], [2, 2]], dtype=tf.int32)
mask = tf.constant([[1, 1], [1, 0]], dtype=tf.int32)
type_ids = tf.constant([[1, 1], [2, 2]], dtype=tf.int32)
# Invoke the trainer model on the tensors. In Eager mode, this does the
# actual calculation. (We can't validate the outputs, since the network is
# too complex: this simply ensures we're not hitting runtime errors.)
_ = bert_trainer_model([word_ids, mask, type_ids])
def test_serialize_deserialize(self):
"""Validate that the BERT trainer can be serialized and deserialized."""
# Build a transformer network to use within the BERT trainer. (Here, we use
# a short sequence_length for convenience.)
test_network = networks.TransformerEncoder(
vocab_size=100, num_layers=2, sequence_length=5)
# Create a BERT trainer with the created network. (Note that all the args
# are different, so we can catch any serialization mismatches.)
bert_trainer_model = bert_classifier.BertClassifier(
test_network, num_classes=4, initializer='zeros', output='predictions')
# Create another BERT trainer via serialization and deserialization.
config = bert_trainer_model.get_config()
new_bert_trainer_model = bert_classifier.BertClassifier.from_config(config)
# Validate that the config can be forced to JSON.
_ = new_bert_trainer_model.to_json()
# If the serialization was successful, the new config should match the old.
self.assertAllEqual(bert_trainer_model.get_config(),
new_bert_trainer_model.get_config())
if __name__ == '__main__':
tf.test.main()
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Trainer network for BERT-style models."""
from __future__ import absolute_import
from __future__ import division
# from __future__ import google_type_annotations
from __future__ import print_function
import copy
import tensorflow as tf
from official.nlp.modeling import networks
@tf.keras.utils.register_keras_serializable(package='Text')
class BertPretrainer(tf.keras.Model):
"""BERT network training model.
This is an implementation of the network structure surrounding a transformer
encoder as described in "BERT: Pre-training of Deep Bidirectional Transformers
for Language Understanding" (https://arxiv.org/abs/1810.04805).
The BertTrainer allows a user to pass in a transformer stack, and instantiates
the masked language model and classification networks that are used to create
the training objectives.
Attributes:
network: A transformer network. This network should output a sequence output
and a classification output. Furthermore, it should expose its embedding
table via a "get_embedding_table" method.
num_classes: Number of classes to predict from the classification network.
num_token_predictions: Number of tokens to predict from the masked LM.
activation: The activation (if any) to use in the masked LM and
classification networks. If None, no activation will be used.
initializer: The initializer (if any) to use in the masked LM and
classification networks. Defaults to a Glorot uniform initializer.
output: The output style for this network. Can be either 'logits' or
'predictions'.
"""
def __init__(self,
network,
num_classes,
num_token_predictions,
activation=None,
output_activation=None,
initializer='glorot_uniform',
output='logits',
**kwargs):
self._self_setattr_tracking = False
self._config = {
'network': network,
'num_classes': num_classes,
'num_token_predictions': num_token_predictions,
'activation': activation,
'output_activation': output_activation,
'initializer': initializer,
'output': output,
}
# We want to use the inputs of the passed network as the inputs to this
# Model. To do this, we need to keep a copy of the network inputs for use
# when we construct the Model object at the end of init. (We keep a copy
# because we'll be adding another tensor to the copy later.)
network_inputs = network.inputs
inputs = copy.copy(network_inputs)
# Because we have a copy of inputs to create this Model object, we can
# invoke the Network object with its own input tensors to start the Model.
# Note that, because of how deferred construction happens, we can't use
# the copy of the list here - by the time the network is invoked, the list
# object contains the additional input added below.
sequence_output, cls_output = network(network_inputs)
sequence_output_length = sequence_output.shape.as_list()[1]
if sequence_output_length < num_token_predictions:
raise ValueError(
"The passed network's output length is %s, which is less than the "
'requested num_token_predictions %s.' %
(sequence_output_length, num_token_predictions))
masked_lm_positions = tf.keras.layers.Input(
shape=(num_token_predictions,),
name='masked_lm_positions',
dtype=tf.int32)
inputs.append(masked_lm_positions)
self.masked_lm = networks.MaskedLM(
num_predictions=num_token_predictions,
input_width=sequence_output.shape[-1],
source_network=network,
activation=activation,
initializer=initializer,
output=output,
name='masked_lm')
lm_outputs = self.masked_lm([sequence_output, masked_lm_positions])
self.classification = networks.Classification(
input_width=cls_output.shape[-1],
num_classes=num_classes,
initializer=initializer,
output=output,
name='classification')
sentence_outputs = self.classification(cls_output)
super(BertPretrainer, self).__init__(
inputs=inputs, outputs=[lm_outputs, sentence_outputs], **kwargs)
def get_config(self):
return self._config
@classmethod
def from_config(cls, config, custom_objects=None):
return cls(**config)
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for BERT trainer network."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow as tf
from tensorflow.python.keras import keras_parameterized # pylint: disable=g-direct-tensorflow-import
from official.nlp.modeling import networks
from official.nlp.modeling.networks import bert_pretrainer
# This decorator runs the test in V1, V2-Eager, and V2-Functional mode. It
# guarantees forward compatibility of this code for the V2 switchover.
@keras_parameterized.run_all_keras_modes
class BertPretrainerTest(keras_parameterized.TestCase):
def test_bert_trainer(self):
"""Validate that the Keras object can be created."""
# Build a transformer network to use within the BERT trainer.
vocab_size = 100
sequence_length = 512
test_network = networks.TransformerEncoder(
vocab_size=vocab_size, num_layers=2, sequence_length=sequence_length)
# Create a BERT trainer with the created network.
num_classes = 3
num_token_predictions = 2
bert_trainer_model = bert_pretrainer.BertPretrainer(
test_network,
num_classes=num_classes,
num_token_predictions=num_token_predictions)
# Create a set of 2-dimensional inputs (the first dimension is implicit).
word_ids = tf.keras.Input(shape=(sequence_length,), dtype=tf.int32)
mask = tf.keras.Input(shape=(sequence_length,), dtype=tf.int32)
type_ids = tf.keras.Input(shape=(sequence_length,), dtype=tf.int32)
lm_mask = tf.keras.Input(shape=(sequence_length,), dtype=tf.int32)
# Invoke the trainer model on the inputs. This causes the layer to be built.
lm_outs, cls_outs = bert_trainer_model([word_ids, mask, type_ids, lm_mask])
# Validate that the outputs are of the expected shape.
expected_lm_shape = [None, num_token_predictions, vocab_size]
expected_classification_shape = [None, num_classes]
self.assertAllEqual(expected_lm_shape, lm_outs.shape.as_list())
self.assertAllEqual(expected_classification_shape, cls_outs.shape.as_list())
def test_bert_trainer_tensor_call(self):
"""Validate that the Keras object can be invoked."""
# Build a transformer network to use within the BERT trainer. (Here, we use
# a short sequence_length for convenience.)
test_network = networks.TransformerEncoder(
vocab_size=100, num_layers=2, sequence_length=2)
# Create a BERT trainer with the created network.
bert_trainer_model = bert_pretrainer.BertPretrainer(
test_network, num_classes=2, num_token_predictions=2)
# Create a set of 2-dimensional data tensors to feed into the model.
word_ids = tf.constant([[1, 1], [2, 2]], dtype=tf.int32)
mask = tf.constant([[1, 1], [1, 0]], dtype=tf.int32)
type_ids = tf.constant([[1, 1], [2, 2]], dtype=tf.int32)
lm_mask = tf.constant([[1, 1], [1, 0]], dtype=tf.int32)
# Invoke the trainer model on the tensors. In Eager mode, this does the
# actual calculation. (We can't validate the outputs, since the network is
# too complex: this simply ensures we're not hitting runtime errors.)
_, _ = bert_trainer_model([word_ids, mask, type_ids, lm_mask])
def test_serialize_deserialize(self):
"""Validate that the BERT trainer can be serialized and deserialized."""
# Build a transformer network to use within the BERT trainer. (Here, we use
# a short sequence_length for convenience.)
test_network = networks.TransformerEncoder(
vocab_size=100, num_layers=2, sequence_length=5)
# Create a BERT trainer with the created network. (Note that all the args
# are different, so we can catch any serialization mismatches.)
bert_trainer_model = bert_pretrainer.BertPretrainer(
test_network, num_classes=4, num_token_predictions=3)
# Create another BERT trainer via serialization and deserialization.
config = bert_trainer_model.get_config()
new_bert_trainer_model = bert_pretrainer.BertPretrainer.from_config(config)
# Validate that the config can be forced to JSON.
_ = new_bert_trainer_model.to_json()
# If the serialization was successful, the new config should match the old.
self.assertAllEqual(bert_trainer_model.get_config(),
new_bert_trainer_model.get_config())
if __name__ == '__main__':
tf.test.main()
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Trainer network for BERT-style models."""
from __future__ import absolute_import
from __future__ import division
# from __future__ import google_type_annotations
from __future__ import print_function
import tensorflow as tf
from official.nlp.modeling import networks
@tf.keras.utils.register_keras_serializable(package='Text')
class BertSpanLabeler(tf.keras.Model):
"""Span labeler model based on a BERT-style transformer-based encoder.
This is an implementation of the network structure surrounding a transformer
encoder as described in "BERT: Pre-training of Deep Bidirectional Transformers
for Language Understanding" (https://arxiv.org/abs/1810.04805).
The BertSpanLabeler allows a user to pass in a transformer stack, and
instantiates a span labeling network based on a single dense layer.
Attributes:
network: A transformer network. This network should output a sequence output
and a classification output. Furthermore, it should expose its embedding
table via a "get_embedding_table" method.
initializer: The initializer (if any) to use in the span labeling network.
Defaults to a Glorot uniform initializer.
output: The output style for this network. Can be either 'logits' or
'predictions'.
"""
def __init__(self,
network,
initializer='glorot_uniform',
output='logits',
**kwargs):
self._self_setattr_tracking = False
self._config = {
'network': network,
'initializer': initializer,
'output': output,
}
# We want to use the inputs of the passed network as the inputs to this
# Model. To do this, we need to keep a handle to the network inputs for use
# when we construct the Model object at the end of init.
inputs = network.inputs
# Because we have a copy of inputs to create this Model object, we can
# invoke the Network object with its own input tensors to start the Model.
sequence_output, _ = network(inputs)
# This is an instance variable for ease of access to the underlying task
# network.
self.span_labeling = networks.SpanLabeling(
input_width=sequence_output.shape[-1],
initializer=initializer,
output=output,
name='span_labeling')
start_logits, end_logits = self.span_labeling(sequence_output)
# Use identity layers wrapped in lambdas to explicitly name the output
# tensors. This allows us to use string-keyed dicts in Keras fit/predict/
# evaluate calls.
start_logits = tf.keras.layers.Lambda(
tf.identity, name='start_positions')(
start_logits)
end_logits = tf.keras.layers.Lambda(
tf.identity, name='end_positions')(
end_logits)
logits = [start_logits, end_logits]
super(BertSpanLabeler, self).__init__(
inputs=inputs, outputs=logits, **kwargs)
def get_config(self):
return self._config
@classmethod
def from_config(cls, config, custom_objects=None):
return cls(**config)
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for BERT trainer network."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow as tf
from tensorflow.python.keras import keras_parameterized # pylint: disable=g-direct-tensorflow-import
from official.nlp.modeling import networks
from official.nlp.modeling.networks import bert_span_labeler
# This decorator runs the test in V1, V2-Eager, and V2-Functional mode. It
# guarantees forward compatibility of this code for the V2 switchover.
@keras_parameterized.run_all_keras_modes
class BertSpanLabelerTest(keras_parameterized.TestCase):
def test_bert_trainer(self):
"""Validate that the Keras object can be created."""
# Build a transformer network to use within the BERT trainer.
vocab_size = 100
sequence_length = 512
test_network = networks.TransformerEncoder(
vocab_size=vocab_size, num_layers=2, sequence_length=sequence_length)
# Create a BERT trainer with the created network.
bert_trainer_model = bert_span_labeler.BertSpanLabeler(test_network)
# Create a set of 2-dimensional inputs (the first dimension is implicit).
word_ids = tf.keras.Input(shape=(sequence_length,), dtype=tf.int32)
mask = tf.keras.Input(shape=(sequence_length,), dtype=tf.int32)
type_ids = tf.keras.Input(shape=(sequence_length,), dtype=tf.int32)
# Invoke the trainer model on the inputs. This causes the layer to be built.
cls_outs = bert_trainer_model([word_ids, mask, type_ids])
# Validate that there are 2 outputs are of the expected shape.
self.assertEqual(2, len(cls_outs))
expected_shape = [None, sequence_length]
for out in cls_outs:
self.assertAllEqual(expected_shape, out.shape.as_list())
def test_bert_trainer_named_compilation(self):
"""Validate compilation using explicit output names."""
# Build a transformer network to use within the BERT trainer.
vocab_size = 100
sequence_length = 512
test_network = networks.TransformerEncoder(
vocab_size=vocab_size, num_layers=2, sequence_length=sequence_length)
# Create a BERT trainer with the created network.
bert_trainer_model = bert_span_labeler.BertSpanLabeler(test_network)
# Attempt to compile the model using a string-keyed dict of output names to
# loss functions. This will validate that the outputs are named as we
# expect.
bert_trainer_model.compile(
optimizer='sgd',
loss={
'start_positions': 'mse',
'end_positions': 'mse'
})
def test_bert_trainer_tensor_call(self):
"""Validate that the Keras object can be invoked."""
# Build a transformer network to use within the BERT trainer. (Here, we use
# a short sequence_length for convenience.)
test_network = networks.TransformerEncoder(
vocab_size=100, num_layers=2, sequence_length=2)
# Create a BERT trainer with the created network.
bert_trainer_model = bert_span_labeler.BertSpanLabeler(test_network)
# Create a set of 2-dimensional data tensors to feed into the model.
word_ids = tf.constant([[1, 1], [2, 2]], dtype=tf.int32)
mask = tf.constant([[1, 1], [1, 0]], dtype=tf.int32)
type_ids = tf.constant([[1, 1], [2, 2]], dtype=tf.int32)
# Invoke the trainer model on the tensors. In Eager mode, this does the
# actual calculation. (We can't validate the outputs, since the network is
# too complex: this simply ensures we're not hitting runtime errors.)
_ = bert_trainer_model([word_ids, mask, type_ids])
def test_serialize_deserialize(self):
"""Validate that the BERT trainer can be serialized and deserialized."""
# Build a transformer network to use within the BERT trainer. (Here, we use
# a short sequence_length for convenience.)
test_network = networks.TransformerEncoder(
vocab_size=100, num_layers=2, sequence_length=5)
# Create a BERT trainer with the created network. (Note that all the args
# are different, so we can catch any serialization mismatches.)
bert_trainer_model = bert_span_labeler.BertSpanLabeler(test_network)
# Create another BERT trainer via serialization and deserialization.
config = bert_trainer_model.get_config()
new_bert_trainer_model = bert_span_labeler.BertSpanLabeler.from_config(
config)
# Validate that the config can be forced to JSON.
_ = new_bert_trainer_model.to_json()
# If the serialization was successful, the new config should match the old.
self.assertAllEqual(bert_trainer_model.get_config(),
new_bert_trainer_model.get_config())
if __name__ == '__main__':
tf.test.main()
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Classification network."""
from __future__ import absolute_import
from __future__ import division
# from __future__ import google_type_annotations
from __future__ import print_function
import tensorflow as tf
# pylint: disable=g-direct-tensorflow-import
from tensorflow.python.keras.engine import network
@tf.keras.utils.register_keras_serializable(package='Text')
class Classification(network.Network):
"""Classification network head for BERT modeling.
This network implements a simple classifier head based on a dense layer.
Attributes:
input_width: The innermost dimension of the input tensor to this network.
num_classes: The number of classes that this network should classify to.
activation: The activation, if any, for the dense layer in this network.
initializer: The intializer for the dense layer in this network. Defaults to
a Glorot uniform initializer.
output: The output style for this network. Can be either 'logits' or
'predictions'.
"""
def __init__(self,
input_width,
num_classes,
initializer='glorot_uniform',
output='logits',
**kwargs):
self._self_setattr_tracking = False
self._config_dict = {
'input_width': input_width,
'num_classes': num_classes,
'initializer': initializer,
'output': output,
}
cls_output = tf.keras.layers.Input(
shape=(input_width,), name='cls_output', dtype=tf.float32)
self.logits = tf.keras.layers.Dense(
num_classes,
activation=None,
kernel_initializer=initializer,
name='predictions/transform/logits')(
cls_output)
predictions = tf.keras.layers.Activation(tf.nn.log_softmax)(self.logits)
if output == 'logits':
output_tensors = self.logits
elif output == 'predictions':
output_tensors = predictions
else:
raise ValueError(
('Unknown `output` value "%s". `output` can be either "logits" or '
'"predictions"') % output)
super(Classification, self).__init__(
inputs=[cls_output], outputs=output_tensors, **kwargs)
def get_config(self):
return self._config_dict
@classmethod
def from_config(cls, config, custom_objects=None):
return cls(**config)
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for classification network."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import tensorflow as tf
from tensorflow.python.keras import keras_parameterized # pylint: disable=g-direct-tensorflow-import
from official.nlp.modeling.networks import classification
# This decorator runs the test in V1, V2-Eager, and V2-Functional mode. It
# guarantees forward compatibility of this code for the V2 switchover.
@keras_parameterized.run_all_keras_modes
class ClassificationTest(keras_parameterized.TestCase):
def test_network_creation(self):
"""Validate that the Keras object can be created."""
input_width = 512
num_classes = 10
test_object = classification.Classification(
input_width=input_width, num_classes=num_classes)
# Create a 2-dimensional input (the first dimension is implicit).
cls_data = tf.keras.Input(shape=(input_width,), dtype=tf.float32)
output = test_object(cls_data)
# Validate that the outputs are of the expected shape.
expected_output_shape = [None, num_classes]
self.assertEqual(expected_output_shape, output.shape.as_list())
def test_network_invocation(self):
"""Validate that the Keras object can be invoked."""
input_width = 512
num_classes = 10
test_object = classification.Classification(
input_width=input_width, num_classes=num_classes, output='predictions')
# Create a 2-dimensional input (the first dimension is implicit).
cls_data = tf.keras.Input(shape=(input_width,), dtype=tf.float32)
output = test_object(cls_data)
# Invoke the network as part of a Model.
model = tf.keras.Model(cls_data, output)
input_data = 10 * np.random.random_sample((3, input_width))
_ = model.predict(input_data)
def test_network_invocation_with_internal_logits(self):
"""Validate that the logit outputs are correct."""
input_width = 512
num_classes = 10
test_object = classification.Classification(
input_width=input_width, num_classes=num_classes, output='predictions')
# Create a 2-dimensional input (the first dimension is implicit).
cls_data = tf.keras.Input(shape=(input_width,), dtype=tf.float32)
output = test_object(cls_data)
model = tf.keras.Model(cls_data, output)
logits_model = tf.keras.Model(test_object.inputs, test_object.logits)
batch_size = 3
input_data = 10 * np.random.random_sample((batch_size, input_width))
outputs = model.predict(input_data)
logits = logits_model.predict(input_data)
# Ensure that the tensor shapes are correct.
expected_output_shape = (batch_size, num_classes)
self.assertEqual(expected_output_shape, outputs.shape)
self.assertEqual(expected_output_shape, logits.shape)
# Ensure that the logits, when softmaxed, create the outputs.
input_tensor = tf.keras.Input(expected_output_shape[1:])
output_tensor = tf.keras.layers.Activation(tf.nn.log_softmax)(input_tensor)
softmax_model = tf.keras.Model(input_tensor, output_tensor)
calculated_softmax = softmax_model.predict(logits)
self.assertAllClose(outputs, calculated_softmax)
def test_network_invocation_with_internal_and_external_logits(self):
"""Validate that the logit outputs are correct."""
input_width = 512
num_classes = 10
test_object = classification.Classification(
input_width=input_width, num_classes=num_classes, output='logits')
# Create a 2-dimensional input (the first dimension is implicit).
cls_data = tf.keras.Input(shape=(input_width,), dtype=tf.float32)
output = test_object(cls_data)
model = tf.keras.Model(cls_data, output)
logits_model = tf.keras.Model(test_object.inputs, test_object.logits)
batch_size = 3
input_data = 10 * np.random.random_sample((batch_size, input_width))
outputs = model.predict(input_data)
logits = logits_model.predict(input_data)
# Ensure that the tensor shapes are correct.
expected_output_shape = (batch_size, num_classes)
self.assertEqual(expected_output_shape, outputs.shape)
self.assertEqual(expected_output_shape, logits.shape)
self.assertAllClose(outputs, logits)
def test_network_invocation_with_logit_output(self):
"""Validate that the logit outputs are correct."""
input_width = 512
num_classes = 10
test_object = classification.Classification(
input_width=input_width, num_classes=num_classes, output='predictions')
logit_object = classification.Classification(
input_width=input_width, num_classes=num_classes, output='logits')
logit_object.set_weights(test_object.get_weights())
# Create a 2-dimensional input (the first dimension is implicit).
cls_data = tf.keras.Input(shape=(input_width,), dtype=tf.float32)
output = test_object(cls_data)
logit_output = logit_object(cls_data)
model = tf.keras.Model(cls_data, output)
logits_model = tf.keras.Model(cls_data, logit_output)
batch_size = 3
input_data = 10 * np.random.random_sample((batch_size, input_width))
outputs = model.predict(input_data)
logits = logits_model.predict(input_data)
# Ensure that the tensor shapes are correct.
expected_output_shape = (batch_size, num_classes)
self.assertEqual(expected_output_shape, outputs.shape)
self.assertEqual(expected_output_shape, logits.shape)
# Ensure that the logits, when softmaxed, create the outputs.
input_tensor = tf.keras.Input(expected_output_shape[1:])
output_tensor = tf.keras.layers.Activation(tf.nn.log_softmax)(input_tensor)
softmax_model = tf.keras.Model(input_tensor, output_tensor)
calculated_softmax = softmax_model.predict(logits)
self.assertAllClose(outputs, calculated_softmax)
def test_serialize_deserialize(self):
# Create a network object that sets all of its config options.
network = classification.Classification(
input_width=128,
num_classes=10,
initializer='zeros',
output='predictions')
# Create another network object from the first object's config.
new_network = classification.Classification.from_config(
network.get_config())
# Validate that the config can be forced to JSON.
_ = new_network.to_json()
# If the serialization was successful, the new config should match the old.
self.assertAllEqual(network.get_config(), new_network.get_config())
def test_unknown_output_type_fails(self):
with self.assertRaisesRegex(ValueError, 'Unknown `output` value "bad".*'):
_ = classification.Classification(
input_width=128, num_classes=10, output='bad')
if __name__ == '__main__':
tf.test.main()
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Masked language model network."""
from __future__ import absolute_import
from __future__ import division
# from __future__ import google_type_annotations
from __future__ import print_function
import tensorflow as tf
from tensorflow.python.keras.engine import network # pylint: disable=g-direct-tensorflow-import
from official.modeling import tf_utils
@tf.keras.utils.register_keras_serializable(package='Text')
class MaskedLM(network.Network):
"""Masked language model network head for BERT modeling.
This network implements a masked language model based on the provided network.
It assumes that the network being passed has a "get_embedding_table()" method.
Attributes:
input_width: The innermost dimension of the input tensor to this network.
num_predictions: The number of predictions to make per sequence.
source_network: The network with the embedding layer to use for the
embedding layer.
activation: The activation, if any, for the dense layer in this network.
initializer: The intializer for the dense layer in this network. Defaults to
a Glorot uniform initializer.
output: The output style for this network. Can be either 'logits' or
'predictions'.
"""
def __init__(self,
input_width,
num_predictions,
source_network,
activation=None,
initializer='glorot_uniform',
output='logits',
**kwargs):
embedding_table = source_network.get_embedding_table()
vocab_size, hidden_size = embedding_table.shape
sequence_data = tf.keras.layers.Input(
shape=(None, input_width), name='sequence_data', dtype=tf.float32)
masked_lm_positions = tf.keras.layers.Input(
shape=(num_predictions,), name='masked_lm_positions', dtype=tf.int32)
masked_lm_input = tf.keras.layers.Lambda(
lambda x: self._gather_indexes(x[0], x[1]))(
[sequence_data, masked_lm_positions])
lm_data = (
tf.keras.layers.Dense(
hidden_size,
activation=activation,
kernel_initializer=initializer,
name='cls/predictions/transform/dense')(masked_lm_input))
lm_data = tf.keras.layers.LayerNormalization(
axis=-1, epsilon=1e-12, name='cls/predictions/transform/LayerNorm')(
lm_data)
lm_data = tf.keras.layers.Lambda(
lambda x: tf.matmul(x, embedding_table, transpose_b=True))(
lm_data)
logits = Bias(
initializer=tf.keras.initializers.Zeros(),
name='cls/predictions/output_bias')(
lm_data)
# We can't use the standard Keras reshape layer here, since it expects
# the input and output batch size to be the same.
reshape_layer = tf.keras.layers.Lambda(
lambda x: tf.reshape(x, [-1, num_predictions, vocab_size]))
self.logits = reshape_layer(logits)
predictions = tf.keras.layers.Activation(tf.nn.log_softmax)(self.logits)
if output == 'logits':
output_tensors = self.logits
elif output == 'predictions':
output_tensors = predictions
else:
raise ValueError(
('Unknown `output` value "%s". `output` can be either "logits" or '
'"predictions"') % output)
super(MaskedLM, self).__init__(
inputs=[sequence_data, masked_lm_positions],
outputs=output_tensors,
**kwargs)
def get_config(self):
raise NotImplementedError('MaskedLM cannot be directly serialized at this '
'time. Please use it only in Layers or '
'functionally subclassed Models/Networks.')
def _gather_indexes(self, sequence_tensor, positions):
"""Gathers the vectors at the specific positions.
Args:
sequence_tensor: Sequence output of `BertModel` layer of shape
(`batch_size`, `seq_length`, num_hidden) where num_hidden is number of
hidden units of `BertModel` layer.
positions: Positions ids of tokens in sequence to mask for pretraining
of with dimension (batch_size, num_predictions) where
`num_predictions` is maximum number of tokens to mask out and predict
per each sequence.
Returns:
Masked out sequence tensor of shape (batch_size * num_predictions,
num_hidden).
"""
sequence_shape = tf_utils.get_shape_list(
sequence_tensor, name='sequence_output_tensor')
batch_size, seq_length, width = sequence_shape
flat_offsets = tf.keras.backend.reshape(
tf.range(0, batch_size, dtype=tf.int32) * seq_length, [-1, 1])
flat_positions = tf.keras.backend.reshape(positions + flat_offsets, [-1])
flat_sequence_tensor = tf.keras.backend.reshape(
sequence_tensor, [batch_size * seq_length, width])
output_tensor = tf.gather(flat_sequence_tensor, flat_positions)
return output_tensor
@tf.keras.utils.register_keras_serializable(package='Text')
# Temporary until we can create a Dense layer that ties the embedding.
class Bias(tf.keras.layers.Layer):
"""Adds a bias term to an input."""
def __init__(self,
initializer='zeros',
regularizer=None,
constraint=None,
activation=None,
**kwargs):
super(Bias, self).__init__(**kwargs)
self._initializer = tf.keras.initializers.get(initializer)
self._regularizer = tf.keras.regularizers.get(regularizer)
self._constraint = tf.keras.constraints.get(constraint)
self._activation = tf.keras.activations.get(activation)
def build(self, input_shape):
input_shape = tf.TensorShape(input_shape)
self._bias = self.add_weight(
'bias',
shape=input_shape[1:],
initializer=self._initializer,
regularizer=self._regularizer,
constraint=self._constraint,
dtype=self._dtype,
trainable=True)
super(Bias, self).build(input_shape)
def compute_output_shape(self, input_shape):
return input_shape
def get_config(self):
config = {
'activation': tf.keras.activations.serialize(self._activation),
'initializer': tf.keras.initializers.serialize(self._initializer),
'regularizer': tf.keras.regularizers.serialize(self._regularizer),
'constraint': tf.keras.constraints.serialize(self._constraint)
}
base_config = super(Bias, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
def call(self, inputs):
outputs = tf.nn.bias_add(inputs, self._bias)
if self._activation is not None:
return self._activation(outputs) # pylint: disable=not-callable
else:
return outputs
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for masked language model network."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import tensorflow as tf
from tensorflow.python.keras import keras_parameterized # pylint: disable=g-direct-tensorflow-import
from official.nlp.modeling.networks import masked_lm
from official.nlp.modeling.networks import transformer_encoder
# This decorator runs the test in V1, V2-Eager, and V2-Functional mode. It
# guarantees forward compatibility of this code for the V2 switchover.
@keras_parameterized.run_all_keras_modes
class MaskedLMTest(keras_parameterized.TestCase):
def create_network(self,
vocab_size,
sequence_length,
hidden_size,
num_predictions,
output='predictions',
xformer_stack=None):
# First, create a transformer stack that we can use to get the LM's
# vocabulary weight.
if xformer_stack is None:
xformer_stack = transformer_encoder.TransformerEncoder(
vocab_size=vocab_size,
num_layers=1,
sequence_length=sequence_length,
hidden_size=hidden_size,
num_attention_heads=4,
)
word_ids = tf.keras.Input(shape=(sequence_length,), dtype=tf.int32)
mask = tf.keras.Input(shape=(sequence_length,), dtype=tf.int32)
type_ids = tf.keras.Input(shape=(sequence_length,), dtype=tf.int32)
lm_outputs, _ = xformer_stack([word_ids, mask, type_ids])
# Create a maskedLM from the transformer stack.
test_network = masked_lm.MaskedLM(
num_predictions=num_predictions,
input_width=lm_outputs.shape[-1],
source_network=xformer_stack,
output=output)
return test_network
def test_network_creation(self):
vocab_size = 100
sequence_length = 32
hidden_size = 64
num_predictions = 21
test_network = self.create_network(
vocab_size=vocab_size,
sequence_length=sequence_length,
hidden_size=hidden_size,
num_predictions=num_predictions)
# Make sure that the output tensor of the masked LM is the right shape.
lm_input_tensor = tf.keras.Input(shape=(sequence_length, hidden_size))
masked_lm_positions = tf.keras.Input(
shape=(num_predictions,), dtype=tf.int32)
output = test_network([lm_input_tensor, masked_lm_positions])
expected_output_shape = [None, num_predictions, vocab_size]
self.assertEqual(expected_output_shape, output.shape.as_list())
def test_network_invocation_with_internal_logits(self):
vocab_size = 100
sequence_length = 32
hidden_size = 64
num_predictions = 21
test_network = self.create_network(
vocab_size=vocab_size,
sequence_length=sequence_length,
hidden_size=hidden_size,
num_predictions=num_predictions)
# Create a model from the masked LM layer.
lm_input_tensor = tf.keras.Input(shape=(sequence_length, hidden_size))
masked_lm_positions = tf.keras.Input(
shape=(num_predictions,), dtype=tf.int32)
output = test_network([lm_input_tensor, masked_lm_positions])
model = tf.keras.Model([lm_input_tensor, masked_lm_positions], output)
logits_model = tf.keras.Model(test_network.inputs, test_network.logits)
# Invoke the masked LM on some fake data to make sure there are no runtime
# errors in the code.
batch_size = 3
lm_input_data = 10 * np.random.random_sample(
(batch_size, sequence_length, hidden_size))
masked_position_data = np.random.randint(
2, size=(batch_size, num_predictions))
outputs = model.predict([lm_input_data, masked_position_data])
logits = logits_model.predict([lm_input_data, masked_position_data])
# Ensure that the tensor shapes are correct.
expected_output_shape = (batch_size, num_predictions, vocab_size)
self.assertEqual(expected_output_shape, outputs.shape)
self.assertEqual(expected_output_shape, logits.shape)
# Ensure that the logits, when softmaxed, create the outputs.
input_tensor = tf.keras.Input(expected_output_shape[1:])
output_tensor = tf.keras.layers.Activation(tf.nn.log_softmax)(input_tensor)
softmax_model = tf.keras.Model(input_tensor, output_tensor)
calculated_softmax = softmax_model.predict(logits)
self.assertAllClose(outputs, calculated_softmax)
def test_network_invocation_with_external_logits(self):
vocab_size = 100
sequence_length = 32
hidden_size = 64
num_predictions = 21
xformer_stack = transformer_encoder.TransformerEncoder(
vocab_size=vocab_size,
num_layers=1,
sequence_length=sequence_length,
hidden_size=hidden_size,
num_attention_heads=4,
)
test_network = self.create_network(
vocab_size=vocab_size,
sequence_length=sequence_length,
hidden_size=hidden_size,
num_predictions=num_predictions,
xformer_stack=xformer_stack,
output='predictions')
logit_network = self.create_network(
vocab_size=vocab_size,
sequence_length=sequence_length,
hidden_size=hidden_size,
num_predictions=num_predictions,
xformer_stack=xformer_stack,
output='logits')
logit_network.set_weights(test_network.get_weights())
# Create a model from the masked LM layer.
lm_input_tensor = tf.keras.Input(shape=(sequence_length, hidden_size))
masked_lm_positions = tf.keras.Input(
shape=(num_predictions,), dtype=tf.int32)
output = test_network([lm_input_tensor, masked_lm_positions])
logit_output = logit_network([lm_input_tensor, masked_lm_positions])
model = tf.keras.Model([lm_input_tensor, masked_lm_positions], output)
logits_model = tf.keras.Model(([lm_input_tensor, masked_lm_positions]),
logit_output)
# Invoke the masked LM on some fake data to make sure there are no runtime
# errors in the code.
batch_size = 3
lm_input_data = 10 * np.random.random_sample(
(batch_size, sequence_length, hidden_size))
masked_position_data = np.random.randint(
2, size=(batch_size, num_predictions))
outputs = model.predict([lm_input_data, masked_position_data])
logits = logits_model.predict([lm_input_data, masked_position_data])
# Ensure that the tensor shapes are correct.
expected_output_shape = (batch_size, num_predictions, vocab_size)
self.assertEqual(expected_output_shape, outputs.shape)
self.assertEqual(expected_output_shape, logits.shape)
# Ensure that the logits, when softmaxed, create the outputs.
input_tensor = tf.keras.Input(expected_output_shape[1:])
output_tensor = tf.keras.layers.Activation(tf.nn.log_softmax)(input_tensor)
softmax_model = tf.keras.Model(input_tensor, output_tensor)
calculated_softmax = softmax_model.predict(logits)
self.assertAllClose(outputs, calculated_softmax)
def test_network_invocation(self):
vocab_size = 100
sequence_length = 32
hidden_size = 64
num_predictions = 21
test_network = self.create_network(
vocab_size=vocab_size,
sequence_length=sequence_length,
hidden_size=hidden_size,
num_predictions=num_predictions)
# Create a model from the masked LM layer.
lm_input_tensor = tf.keras.Input(shape=(sequence_length, hidden_size))
masked_lm_positions = tf.keras.Input(
shape=(num_predictions,), dtype=tf.int32)
output = test_network([lm_input_tensor, masked_lm_positions])
model = tf.keras.Model([lm_input_tensor, masked_lm_positions], output)
# Invoke the masked LM on some fake data to make sure there are no runtime
# errors in the code.
batch_size = 3
lm_input_data = 10 * np.random.random_sample(
(batch_size, sequence_length, hidden_size))
masked_position_data = np.random.randint(
2, size=(batch_size, num_predictions))
_ = model.predict([lm_input_data, masked_position_data])
def test_unknown_output_type_fails(self):
with self.assertRaisesRegex(ValueError, 'Unknown `output` value "bad".*'):
_ = self.create_network(
vocab_size=8,
sequence_length=8,
hidden_size=8,
num_predictions=8,
output='bad')
if __name__ == '__main__':
tf.test.main()
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Span labeling network."""
from __future__ import absolute_import
from __future__ import division
# from __future__ import google_type_annotations
from __future__ import print_function
import tensorflow as tf
# pylint: disable=g-direct-tensorflow-import
from tensorflow.python.keras.engine import network
@tf.keras.utils.register_keras_serializable(package='Text')
class SpanLabeling(network.Network):
"""Span labeling network head for BERT modeling.
This network implements a simple single-span labeler based on a dense layer.
Attributes:
input_width: The innermost dimension of the input tensor to this network.
activation: The activation, if any, for the dense layer in this network.
initializer: The intializer for the dense layer in this network. Defaults to
a Glorot uniform initializer.
output: The output style for this network. Can be either 'logits' or
'predictions'.
"""
def __init__(self,
input_width,
activation=None,
initializer='glorot_uniform',
output='logits',
**kwargs):
self._self_setattr_tracking = False
self._config = {
'input_width': input_width,
'activation': activation,
'initializer': initializer,
'output': output,
}
sequence_data = tf.keras.layers.Input(
shape=(None, input_width), name='sequence_data', dtype=tf.float32)
time_distributed_dense = tf.keras.layers.TimeDistributed(
tf.keras.layers.Dense(
2, # This layer predicts start location and end location.
activation=activation,
kernel_initializer=initializer,
name='predictions/transform/logits'))
intermediate_logits = time_distributed_dense(sequence_data)
self.start_logits, self.end_logits = (
tf.keras.layers.Lambda(self._split_output_tensor)(intermediate_logits))
start_predictions = tf.keras.layers.Activation(tf.nn.log_softmax)(
self.start_logits)
end_predictions = tf.keras.layers.Activation(tf.nn.log_softmax)(
self.end_logits)
if output == 'logits':
output_tensors = [self.start_logits, self.end_logits]
elif output == 'predictions':
output_tensors = [start_predictions, end_predictions]
else:
raise ValueError(
('Unknown `output` value "%s". `output` can be either "logits" or '
'"predictions"') % output)
super(SpanLabeling, self).__init__(
inputs=[sequence_data], outputs=output_tensors, **kwargs)
def _split_output_tensor(self, tensor):
transposed_tensor = tf.transpose(tensor, [2, 0, 1])
return tf.unstack(transposed_tensor)
def get_config(self):
return self._config
@classmethod
def from_config(cls, config, custom_objects=None):
return cls(**config)
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for span_labeling network."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import tensorflow as tf
from tensorflow.python.keras import keras_parameterized # pylint: disable=g-direct-tensorflow-import
from official.nlp.modeling.networks import span_labeling
# This decorator runs the test in V1, V2-Eager, and V2-Functional mode. It
# guarantees forward compatibility of this code for the V2 switchover.
@keras_parameterized.run_all_keras_modes
class SpanLabelingTest(keras_parameterized.TestCase):
def test_network_creation(self):
"""Validate that the Keras object can be created."""
sequence_length = 15
input_width = 512
test_network = span_labeling.SpanLabeling(
input_width=input_width, output='predictions')
# Create a 3-dimensional input (the first dimension is implicit).
sequence_data = tf.keras.Input(
shape=(sequence_length, input_width), dtype=tf.float32)
start_outputs, end_outputs = test_network(sequence_data)
# Validate that the outputs are of the expected shape.
expected_output_shape = [None, sequence_length]
self.assertEqual(expected_output_shape, start_outputs.shape.as_list())
self.assertEqual(expected_output_shape, end_outputs.shape.as_list())
def test_network_invocation(self):
"""Validate that the Keras object can be invoked."""
sequence_length = 15
input_width = 512
test_network = span_labeling.SpanLabeling(input_width=input_width)
# Create a 3-dimensional input (the first dimension is implicit).
sequence_data = tf.keras.Input(
shape=(sequence_length, input_width), dtype=tf.float32)
outputs = test_network(sequence_data)
model = tf.keras.Model(sequence_data, outputs)
# Invoke the network as part of a Model.
batch_size = 3
input_data = 10 * np.random.random_sample(
(batch_size, sequence_length, input_width))
start_outputs, end_outputs = model.predict(input_data)
# Validate that the outputs are of the expected shape.
expected_output_shape = (batch_size, sequence_length)
self.assertEqual(expected_output_shape, start_outputs.shape)
self.assertEqual(expected_output_shape, end_outputs.shape)
def test_network_invocation_with_internal_logit_output(self):
"""Validate that the logit outputs are correct."""
sequence_length = 15
input_width = 512
test_network = span_labeling.SpanLabeling(
input_width=input_width, output='predictions')
# Create a 3-dimensional input (the first dimension is implicit).
sequence_data = tf.keras.Input(
shape=(sequence_length, input_width), dtype=tf.float32)
output = test_network(sequence_data)
model = tf.keras.Model(sequence_data, output)
logit_model = tf.keras.Model(
test_network.inputs,
[test_network.start_logits, test_network.end_logits])
batch_size = 3
input_data = 10 * np.random.random_sample(
(batch_size, sequence_length, input_width))
start_outputs, end_outputs = model.predict(input_data)
start_logits, end_logits = logit_model.predict(input_data)
# Ensure that the tensor shapes are correct.
expected_output_shape = (batch_size, sequence_length)
self.assertEqual(expected_output_shape, start_outputs.shape)
self.assertEqual(expected_output_shape, end_outputs.shape)
self.assertEqual(expected_output_shape, start_logits.shape)
self.assertEqual(expected_output_shape, end_logits.shape)
# Ensure that the logits, when softmaxed, create the outputs.
input_tensor = tf.keras.Input(expected_output_shape[1:])
output_tensor = tf.keras.layers.Activation(tf.nn.log_softmax)(input_tensor)
softmax_model = tf.keras.Model(input_tensor, output_tensor)
start_softmax = softmax_model.predict(start_logits)
self.assertAllClose(start_outputs, start_softmax)
end_softmax = softmax_model.predict(end_logits)
self.assertAllClose(end_outputs, end_softmax)
def test_network_invocation_with_external_logit_output(self):
"""Validate that the logit outputs are correct."""
sequence_length = 15
input_width = 512
test_network = span_labeling.SpanLabeling(
input_width=input_width, output='predictions')
logit_network = span_labeling.SpanLabeling(
input_width=input_width, output='logits')
logit_network.set_weights(test_network.get_weights())
# Create a 3-dimensional input (the first dimension is implicit).
sequence_data = tf.keras.Input(
shape=(sequence_length, input_width), dtype=tf.float32)
output = test_network(sequence_data)
logit_output = logit_network(sequence_data)
model = tf.keras.Model(sequence_data, output)
logit_model = tf.keras.Model(sequence_data, logit_output)
batch_size = 3
input_data = 10 * np.random.random_sample(
(batch_size, sequence_length, input_width))
start_outputs, end_outputs = model.predict(input_data)
start_logits, end_logits = logit_model.predict(input_data)
# Ensure that the tensor shapes are correct.
expected_output_shape = (batch_size, sequence_length)
self.assertEqual(expected_output_shape, start_outputs.shape)
self.assertEqual(expected_output_shape, end_outputs.shape)
self.assertEqual(expected_output_shape, start_logits.shape)
self.assertEqual(expected_output_shape, end_logits.shape)
# Ensure that the logits, when softmaxed, create the outputs.
input_tensor = tf.keras.Input(expected_output_shape[1:])
output_tensor = tf.keras.layers.Activation(tf.nn.log_softmax)(input_tensor)
softmax_model = tf.keras.Model(input_tensor, output_tensor)
start_softmax = softmax_model.predict(start_logits)
self.assertAllClose(start_outputs, start_softmax)
end_softmax = softmax_model.predict(end_logits)
self.assertAllClose(end_outputs, end_softmax)
def test_serialize_deserialize(self):
# Create a network object that sets all of its config options.
network = span_labeling.SpanLabeling(
input_width=128,
activation='relu',
initializer='zeros',
output='predictions')
# Create another network object from the first object's config.
new_network = span_labeling.SpanLabeling.from_config(network.get_config())
# Validate that the config can be forced to JSON.
_ = new_network.to_json()
# If the serialization was successful, the new config should match the old.
self.assertAllEqual(network.get_config(), new_network.get_config())
def test_unknown_output_type_fails(self):
with self.assertRaisesRegex(ValueError, 'Unknown `output` value "bad".*'):
_ = span_labeling.SpanLabeling(input_width=10, output='bad')
if __name__ == '__main__':
tf.test.main()
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Transformer-based text encoder network."""
from __future__ import absolute_import
from __future__ import division
# from __future__ import google_type_annotations
from __future__ import print_function
import tensorflow as tf
from tensorflow.python.keras.engine import network # pylint: disable=g-direct-tensorflow-import
from official.modeling import activations
from official.nlp import bert_modeling
from official.nlp.modeling import layers
@tf.keras.utils.register_keras_serializable(package='Text')
class TransformerEncoder(network.Network):
"""Bi-directional Transformer-based encoder network.
This network implements a bi-directional Transformer-based encoder as
described in "BERT: Pre-training of Deep Bidirectional Transformers for
Language Understanding" (https://arxiv.org/abs/1810.04805). It includes the
embedding lookups and transformer layers, but not the masked language model
or classification task networks.
The default values for this object are taken from the BERT-Base implementation
in "BERT: Pre-training of Deep Bidirectional Transformers for Language
Understanding".
Attributes:
vocab_size: The size of the token vocabulary.
hidden_size: The size of the transformer hidden layers.
num_layers: The number of transformer layers.
num_attention_heads: The number of attention heads for each transformer. The
hidden size must be divisible by the number of attention heads.
sequence_length: The sequence length that this encoder expects. If None, the
sequence length is dynamic; if an integer, the encoder will require
sequences padded to this length.
max_sequence_length: The maximum sequence length that this encoder can
consume. If None, max_sequence_length uses the value from sequence length.
This determines the variable shape for positional embeddings.
type_vocab_size: The number of types that the 'type_ids' input can take.
intermediate_size: The intermediate size for the transformer layers.
activation: The activation to use for the transformer layers.
dropout_rate: The dropout rate to use for the transformer layers.
attention_dropout_rate: The dropout rate to use for the attention layers
within the transformer layers.
initializer: The initialzer to use for all weights in this encoder.
float_dtype: The dtype of this encoder. Can be 'float32' or 'float16'.
"""
def __init__(self,
vocab_size,
hidden_size=768,
num_layers=12,
num_attention_heads=12,
sequence_length=512,
max_sequence_length=None,
type_vocab_size=16,
intermediate_size=3072,
activation=activations.gelu,
dropout_rate=0.1,
attention_dropout_rate=0.1,
initializer=tf.keras.initializers.TruncatedNormal(stddev=0.02),
float_dtype='float32',
**kwargs):
activation = tf.keras.activations.get(activation)
initializer = tf.keras.initializers.get(initializer)
if not max_sequence_length:
max_sequence_length = sequence_length
self._self_setattr_tracking = False
self._config_dict = {
'vocab_size': vocab_size,
'hidden_size': hidden_size,
'num_layers': num_layers,
'num_attention_heads': num_attention_heads,
'sequence_length': sequence_length,
'max_sequence_length': max_sequence_length,
'type_vocab_size': type_vocab_size,
'intermediate_size': intermediate_size,
'activation': tf.keras.activations.serialize(activation),
'dropout_rate': dropout_rate,
'attention_dropout_rate': attention_dropout_rate,
'initializer': tf.keras.initializers.serialize(initializer),
'float_dtype': float_dtype,
}
word_ids = tf.keras.layers.Input(
shape=(sequence_length,), dtype=tf.int32, name='input_word_ids')
mask = tf.keras.layers.Input(
shape=(sequence_length,), dtype=tf.int32, name='input_mask')
type_ids = tf.keras.layers.Input(
shape=(sequence_length,), dtype=tf.int32, name='input_type_ids')
self._embedding_layer = layers.OnDeviceEmbedding(
vocab_size=vocab_size,
embedding_width=hidden_size,
initializer=initializer,
dtype=float_dtype,
name='word_embeddings')
word_embeddings = self._embedding_layer(word_ids)
# Always uses dynamic slicing for simplicity.
self._position_embedding_layer = layers.PositionEmbedding(
initializer=initializer,
use_dynamic_slicing=True,
max_sequence_length=max_sequence_length,
dtype=float_dtype)
position_embeddings = self._position_embedding_layer(word_embeddings)
type_embeddings = (
layers.OnDeviceEmbedding(
vocab_size=type_vocab_size,
embedding_width=hidden_size,
initializer=initializer,
use_one_hot=True,
dtype=float_dtype,
name='type_embeddings')(type_ids))
embeddings = tf.keras.layers.Add()(
[word_embeddings, position_embeddings, type_embeddings])
embeddings = (
tf.keras.layers.LayerNormalization(
name='embeddings/layer_norm',
axis=-1,
epsilon=1e-12,
dtype=float_dtype)(embeddings))
embeddings = (
tf.keras.layers.Dropout(rate=dropout_rate,
dtype=tf.float32)(embeddings))
if float_dtype == 'float16':
embeddings = tf.cast(embeddings, tf.float16)
data = embeddings
attention_mask = MakeAttentionMaskLayer()([data, mask])
for i in range(num_layers):
layer = layers.Transformer(
num_attention_heads=num_attention_heads,
intermediate_size=intermediate_size,
intermediate_activation=activation,
dropout_rate=dropout_rate,
attention_dropout_rate=attention_dropout_rate,
kernel_initializer=initializer,
dtype=float_dtype,
name='transformer/layer_%d' % i)
data = layer([data, attention_mask])
first_token_tensor = (
tf.keras.layers.Lambda(lambda x: tf.squeeze(x[:, 0:1, :], axis=1))(data)
)
cls_output = tf.keras.layers.Dense(
units=hidden_size,
activation='tanh',
kernel_initializer=initializer,
dtype=float_dtype,
name='pooler_transform')(
first_token_tensor)
super(TransformerEncoder, self).__init__(
inputs=[word_ids, mask, type_ids],
outputs=[data, cls_output],
**kwargs)
def get_embedding_table(self):
return self._embedding_layer.embeddings
def get_config(self):
return self._config_dict
@classmethod
def from_config(cls, config, custom_objects=None):
return cls(**config)
@tf.keras.utils.register_keras_serializable(package='Text')
class MakeAttentionMaskLayer(tf.keras.layers.Layer):
def call(self, inputs):
return bert_modeling.create_attention_mask_from_input_mask(
inputs[0], inputs[1])
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for transformer-based text encoder network."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import tensorflow as tf
from tensorflow.python.keras import keras_parameterized # pylint: disable=g-direct-tensorflow-import
from official.nlp.modeling.networks import transformer_encoder
# This decorator runs the test in V1, V2-Eager, and V2-Functional mode. It
# guarantees forward compatibility of this code for the V2 switchover.
@keras_parameterized.run_all_keras_modes
class TransformerEncoderTest(keras_parameterized.TestCase):
def test_network_creation(self):
hidden_size = 32
sequence_length = 21
# Create a small TransformerEncoder for testing.
test_network = transformer_encoder.TransformerEncoder(
vocab_size=100,
hidden_size=hidden_size,
sequence_length=sequence_length,
num_attention_heads=2,
num_layers=3)
# Create the inputs (note that the first dimension is implicit).
word_ids = tf.keras.Input(shape=(sequence_length,), dtype=tf.int32)
mask = tf.keras.Input(shape=(sequence_length,), dtype=tf.int32)
type_ids = tf.keras.Input(shape=(sequence_length,), dtype=tf.int32)
data, pooled = test_network([word_ids, mask, type_ids])
expected_data_shape = [None, sequence_length, hidden_size]
expected_pooled_shape = [None, hidden_size]
self.assertAllEqual(expected_data_shape, data.shape.as_list())
self.assertAllEqual(expected_pooled_shape, pooled.shape.as_list())
# The default output dtype is float32.
self.assertAllEqual(tf.float32, data.dtype)
self.assertAllEqual(tf.float32, pooled.dtype)
def test_network_creation_with_float16_dtype(self):
hidden_size = 32
sequence_length = 21
# Create a small TransformerEncoder for testing.
test_network = transformer_encoder.TransformerEncoder(
vocab_size=100,
hidden_size=hidden_size,
sequence_length=sequence_length,
num_attention_heads=2,
num_layers=3,
float_dtype="float16")
# Create the inputs (note that the first dimension is implicit).
word_ids = tf.keras.Input(shape=(sequence_length,), dtype=tf.int32)
mask = tf.keras.Input(shape=(sequence_length,), dtype=tf.int32)
type_ids = tf.keras.Input(shape=(sequence_length,), dtype=tf.int32)
data, pooled = test_network([word_ids, mask, type_ids])
expected_data_shape = [None, sequence_length, hidden_size]
expected_pooled_shape = [None, hidden_size]
self.assertAllEqual(expected_data_shape, data.shape.as_list())
self.assertAllEqual(expected_pooled_shape, pooled.shape.as_list())
# If float_dtype is set to float16, the output should always be float16.
self.assertAllEqual(tf.float16, data.dtype)
self.assertAllEqual(tf.float16, pooled.dtype)
def test_network_invocation(self):
hidden_size = 32
sequence_length = 21
vocab_size = 57
num_types = 7
# Create a small TransformerEncoder for testing.
test_network = transformer_encoder.TransformerEncoder(
vocab_size=vocab_size,
hidden_size=hidden_size,
sequence_length=sequence_length,
num_attention_heads=2,
num_layers=3,
type_vocab_size=num_types)
self.assertTrue(
test_network._position_embedding_layer._use_dynamic_slicing)
# Create the inputs (note that the first dimension is implicit).
word_ids = tf.keras.Input(shape=(sequence_length,), dtype=tf.int32)
mask = tf.keras.Input(shape=(sequence_length,), dtype=tf.int32)
type_ids = tf.keras.Input(shape=(sequence_length,), dtype=tf.int32)
data, pooled = test_network([word_ids, mask, type_ids])
# Create a model based off of this network:
model = tf.keras.Model([word_ids, mask, type_ids], [data, pooled])
# Invoke the model. We can't validate the output data here (the model is too
# complex) but this will catch structural runtime errors.
batch_size = 3
word_id_data = np.random.randint(
vocab_size, size=(batch_size, sequence_length))
mask_data = np.random.randint(2, size=(batch_size, sequence_length))
type_id_data = np.random.randint(
num_types, size=(batch_size, sequence_length))
_ = model.predict([word_id_data, mask_data, type_id_data])
# Creates a TransformerEncoder with max_sequence_length != sequence_length
max_sequence_length = 128
test_network = transformer_encoder.TransformerEncoder(
vocab_size=vocab_size,
hidden_size=hidden_size,
sequence_length=sequence_length,
max_sequence_length=max_sequence_length,
num_attention_heads=2,
num_layers=3,
type_vocab_size=num_types)
self.assertTrue(test_network._position_embedding_layer._use_dynamic_slicing)
model = tf.keras.Model([word_ids, mask, type_ids], [data, pooled])
_ = model.predict([word_id_data, mask_data, type_id_data])
def test_serialize_deserialize(self):
# Create a network object that sets all of its config options.
kwargs = dict(
vocab_size=100,
hidden_size=32,
num_layers=3,
num_attention_heads=2,
sequence_length=21,
max_sequence_length=21,
type_vocab_size=12,
intermediate_size=1223,
activation="relu",
dropout_rate=0.05,
attention_dropout_rate=0.22,
initializer="glorot_uniform",
float_dtype="float16")
network = transformer_encoder.TransformerEncoder(**kwargs)
expected_config = dict(kwargs)
expected_config["activation"] = tf.keras.activations.serialize(
tf.keras.activations.get(expected_config["activation"]))
expected_config["initializer"] = tf.keras.initializers.serialize(
tf.keras.initializers.get(expected_config["initializer"]))
self.assertEqual(network.get_config(), expected_config)
# Create another network object from the first object's config.
new_network = transformer_encoder.TransformerEncoder.from_config(
network.get_config())
# Validate that the config can be forced to JSON.
_ = new_network.to_json()
# If the serialization was successful, the new config should match the old.
self.assertAllEqual(network.get_config(), new_network.get_config())
if __name__ == "__main__":
tf.test.main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment