Commit ff6c3b1e authored by Ayush Dubey's avatar Ayush Dubey Committed by A. Unique TensorFlower
Browse files

Add multi-worker benchmarks to Keras ResNet model.

Also add `worker_hosts` and `task_index` flags.  These flags enable running the
model over multiple hosts by passing the cluster information via command line.

Setting `TF_CONFIG` will continue to work.

PiperOrigin-RevId: 263825245
parent 24d9f295
...@@ -733,6 +733,7 @@ def define_resnet_flags(resnet_size_choices=None, dynamic_loss_scale=False, ...@@ -733,6 +733,7 @@ def define_resnet_flags(resnet_size_choices=None, dynamic_loss_scale=False,
tf_data_experimental_slack=True) tf_data_experimental_slack=True)
flags_core.define_image() flags_core.define_image()
flags_core.define_benchmark() flags_core.define_benchmark()
flags_core.define_distribution()
flags.adopt_module_key_flags(flags_core) flags.adopt_module_key_flags(flags_core)
flags.DEFINE_enum( flags.DEFINE_enum(
...@@ -768,16 +769,6 @@ def define_resnet_flags(resnet_size_choices=None, dynamic_loss_scale=False, ...@@ -768,16 +769,6 @@ def define_resnet_flags(resnet_size_choices=None, dynamic_loss_scale=False,
'If True, uses `tf.estimator.train_and_evaluate` for the training ' 'If True, uses `tf.estimator.train_and_evaluate` for the training '
'and evaluation loop, instead of separate calls to `classifier.train ' 'and evaluation loop, instead of separate calls to `classifier.train '
'and `classifier.evaluate`, which is the default behavior.')) 'and `classifier.evaluate`, which is the default behavior.'))
flags.DEFINE_string(
name='worker_hosts', default=None,
help=flags_core.help_wrap(
'Comma-separated list of worker ip:port pairs for running '
'multi-worker models with DistributionStrategy. The user would '
'start the program on each host with identical value for this flag.'))
flags.DEFINE_integer(
name='task_index', default=-1,
help=flags_core.help_wrap('If multi-worker training, the task_index of '
'this worker.'))
flags.DEFINE_bool( flags.DEFINE_bool(
name='enable_lars', default=False, name='enable_lars', default=False,
help=flags_core.help_wrap( help=flags_core.help_wrap(
......
...@@ -262,6 +262,7 @@ def define_keras_flags(dynamic_loss_scale=True): ...@@ -262,6 +262,7 @@ def define_keras_flags(dynamic_loss_scale=True):
force_v2_in_keras_compile=True) force_v2_in_keras_compile=True)
flags_core.define_image() flags_core.define_image()
flags_core.define_benchmark() flags_core.define_benchmark()
flags_core.define_distribution()
flags.adopt_module_key_flags(flags_core) flags.adopt_module_key_flags(flags_core)
flags.DEFINE_boolean(name='enable_eager', default=False, help='Enable eager?') flags.DEFINE_boolean(name='enable_eager', default=False, help='Enable eager?')
......
...@@ -896,5 +896,96 @@ class TrivialKerasBenchmarkReal(keras_benchmark.KerasBenchmark): ...@@ -896,5 +896,96 @@ class TrivialKerasBenchmarkReal(keras_benchmark.KerasBenchmark):
log_steps=FLAGS.log_steps) log_steps=FLAGS.log_steps)
class Resnet50MultiWorkerKerasBenchmark(Resnet50KerasBenchmarkBase):
"""Resnet50 distributed benchmark tests with multiple workers."""
def __init__(self, output_dir=None, default_flags=None):
super(Resnet50MultiWorkerKerasBenchmark, self).__init__(
output_dir=output_dir, default_flags=default_flags)
def _benchmark_common(self, eager, num_workers, all_reduce_alg):
"""Common to all benchmarks in this class."""
self._setup()
num_gpus = 8
FLAGS.num_gpus = num_gpus
FLAGS.dtype = 'fp16'
FLAGS.enable_eager = eager
FLAGS.enable_xla = False
FLAGS.distribution_strategy = 'multi_worker_mirrored'
FLAGS.use_tensor_lr = True
FLAGS.tf_gpu_thread_mode = 'gpu_private'
FLAGS.model_dir = self._get_model_dir(
'benchmark_graph_8_gpu_{}_worker_fp16_{}_tweaked'.format(
num_workers, all_reduce_alg))
FLAGS.batch_size = 256 * num_gpus * num_workers
FLAGS.all_reduce_alg = all_reduce_alg
self._run_and_report_benchmark()
def benchmark_graph_8_gpu_1_worker_fp16_ring_tweaked(self):
"""Legacy graph, 8 GPUs per worker, 1 worker, fp16, ring all-reduce."""
self._benchmark_common(eager=False, num_workers=1, all_reduce_alg='ring')
def benchmark_graph_8_gpu_1_worker_fp16_nccl_tweaked(self):
"""Legacy graph, 8 GPUs per worker, 1 worker, fp16, nccl all-reduce."""
self._benchmark_common(eager=False, num_workers=1, all_reduce_alg='nccl')
def benchmark_graph_8_gpu_2_workers_fp16_ring_tweaked(self):
"""Legacy graph, 8 GPUs per worker, 2 workers, fp16, ring all-reduce."""
self._benchmark_common(eager=False, num_workers=2, all_reduce_alg='ring')
def benchmark_graph_8_gpu_2_workers_fp16_nccl_tweaked(self):
"""Legacy graph, 8 GPUs per worker, 2 workers, fp16, nccl all-reduce."""
self._benchmark_common(eager=False, num_workers=2, all_reduce_alg='nccl')
def benchmark_graph_8_gpu_8_workers_fp16_ring_tweaked(self):
"""Legacy graph, 8 GPUs per worker, 8 workers, fp16, ring all-reduce."""
self._benchmark_common(eager=False, num_workers=8, all_reduce_alg='ring')
def benchmark_graph_8_gpu_8_workers_fp16_nccl_tweaked(self):
"""Legacy graph, 8 GPUs per worker, 8 workers, fp16, nccl all-reduce."""
self._benchmark_common(eager=False, num_workers=8, all_reduce_alg='nccl')
def benchmark_eager_8_gpu_1_worker_fp16_ring_tweaked(self):
"""Eager, 8 GPUs per worker, 1 worker, fp16, ring all-reduce."""
self._benchmark_common(eager=True, num_workers=1, all_reduce_alg='ring')
def benchmark_eager_8_gpu_1_worker_fp16_nccl_tweaked(self):
"""Eager, 8 GPUs per worker, 1 worker, fp16, nccl all-reduce."""
self._benchmark_common(eager=True, num_workers=1, all_reduce_alg='nccl')
def benchmark_eager_8_gpu_2_workers_fp16_ring_tweaked(self):
"""Eager, 8 GPUs per worker, 2 workers, fp16, ring all-reduce."""
self._benchmark_common(eager=True, num_workers=2, all_reduce_alg='ring')
def benchmark_eager_8_gpu_2_workers_fp16_nccl_tweaked(self):
"""Eager, 8 GPUs per worker, 2 workers, fp16, nccl all-reduce."""
self._benchmark_common(eager=True, num_workers=2, all_reduce_alg='nccl')
def benchmark_eager_8_gpu_8_workers_fp16_ring_tweaked(self):
"""Eager, 8 GPUs per worker, 8 workers, fp16, ring all-reduce."""
self._benchmark_common(eager=True, num_workers=8, all_reduce_alg='ring')
def benchmark_eager_8_gpu_8_workers_fp16_nccl_tweaked(self):
"""Eager, 8 GPUs per worker, 8 workers, fp16, nccl all-reduce."""
self._benchmark_common(eager=True, num_workers=8, all_reduce_alg='nccl')
class Resnet50MultiWorkerKerasBenchmarkSynth(Resnet50KerasBenchmarkBase):
"""Resnet50 multi-worker synthetic benchmark tests."""
def __init__(self, output_dir=None, root_data_dir=None, **kwargs):
def_flags = {}
def_flags['skip_eval'] = True
def_flags['report_accuracy_metrics'] = False
def_flags['use_synthetic_data'] = True
def_flags['train_steps'] = 110
def_flags['log_steps'] = 10
super(Resnet50MultiWorkerKerasBenchmarkSynth, self).__init__(
output_dir=output_dir, default_flags=def_flags)
if __name__ == '__main__': if __name__ == '__main__':
tf.test.main() tf.test.main()
...@@ -105,10 +105,14 @@ def run(flags_obj): ...@@ -105,10 +105,14 @@ def run(flags_obj):
if tf.test.is_built_with_cuda() else 'channels_last') if tf.test.is_built_with_cuda() else 'channels_last')
tf.keras.backend.set_image_data_format(data_format) tf.keras.backend.set_image_data_format(data_format)
# Configures cluster spec for distribution strategy.
num_workers = distribution_utils.configure_cluster(flags_obj.worker_hosts,
flags_obj.task_index)
strategy = distribution_utils.get_distribution_strategy( strategy = distribution_utils.get_distribution_strategy(
distribution_strategy=flags_obj.distribution_strategy, distribution_strategy=flags_obj.distribution_strategy,
num_gpus=flags_obj.num_gpus, num_gpus=flags_obj.num_gpus,
num_workers=distribution_utils.configure_cluster(), num_workers=num_workers,
all_reduce_alg=flags_obj.all_reduce_alg, all_reduce_alg=flags_obj.all_reduce_alg,
num_packs=flags_obj.num_packs) num_packs=flags_obj.num_packs)
......
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Flags related to distributed execution."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from absl import flags
import tensorflow as tf
from official.utils.flags._conventions import help_wrap
def define_distribution(worker_hosts=True, task_index=True):
"""Register distributed execution flags.
Args:
worker_hosts: Create a flag for specifying comma-separated list of workers.
task_index: Create a flag for specifying index of task.
Returns:
A list of flags for core.py to marks as key flags.
"""
key_flags = []
if worker_hosts:
flags.DEFINE_string(
name='worker_hosts', default=None,
help=help_wrap(
'Comma-separated list of worker ip:port pairs for running '
'multi-worker models with DistributionStrategy. The user would '
'start the program on each host with identical value for this '
'flag.'))
if task_index:
flags.DEFINE_integer(
name='task_index', default=-1,
help=help_wrap('If multi-worker training, the task_index of this '
'worker.'))
return key_flags
...@@ -32,6 +32,7 @@ from official.utils.flags import _base ...@@ -32,6 +32,7 @@ from official.utils.flags import _base
from official.utils.flags import _benchmark from official.utils.flags import _benchmark
from official.utils.flags import _conventions from official.utils.flags import _conventions
from official.utils.flags import _device from official.utils.flags import _device
from official.utils.flags import _distribution
from official.utils.flags import _misc from official.utils.flags import _misc
from official.utils.flags import _performance from official.utils.flags import _performance
...@@ -77,6 +78,8 @@ define_benchmark = register_key_flags_in_core(_benchmark.define_benchmark) ...@@ -77,6 +78,8 @@ define_benchmark = register_key_flags_in_core(_benchmark.define_benchmark)
define_device = register_key_flags_in_core(_device.define_device) define_device = register_key_flags_in_core(_device.define_device)
define_image = register_key_flags_in_core(_misc.define_image) define_image = register_key_flags_in_core(_misc.define_image)
define_performance = register_key_flags_in_core(_performance.define_performance) define_performance = register_key_flags_in_core(_performance.define_performance)
define_distribution = register_key_flags_in_core(
_distribution.define_distribution)
help_wrap = _conventions.help_wrap help_wrap = _conventions.help_wrap
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment