Commit 06f22a59 authored by Zongwei Zhou's avatar Zongwei Zhou Committed by A. Unique TensorFlower
Browse files

Add Resnet50 benchmark suite that read training data from remote storage

PiperOrigin-RevId: 277082247
parent b62439d7
......@@ -78,7 +78,7 @@ class Resnet50KerasAccuracy(keras_benchmark.KerasBenchmark):
FLAGS.datasets_num_private_threads = 14
FLAGS.use_tensor_lr = True
self._run_and_report_benchmark()
def benchmark_8_gpu_amp(self):
"""Test Keras model with eager, dist_strat and 8 GPUs with automatic mixed precision."""
self._setup()
......@@ -95,7 +95,7 @@ class Resnet50KerasAccuracy(keras_benchmark.KerasBenchmark):
FLAGS.datasets_num_private_threads = 14
FLAGS.use_tensor_lr = True
self._run_and_report_benchmark()
def benchmark_8_gpu_fp16(self):
"""Test Keras model with eager, dist_strat, 8 GPUs, and fp16."""
self._setup()
......@@ -201,13 +201,14 @@ class Resnet50KerasBenchmarkBase(keras_benchmark.KerasBenchmark):
flag_methods=flag_methods,
default_flags=default_flags)
def _run_and_report_benchmark(self):
def _run_and_report_benchmark(self, skip_steps=None):
start_time_sec = time.time()
stats = resnet_imagenet_main.run(FLAGS)
wall_time_sec = time.time() - start_time_sec
# Number of logged step time entries that are excluded in performance
# report. We keep results from last 100 batches in this case.
warmup = (FLAGS.train_steps - 100) // FLAGS.log_steps
# report. We keep results from last 100 batches, or skip the steps based on
# input skip_steps.
warmup = (skip_steps or (FLAGS.train_steps - 100)) // FLAGS.log_steps
super(Resnet50KerasBenchmarkBase, self)._report_benchmark(
stats,
......@@ -513,7 +514,7 @@ class Resnet50KerasBenchmarkBase(keras_benchmark.KerasBenchmark):
FLAGS.model_dir = self._get_model_dir('benchmark_8_gpu_amp')
FLAGS.batch_size = 256 * 8 # 8 GPUs
self._run_and_report_benchmark()
def benchmark_8_gpu_tweaked(self):
"""Test Keras model with manual config tuning and 8 GPUs."""
self._setup()
......@@ -552,7 +553,7 @@ class Resnet50KerasBenchmarkBase(keras_benchmark.KerasBenchmark):
FLAGS.model_dir = self._get_model_dir('benchmark_xla_8_gpu_amp')
FLAGS.batch_size = 256 * 8 # 8 GPUs
self._run_and_report_benchmark()
def benchmark_xla_8_gpu_tweaked(self):
"""Test Keras model with manual config tuning, 8 GPUs, and XLA."""
self._setup()
......@@ -845,6 +846,29 @@ class Resnet50KerasBenchmarkReal(Resnet50KerasBenchmarkBase):
output_dir=output_dir, default_flags=def_flags)
class Resnet50KerasBenchmarkRemoteData(Resnet50KerasBenchmarkBase):
"""Resnet50 real data (stored in remote storage) benchmark tests."""
def __init__(self, output_dir=None, root_data_dir=None, **kwargs):
def_flags = {}
def_flags['skip_eval'] = True
def_flags['report_accuracy_metrics'] = False
def_flags['data_dir'] = os.path.join(root_data_dir, 'imagenet')
# Defining multiple epochs overrides the train_steps setting in benchmarks.
def_flags['train_epochs'] = 2
# Cache dataset so performance is stable after the first epoch.
def_flags['training_dataset_cache'] = True
def_flags['log_steps'] = 100
super(Resnet50KerasBenchmarkRemoteData, self).__init__(
output_dir=output_dir, default_flags=def_flags)
def _run_and_report_benchmark(self):
# skip the first epoch for performance measurement.
super(Resnet50KerasBenchmarkRemoteData,
self)._run_and_report_benchmark(skip_steps=600)
class TrivialKerasBenchmarkReal(keras_benchmark.KerasBenchmark):
"""Trivial model with real data benchmark tests."""
......
......@@ -63,7 +63,8 @@ def define_performance(num_parallel_calls=False, inter_op=False, intra_op=False,
dynamic_loss_scale=False, fp16_implementation=False,
loss_scale=False,
tf_data_experimental_slack=False, enable_xla=False,
force_v2_in_keras_compile=False):
force_v2_in_keras_compile=False,
training_dataset_cache=False):
"""Register flags for specifying performance tuning arguments.
Args:
......@@ -92,6 +93,9 @@ def define_performance(num_parallel_calls=False, inter_op=False, intra_op=False,
force_v2_in_keras_compile: Forces the use of run_distribued path even if not
using a `strategy`. This is not the same as
`tf.distribute.OneDeviceStrategy`
training_dataset_cache: Whether to cache the training dataset on workers.
Typically used to improve training performance when training data is in
remote storage and can fit into worker memory.
Returns:
A list of flags for core.py to marks as key flags.
......@@ -262,6 +266,16 @@ def define_performance(num_parallel_calls=False, inter_op=False, intra_op=False,
"map and batch from tf.data.")
)
if training_dataset_cache:
flags.DEFINE_boolean(
name="training_dataset_cache",
default=False,
help=help_wrap(
"Determines whether to cache the training dataset on workers. "
"Typically used to improve training performance when training "
"data is in remote storage and can fit into worker memory.")
)
if tf_data_experimental_slack:
flags.DEFINE_boolean(
name="tf_data_experimental_slack",
......
......@@ -298,7 +298,8 @@ def define_keras_flags(dynamic_loss_scale=True):
fp16_implementation=True,
tf_data_experimental_slack=True,
enable_xla=True,
force_v2_in_keras_compile=True)
force_v2_in_keras_compile=True,
training_dataset_cache=True)
flags_core.define_image()
flags_core.define_benchmark()
flags_core.define_distribution()
......@@ -327,8 +328,8 @@ def define_keras_flags(dynamic_loss_scale=True):
flags.DEFINE_integer(
name='train_steps', default=None,
help='The number of steps to run for training. If it is larger than '
'# batches per epoch, then use # batches per epoch. When this flag is '
'set, only one epoch is going to run for training.')
'# batches per epoch, then use # batches per epoch. This flag will be '
'ignored if train_epochs is set to be larger than 1. ')
flags.DEFINE_string(
name='profile_steps', default=None,
help='Save profiling data to model dir at given range of steps. The '
......
......@@ -255,7 +255,8 @@ def input_fn(is_training,
parse_record_fn=parse_record,
input_context=None,
drop_remainder=False,
tf_data_experimental_slack=False):
tf_data_experimental_slack=False,
training_dataset_cache=False):
"""Input function which provides batches for train or eval.
Args:
......@@ -272,6 +273,9 @@ def input_fn(is_training,
batches. If True, the batch dimension will be static.
tf_data_experimental_slack: Whether to enable tf.data's
`experimental_slack` option.
training_dataset_cache: Whether to cache the training dataset on workers.
Typically used to improve training performance when training data is in
remote storage and can fit into worker memory.
Returns:
A dataset that can be used for iteration.
......@@ -299,6 +303,11 @@ def input_fn(is_training,
cycle_length=10,
num_parallel_calls=tf.data.experimental.AUTOTUNE)
if is_training and training_dataset_cache:
# Improve training performance when training data is in remote storage and
# can fit into worker memory.
dataset = dataset.cache()
return process_record_dataset(
dataset=dataset,
is_training=is_training,
......
......@@ -128,6 +128,7 @@ def run(flags_obj):
dtype=dtype,
drop_remainder=drop_remainder,
tf_data_experimental_slack=flags_obj.tf_data_experimental_slack,
training_dataset_cache=flags_obj.training_dataset_cache,
)
eval_input_dataset = None
......@@ -198,7 +199,8 @@ def run(flags_obj):
imagenet_preprocessing.NUM_IMAGES['train'] // flags_obj.batch_size)
train_epochs = flags_obj.train_epochs
if flags_obj.train_steps:
# if mutliple epochs, ignore the train_steps flag.
if train_epochs <= 1 and flags_obj.train_steps:
train_steps = min(flags_obj.train_steps, train_steps)
train_epochs = 1
......@@ -254,7 +256,7 @@ def run(flags_obj):
def define_imagenet_keras_flags():
common.define_keras_flags()
flags_core.set_defaults(train_epochs=90)
flags_core.set_defaults()
flags.adopt_module_key_flags(common)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment