Unverified Commit 6aa6bac5 authored by Rachel Lim's avatar Rachel Lim Committed by GitHub
Browse files

Adds keras imagenet benchmarks which use tf.data's `experimental_slack` option. (#6744)

* Added 'tfdata_exp' version of all benchmarks which set
FLAGS.tf_data_experimental_slack = True. Renamed
`data_prefetch_with_slack` to `data_delay_prefetch` (haoyu's change)
to make the names more distinct.

* Add flag to resnet input pipeline and surface through
keras_imagenet_main.py
parent aecf5d02
......@@ -168,7 +168,9 @@ def input_fn(is_training,
num_parallel_batches=1,
parse_record_fn=parse_record,
input_context=None,
drop_remainder=False):
drop_remainder=False,
tf_data_experimental_slack=False,
):
"""Input function which provides batches for train or eval.
Args:
......@@ -184,6 +186,8 @@ def input_fn(is_training,
`tf.distribute.Strategy`.
drop_remainder: A boolean indicates whether to drop the remainder of the
batches. If True, the batch dimension will be static.
tf_data_experimental_slack: Whether to enable tf.data's
`experimental_slack` option.
Returns:
A dataset that can be used for iteration.
......@@ -221,7 +225,8 @@ def input_fn(is_training,
dtype=dtype,
datasets_num_private_threads=datasets_num_private_threads,
num_parallel_batches=num_parallel_batches,
drop_remainder=drop_remainder
drop_remainder=drop_remainder,
tf_data_experimental_slack=tf_data_experimental_slack,
)
......
......@@ -358,7 +358,7 @@ def define_keras_flags():
'Note that profiler has a non-trivial performance overhead, and the '
'output file can be gigantic if profiling many steps.')
flags.DEFINE_boolean(
name='data_prefetch_with_slack', default=False,
name='data_delay_prefetch', default=False,
help='Add a small delay in tf.data prefetch to prioritize memory copy of '
'other tensors over the data minibatch for the (T+1)th step. It should '
'help improve performance using EagerIterator and function. The codepath '
......@@ -429,7 +429,7 @@ def is_v2_0():
return tf.__version__.startswith('2')
def data_prefetch_with_slack():
def data_delay_prefetch():
"""Use unstable code for perf tuning purposes."""
if not FLAGS.use_synthetic_data:
_monkey_patch_org_create_device_dataset()
......
......@@ -281,7 +281,22 @@ class Resnet50KerasBenchmarkBase(keras_benchmark.KerasBenchmark):
FLAGS.batch_size = 256
FLAGS.use_tensor_lr = True
FLAGS.tf_gpu_thread_mode = 'gpu_private'
FLAGS.data_prefetch_with_slack = True
FLAGS.data_delay_prefetch = True
self._run_and_report_benchmark()
def benchmark_xla_1_gpu_fp16_slack(self):
"""Test Keras model with XLA, 1 GPU, fp16, and tf.data's experimental_slack
functionality."""
self._setup()
FLAGS.num_gpus = 1
FLAGS.enable_eager = True
FLAGS.enable_xla = True
FLAGS.distribution_strategy = 'default'
FLAGS.model_dir = self._get_model_dir('benchmark_xla_1_gpu_fp16_slack')
FLAGS.dtype = 'fp16'
FLAGS.batch_size = 256
FLAGS.tf_data_experimental_slack = True
self._run_and_report_benchmark()
def benchmark_xla_1_gpu_fp16_dynamic(self):
......@@ -364,6 +379,23 @@ class Resnet50KerasBenchmarkBase(keras_benchmark.KerasBenchmark):
FLAGS.tf_gpu_thread_mode = 'gpu_private'
self._run_and_report_benchmark()
def benchmark_graph_xla_1_gpu_fp16_slack(self):
"""Test Keras model in legacy graph mode with 1 GPU, fp16, XLA, and
tf.data's experimental_slack functionality.
"""
self._setup()
FLAGS.num_gpus = 1
FLAGS.enable_eager = False
FLAGS.enable_xla = True
FLAGS.distribution_strategy = 'default'
FLAGS.model_dir = self._get_model_dir(
'benchmark_graph_xla_1_gpu_fp16_slack')
FLAGS.dtype = 'fp16'
FLAGS.batch_size = 256
FLAGS.tf_data_experimental_slack = True
self._run_and_report_benchmark()
def benchmark_8_gpu(self):
"""Test Keras model with 8 GPUs."""
self._setup()
......@@ -398,7 +430,19 @@ class Resnet50KerasBenchmarkBase(keras_benchmark.KerasBenchmark):
FLAGS.batch_size = 128 * 8 # 8 GPUs
FLAGS.use_tensor_lr = True
FLAGS.datasets_num_private_threads = 14
FLAGS.data_prefetch_with_slack = True
FLAGS.data_delay_prefetch = True
self._run_and_report_benchmark()
def benchmark_8_gpu_slack(self):
"""Test Keras model with tf.data's experimental_slack and 8 GPUs."""
self._setup()
FLAGS.num_gpus = 8
FLAGS.enable_eager = True
FLAGS.distribution_strategy = 'default'
FLAGS.model_dir = self._get_model_dir('benchmark_8_gpu_slack')
FLAGS.batch_size = 128 * 8 # 8 GPUs
FLAGS.tf_data_experimental_slack = True
self._run_and_report_benchmark()
def benchmark_xla_8_gpu(self):
......@@ -426,7 +470,7 @@ class Resnet50KerasBenchmarkBase(keras_benchmark.KerasBenchmark):
self._run_and_report_benchmark()
def benchmark_8_gpu_fp16_tweaked(self):
"""Test Keras model with 8 GPUs and fp16."""
"""Test Keras model with 8 GPUs, fp16, and manual config tuning."""
self._setup()
FLAGS.num_gpus = 8
......@@ -437,11 +481,13 @@ class Resnet50KerasBenchmarkBase(keras_benchmark.KerasBenchmark):
FLAGS.batch_size = 256 * 8 # 8 GPUs
FLAGS.use_tensor_lr = True
FLAGS.tf_gpu_thread_mode = 'gpu_private'
FLAGS.data_prefetch_with_slack = True
FLAGS.data_delay_prefetch = True
self._run_and_report_benchmark()
def benchmark_8_gpu_fp16_dynamic_tweaked(self):
"""Test Keras model with 8 GPUs, fp16, and dynamic loss scaling."""
"""Test Keras model with 8 GPUs, fp16, dynamic loss scaling, and manual
config tuning.
"""
self._setup()
FLAGS.num_gpus = 8
......@@ -454,7 +500,7 @@ class Resnet50KerasBenchmarkBase(keras_benchmark.KerasBenchmark):
FLAGS.loss_scale = 'dynamic'
FLAGS.use_tensor_lr = True
FLAGS.tf_gpu_thread_mode = 'gpu_private'
FLAGS.data_prefetch_with_slack = True
FLAGS.data_delay_prefetch = True
self._run_and_report_benchmark()
def benchmark_xla_8_gpu_fp16(self):
......@@ -483,7 +529,23 @@ class Resnet50KerasBenchmarkBase(keras_benchmark.KerasBenchmark):
FLAGS.batch_size = 256 * 8 # 8 GPUs
FLAGS.use_tensor_lr = True
# FLAGS.tf_gpu_thread_mode = 'gpu_private'
FLAGS.data_prefetch_with_slack = True
FLAGS.data_delay_prefetch = True
self._run_and_report_benchmark()
def benchmark_xla_8_gpu_fp16_slack(self):
"""Test Keras model with tf.data's experimental_slack functionality, XLA,
8 GPUs and fp16.
"""
self._setup()
FLAGS.num_gpus = 8
FLAGS.dtype = 'fp16'
FLAGS.enable_eager = True
FLAGS.enable_xla = True
FLAGS.distribution_strategy = 'default'
FLAGS.model_dir = self._get_model_dir('benchmark_xla_8_gpu_fp16_slack')
FLAGS.batch_size = 256 * 8 # 8 GPUs
FLAGS.tf_data_experimental_slack = True
self._run_and_report_benchmark()
def benchmark_xla_8_gpu_fp16_dynamic_tweaked(self):
......@@ -501,7 +563,7 @@ class Resnet50KerasBenchmarkBase(keras_benchmark.KerasBenchmark):
FLAGS.loss_scale = 'dynamic'
FLAGS.use_tensor_lr = True
FLAGS.tf_gpu_thread_mode = 'gpu_private'
FLAGS.data_prefetch_with_slack = True
FLAGS.data_delay_prefetch = True
self._run_and_report_benchmark()
def benchmark_xla_8_gpu_fp16_tensorboard_tweaked(self):
......@@ -518,7 +580,7 @@ class Resnet50KerasBenchmarkBase(keras_benchmark.KerasBenchmark):
FLAGS.batch_size = 256 * 8 # 8 GPUs
FLAGS.use_tensor_lr = True
FLAGS.tf_gpu_thread_mode = 'gpu_private'
FLAGS.data_prefetch_with_slack = True
FLAGS.data_delay_prefetch = True
FLAGS.enable_tensorboard = True
self._run_and_report_benchmark()
......@@ -604,6 +666,23 @@ class Resnet50KerasBenchmarkBase(keras_benchmark.KerasBenchmark):
FLAGS.tf_gpu_thread_mode = 'gpu_private'
self._run_and_report_benchmark()
def benchmark_graph_xla_8_gpu_fp16_slack(self):
"""Test Keras model in legacy graph mode with tf.data's experimental_slack
functionality, XLA, 8 GPUs and fp16.
"""
self._setup()
FLAGS.num_gpus = 8
FLAGS.dtype = 'fp16'
FLAGS.enable_eager = False
FLAGS.enable_xla = True
FLAGS.distribution_strategy = 'default'
FLAGS.model_dir = self._get_model_dir(
'benchmark_graph_xla_8_gpu_fp16_slack')
FLAGS.batch_size = 256 * 8 # 8 GPUs
FLAGS.tf_data_experimental_slack = True
self._run_and_report_benchmark()
def benchmark_graph_8_gpu_fp16_dynamic_tweaked(self):
"""Test graph Keras with config tuning, 8 GPUs and dynamic fp16."""
self._setup()
......@@ -760,7 +839,20 @@ class TrivialKerasBenchmarkReal(keras_benchmark.KerasBenchmark):
FLAGS.model_dir = self._get_model_dir('benchmark_8_gpu_tweaked')
FLAGS.batch_size = 256 * 8
FLAGS.tf_gpu_thread_mode = 'gpu_private'
FLAGS.data_prefetch_with_slack = True
FLAGS.data_delay_prefetch = True
self._run_and_report_benchmark()
def benchmark_8_gpu_slack(self):
"""Test trivial Keras model (input pipeline) with tf.data's
experimental_slack and 8 GPUs.
"""
self._setup()
FLAGS.num_gpus = 8
FLAGS.enable_eager = True
FLAGS.model_dir = self._get_model_dir('benchmark_8_gpu_slack')
FLAGS.batch_size = 256 * 8
FLAGS.tf_data_experimental_slack = True
self._run_and_report_benchmark()
def benchmark_graph_8_gpu(self):
......
......@@ -107,8 +107,8 @@ def run(flags_obj):
# Execute flag override logic for better model performance
if flags_obj.tf_gpu_thread_mode:
keras_common.set_gpu_thread_mode_and_count(flags_obj)
if flags_obj.data_prefetch_with_slack:
keras_common.data_prefetch_with_slack()
if flags_obj.data_delay_prefetch:
keras_common.data_delay_prefetch()
keras_common.set_cudnn_batchnorm_mode()
dtype = flags_core.get_tf_dtype(flags_obj)
......@@ -157,7 +157,9 @@ def run(flags_obj):
parse_record_fn=parse_record_keras,
datasets_num_private_threads=flags_obj.datasets_num_private_threads,
dtype=dtype,
drop_remainder=drop_remainder)
drop_remainder=drop_remainder,
tf_data_experimental_slack=flags_obj.tf_data_experimental_slack,
)
eval_input_dataset = None
if not flags_obj.skip_eval:
......
......@@ -28,6 +28,7 @@ import math
import multiprocessing
import os
# pylint: disable=g-bad-import-order
from absl import flags
import tensorflow as tf
......@@ -54,7 +55,9 @@ def process_record_dataset(dataset,
dtype=tf.float32,
datasets_num_private_threads=None,
num_parallel_batches=1,
drop_remainder=False):
drop_remainder=False,
tf_data_experimental_slack=False,
):
"""Given a Dataset with raw records, return an iterator over the records.
Args:
......@@ -73,6 +76,8 @@ def process_record_dataset(dataset,
num_parallel_batches: Number of parallel batches for tf.data.
drop_remainder: A boolean indicates whether to drop the remainder of the
batches. If True, the batch dimension will be static.
tf_data_experimental_slack: Whether to enable tf.data's
`experimental_slack` option.
Returns:
Dataset of (image, label) pairs ready for iteration.
......@@ -115,6 +120,11 @@ def process_record_dataset(dataset,
# on how many devices are present.
dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
if tf_data_experimental_slack:
options = tf.data.Options()
options.experimental_slack = True
dataset = dataset.with_options(options)
return dataset
......@@ -723,7 +733,9 @@ def define_resnet_flags(resnet_size_choices=None, dynamic_loss_scale=False,
datasets_num_parallel_batches=True,
dynamic_loss_scale=dynamic_loss_scale,
fp16_implementation=fp16_implementation,
loss_scale=True)
loss_scale=True,
tf_data_experimental_slack=True,
)
flags_core.define_image()
flags_core.define_benchmark()
flags.adopt_module_key_flags(flags_core)
......
......@@ -56,7 +56,8 @@ def define_performance(num_parallel_calls=True, inter_op=True, intra_op=True,
datasets_num_private_threads=False,
datasets_num_parallel_batches=False,
dynamic_loss_scale=False, fp16_implementation=False,
loss_scale=False):
loss_scale=False,
tf_data_experimental_slack=False):
"""Register flags for specifying performance tuning arguments.
Args:
......@@ -79,6 +80,8 @@ def define_performance(num_parallel_calls=True, inter_op=True, intra_op=True,
fp16_implementation: Create fp16_implementation flag.
loss_scale: Controls the loss scaling, normally for mixed-precision
training. Can only be turned on if dtype is also True.
tf_data_experimental_slack: Determines whether to enable tf.data's
`experimental_slack` option.
Returns:
A list of flags for core.py to marks as key flags.
......@@ -255,4 +258,12 @@ def define_performance(num_parallel_calls=True, inter_op=True, intra_op=True,
"map and batch from tf.data.")
)
if tf_data_experimental_slack:
flags.DEFINE_boolean(
name="tf_data_experimental_slack",
default=False,
help=help_wrap(
"Whether to enable tf.data's `experimental_slack` option.")
)
return key_flags
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment