Unverified Commit 7653185e authored by Ayushman Kumar's avatar Ayushman Kumar Committed by GitHub
Browse files

Merge pull request #2 from tensorflow/master

Updated
parents 43178d7f cf01596c
# Docker image for running examples in Tensorflow models.
# base_image depends on whether we are running on GPUs or non-GPUs
FROM ubuntu:latest
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates \
build-essential \
git \
python \
python-pip \
python-setuptools
RUN pip install tf-nightly
# Checkout tensorflow/models at HEAD
RUN git clone https://github.com/tensorflow/models.git /tensorflow_models
# Docker image for running examples in Tensorflow models.
# base_image depends on whether we are running on GPUs or non-GPUs
FROM nvidia/cuda:9.0-cudnn7-runtime-ubuntu16.04
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates \
build-essential \
git \
python \
python-pip \
python-setuptools
RUN pip install tf-nightly-gpu
# Checkout tensorflow/models at HEAD
RUN git clone https://github.com/tensorflow/models.git /tensorflow_models
......@@ -16,6 +16,7 @@
Loads a SavedModel and records memory usage.
"""
import functools
import time
from absl import flags
......@@ -31,24 +32,31 @@ class TfHubMemoryUsageBenchmark(PerfZeroBenchmark):
"""A benchmark measuring memory usage for a given TF Hub SavedModel."""
def __init__(self,
hub_model_handle_list=None,
output_dir=None,
default_flags=None,
root_data_dir=None,
**kwargs):
super(TfHubMemoryUsageBenchmark, self).__init__(
output_dir=output_dir, default_flags=default_flags, **kwargs)
def benchmark_memory_usage(self):
if hub_model_handle_list:
for hub_model_handle in hub_model_handle_list.split(';'):
setattr(
self, 'benchmark_' + hub_model_handle,
functools.partial(self.benchmark_memory_usage, hub_model_handle))
def benchmark_memory_usage(
self, hub_model_handle='https://tfhub.dev/google/nnlm-en-dim128/1'):
start_time_sec = time.time()
self.load_model()
self.load_model(hub_model_handle)
wall_time_sec = time.time() - start_time_sec
metrics = []
self.report_benchmark(iters=-1, wall_time=wall_time_sec, metrics=metrics)
def load_model(self):
def load_model(self, hub_model_handle):
"""Loads a TF Hub module."""
hub.load('https://tfhub.dev/google/nnlm-en-dim128/1')
hub.load(hub_model_handle)
if __name__ == '__main__':
......
......@@ -23,6 +23,7 @@ import os
from absl import logging
import tensorflow as tf
from official.staging.training import grad_utils
from official.utils.misc import distribution_utils
_SUMMARY_TXT = 'training_summary.txt'
......@@ -94,7 +95,10 @@ def run_customized_training_loop(
init_checkpoint=None,
custom_callbacks=None,
run_eagerly=False,
sub_model_export_name=None):
sub_model_export_name=None,
explicit_allreduce=False,
pre_allreduce_callbacks=None,
post_allreduce_callbacks=None):
"""Run BERT pretrain model training using low-level API.
Arguments:
......@@ -136,6 +140,23 @@ def run_customized_training_loop(
file is {sub_model_export_name}_step_{step}.ckpt and the last
checkpint's name is {sub_model_export_name}.ckpt;
if None, `sub_model` will not be exported as checkpoint.
explicit_allreduce: Whether to explicitly perform gradient allreduce,
instead of relying on implicit allreduce in optimizer.apply_gradients().
default is False. For now, if training using FP16 mixed precision,
explicit allreduce will aggregate gradients in FP16 format. For TPU and
GPU training using FP32, explicit allreduce will aggregate gradients in
FP32 format.
pre_allreduce_callbacks: A list of callback functions that takes gradients
and model variables pairs as input, manipulate them, and returns a new
gradients and model variables paris. The callback functions will be
invoked in the list order and before gradients are allreduced.
Default is no callbacks. Only used when explicit_allreduce=True.
post_allreduce_callbacks: A list of callback functions that takes
gradients and model variables pairs as input, manipulate them, and
returns a new gradients and model variables paris. The callback
functions will be invoked in the list order and right before gradients
are applied to variables for updates. Default is no callbacks. Only used
when explicit_allreduce=True.
Returns:
Trained model.
......@@ -199,8 +220,6 @@ def run_customized_training_loop(
'sub_model is None.' % sub_model_export_name)
optimizer = model.optimizer
use_float16 = isinstance(
optimizer, tf.keras.mixed_precision.experimental.LossScaleOptimizer)
if init_checkpoint:
logging.info(
......@@ -242,10 +261,16 @@ def run_customized_training_loop(
with tf.GradientTape() as tape:
model_outputs = model(inputs, training=True)
loss = loss_fn(labels, model_outputs)
if use_float16:
if explicit_allreduce:
grad_utils.minimize_using_explicit_allreduce(tape, optimizer, loss,
training_vars,
pre_allreduce_callbacks,
post_allreduce_callbacks)
else:
if isinstance(optimizer,
tf.keras.mixed_precision.experimental.LossScaleOptimizer):
with tape:
scaled_loss = optimizer.get_scaled_loss(loss)
if use_float16:
scaled_grads = tape.gradient(scaled_loss, training_vars)
grads = optimizer.get_unscaled_gradients(scaled_grads)
else:
......
......@@ -25,10 +25,8 @@ from official.modeling import tf_utils
from official.nlp.albert import configs as albert_configs
from official.nlp.bert import configs
from official.nlp.modeling import losses
from official.nlp.modeling import models
from official.nlp.modeling import networks
from official.nlp.modeling.networks import bert_classifier
from official.nlp.modeling.networks import bert_pretrainer
from official.nlp.modeling.networks import bert_span_labeler
class BertPretrainLossAndMetricLayer(tf.keras.layers.Layer):
......@@ -159,7 +157,7 @@ def pretrain_model(bert_config,
if initializer is None:
initializer = tf.keras.initializers.TruncatedNormal(
stddev=bert_config.initializer_range)
pretrainer_model = bert_pretrainer.BertPretrainer(
pretrainer_model = models.BertPretrainer(
network=transformer_encoder,
num_classes=2, # The next sentence prediction label has two classes.
num_token_predictions=max_predictions_per_seq,
......@@ -211,7 +209,7 @@ def squad_model(bert_config,
stddev=bert_config.initializer_range)
if not hub_module_url:
bert_encoder = get_transformer_encoder(bert_config, max_seq_length)
return bert_span_labeler.BertSpanLabeler(
return models.BertSpanLabeler(
network=bert_encoder, initializer=initializer), bert_encoder
input_word_ids = tf.keras.layers.Input(
......@@ -231,7 +229,7 @@ def squad_model(bert_config,
},
outputs=[sequence_output, pooled_output],
name='core_model')
return bert_span_labeler.BertSpanLabeler(
return models.BertSpanLabeler(
network=bert_encoder, initializer=initializer), bert_encoder
......@@ -268,7 +266,7 @@ def classifier_model(bert_config,
if not hub_module_url:
bert_encoder = get_transformer_encoder(bert_config, max_seq_length)
return bert_classifier.BertClassifier(
return models.BertClassifier(
bert_encoder,
num_classes=num_labels,
dropout_rate=bert_config.hidden_dropout_prob,
......
......@@ -87,7 +87,7 @@ def create_pretrain_dataset(input_patterns,
if input_pipeline_context and input_pipeline_context.num_input_pipelines > 1:
dataset = dataset.shard(input_pipeline_context.num_input_pipelines,
input_pipeline_context.input_pipeline_id)
if is_training:
dataset = dataset.repeat()
# We set shuffle buffer to exactly match total number of
......@@ -132,7 +132,7 @@ def create_pretrain_dataset(input_patterns,
if is_training:
dataset = dataset.shuffle(100)
dataset = dataset.batch(batch_size, drop_remainder=True)
dataset = dataset.batch(batch_size, drop_remainder=is_training)
dataset = dataset.prefetch(1024)
return dataset
......
......@@ -239,22 +239,74 @@ def run_keras_compile_fit(model_dir,
return bert_model
def get_predictions_and_labels(strategy, trained_model, eval_input_fn,
eval_steps):
"""Obtains predictions of trained model on evaluation data.
Note that list of labels is returned along with the predictions because the
order changes on distributing dataset over TPU pods.
Args:
strategy: Distribution strategy.
trained_model: Trained model with preloaded weights.
eval_input_fn: Input function for evaluation data.
eval_steps: Number of evaluation steps.
Returns:
predictions: List of predictions.
labels: List of gold labels corresponding to predictions.
"""
@tf.function
def test_step(iterator):
"""Computes predictions on distributed devices."""
def _test_step_fn(inputs):
"""Replicated predictions."""
inputs, labels = inputs
model_outputs = trained_model(inputs, training=False)
return model_outputs, labels
outputs, labels = strategy.experimental_run_v2(
_test_step_fn, args=(next(iterator),))
# outputs: current batch logits as a tuple of shard logits
outputs = tf.nest.map_structure(strategy.experimental_local_results,
outputs)
labels = tf.nest.map_structure(strategy.experimental_local_results, labels)
return outputs, labels
def _run_evaluation(test_iterator):
"""Runs evaluation steps."""
preds, golds = list(), list()
for _ in range(eval_steps):
logits, labels = test_step(test_iterator)
for cur_logits, cur_labels in zip(logits, labels):
preds.extend(tf.math.argmax(cur_logits, axis=1).numpy())
golds.extend(cur_labels.numpy().tolist())
return preds, golds
test_iter = iter(
strategy.experimental_distribute_datasets_from_function(eval_input_fn))
predictions, labels = _run_evaluation(test_iter)
return predictions, labels
def export_classifier(model_export_path, input_meta_data,
restore_model_using_load_weights,
bert_config, model_dir):
restore_model_using_load_weights, bert_config, model_dir):
"""Exports a trained model as a `SavedModel` for inference.
Args:
model_export_path: a string specifying the path to the SavedModel directory.
input_meta_data: dictionary containing meta data about input and model.
restore_model_using_load_weights: Whether to use checkpoint.restore() API
for custom checkpoint or to use model.load_weights() API.
There are 2 different ways to save checkpoints. One is using
tf.train.Checkpoint and another is using Keras model.save_weights().
Custom training loop implementation uses tf.train.Checkpoint API
and Keras ModelCheckpoint callback internally uses model.save_weights()
API. Since these two API's cannot be used together, model loading logic
must be take into account how model checkpoint was saved.
for custom checkpoint or to use model.load_weights() API. There are 2
different ways to save checkpoints. One is using tf.train.Checkpoint and
another is using Keras model.save_weights(). Custom training loop
implementation uses tf.train.Checkpoint API and Keras ModelCheckpoint
callback internally uses model.save_weights() API. Since these two API's
cannot be used together, model loading logic must be take into account how
model checkpoint was saved.
bert_config: Bert configuration file to define core bert layers.
model_dir: The directory where the model weights and training/evaluation
summaries are stored.
......
......@@ -269,6 +269,16 @@ def train_squad(strategy,
loss_factor=1.0 /
strategy.num_replicas_in_sync if FLAGS.scale_loss else 1.0)
# when all_reduce_sum_gradients = False, apply_gradients() no longer
# implicitly allreduce gradients, users manually allreduce gradient and
# passed the allreduced grads_and_vars. For now, the clip_by_global_norm
# will be moved to before users' manual allreduce to keep the math
# unchanged.
def clip_by_global_norm_callback(grads_and_vars):
grads, variables = zip(*grads_and_vars)
(clipped_grads, _) = tf.clip_by_global_norm(grads, clip_norm=1.0)
return zip(clipped_grads, variables)
model_training_utils.run_customized_training_loop(
strategy=strategy,
model_fn=_get_squad_model,
......@@ -280,7 +290,9 @@ def train_squad(strategy,
train_input_fn=train_input_fn,
init_checkpoint=FLAGS.init_checkpoint,
run_eagerly=run_eagerly,
custom_callbacks=custom_callbacks)
custom_callbacks=custom_callbacks,
explicit_allreduce=True,
pre_allreduce_callbacks=[clip_by_global_norm_callback])
def predict_squad(strategy, input_meta_data, tokenizer, bert_config, squad_lib):
......
# Copyright 2020 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Models package definition."""
from official.nlp.modeling.models.bert_classifier import BertClassifier
from official.nlp.modeling.models.bert_pretrainer import BertPretrainer
from official.nlp.modeling.models.bert_span_labeler import BertSpanLabeler
......@@ -22,7 +22,7 @@ import tensorflow as tf
from tensorflow.python.keras import keras_parameterized # pylint: disable=g-direct-tensorflow-import
from official.nlp.modeling import networks
from official.nlp.modeling.networks import bert_classifier
from official.nlp.modeling.models import bert_classifier
# This decorator runs the test in V1, V2-Eager, and V2-Functional mode. It
......
......@@ -22,7 +22,7 @@ import tensorflow as tf
from tensorflow.python.keras import keras_parameterized # pylint: disable=g-direct-tensorflow-import
from official.nlp.modeling import networks
from official.nlp.modeling.networks import bert_pretrainer
from official.nlp.modeling.models import bert_pretrainer
# This decorator runs the test in V1, V2-Eager, and V2-Functional mode. It
......
......@@ -22,7 +22,7 @@ import tensorflow as tf
from tensorflow.python.keras import keras_parameterized # pylint: disable=g-direct-tensorflow-import
from official.nlp.modeling import networks
from official.nlp.modeling.networks import bert_span_labeler
from official.nlp.modeling.models import bert_span_labeler
# This decorator runs the test in V1, V2-Eager, and V2-Functional mode. It
......
......@@ -142,6 +142,12 @@ class AdamWeightDecay(tf.keras.optimizers.Adam):
name=None,
all_reduce_sum_gradients=True):
grads, tvars = list(zip(*grads_and_vars))
if all_reduce_sum_gradients:
# when all_reduce_sum_gradients = False, apply_gradients() no longer
# implicitly allreduce gradients, users manually allreduce gradient and
# passed the allreduced grads_and_vars. For now, the clip_by_global_norm
# will be moved to before the explicit allreduce to keep the math
# the same as TF 1 and pre TF 2.2 implementation.
(grads, _) = tf.clip_by_global_norm(grads, clip_norm=1.0)
return super(AdamWeightDecay, self).apply_gradients(
zip(grads, tvars),
......
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Some gradient util functions to help users writing custom training loop."""
from __future__ import absolute_import
from __future__ import division
# from __future__ import google_type_annotations
from __future__ import print_function
from absl import logging
import tensorflow.compat.v2 as tf
def _filter_grads(grads_and_vars):
"""Filter out iterable with grad equal to None."""
grads_and_vars = tuple(grads_and_vars)
if not grads_and_vars:
return grads_and_vars
filtered = []
vars_with_empty_grads = []
for grad, var in grads_and_vars:
if grad is None:
vars_with_empty_grads.append(var)
else:
filtered.append((grad, var))
filtered = tuple(filtered)
if not filtered:
raise ValueError("No gradients provided for any variable: %s." %
([v.name for _, v in grads_and_vars],))
if vars_with_empty_grads:
logging.warning(
("Gradients do not exist for variables %s when minimizing the loss."),
([v.name for v in vars_with_empty_grads]))
return filtered
def _filter_and_allreduce_gradients(grads_and_vars,
allreduce_precision="float32"):
"""Filter None grads and then allreduce gradients in specified precision.
This utils function is used when users intent to explicitly allreduce
gradients and customize gradients operations before and after allreduce.
The allreduced gradients are then passed to optimizer.apply_gradients(
all_reduce_sum_gradients=False).
Arguments:
grads_and_vars: gradients and variables pairs.
allreduce_precision: Whether to allreduce gradients in float32 or float16.
Returns:
pairs of allreduced non-None gradients and variables.
"""
filtered_grads_and_vars = _filter_grads(grads_and_vars)
(grads, variables) = zip(*filtered_grads_and_vars)
if allreduce_precision == "float16":
grads = [tf.cast(grad, "float16") for grad in grads]
allreduced_grads = tf.distribute.get_replica_context().all_reduce(
tf.distribute.ReduceOp.SUM, grads)
if allreduce_precision == "float16":
allreduced_grads = [tf.cast(grad, "float32") for grad in allreduced_grads]
return allreduced_grads, variables
def _run_callbacks(callbacks, grads_and_vars):
for callback in callbacks:
grads_and_vars = callback(grads_and_vars)
return grads_and_vars
def minimize_using_explicit_allreduce(tape,
optimizer,
loss,
trainable_variables,
pre_allreduce_callbacks=None,
post_allreduce_callbacks=None):
"""Minimizes loss for one step by updating `trainable_variables`.
Minimizes loss for one step by updating `trainable_variables`.
This explicitly performs gradient allreduce, instead of relying on implicit
allreduce in optimizer.apply_gradients(). If training using FP16 mixed
precision, explicit allreduce will aggregate gradients in FP16 format.
For TPU and GPU training using FP32, explicit allreduce will aggregate
gradients in FP32 format.
Arguments:
tape: An instance of `tf.GradientTape`.
optimizer: An instance of `tf.keras.optimizers.Optimizer`.
loss: the loss tensor.
trainable_variables: A list of model Variables.
pre_allreduce_callbacks: A list of callback functions that takes gradients
and model variables pairs as input, manipulate them, and returns a new
gradients and model variables pairs. The callback functions will be
invoked in the list order and before gradients are allreduced.
Default is no callbacks.
post_allreduce_callbacks: A list of callback functions that takes
gradients and model variables pairs as input, manipulate them, and
returns a new gradients and model variables paris. The callback
functions will be invoked in the list order and right before gradients
are applied to variables for updates. Default is no callbacks.
"""
if isinstance(optimizer,
tf.keras.mixed_precision.experimental.LossScaleOptimizer):
# FP16 GPU code path
with tape:
scaled_loss = optimizer.get_scaled_loss(loss)
scaled_grads = tape.gradient(scaled_loss, trainable_variables)
grads_and_vars = zip(scaled_grads, trainable_variables)
if pre_allreduce_callbacks:
grads_and_vars = _run_callbacks(pre_allreduce_callbacks, grads_and_vars)
(allreduced_scaled_grads,
filtered_training_vars) = _filter_and_allreduce_gradients(
grads_and_vars, allreduce_precision="float16")
allreduced_unscaled_grads = optimizer.get_unscaled_gradients(
allreduced_scaled_grads)
grads_and_vars = zip(allreduced_unscaled_grads, filtered_training_vars)
else:
# TPU or FP32 GPU code path
grads = tape.gradient(loss, trainable_variables)
grads_and_vars = zip(grads, trainable_variables)
if pre_allreduce_callbacks:
grads_and_vars = _run_callbacks(pre_allreduce_callbacks, grads_and_vars)
(allreduced_grads,
filtered_training_vars) = _filter_and_allreduce_gradients(
grads_and_vars, allreduce_precision="float32")
grads_and_vars = zip(allreduced_grads, filtered_training_vars)
if post_allreduce_callbacks:
grads_and_vars = _run_callbacks(post_allreduce_callbacks, grads_and_vars)
optimizer.apply_gradients(grads_and_vars, all_reduce_sum_gradients=False)
......@@ -21,6 +21,7 @@ from __future__ import print_function
import tensorflow.compat.v2 as tf
from official.modeling import performance
from official.staging.training import grad_utils
from official.staging.training import standard_runnable
from official.staging.training import utils
from official.utils.flags import core as flags_core
......@@ -170,17 +171,8 @@ class ResnetRunnable(standard_runnable.StandardTrainable,
else:
loss += (tf.reduce_sum(self.model.losses) / num_replicas)
# Scale the loss
if self.flags_obj.dtype == 'fp16':
loss = self.optimizer.get_scaled_loss(loss)
grads = tape.gradient(loss, self.model.trainable_variables)
# Unscale the grads
if self.flags_obj.dtype == 'fp16':
grads = self.optimizer.get_unscaled_gradients(grads)
self.optimizer.apply_gradients(zip(grads, self.model.trainable_variables))
grad_utils.minimize_using_explicit_allreduce(
tape, self.optimizer, loss, self.model.trainable_variables)
self.train_loss.update_state(loss)
self.train_accuracy.update_state(labels, logits)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment