Unverified Commit 65636099 authored by anj-s's avatar anj-s Committed by GitHub
Browse files

Add benchmarks for custom training loops + tf.distribute (#6980)

* first version of ctl

* fix indent

* remove monkey patching for core

* add dtype arg

* fix dtype arg

* add logging lib

* remove compat.v1.logging

* add datetime import

* fix FLAGS import

* add constant vals

* move to using as tf import

* move to using as tf import

* remove steps per epoch = 1

* test train and test for one step

* test train and test for one step

* test train and test for one step

* test train and test for the entire dataset

* use an iterator for test

* pass tensors instead of an iterator

* add stats dict

* fix list declaration

* fix list declaration

* fix elapsed time calc

* print lr at epoch boundary alone

* Use regular tf import instead of compat

* remove tensorboard chkpts

* add correct logging import

* add correct logging import

* add benchmark configs

* add tests and configs

* add tests and configs

* add keras flags import

* add keras flags import

* fix eval ds creation cond

* return numpy value of train_loss

* return numpy value of loss and acc values

* add option for full eager mode

* fix lint errors

* add ctl flags

* add ctl import

* add the xla flag

* enable v2 behavior in unit tests

* rename dataset var

* add synthetic dataset without monkey patching

* add ctl local constants

* add ctl local constants

* change to using v2 imports

* change to using v2 imports

* change to using v2 imports

* change to using keras synthetic input fn

* remove enable_eager flag from benchmarks

* remove enable_eager flag from benchmarks

* remove enable_eager flag from benchmarks

* add option for no distrat

* add lambda for flags

* remove no_func benchmarks due to OOM error

* remove README

* remove unused comments

* remove unchanged file

* remove unchanged file

* remove unused drop_remainder_arg

* use keras.common lr function

* address PR comments

* remove reference to deleted file

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* .

* fix lint errors

* .
parent d3610769
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Common util functions and classes used by CTL."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from absl import flags
FLAGS = flags.FLAGS
def define_ctl_flags():
"""Define flags for CTL."""
flags.DEFINE_boolean(name='use_tf_function', default=True,
help='Wrap the train and test step inside a '
'tf.function.')
flags.DEFINE_boolean(name='skip_eval', default=False, help='Skip evaluation?')
flags.DEFINE_integer(
name='train_steps', default=None,
help='The number of steps to run for training. If it is larger than '
'# batches per epoch, then use # batches per epoch. When this flag is '
'set, only one epoch is going to run for training.')
\ No newline at end of file
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Executes CTL benchmarks and accuracy tests."""
from __future__ import print_function
import os
import time
# pylint: disable=g-bad-import-order
from absl import flags
import tensorflow as tf
from official.resnet import imagenet_main
from official.resnet.ctl import ctl_imagenet_main
from official.resnet.ctl import ctl_common
from official.utils.testing.perfzero_benchmark import PerfZeroBenchmark
MIN_TOP_1_ACCURACY = 0.76
MAX_TOP_1_ACCURACY = 0.77
FLAGS = flags.FLAGS
class CtlBenchmark(PerfZeroBenchmark):
"""Base benchmark class with methods to simplify testing."""
def __init__(self, output_dir=None, default_flags=None, flag_methods=None):
self.output_dir = output_dir
self.default_flags = default_flags or {}
self.flag_methods = flag_methods or {}
super(CtlBenchmark, self).__init__(
output_dir=self.output_dir,
default_flags=self.default_flags,
flag_methods=self.flag_methods)
def _report_benchmark(self,
stats,
wall_time_sec,
top_1_max=None,
top_1_min=None,
total_batch_size=None,
log_steps=None,
warmup=1):
"""Report benchmark results by writing to local protobuf file.
Args:
stats: dict returned from keras models with known entries.
wall_time_sec: the during of the benchmark execution in seconds
top_1_max: highest passing level for top_1 accuracy.
top_1_min: lowest passing level for top_1 accuracy.
total_batch_size: Global batch-size.
log_steps: How often the log was created for stats['step_timestamp_log'].
warmup: number of entries in stats['step_timestamp_log'] to ignore.
"""
metrics = []
if 'eval_acc' in stats:
metrics.append({'name': 'accuracy_top_1',
'value': stats['eval_acc'],
'min_value': top_1_min,
'max_value': top_1_max})
metrics.append({'name': 'eval_loss',
'value': stats['eval_loss']})
metrics.append({'name': 'top_1_train_accuracy',
'value': stats['train_acc']})
metrics.append({'name': 'train_loss',
'value': stats['train_loss']})
if (warmup and 'step_timestamp_log' in stats and
len(stats['step_timestamp_log']) > warmup):
# first entry in the time_log is start of step 1. The rest of the
# entries are the end of each step recorded
time_log = stats['step_timestamp_log']
elapsed = time_log[-1].timestamp - time_log[warmup].timestamp
num_examples = (
total_batch_size * log_steps * (len(time_log) - warmup - 1))
examples_per_sec = num_examples / elapsed
metrics.append({'name': 'exp_per_second',
'value': examples_per_sec})
if 'avg_exp_per_second' in stats:
metrics.append({'name': 'avg_exp_per_second',
'value': stats['avg_exp_per_second']})
self.report_benchmark(iters=-1, wall_time=wall_time_sec, metrics=metrics)
class Resnet50CtlAccuracy(CtlBenchmark):
"""Benchmark accuracy tests for ResNet50 in CTL."""
def __init__(self, output_dir=None, root_data_dir=None, **kwargs):
"""A benchmark class.
Args:
output_dir: directory where to output e.g. log files
root_data_dir: directory under which to look for dataset
**kwargs: arbitrary named arguments. This is needed to make the
constructor forward compatible in case PerfZero provides more
named arguments before updating the constructor.
"""
flag_methods = [
ctl_common.define_ctl_flags,
lambda: imagenet_main.define_imagenet_flags()
]
self.data_dir = os.path.join(root_data_dir, 'imagenet')
super(Resnet50CtlAccuracy, self).__init__(
output_dir=output_dir, flag_methods=flag_methods)
def benchmark_8_gpu(self):
"""Test Keras model with eager, dist_strat and 8 GPUs."""
self._setup()
FLAGS.num_gpus = 8
FLAGS.data_dir = self.data_dir
FLAGS.batch_size = 128 * 8
FLAGS.train_epochs = 90
FLAGS.epochs_between_evals = 10
FLAGS.model_dir = self._get_model_dir('benchmark_8_gpu')
FLAGS.dtype = 'fp32'
# Add some thread tunings to improve performance.
FLAGS.datasets_num_private_threads = 14
self._run_and_report_benchmark()
def _run_and_report_benchmark(self):
start_time_sec = time.time()
stats = ctl_imagenet_main.run(flags.FLAGS)
wall_time_sec = time.time() - start_time_sec
super(Resnet50CtlAccuracy, self)._report_benchmark(
stats,
wall_time_sec,
top_1_min=MIN_TOP_1_ACCURACY,
top_1_max=MAX_TOP_1_ACCURACY,
total_batch_size=FLAGS.batch_size,
log_steps=100)
def _get_model_dir(self, folder_name):
return os.path.join(self.output_dir, folder_name)
class Resnet50CtlBenchmarkBase(CtlBenchmark):
"""Resnet50 benchmarks."""
def __init__(self, output_dir=None, default_flags=None):
flag_methods = [
ctl_common.define_ctl_flags,
lambda: imagenet_main.define_imagenet_flags()
]
super(Resnet50CtlBenchmarkBase, self).__init__(
output_dir=output_dir,
flag_methods=flag_methods,
default_flags=default_flags)
def _run_and_report_benchmark(self):
start_time_sec = time.time()
stats = ctl_imagenet_main.run(FLAGS)
wall_time_sec = time.time() - start_time_sec
# Number of logged step time entries that are excluded in performance
# report. We keep results from last 100 batches in this case.
warmup = (FLAGS.train_steps - 100) // FLAGS.log_steps
super(Resnet50CtlBenchmarkBase, self)._report_benchmark(
stats,
wall_time_sec,
total_batch_size=FLAGS.batch_size,
log_steps=FLAGS.log_steps,
warmup=warmup)
def benchmark_1_gpu_no_dist_strat(self):
"""Test Keras model with 1 GPU, no distribution strategy."""
self._setup()
FLAGS.num_gpus = 1
FLAGS.distribution_strategy = 'off'
FLAGS.model_dir = self._get_model_dir('benchmark_1_gpu_no_dist_strat')
FLAGS.batch_size = 128
self._run_and_report_benchmark()
def benchmark_1_gpu(self):
"""Test Keras model with 1 GPU."""
self._setup()
FLAGS.num_gpus = 1
FLAGS.distribution_strategy = 'default'
FLAGS.model_dir = self._get_model_dir('benchmark_1_gpu')
FLAGS.batch_size = 128
self._run_and_report_benchmark()
def benchmark_8_gpu(self):
"""Test Keras model with 8 GPUs."""
self._setup()
FLAGS.num_gpus = 8
FLAGS.distribution_strategy = 'default'
FLAGS.model_dir = self._get_model_dir('benchmark_8_gpu')
FLAGS.batch_size = 128 * 8 # 8 GPUs
self._run_and_report_benchmark()
def fill_report_object(self, stats):
super(Resnet50CtlBenchmarkBase, self).fill_report_object(
stats,
total_batch_size=FLAGS.batch_size,
log_steps=FLAGS.log_steps)
class Resnet50CtlBenchmarkSynth(Resnet50CtlBenchmarkBase):
"""Resnet50 synthetic benchmark tests."""
def __init__(self, output_dir=None, root_data_dir=None, **kwargs):
def_flags = {}
def_flags['skip_eval'] = True
def_flags['use_synthetic_data'] = True
def_flags['train_steps'] = 110
def_flags['log_steps'] = 10
super(Resnet50CtlBenchmarkSynth, self).__init__(
output_dir=output_dir, default_flags=def_flags)
class Resnet50CtlBenchmarkReal(Resnet50CtlBenchmarkBase):
"""Resnet50 real data benchmark tests."""
def __init__(self, output_dir=None, root_data_dir=None, **kwargs):
def_flags = {}
def_flags['skip_eval'] = True
def_flags['data_dir'] = os.path.join(root_data_dir, 'imagenet')
def_flags['train_steps'] = 110
def_flags['log_steps'] = 10
super(Resnet50CtlBenchmarkReal, self).__init__(
output_dir=output_dir, default_flags=def_flags)
if __name__ == '__main__':
tf.test.main()
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Runs a ResNet model on the ImageNet dataset using custom training loops."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import tempfile
import datetime
import time
import numpy as np
from absl import app as absl_app
from absl import flags
from absl import logging
import tensorflow as tf
from official.resnet import imagenet_main
from official.resnet.keras import keras_common
from official.resnet.keras import keras_imagenet_main
from official.resnet.keras import resnet_model
from official.utils.flags import core as flags_core
from official.utils.logs import logger
from official.utils.misc import distribution_utils
from official.utils.misc import model_helpers
from official.resnet.ctl import ctl_common
from official.utils.misc import keras_utils
def parse_record_keras(raw_record, is_training, dtype):
"""Adjust the shape of label."""
image, label = imagenet_main.parse_record(raw_record, is_training, dtype)
# Subtract one so that labels are in [0, 1000), and cast to float32 for
# Keras model.
label = tf.cast(tf.cast(tf.reshape(label, shape=[1]), dtype=tf.int32) - 1,
dtype=tf.float32)
return image, label
def build_stats(train_result, eval_result, time_callback):
"""Normalizes and returns dictionary of stats.
Args:
train_result: The final loss at training time.
eval_result: Output of the eval step. Assumes first value is eval_loss and
second value is accuracy_top_1.
time_callback: Time tracking callback instance.
Returns:
Dictionary of normalized results.
"""
stats = {}
if eval_result:
stats["eval_loss"] = eval_result[0]
stats["eval_acc"] = eval_result[1]
stats['train_loss'] = train_result[0]
stats['train_acc'] = train_result[1]
if time_callback:
timestamp_log = time_callback.timestamp_log
stats["step_timestamp_log"] = timestamp_log
stats["train_finish_time"] = time_callback.train_finish_time
if len(timestamp_log) > 1:
stats["avg_exp_per_second"] = (
time_callback.batch_size * time_callback.log_steps *
(len(time_callback.timestamp_log) - 1) /
(timestamp_log[-1].timestamp - timestamp_log[0].timestamp))
return stats
def get_input_dataset(flags_obj, strategy):
"""Returns the test and train input datasets."""
dtype = flags_core.get_tf_dtype(flags_obj)
if flags_obj.use_synthetic_data:
input_fn = keras_common.get_synth_input_fn(
height=imagenet_main.DEFAULT_IMAGE_SIZE,
width=imagenet_main.DEFAULT_IMAGE_SIZE,
num_channels=imagenet_main.NUM_CHANNELS,
num_classes=imagenet_main.NUM_CLASSES,
dtype=dtype,
drop_remainder=True)
else:
input_fn = imagenet_main.input_fn
train_ds = input_fn(
is_training=True,
data_dir=flags_obj.data_dir,
batch_size=flags_obj.batch_size,
parse_record_fn=parse_record_keras,
datasets_num_private_threads=flags_obj.datasets_num_private_threads,
dtype=dtype)
if strategy:
train_ds = strategy.experimental_distribute_dataset(train_ds)
test_ds = None
if not flags_obj.skip_eval:
test_ds = input_fn(
is_training=False,
data_dir=flags_obj.data_dir,
batch_size=flags_obj.batch_size,
parse_record_fn=parse_record_keras,
dtype=dtype)
if strategy:
test_ds = strategy.experimental_distribute_dataset(test_ds)
return train_ds, test_ds
def get_num_train_iterations(flags_obj):
"""Returns the number of training stesps, train and test epochs."""
train_steps = imagenet_main.NUM_IMAGES['train'] // flags_obj.batch_size
train_epochs = flags_obj.train_epochs
if flags_obj.train_steps:
train_steps = min(flags_obj.train_steps, train_steps)
train_epochs = 1
eval_steps = imagenet_main.NUM_IMAGES['validation'] // flags_obj.batch_size
return train_steps, train_epochs, eval_steps
def run(flags_obj):
"""Run ResNet ImageNet training and eval loop using custom training loops.
Args:
flags_obj: An object containing parsed flag values.
Raises:
ValueError: If fp16 is passed as it is not currently supported.
Returns:
Dictionary of training and eval stats.
"""
dtype = flags_core.get_tf_dtype(flags_obj)
# TODO(anj-s): Set data_format without using Keras.
data_format = flags_obj.data_format
if data_format is None:
data_format = ('channels_first'
if tf.test.is_built_with_cuda() else 'channels_last')
tf.keras.backend.set_image_data_format(data_format)
strategy = distribution_utils.get_distribution_strategy(
distribution_strategy=flags_obj.distribution_strategy,
num_gpus=flags_obj.num_gpus,
num_workers=distribution_utils.configure_cluster(),
all_reduce_alg=flags_obj.all_reduce_alg,
num_packs=flags_obj.num_packs)
train_ds, test_ds = get_input_dataset(flags_obj, strategy)
train_steps, train_epochs, eval_steps = get_num_train_iterations(flags_obj)
time_callback = keras_utils.TimeHistory(flags_obj.batch_size,
flags_obj.log_steps)
strategy_scope = distribution_utils.get_strategy_scope(strategy)
with strategy_scope:
model = resnet_model.resnet50(num_classes=imagenet_main.NUM_CLASSES,
dtype=dtype, batch_size=flags_obj.batch_size)
optimizer = tf.keras.optimizers.SGD(
learning_rate=keras_common.BASE_LEARNING_RATE, momentum=0.9,
nesterov=True)
training_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
'training_accuracy', dtype=tf.float32)
test_loss = tf.keras.metrics.Mean('test_loss', dtype=tf.float32)
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
'test_accuracy', dtype=tf.float32)
def train_step(train_ds_inputs):
"""Training StepFn."""
def step_fn(inputs):
"""Per-Replica StepFn."""
images, labels = inputs
with tf.GradientTape() as tape:
logits = model(images, training=True)
prediction_loss = tf.keras.losses.sparse_categorical_crossentropy(
labels, logits)
loss1 = tf.reduce_sum(prediction_loss) * (1.0/ flags_obj.batch_size)
loss2 = (tf.reduce_sum(model.losses) /
tf.distribute.get_strategy().num_replicas_in_sync)
loss = loss1 + loss2
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))
training_accuracy.update_state(labels, logits)
return loss
if strategy:
per_replica_losses = strategy.experimental_run_v2(
step_fn, args=(train_ds_inputs,))
return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses,
axis=None)
else:
return step_fn(train_ds_inputs)
def test_step(test_ds_inputs):
"""Evaluation StepFn."""
def step_fn(inputs):
images, labels = inputs
logits = model(images, training=False)
loss = tf.keras.losses.sparse_categorical_crossentropy(labels,
logits)
loss = tf.reduce_sum(loss) * (1.0/ flags_obj.batch_size)
test_loss.update_state(loss)
test_accuracy.update_state(labels, logits)
if strategy:
strategy.experimental_run_v2(step_fn, args=(test_ds_inputs,))
else:
step_fn(test_ds_inputs)
if flags_obj.use_tf_function:
train_step = tf.function(train_step)
test_step = tf.function(test_step)
time_callback.on_train_begin()
for epoch in range(train_epochs):
train_iter = iter(train_ds)
total_loss = 0.0
training_accuracy.reset_states()
for step in range(train_steps):
optimizer.lr = keras_imagenet_main.learning_rate_schedule(
epoch, step, train_steps, flags_obj.batch_size)
time_callback.on_batch_begin(step+epoch*train_steps)
total_loss += train_step(next(train_iter))
time_callback.on_batch_end(step+epoch*train_steps)
train_loss = total_loss / train_steps
logging.info('Training loss: %s, accuracy: %s%% at epoch: %d',
train_loss.numpy(),
training_accuracy.result().numpy(),
epoch)
if (not flags_obj.skip_eval and
(epoch + 1) % flags_obj.epochs_between_evals == 0):
test_loss.reset_states()
test_accuracy.reset_states()
test_iter = iter(test_ds)
for _ in range(eval_steps):
test_step(next(test_iter))
logging.info('Test loss: %s, accuracy: %s%% at epoch: %d',
test_loss.result().numpy(),
test_accuracy.result().numpy(),
epoch)
time_callback.on_train_end()
eval_result = None
train_result = None
if not flags_obj.skip_eval:
eval_result = [test_loss.result().numpy(),
test_accuracy.result().numpy()]
train_result = [train_loss.numpy(),
training_accuracy.result().numpy()]
stats = build_stats(train_result, eval_result, time_callback)
return stats
def main(_):
model_helpers.apply_clean(flags.FLAGS)
with logger.benchmark_context(flags.FLAGS):
return run(flags.FLAGS)
if __name__ == '__main__':
logging.set_verbosity(logging.INFO)
imagenet_main.define_imagenet_flags()
ctl_common.define_ctl_flags()
absl_app.run(main)
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Test the ResNet model with ImageNet data using CTL."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from tempfile import mkdtemp
import tensorflow as tf
from official.resnet import imagenet_main
from official.resnet.ctl import ctl_imagenet_main
from official.resnet.ctl import ctl_common
from official.utils.testing import integration
from official.resnet.keras import keras_common
# pylint: disable=ungrouped-imports
from tensorflow.python.eager import context
from tensorflow.python.platform import googletest
class CtlImagenetTest(googletest.TestCase):
"""Unit tests for Keras ResNet with ImageNet using CTL."""
_extra_flags = [
'-batch_size', '4',
'-train_steps', '4',
'-use_synthetic_data', 'true'
]
_tempdir = None
def get_temp_dir(self):
if not self._tempdir:
self._tempdir = mkdtemp(dir=googletest.GetTempDir())
return self._tempdir
@classmethod
def setUpClass(cls): # pylint: disable=invalid-name
super(CtlImagenetTest, cls).setUpClass()
imagenet_main.define_imagenet_flags()
ctl_common.define_ctl_flags()
def setUp(self):
super(CtlImagenetTest, self).setUp()
imagenet_main.NUM_IMAGES['validation'] = 4
def tearDown(self):
super(CtlImagenetTest, self).tearDown()
tf.io.gfile.rmtree(self.get_temp_dir())
def test_end_to_end_no_dist_strat(self):
"""Test Keras model with 1 GPU, no distribution strategy."""
extra_flags = [
"-distribution_strategy", "off",
"-model_dir", "ctl_imagenet_no_dist_strat",
"-data_format", "channels_last",
]
extra_flags = extra_flags + self._extra_flags
integration.run_synthetic(
main=ctl_imagenet_main.run,
tmp_root=self.get_temp_dir(),
extra_flags=extra_flags
)
def test_end_to_end_2_gpu(self):
"""Test Keras model with 2 GPUs."""
num_gpus = "2"
if context.num_gpus() < 2:
num_gpus = "0"
extra_flags = [
"-num_gpus", num_gpus,
"-distribution_strategy", "default",
"-model_dir", "ctl_imagenet_2_gpu",
"-data_format", "channels_last",
]
extra_flags = extra_flags + self._extra_flags
integration.run_synthetic(
main=ctl_imagenet_main.run,
tmp_root=self.get_temp_dir(),
extra_flags=extra_flags
)
if __name__ == '__main__':
if not keras_common.is_v2_0():
tf.enable_v2_behavior()
googletest.main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment