Unverified Commit 79b57a3f authored by Yuefeng Zhou's avatar Yuefeng Zhou Committed by GitHub
Browse files

Add a flag to specify distribution strategies. (#6185)

* Add a flag to specify distribution strategies.

* Fix a small error.

* Address comments.

* Address comments.

* Fix typos.
parent f788046c
......@@ -88,6 +88,7 @@ def run_keras_model_benchmark(_):
# Use distribution strategy
if FLAGS.dist_strat:
distribution = distribution_utils.get_distribution_strategy(
distribution_strategy=FLAGS.distribution_strategy,
num_gpus=num_gpus)
elif num_gpus > 1:
# Run with multi_gpu_model
......
......@@ -165,7 +165,9 @@ def run_mnist(flags_obj):
allow_soft_placement=True)
distribution_strategy = distribution_utils.get_distribution_strategy(
flags_core.get_num_gpus(flags_obj), flags_obj.all_reduce_alg)
distribution_strategy=flags_obj.distribution_strategy,
num_gpus=flags_core.get_num_gpus(flags_obj),
all_reduce_alg=flags_obj.all_reduce_alg)
run_config = tf.estimator.RunConfig(
train_distribute=distribution_strategy, session_config=session_config)
......
......@@ -93,7 +93,7 @@ class Resnet56KerasAccuracy(keras_benchmark.KerasBenchmark):
def benchmark_graph_1_gpu_no_dist_strat(self):
"""Test keras based model with Keras fit but not distribution strategies."""
self._setup()
FLAGS.turn_off_distribution_strategy = True
FLAGS.distribution_strategy = 'off'
FLAGS.num_gpus = 1
FLAGS.data_dir = DATA_DIR
FLAGS.batch_size = 128
......@@ -144,7 +144,7 @@ class Resnet56KerasBenchmarkBase(keras_benchmark.KerasBenchmark):
self._setup()
FLAGS.num_gpus = 1
FLAGS.enable_eager = True
FLAGS.turn_off_distribution_strategy = True
FLAGS.distribution_strategy = 'off'
FLAGS.model_dir = self._get_model_dir('benchmark_1_gpu_no_dist_strat')
FLAGS.batch_size = 128
self._run_and_report_benchmark()
......@@ -153,7 +153,7 @@ class Resnet56KerasBenchmarkBase(keras_benchmark.KerasBenchmark):
self._setup()
FLAGS.num_gpus = 1
FLAGS.enable_eager = False
FLAGS.turn_off_distribution_strategy = True
FLAGS.distribution_strategy = 'off'
FLAGS.model_dir = self._get_model_dir('benchmark_graph_1_gpu_no_dist_strat')
FLAGS.batch_size = 128
self._run_and_report_benchmark()
......@@ -162,7 +162,7 @@ class Resnet56KerasBenchmarkBase(keras_benchmark.KerasBenchmark):
self._setup()
FLAGS.num_gpus = 1
FLAGS.enable_eager = True
FLAGS.turn_off_distribution_strategy = False
FLAGS.distribution_strategy = 'default'
FLAGS.model_dir = self._get_model_dir('benchmark_1_gpu')
FLAGS.batch_size = 128
self._run_and_report_benchmark()
......@@ -171,7 +171,7 @@ class Resnet56KerasBenchmarkBase(keras_benchmark.KerasBenchmark):
self._setup()
FLAGS.num_gpus = 1
FLAGS.enable_eager = False
FLAGS.turn_off_distribution_strategy = False
FLAGS.distribution_strategy = 'default'
FLAGS.model_dir = self._get_model_dir('benchmark_graph_1_gpu')
FLAGS.batch_size = 128
self._run_and_report_benchmark()
......@@ -180,7 +180,7 @@ class Resnet56KerasBenchmarkBase(keras_benchmark.KerasBenchmark):
self._setup()
FLAGS.num_gpus = 2
FLAGS.enable_eager = True
FLAGS.turn_off_distribution_strategy = False
FLAGS.distribution_strategy = 'default'
FLAGS.model_dir = self._get_model_dir('benchmark_2_gpu')
FLAGS.batch_size = 128 * 2 # 2 GPUs
self._run_and_report_benchmark()
......@@ -189,7 +189,7 @@ class Resnet56KerasBenchmarkBase(keras_benchmark.KerasBenchmark):
self._setup()
FLAGS.num_gpus = 2
FLAGS.enable_eager = False
FLAGS.turn_off_distribution_strategy = False
FLAGS.distribution_strategy = 'default'
FLAGS.model_dir = self._get_model_dir('benchmark_graph_2_gpu')
FLAGS.batch_size = 128 * 2 # 2 GPUs
self._run_and_report_benchmark()
......
......@@ -138,8 +138,8 @@ def run(flags_obj):
parse_record_fn=parse_record_keras)
strategy = distribution_utils.get_distribution_strategy(
num_gpus=flags_obj.num_gpus,
turn_off_distribution_strategy=flags_obj.turn_off_distribution_strategy)
distribution_strategy=flags_obj.distribution_strategy,
num_gpus=flags_obj.num_gpus)
strategy_scope = keras_common.get_strategy_scope(strategy)
......
......@@ -112,7 +112,7 @@ class Resnet50KerasBenchmarkBase(keras_benchmark.KerasBenchmark):
FLAGS.num_gpus = 1
FLAGS.enable_eager = True
FLAGS.turn_off_distribution_strategy = True
FLAGS.distribution_strategy = 'off'
FLAGS.model_dir = self._get_model_dir('benchmark_1_gpu_no_dist_strat')
FLAGS.batch_size = 128
self._run_and_report_benchmark()
......@@ -122,7 +122,7 @@ class Resnet50KerasBenchmarkBase(keras_benchmark.KerasBenchmark):
FLAGS.num_gpus = 1
FLAGS.enable_eager = False
FLAGS.turn_off_distribution_strategy = True
FLAGS.distribution_strategy = 'off'
FLAGS.model_dir = self._get_model_dir('benchmark_graph_1_gpu_no_dist_strat')
FLAGS.batch_size = 128
self._run_and_report_benchmark()
......@@ -132,7 +132,7 @@ class Resnet50KerasBenchmarkBase(keras_benchmark.KerasBenchmark):
FLAGS.num_gpus = 1
FLAGS.enable_eager = True
FLAGS.turn_off_distribution_strategy = False
FLAGS.distribution_strategy = 'default'
FLAGS.model_dir = self._get_model_dir('benchmark_1_gpu')
FLAGS.batch_size = 128
self._run_and_report_benchmark()
......@@ -142,7 +142,7 @@ class Resnet50KerasBenchmarkBase(keras_benchmark.KerasBenchmark):
FLAGS.num_gpus = 1
FLAGS.enable_eager = False
FLAGS.turn_off_distribution_strategy = False
FLAGS.distribution_strategy = 'default'
FLAGS.model_dir = self._get_model_dir('benchmark_graph_1_gpu')
FLAGS.batch_size = 128
self._run_and_report_benchmark()
......@@ -152,7 +152,7 @@ class Resnet50KerasBenchmarkBase(keras_benchmark.KerasBenchmark):
FLAGS.num_gpus = 8
FLAGS.enable_eager = True
FLAGS.turn_off_distribution_strategy = False
FLAGS.distribution_strategy = 'default'
FLAGS.model_dir = self._get_model_dir('benchmark_8_gpu')
FLAGS.batch_size = 128 * 8 # 8 GPUs
self._run_and_report_benchmark()
......@@ -162,7 +162,7 @@ class Resnet50KerasBenchmarkBase(keras_benchmark.KerasBenchmark):
FLAGS.num_gpus = 8
FLAGS.enable_eager = False
FLAGS.turn_off_distribution_strategy = False
FLAGS.distribution_strategy = 'default'
FLAGS.model_dir = self._get_model_dir('benchmark_graph_8_gpu')
FLAGS.batch_size = 128 * 8 # 8 GPUs
self._run_and_report_benchmark()
......
......@@ -127,8 +127,8 @@ def run(flags_obj):
parse_record_fn=parse_record_keras)
strategy = distribution_utils.get_distribution_strategy(
num_gpus=flags_obj.num_gpus,
turn_off_distribution_strategy=flags_obj.turn_off_distribution_strategy)
distribution_strategy=flags_obj.distribution_strategy,
num_gpus=flags_obj.num_gpus)
strategy_scope = keras_common.get_strategy_scope(strategy)
......
......@@ -471,7 +471,9 @@ def resnet_main(
allow_soft_placement=True)
distribution_strategy = distribution_utils.get_distribution_strategy(
flags_core.get_num_gpus(flags_obj), flags_obj.all_reduce_alg)
distribution_strategy=flags_obj.distribution_strategy,
num_gpus=flags_core.get_num_gpus(flags_obj),
all_reduce_alg=flags_obj.all_reduce_alg)
# Creates a `RunConfig` that checkpoints every 24 hours which essentially
# results in checkpoints determined only by `epochs_between_evals`.
......@@ -632,10 +634,6 @@ def define_resnet_flags(resnet_size_choices=None):
'the expense of image resize/cropping being done as part of model '
'inference. Note, this flag only applies to ImageNet and cannot '
'be used for CIFAR.'))
flags.DEFINE_boolean(
name='turn_off_distribution_strategy', default=False,
help=flags_core.help_wrap('Set to True to not use distribution '
'strategies.'))
choice_kwargs = dict(
name='resnet_size', short_name='rs', default='50',
help=flags_core.help_wrap('The size of the ResNet model to use.'))
......
......@@ -495,7 +495,9 @@ def construct_estimator(flags_obj, params, schedule_manager):
"""
if not params["use_tpu"]:
distribution_strategy = distribution_utils.get_distribution_strategy(
flags_core.get_num_gpus(flags_obj), flags_obj.all_reduce_alg)
distribution_strategy=flags_obj.distribution_strategy,
num_gpus=flags_core.get_num_gpus(flags_obj),
all_reduce_alg=flags_obj.all_reduce_alg)
return tf.estimator.Estimator(
model_fn=model_fn, model_dir=flags_obj.model_dir, params=params,
config=tf.estimator.RunConfig(train_distribute=distribution_strategy))
......
......@@ -27,7 +27,8 @@ from official.utils.logs import hooks_helper
def define_base(data_dir=True, model_dir=True, clean=True, train_epochs=True,
epochs_between_evals=True, stop_threshold=True, batch_size=True,
num_gpu=True, hooks=True, export_dir=True):
num_gpu=True, hooks=True, export_dir=True,
distribution_strategy=True):
"""Register base flags.
Args:
......@@ -41,7 +42,8 @@ def define_base(data_dir=True, model_dir=True, clean=True, train_epochs=True,
num_gpu: Create a flag to specify the number of GPUs used.
hooks: Create a flag to specify hooks for logging.
export_dir: Create a flag to specify where a SavedModel should be exported.
distribution_strategy: Create a flag to specify which Distribution Strategy
to use.
Returns:
A list of flags for core.py to marks as key flags.
"""
......@@ -128,6 +130,18 @@ def define_base(data_dir=True, model_dir=True, clean=True, train_epochs=True,
)
key_flags.append("export_dir")
if distribution_strategy:
flags.DEFINE_string(
name="distribution_strategy", short_name="ds", default="default",
help=help_wrap("The Distribution Strategy to use for training. "
"Accepted values are 'off', 'default', 'one_device', "
"'mirrored', 'parameter_server', 'collective', "
"case insensitive. 'off' means not to use "
"Distribution Strategy; 'default' means to choose "
"from `MirroredStrategy` or `OneDeviceStrategy` "
"according to the number of GPUs.")
)
return key_flags
......
......@@ -23,43 +23,55 @@ import string
import tensorflow as tf
def get_distribution_strategy(num_gpus,
all_reduce_alg=None,
turn_off_distribution_strategy=False):
def get_distribution_strategy(distribution_strategy="default",
num_gpus=0,
all_reduce_alg=None):
"""Return a DistributionStrategy for running the model.
Args:
distribution_strategy: a string specify which distribution strategy to use.
Accepted values are 'off', 'default', 'one_device', 'mirrored',
'parameter_server', 'collective', case insensitive. 'off' means not to use
Distribution Strategy; 'default' means to choose from `MirroredStrategy`
or `OneDeviceStrategy` according to the number of GPUs."
num_gpus: Number of GPUs to run this model.
all_reduce_alg: Specify which algorithm to use when performing all-reduce.
See tf.contrib.distribute.AllReduceCrossDeviceOps for available
algorithms. If None, DistributionStrategy will choose based on device
topology.
turn_off_distribution_strategy: when set to True, do not use any
distribution strategy. Note that when it is True, and num_gpus is
larger than 1, it will raise a ValueError.
all_reduce_alg: Optional. Specify which algorithm to use when performing
all-reduce. See tf.contrib.distribute.AllReduceCrossDeviceOps for
available algorithms. If None, DistributionStrategy will choose based on
device topology.
Returns:
tf.contrib.distribute.DistibutionStrategy object.
tf.distribute.DistibutionStrategy object.
Raises:
ValueError: if turn_off_distribution_strategy is True and num_gpus is
larger than 1
ValueError: if `distribution_strategy` is 'off' or 'one_device' and
`num_gpus` is larger than 1; or `num_gpus` is negative.
"""
if num_gpus == 0:
if turn_off_distribution_strategy:
return None
if num_gpus < 0:
raise ValueError("`num_gpus` can not be negative.")
distribution_strategy = distribution_strategy.lower()
if distribution_strategy == "off":
if num_gpus > 1:
raise ValueError("When {} GPUs are specified, distribution_strategy flag "
"cannot be set to 'off'.".format(num_gpus))
return None
if (distribution_strategy == "one_device" or
(distribution_strategy == "default" and num_gpus <= 1)):
if num_gpus == 0:
return tf.contrib.distribute.OneDeviceStrategy("device:CPU:0")
else:
return tf.contrib.distribute.OneDeviceStrategy('device:CPU:0')
elif num_gpus == 1:
if turn_off_distribution_strategy:
return None
if num_gpus > 1:
raise ValueError("`OneDeviceStrategy` can not be used for more than "
"one device.")
return tf.contrib.distribute.OneDeviceStrategy("device:GPU:0")
if distribution_strategy in ("mirrored", "default"):
if num_gpus == 0:
assert distribution_strategy == "mirrored"
devices = ["device:CPU:0"]
else:
return tf.contrib.distribute.OneDeviceStrategy('device:GPU:0')
elif turn_off_distribution_strategy:
raise ValueError('When {} GPUs are specified, '
'turn_off_distribution_strategy flag cannot be set to'
'True.'.format(num_gpus))
else: # num_gpus > 1 and not turn_off_distribution_strategy
devices = ['device:GPU:%d' % i for i in range(num_gpus)]
devices = ["device:GPU:%d" % i for i in range(num_gpus)]
if all_reduce_alg:
return tf.distribute.MirroredStrategy(
devices=devices,
......@@ -68,6 +80,17 @@ def get_distribution_strategy(num_gpus,
else:
return tf.distribute.MirroredStrategy(devices=devices)
if distribution_strategy == "collective":
return tf.contrib.distribute.CollectiveAllReduceStrategy(
num_gpus_per_worker=num_gpus)
if distribution_strategy == "parameter_server":
return tf.contrib.distribute.ParameterServerStrategy(
num_gpus_per_worker=num_gpus)
raise ValueError(
"Unrecognized Distribution Strategy: %r" % distribution_strategy)
def per_device_batch_size(batch_size, num_gpus):
"""For multi-gpu, batch-size must be a multiple of the number of GPUs.
......
......@@ -26,24 +26,30 @@ from official.utils.misc import distribution_utils
class GetDistributionStrategyTest(tf.test.TestCase):
"""Tests for get_distribution_strategy."""
def test_one_device_strategy_cpu(self):
ds = distribution_utils.get_distribution_strategy(0)
ds = distribution_utils.get_distribution_strategy(num_gpus=0)
self.assertEquals(ds.num_replicas_in_sync, 1)
self.assertEquals(len(ds.extended.worker_devices), 1)
self.assertIn('CPU', ds.extended.worker_devices[0])
def test_one_device_strategy_gpu(self):
ds = distribution_utils.get_distribution_strategy(1)
ds = distribution_utils.get_distribution_strategy(num_gpus=1)
self.assertEquals(ds.num_replicas_in_sync, 1)
self.assertEquals(len(ds.extended.worker_devices), 1)
self.assertIn('GPU', ds.extended.worker_devices[0])
def test_mirrored_strategy(self):
ds = distribution_utils.get_distribution_strategy(5)
ds = distribution_utils.get_distribution_strategy(num_gpus=5)
self.assertEquals(ds.num_replicas_in_sync, 5)
self.assertEquals(len(ds.extended.worker_devices), 5)
for device in ds.extended.worker_devices:
self.assertIn('GPU', device)
def test_override_strategy(self):
ds = distribution_utils.get_distribution_strategy(
distribution_strategy='collective', num_gpus=2)
self.assertTrue(
isinstance(ds, tf.contrib.distribute.CollectiveAllReduceStrategy))
class PerDeviceBatchSizeTest(tf.test.TestCase):
"""Tests for per_device_batch_size."""
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment