Unverified Commit 29c9f985 authored by Katherine Wu's avatar Katherine Wu Committed by GitHub
Browse files

Transformer multi gpu, remove multi_gpu flag, distribution helper functions (#4457)

* Add DistributionStrategy to transformer model

* add num_gpu flag

* Calculate per device batch size for transformer

* remove reference to flags_core

* Add synthetic data option to transformer

* fix typo

* add import back in

* Use hierarchical copy

* address PR comments

* lint

* fix spaces

* group train op together to fix single GPU error

* Fix translate bug (sorted_keys is a dict, not a list)

* Change params to a default dict (translate.py was throwing errors because params didn't have the TPU parameters.)

* Address PR comments. Removed multi gpu flag + more

* fix lint

* fix more lints

* add todo for Synthetic dataset

* Update docs
parent e7957b7f
......@@ -24,6 +24,7 @@ import tensorflow as tf # pylint: disable=g-bad-import-order
from official.mnist import dataset
from official.utils.flags import core as flags_core
from official.utils.logs import hooks_helper
from official.utils.misc import distribution_utils
from official.utils.misc import model_helpers
......@@ -87,7 +88,7 @@ def create_model(data_format):
def define_mnist_flags():
flags_core.define_base(multi_gpu=True, num_gpu=False)
flags_core.define_base()
flags_core.define_image()
flags.adopt_module_key_flags(flags_core)
flags_core.set_defaults(data_dir='/tmp/mnist_data',
......@@ -152,53 +153,29 @@ def model_fn(features, labels, mode, params):
})
def validate_batch_size_for_multi_gpu(batch_size):
"""For multi-gpu, batch-size must be a multiple of the number of GPUs.
Note that this should eventually be handled by replicate_model_fn
directly. Multi-GPU support is currently experimental, however,
so doing the work here until that feature is in place.
Args:
batch_size: the number of examples processed in each training batch.
Raises:
ValueError: if no GPUs are found, or selected batch_size is invalid.
"""
from tensorflow.python.client import device_lib # pylint: disable=g-import-not-at-top
local_device_protos = device_lib.list_local_devices()
num_gpus = sum([1 for d in local_device_protos if d.device_type == 'GPU'])
if not num_gpus:
raise ValueError('Multi-GPU mode was specified, but no GPUs '
'were found. To use CPU, run without --multi_gpu.')
remainder = batch_size % num_gpus
if remainder:
err = ('When running with multiple GPUs, batch size '
'must be a multiple of the number of available GPUs. '
'Found {} GPUs with a batch size of {}; try --batch_size={} instead.'
).format(num_gpus, batch_size, batch_size - remainder)
raise ValueError(err)
def run_mnist(flags_obj):
"""Run MNIST training and eval loop.
Args:
flags_obj: An object containing parsed flag values.
"""
model_function = model_fn
if flags_obj.multi_gpu:
validate_batch_size_for_multi_gpu(flags_obj.batch_size)
# Get number of GPUs as defined by the --num_gpus flags and the number of
# GPUs available on the machine.
num_gpus = flags_core.get_num_gpus(flags_obj)
multi_gpu = num_gpus > 1
if multi_gpu:
# Validate that the batch size can be split into devices.
distribution_utils.per_device_batch_size(flags_obj.batch_size, num_gpus)
# There are two steps required if using multi-GPU: (1) wrap the model_fn,
# and (2) wrap the optimizer. The first happens here, and (2) happens
# in the model_fn itself when the optimizer is defined.
model_function = tf.contrib.estimator.replicate_model_fn(
model_fn, loss_reduction=tf.losses.Reduction.MEAN)
model_fn, loss_reduction=tf.losses.Reduction.MEAN,
devices=["/device:GPU:%d" % d for d in range(num_gpus)])
data_format = flags_obj.data_format
if data_format is None:
......@@ -209,7 +186,7 @@ def run_mnist(flags_obj):
model_dir=flags_obj.model_dir,
params={
'data_format': data_format,
'multi_gpu': flags_obj.multi_gpu
'multi_gpu': multi_gpu
})
# Set up training and evaluation input functions.
......
......@@ -38,6 +38,7 @@ from official.recommendation import neumf_model
from official.utils.flags import core as flags_core
from official.utils.logs import hooks_helper
from official.utils.logs import logger
from official.utils.misc import distribution_utils
from official.utils.misc import model_helpers
_TOP_K = 10 # Top-k list for evaluation
......@@ -85,7 +86,8 @@ def evaluate_model(estimator, batch_size, num_gpus, ncf_dataset):
# Define prediction input function
def pred_input_fn():
return dataset.input_fn(
False, per_device_batch_size(batch_size, num_gpus), ncf_dataset)
False, distribution_utils.per_device_batch_size(batch_size, num_gpus),
ncf_dataset)
# Get predictions
predictions = estimator.predict(input_fn=pred_input_fn)
......@@ -165,37 +167,6 @@ def convert_keras_to_estimator(keras_model, num_gpus, model_dir):
return estimator
def per_device_batch_size(batch_size, num_gpus):
"""For multi-gpu, batch-size must be a multiple of the number of GPUs.
Note that this should eventually be handled by DistributionStrategies
directly. Multi-GPU support is currently experimental, however,
so doing the work here until that feature is in place.
Args:
batch_size: Global batch size to be divided among devices. This should be
equal to num_gpus times the single-GPU batch_size for multi-gpu training.
num_gpus: How many GPUs are used with DistributionStrategies.
Returns:
Batch size per device.
Raises:
ValueError: if batch_size is not divisible by number of devices
"""
if num_gpus <= 1:
return batch_size
remainder = batch_size % num_gpus
if remainder:
err = ("When running with multiple GPUs, batch size "
"must be a multiple of the number of available GPUs. Found {} "
"GPUs with a batch size of {}; try --batch_size={} instead."
).format(num_gpus, batch_size, batch_size - remainder)
raise ValueError(err)
return int(batch_size / num_gpus)
def main(_):
with logger.benchmark_context(FLAGS):
run_ncf(FLAGS)
......@@ -252,7 +223,8 @@ def run_ncf(_):
# Training and evaluation cycle
def train_input_fn():
return dataset.input_fn(
True, per_device_batch_size(FLAGS.batch_size, num_gpus),
True,
distribution_utils.per_device_batch_size(FLAGS.batch_size, num_gpus),
ncf_dataset, FLAGS.epochs_between_evals)
total_training_cycle = FLAGS.train_epochs // FLAGS.epochs_between_evals
......@@ -295,7 +267,8 @@ def define_ncf_flags():
intra_op=False,
synthetic_data=False,
max_train_steps=False,
dtype=False
dtype=False,
all_reduce_alg=False
)
flags_core.define_benchmark()
......
......@@ -34,6 +34,7 @@ from official.utils.flags import core as flags_core
from official.utils.export import export
from official.utils.logs import hooks_helper
from official.utils.logs import logger
from official.utils.misc import distribution_utils
from official.utils.misc import model_helpers
# pylint: enable=g-bad-import-order
......@@ -124,9 +125,11 @@ def get_synth_input_fn(height, width, num_channels, num_classes):
that can be used for iteration.
"""
def input_fn(is_training, data_dir, batch_size, *args, **kwargs): # pylint: disable=unused-argument
images = tf.zeros((batch_size, height, width, num_channels), tf.float32)
labels = tf.zeros((batch_size), tf.int32)
return tf.data.Dataset.from_tensors((images, labels)).repeat()
return model_helpers.generate_synthetic_data(
input_shape=tf.TensorShape([batch_size, height, width, num_channels]),
input_dtype=tf.float32,
label_shape=tf.TensorShape([batch_size]),
label_dtype=tf.int32)
return input_fn
......@@ -318,37 +321,6 @@ def resnet_model_fn(features, labels, mode, model_class,
eval_metric_ops=metrics)
def per_device_batch_size(batch_size, num_gpus):
"""For multi-gpu, batch-size must be a multiple of the number of GPUs.
Note that this should eventually be handled by DistributionStrategies
directly. Multi-GPU support is currently experimental, however,
so doing the work here until that feature is in place.
Args:
batch_size: Global batch size to be divided among devices. This should be
equal to num_gpus times the single-GPU batch_size for multi-gpu training.
num_gpus: How many GPUs are used with DistributionStrategies.
Returns:
Batch size per device.
Raises:
ValueError: if batch_size is not divisible by number of devices
"""
if num_gpus <= 1:
return batch_size
remainder = batch_size % num_gpus
if remainder:
err = ('When running with multiple GPUs, batch size '
'must be a multiple of the number of available GPUs. Found {} '
'GPUs with a batch size of {}; try --batch_size={} instead.'
).format(num_gpus, batch_size, batch_size - remainder)
raise ValueError(err)
return int(batch_size / num_gpus)
def resnet_main(
flags_obj, model_function, input_function, dataset_name, shape=None):
"""Shared main loop for ResNet Models.
......@@ -379,17 +351,11 @@ def resnet_main(
intra_op_parallelism_threads=flags_obj.intra_op_parallelism_threads,
allow_soft_placement=True)
if flags_core.get_num_gpus(flags_obj) == 0:
distribution = tf.contrib.distribute.OneDeviceStrategy('device:CPU:0')
elif flags_core.get_num_gpus(flags_obj) == 1:
distribution = tf.contrib.distribute.OneDeviceStrategy('device:GPU:0')
else:
distribution = tf.contrib.distribute.MirroredStrategy(
num_gpus=flags_core.get_num_gpus(flags_obj)
)
distribution_strategy = distribution_utils.get_distribution_strategy(
flags_core.get_num_gpus(flags_obj), flags_obj.all_reduce_alg)
run_config = tf.estimator.RunConfig(train_distribute=distribution,
session_config=session_config)
run_config = tf.estimator.RunConfig(
train_distribute=distribution_strategy, session_config=session_config)
classifier = tf.estimator.Estimator(
model_fn=model_function, model_dir=flags_obj.model_dir, config=run_config,
......@@ -411,7 +377,7 @@ def resnet_main(
'train_epochs': flags_obj.train_epochs,
}
if flags_obj.use_synthetic_data:
dataset_name = dataset_name + "-synthetic"
dataset_name = dataset_name + '-synthetic'
benchmark_logger = logger.get_benchmark_logger()
benchmark_logger.log_run_info('resnet', dataset_name, run_params,
......@@ -424,7 +390,7 @@ def resnet_main(
def input_fn_train():
return input_function(
is_training=True, data_dir=flags_obj.data_dir,
batch_size=per_device_batch_size(
batch_size=distribution_utils.per_device_batch_size(
flags_obj.batch_size, flags_core.get_num_gpus(flags_obj)),
num_epochs=flags_obj.epochs_between_evals,
num_gpus=flags_core.get_num_gpus(flags_obj))
......@@ -432,7 +398,7 @@ def resnet_main(
def input_fn_eval():
return input_function(
is_training=False, data_dir=flags_obj.data_dir,
batch_size=per_device_batch_size(
batch_size=distribution_utils.per_device_batch_size(
flags_obj.batch_size, flags_core.get_num_gpus(flags_obj)),
num_epochs=1)
......
......@@ -14,8 +14,12 @@
# ==============================================================================
"""Defines Transformer model parameters."""
from collections import defaultdict
BASE_PARAMS = defaultdict(
lambda: None, # Set default value to None.
BASE_PARAMS = dict(
# Input params
default_batch_size=2048, # Maximum number of tokens per batch of examples.
default_batch_size_tpu=32768,
......@@ -56,8 +60,8 @@ BASE_PARAMS = dict(
allow_ffn_pad=True,
)
BIG_PARAMS = dict(BASE_PARAMS)
BIG_PARAMS.update(dict(
BIG_PARAMS = BASE_PARAMS.copy()
BIG_PARAMS.update(
default_batch_size=4096,
# default batch size is smaller than for BASE_PARAMS due to memory limits.
......@@ -66,13 +70,27 @@ BIG_PARAMS.update(dict(
hidden_size=1024,
filter_size=4096,
num_heads=16,
))
)
# Parameters for running the model in multi gpu. These should not change the
# params that modify the model shape (such as the hidden_size or num_heads).
BASE_MULTI_GPU_PARAMS = BASE_PARAMS.copy()
BASE_MULTI_GPU_PARAMS.update(
learning_rate_warmup_steps=8000
)
BIG_MULTI_GPU_PARAMS = BIG_PARAMS.copy()
BIG_MULTI_GPU_PARAMS.update(
layer_postprocess_dropout=0.3,
learning_rate_warmup_steps=8000
)
TINY_PARAMS = dict(BASE_PARAMS)
TINY_PARAMS.update(dict(
# Parameters for testing the model
TINY_PARAMS = BASE_PARAMS.copy()
TINY_PARAMS.update(
default_batch_size=1024,
default_batch_size_tpu=1024,
hidden_size=32,
num_heads=4,
filter_size=256,
))
)
......@@ -46,14 +46,16 @@ from official.utils.export import export
from official.utils.flags import core as flags_core
from official.utils.logs import hooks_helper
from official.utils.logs import logger
from official.utils.misc import distribution_utils
from official.utils.misc import model_helpers
PARAMS_MAP = {
"tiny": model_params.TINY_PARAMS,
"base": model_params.BASE_PARAMS,
"big": model_params.BIG_PARAMS,
}
DEFAULT_TRAIN_EPOCHS = 10
INF = int(1e9)
BLEU_DIR = "bleu"
......@@ -185,9 +187,12 @@ def get_train_op_and_metrics(loss, params):
tvars = tf.trainable_variables()
gradients = optimizer.compute_gradients(
loss, tvars, colocate_gradients_with_ops=True)
train_op = optimizer.apply_gradients(
minimize_op = optimizer.apply_gradients(
gradients, global_step=global_step, name="train")
update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
train_op = tf.group(minimize_op, update_ops)
metrics = {"learning_rate": learning_rate}
if not params["use_tpu"]:
......@@ -353,14 +358,15 @@ def run_loop(
def define_transformer_flags():
"""Add flags and flag validators for running transformer_main."""
# Add common flags (data_dir, model_dir, train_epochs, etc.).
flags_core.define_base(multi_gpu=False, num_gpu=False)
flags_core.define_base()
flags_core.define_performance(
num_parallel_calls=True,
inter_op=False,
intra_op=False,
synthetic_data=False,
synthetic_data=True,
max_train_steps=False,
dtype=False
dtype=False,
all_reduce_alg=True
)
flags_core.define_benchmark()
flags_core.define_device(tpu=True)
......@@ -373,7 +379,7 @@ def define_transformer_flags():
# Add transformer-specific flags
flags.DEFINE_enum(
name="param_set", short_name="mp", default="big",
enum_values=["base", "big", "tiny"],
enum_values=PARAMS_MAP.keys(),
help=flags_core.help_wrap(
"Parameter set to use when creating and training the model. The "
"parameters define the input shape (batch size and max length), "
......@@ -423,7 +429,6 @@ def define_transformer_flags():
"Path to subtoken vocabulary file. If data_download.py was used to "
"download and encode the training data, look in the data_dir to find "
"the vocab file."))
flags.mark_flag_as_required("vocab_file")
flags_core.set_defaults(data_dir="/tmp/translate_ende",
model_dir="/tmp/transformer_model",
......@@ -452,6 +457,7 @@ def define_transformer_flags():
@flags.validator("vocab_file", "File set by --vocab_file does not exist.")
def _check_vocab_file(vocab_file):
"""Ensure that vocab file exists."""
if vocab_file:
return tf.gfile.Exists(vocab_file)
flags_core.require_cloud_storage(["data_dir", "model_dir"])
......@@ -469,8 +475,11 @@ def construct_estimator(flags_obj, params, schedule_manager):
An estimator object to be used for training and eval.
"""
if not params["use_tpu"]:
distribution_strategy = distribution_utils.get_distribution_strategy(
flags_core.get_num_gpus(flags_obj), flags_obj.all_reduce_alg)
return tf.estimator.Estimator(
model_fn=model_fn, model_dir=flags_obj.model_dir, params=params)
model_fn=model_fn, model_dir=flags_obj.model_dir, params=params,
config=tf.estimator.RunConfig(train_distribute=distribution_strategy))
tpu_cluster_resolver = tf.contrib.cluster_resolver.TPUClusterResolver(
tpu=flags_obj.tpu,
......@@ -506,20 +515,34 @@ def run_transformer(flags_obj):
Args:
flags_obj: Object containing parsed flag values.
"""
num_gpus = flags_core.get_num_gpus(flags_obj)
# Add flag-defined parameters to params object
params = PARAMS_MAP[flags_obj.param_set]
if num_gpus > 1:
if flags_obj.param_set == "big":
params = model_params.BIG_MULTI_GPU_PARAMS
elif flags_obj.param_set == "base":
params = model_params.BASE_MULTI_GPU_PARAMS
params["data_dir"] = flags_obj.data_dir
params["model_dir"] = flags_obj.model_dir
params["num_parallel_calls"] = flags_obj.num_parallel_calls
params["tpu"] = flags_obj.tpu
params["use_tpu"] = bool(flags_obj.tpu) # was a tpu specified.
params["batch_size"] = flags_obj.batch_size or (
params["default_batch_size_tpu"] if params["use_tpu"]
else params["default_batch_size"])
params["static_batch"] = flags_obj.static_batch or params["use_tpu"]
params["allow_ffn_pad"] = not params["use_tpu"]
params["use_synthetic_data"] = flags_obj.use_synthetic_data
# Set batch size parameter, which depends on TPU and distribution settings.
params["batch_size"] = (
flags_obj.batch_size or params["default_batch_size_tpu"])
if not params["use_tpu"]:
params["batch_size"] = distribution_utils.per_device_batch_size(
params["batch_size"], num_gpus)
schedule_manager = schedule.Manager(
train_steps=flags_obj.train_steps,
steps_between_evals=flags_obj.steps_between_evals,
......
......@@ -21,7 +21,6 @@ from __future__ import print_function
import os
# pylint: disable=g-bad-import-order
from six.moves import xrange # pylint: disable=redefined-builtin
from absl import app as absl_app
from absl import flags
import tensorflow as tf
......@@ -54,10 +53,10 @@ def _get_sorted_inputs(filename):
input_lens = [(i, len(line.split())) for i, line in enumerate(inputs)]
sorted_input_lens = sorted(input_lens, key=lambda x: x[1], reverse=True)
sorted_inputs = []
sorted_keys = {}
sorted_inputs = [None] * len(sorted_input_lens)
sorted_keys = [0] * len(sorted_input_lens)
for i, (index, _) in enumerate(sorted_input_lens):
sorted_inputs.append(inputs[index])
sorted_inputs[i] = inputs[index]
sorted_keys[index] = i
return sorted_inputs, sorted_keys
......@@ -132,8 +131,8 @@ def translate_file(
"file.")
tf.logging.info("Writing to file %s" % output_file)
with tf.gfile.Open(output_file, "w") as f:
for index, key in enumerate(sorted_keys):
f.write("%s\n" % translations[key])
for i in sorted_keys:
f.write("%s\n" % translations[i])
def translate_text(estimator, subtokenizer, txt):
......
......@@ -51,10 +51,13 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import math
import os
import tensorflow as tf
from official.utils.misc import model_helpers
# Use the number of training files as the shuffle buffer.
_FILE_SHUFFLE_BUFFER = 100
# Buffer size for reading records from a TFRecord file. Each training file is
......@@ -247,13 +250,28 @@ def _read_and_batch_from_files(
dataset = dataset.repeat(repeat)
# Prefetch the next element to improve speed of input pipeline.
dataset = dataset.prefetch(1)
dataset = dataset.prefetch(buffer_size=tf.contrib.data.AUTOTUNE)
return dataset
def _generate_synthetic_data(params):
"""Create synthetic data based on the parameter batch size."""
batch = length = int(math.sqrt(params["batch_size"]))
return model_helpers.generate_synthetic_data(
input_shape=tf.TensorShape([batch, length]),
input_value=1,
input_dtype=tf.int32,
label_shape=tf.TensorShape([batch, length]),
label_value=1,
label_dtype=tf.int32,
)
def train_input_fn(params):
"""Load and return dataset of batched examples for use during training."""
file_pattern = os.path.join(params.get("data_dir", ""), "*train*")
file_pattern = os.path.join(params["data_dir"] or "", "*train*")
if params["use_synthetic_data"]:
return _generate_synthetic_data(params)
return _read_and_batch_from_files(
file_pattern, params["batch_size"], params["max_length"],
params["num_parallel_calls"], shuffle=True,
......@@ -262,7 +280,9 @@ def train_input_fn(params):
def eval_input_fn(params):
"""Load and return dataset of batched examples for use during evaluation."""
file_pattern = os.path.join(params.get("data_dir", ""), "*dev*")
file_pattern = os.path.join(params["data_dir"] or "", "*dev*")
if params["use_synthetic_data"]:
return _generate_synthetic_data(params)
return _read_and_batch_from_files(
file_pattern, params["batch_size"], params["max_length"],
params["num_parallel_calls"], shuffle=False, repeat=1,
......
......@@ -27,7 +27,7 @@ from official.utils.logs import hooks_helper
def define_base(data_dir=True, model_dir=True, train_epochs=True,
epochs_between_evals=True, stop_threshold=True, batch_size=True,
multi_gpu=False, num_gpu=True, hooks=True, export_dir=True):
num_gpu=True, hooks=True, export_dir=True):
"""Register base flags.
Args:
......@@ -38,7 +38,6 @@ def define_base(data_dir=True, model_dir=True, train_epochs=True,
stop_threshold: Create a flag to specify a threshold accuracy or other
eval metric which should trigger the end of training.
batch_size: Create a flag to specify the batch size.
multi_gpu: Create a flag to allow the use of all available GPUs.
num_gpu: Create a flag to specify the number of GPUs used.
hooks: Create a flag to specify hooks for logging.
export_dir: Create a flag to specify where a SavedModel should be exported.
......@@ -84,17 +83,13 @@ def define_base(data_dir=True, model_dir=True, train_epochs=True,
if batch_size:
flags.DEFINE_integer(
name="batch_size", short_name="bs", default=32,
help=help_wrap("Batch size for training and evaluation."))
help=help_wrap("Batch size for training and evaluation. When using "
"multiple gpus, this is the global batch size for "
"all devices. For example, if the batch size is 32 "
"and there are 4 GPUs, each GPU will get 8 examples on "
"each step."))
key_flags.append("batch_size")
assert not (multi_gpu and num_gpu)
if multi_gpu:
flags.DEFINE_bool(
name="multi_gpu", default=False,
help=help_wrap("If set, run across all available GPUs."))
key_flags.append("multi_gpu")
if num_gpu:
flags.DEFINE_integer(
name="num_gpus", short_name="ng",
......
......@@ -44,7 +44,8 @@ def get_loss_scale(flags_obj):
def define_performance(num_parallel_calls=True, inter_op=True, intra_op=True,
synthetic_data=True, max_train_steps=True, dtype=True):
synthetic_data=True, max_train_steps=True, dtype=True,
all_reduce_alg=True):
"""Register flags for specifying performance tuning arguments.
Args:
......@@ -129,4 +130,12 @@ def define_performance(num_parallel_calls=True, inter_op=True, intra_op=True,
return loss_scale > 0
if all_reduce_alg:
flags.DEFINE_string(
name="all_reduce_alg", short_name="ara", default=None,
help=help_wrap("Defines the algorithm to use for performing all-reduce."
"See tf.contrib.distribute.AllReduceCrossTowerOps for "
"more details and available options."))
return key_flags
......@@ -71,7 +71,7 @@ define_base = register_key_flags_in_core(_base.define_base)
# Remove options not relevant for Eager from define_base().
define_base_eager = register_key_flags_in_core(functools.partial(
_base.define_base, epochs_between_evals=False, stop_threshold=False,
multi_gpu=False, hooks=False))
hooks=False))
define_benchmark = register_key_flags_in_core(_benchmark.define_benchmark)
define_device = register_key_flags_in_core(_device.define_device)
define_image = register_key_flags_in_core(_misc.define_image)
......
......@@ -22,7 +22,7 @@ from official.utils.flags import core as flags_core # pylint: disable=g-bad-imp
def define_flags():
flags_core.define_base(multi_gpu=True, num_gpu=False)
flags_core.define_base(num_gpu=False)
flags_core.define_performance()
flags_core.define_image()
flags_core.define_benchmark()
......@@ -75,9 +75,8 @@ class BaseTester(unittest.TestCase):
"""Test to ensure boolean flags trigger as expected.
"""
flags_core.parse_flags([__file__, "--multi_gpu", "--use_synthetic_data"])
flags_core.parse_flags([__file__, "--use_synthetic_data"])
assert flags.FLAGS.multi_gpu
assert flags.FLAGS.use_synthetic_data
def test_parse_dtype_info(self):
......
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Helper functions for running models in a distributed setting."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow as tf
def get_distribution_strategy(num_gpus, all_reduce_alg=None):
"""Return a DistributionStrategy for running the model.
Args:
num_gpus: Number of GPUs to run this model.
all_reduce_alg: Specify which algorithm to use when performing all-reduce.
See tf.contrib.distribute.AllReduceCrossTowerOps for available algorithms.
If None, DistributionStrategy will choose based on device topology.
Returns:
tf.contrib.distribute.DistibutionStrategy object.
"""
if num_gpus == 0:
return tf.contrib.distribute.OneDeviceStrategy("device:CPU:0")
elif num_gpus == 1:
return tf.contrib.distribute.OneDeviceStrategy("device:GPU:0")
else:
if all_reduce_alg:
return tf.contrib.distribute.MirroredStrategy(
num_gpus=num_gpus,
cross_tower_ops=tf.contrib.distribute.AllReduceCrossTowerOps(
all_reduce_alg, num_packs=num_gpus))
else:
return tf.contrib.distribute.MirroredStrategy(num_gpus=num_gpus)
def per_device_batch_size(batch_size, num_gpus):
"""For multi-gpu, batch-size must be a multiple of the number of GPUs.
Note that this should eventually be handled by DistributionStrategies
directly. Multi-GPU support is currently experimental, however,
so doing the work here until that feature is in place.
Args:
batch_size: Global batch size to be divided among devices. This should be
equal to num_gpus times the single-GPU batch_size for multi-gpu training.
num_gpus: How many GPUs are used with DistributionStrategies.
Returns:
Batch size per device.
Raises:
ValueError: if batch_size is not divisible by number of devices
"""
if num_gpus <= 1:
return batch_size
remainder = batch_size % num_gpus
if remainder:
err = ("When running with multiple GPUs, batch size "
"must be a multiple of the number of available GPUs. Found {} "
"GPUs with a batch size of {}; try --batch_size={} instead."
).format(num_gpus, batch_size, batch_size - remainder)
raise ValueError(err)
return int(batch_size / num_gpus)
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
""" Tests for distribution util functions."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow as tf # pylint: disable=g-bad-import-order
from official.utils.misc import distribution_utils
class GetDistributionStrategyTest(tf.test.TestCase):
"""Tests for get_distribution_strategy."""
def test_one_device_strategy_cpu(self):
ds = distribution_utils.get_distribution_strategy(0)
self.assertTrue(ds.is_single_tower)
self.assertEquals(ds.num_towers, 1)
self.assertEquals(len(ds.worker_devices), 1)
self.assertIn('CPU', ds.worker_devices[0])
def test_one_device_strategy_gpu(self):
ds = distribution_utils.get_distribution_strategy(1)
self.assertTrue(ds.is_single_tower)
self.assertEquals(ds.num_towers, 1)
self.assertEquals(len(ds.worker_devices), 1)
self.assertIn('GPU', ds.worker_devices[0])
def test_mirrored_strategy(self):
ds = distribution_utils.get_distribution_strategy(5)
self.assertFalse(ds.is_single_tower)
self.assertEquals(ds.num_towers, 5)
self.assertEquals(len(ds.worker_devices), 5)
for device in ds.worker_devices:
self.assertIn('GPU', device)
class PerDeviceBatchSizeTest(tf.test.TestCase):
"""Tests for per_device_batch_size."""
def test_batch_size(self):
self.assertEquals(
distribution_utils.per_device_batch_size(147, num_gpus=0), 147)
self.assertEquals(
distribution_utils.per_device_batch_size(147, num_gpus=1), 147)
self.assertEquals(
distribution_utils.per_device_batch_size(147, num_gpus=7), 21)
def test_batch_size_with_remainder(self):
with self.assertRaises(ValueError):
distribution_utils.per_device_batch_size(147, num_gpus=5)
if __name__ == "__main__":
tf.test.main()
......@@ -21,6 +21,7 @@ from __future__ import print_function
import numbers
import tensorflow as tf
from tensorflow.python.util import nest
def past_stop_threshold(stop_threshold, eval_metric):
......@@ -53,3 +54,33 @@ def past_stop_threshold(stop_threshold, eval_metric):
return True
return False
def generate_synthetic_data(
input_shape, input_value=0, input_dtype=None, label_shape=None,
label_value=0, label_dtype=None):
"""Create a repeating dataset with constant values.
Args:
input_shape: a tf.TensorShape object or nested tf.TensorShapes. The shape of
the input data.
input_value: Value of each input element.
input_dtype: Input dtype. If None, will be inferred by the input value.
label_shape: a tf.TensorShape object or nested tf.TensorShapes. The shape of
the label data.
label_value: Value of each input element.
label_dtype: Input dtype. If None, will be inferred by the target value.
Returns:
Dataset of tensors or tuples of tensors (if label_shape is set).
"""
# TODO(kathywu): Replace with SyntheticDataset once it is in contrib.
element = input_element = nest.map_structure(
lambda s: tf.constant(input_value, input_dtype, s), input_shape)
if label_shape:
label_element = nest.map_structure(
lambda s: tf.constant(label_value, label_dtype, s), label_shape)
element = (input_element, label_element)
return tf.data.Dataset.from_tensors(element).repeat()
......@@ -65,5 +65,57 @@ class PastStopThresholdTest(tf.test.TestCase):
model_helpers.past_stop_threshold(tf.constant(4), None)
class SyntheticDataTest(tf.test.TestCase):
"""Tests for generate_synthetic_data."""
def test_generate_synethetic_data(self):
input_element, label_element = model_helpers.generate_synthetic_data(
input_shape=tf.TensorShape([5]),
input_value=123,
input_dtype=tf.float32,
label_shape=tf.TensorShape([]),
label_value=456,
label_dtype=tf.int32).make_one_shot_iterator().get_next()
with self.test_session() as sess:
for n in range(5):
inp, lab = sess.run((input_element, label_element))
self.assertAllClose(inp, [123., 123., 123., 123., 123.])
self.assertEquals(lab, 456)
def test_generate_only_input_data(self):
d = model_helpers.generate_synthetic_data(
input_shape=tf.TensorShape([4]),
input_value=43.5,
input_dtype=tf.float32)
element = d.make_one_shot_iterator().get_next()
self.assertFalse(isinstance(element, tuple))
with self.test_session() as sess:
inp = sess.run(element)
self.assertAllClose(inp, [43.5, 43.5, 43.5, 43.5])
def test_generate_nested_data(self):
d = model_helpers.generate_synthetic_data(
input_shape={'a': tf.TensorShape([2]),
'b': {'c': tf.TensorShape([3]), 'd': tf.TensorShape([])}},
input_value=1.1)
element = d.make_one_shot_iterator().get_next()
self.assertIn('a', element)
self.assertIn('b', element)
self.assertEquals(len(element['b']), 2)
self.assertIn('c', element['b'])
self.assertIn('d', element['b'])
self.assertNotIn('c', element)
with self.test_session() as sess:
inp = sess.run(element)
self.assertAllClose(inp['a'], [1.1, 1.1])
self.assertAllClose(inp['b']['c'], [1.1, 1.1, 1.1])
self.assertAllClose(inp['b']['d'], 1.1)
if __name__ == "__main__":
tf.test.main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment