Commit 2894bb53 authored by Toby Boyd's avatar Toby Boyd
Browse files

Add option to run perf tuned args.

parent b88da6ee
...@@ -158,8 +158,9 @@ def parse_record(raw_record, is_training, dtype): ...@@ -158,8 +158,9 @@ def parse_record(raw_record, is_training, dtype):
return image, label return image, label
def input_fn(is_training, data_dir, batch_size, num_epochs=1, num_gpus=None, def input_fn(is_training, data_dir, batch_size, num_epochs=1,
dtype=tf.float32): dtype=tf.float32, datasets_num_private_threads=None,
num_parallel_batches=1):
"""Input function which provides batches for train or eval. """Input function which provides batches for train or eval.
Args: Args:
...@@ -167,8 +168,9 @@ def input_fn(is_training, data_dir, batch_size, num_epochs=1, num_gpus=None, ...@@ -167,8 +168,9 @@ def input_fn(is_training, data_dir, batch_size, num_epochs=1, num_gpus=None,
data_dir: The directory containing the input data. data_dir: The directory containing the input data.
batch_size: The number of samples per batch. batch_size: The number of samples per batch.
num_epochs: The number of epochs to repeat the dataset. num_epochs: The number of epochs to repeat the dataset.
num_gpus: The number of gpus used for training.
dtype: Data type to use for images/features dtype: Data type to use for images/features
datasets_num_private_threads: Number of private threads for tf.data.
num_parallel_batches: Number of parallel batches for tf.data.
Returns: Returns:
A dataset that can be used for iteration. A dataset that can be used for iteration.
...@@ -195,9 +197,9 @@ def input_fn(is_training, data_dir, batch_size, num_epochs=1, num_gpus=None, ...@@ -195,9 +197,9 @@ def input_fn(is_training, data_dir, batch_size, num_epochs=1, num_gpus=None,
shuffle_buffer=_SHUFFLE_BUFFER, shuffle_buffer=_SHUFFLE_BUFFER,
parse_record_fn=parse_record, parse_record_fn=parse_record,
num_epochs=num_epochs, num_epochs=num_epochs,
num_gpus=num_gpus, dtype=dtype,
examples_per_epoch=_NUM_IMAGES['train'] if is_training else None, datasets_num_private_threads=datasets_num_private_threads,
dtype=dtype num_parallel_batches=num_parallel_batches
) )
......
...@@ -30,6 +30,8 @@ import os ...@@ -30,6 +30,8 @@ import os
# pylint: disable=g-bad-import-order # pylint: disable=g-bad-import-order
from absl import flags from absl import flags
import tensorflow as tf import tensorflow as tf
from tensorflow.contrib.data.python.ops import threadpool
import multiprocessing
from official.resnet import resnet_model from official.resnet import resnet_model
from official.utils.flags import core as flags_core from official.utils.flags import core as flags_core
...@@ -39,15 +41,20 @@ from official.utils.logs import logger ...@@ -39,15 +41,20 @@ from official.utils.logs import logger
from official.resnet import imagenet_preprocessing from official.resnet import imagenet_preprocessing
from official.utils.misc import distribution_utils from official.utils.misc import distribution_utils
from official.utils.misc import model_helpers from official.utils.misc import model_helpers
# pylint: enable=g-bad-import-order
################################################################################ ################################################################################
# Functions for input processing. # Functions for input processing.
################################################################################ ################################################################################
def process_record_dataset(dataset, is_training, batch_size, shuffle_buffer, def process_record_dataset(dataset,
parse_record_fn, num_epochs=1, num_gpus=None, is_training,
examples_per_epoch=None, dtype=tf.float32): batch_size,
shuffle_buffer,
parse_record_fn,
num_epochs=1,
dtype=tf.float32,
datasets_num_private_threads=None,
num_parallel_batches=1):
"""Given a Dataset with raw records, return an iterator over the records. """Given a Dataset with raw records, return an iterator over the records.
Args: Args:
...@@ -60,9 +67,10 @@ def process_record_dataset(dataset, is_training, batch_size, shuffle_buffer, ...@@ -60,9 +67,10 @@ def process_record_dataset(dataset, is_training, batch_size, shuffle_buffer,
parse_record_fn: A function that takes a raw record and returns the parse_record_fn: A function that takes a raw record and returns the
corresponding (image, label) pair. corresponding (image, label) pair.
num_epochs: The number of epochs to repeat the dataset. num_epochs: The number of epochs to repeat the dataset.
num_gpus: The number of gpus used for training.
examples_per_epoch: The number of examples in an epoch.
dtype: Data type to use for images/features. dtype: Data type to use for images/features.
datasets_num_private_threads: Number of threads for a private
threadpool created for all datasets computation.
num_parallel_batches: Number of parallel batches for tf.data.
Returns: Returns:
Dataset of (image, label) pairs ready for iteration. Dataset of (image, label) pairs ready for iteration.
...@@ -83,7 +91,7 @@ def process_record_dataset(dataset, is_training, batch_size, shuffle_buffer, ...@@ -83,7 +91,7 @@ def process_record_dataset(dataset, is_training, batch_size, shuffle_buffer,
tf.contrib.data.map_and_batch( tf.contrib.data.map_and_batch(
lambda value: parse_record_fn(value, is_training, dtype), lambda value: parse_record_fn(value, is_training, dtype),
batch_size=batch_size, batch_size=batch_size,
num_parallel_calls=1, num_parallel_calls=num_parallel_batches,
drop_remainder=False)) drop_remainder=False))
# Operations between the final prefetch and the get_next call to the iterator # Operations between the final prefetch and the get_next call to the iterator
...@@ -94,6 +102,14 @@ def process_record_dataset(dataset, is_training, batch_size, shuffle_buffer, ...@@ -94,6 +102,14 @@ def process_record_dataset(dataset, is_training, batch_size, shuffle_buffer,
# on how many devices are present. # on how many devices are present.
dataset = dataset.prefetch(buffer_size=tf.contrib.data.AUTOTUNE) dataset = dataset.prefetch(buffer_size=tf.contrib.data.AUTOTUNE)
# Defines a specific size thread pool for tf.data operations.
if datasets_num_private_threads:
dataset = threadpool.override_threadpool(
dataset,
threadpool.PrivateThreadPool(
datasets_num_private_threads,
display_name='input_pipeline_thread_pool'))
return dataset return dataset
...@@ -162,6 +178,63 @@ def image_bytes_serving_input_fn(image_shape, dtype=tf.float32): ...@@ -162,6 +178,63 @@ def image_bytes_serving_input_fn(image_shape, dtype=tf.float32):
images, {'image_bytes': image_bytes_list}) images, {'image_bytes': image_bytes_list})
def set_environment_vars(flags_obj):
"""Adjust flags and set env_vars for performance.
These settings exist to test the difference between using stock settings
and manual tuning. It also shows some of the ENV_VARS that can be tweaked to
squeeze a few extra examples per second. These settings are defaulted to the
current platform of interest, which changes over time.
On systems with small numbers of cpu cores, e.g. under 8 logical cores,
setting up a private thread pool for GPU with `tf_gpu_thread_mode=gpu_private`
may perform poorly.
Args:
flags_obj: Current flags, which will be adjusted possibly overriding
what has been set by the user on the command-line.
Returns:
tf.ConfigProto: session_config proto to add to the session.
"""
if flags_obj.tf_gpu_thread_mode in ['gpu_private']:
cpu_count = multiprocessing.cpu_count()
print('Logical CPU cores:', cpu_count)
# Sets up thread pool for each GPU for op scheduling.
per_gpu_thread_count = 1
total_gpu_thread_count = per_gpu_thread_count * flags_obj.num_gpus
os.environ['TF_GPU_THREAD_MODE'] = flags_obj.tf_gpu_thread_mode
os.environ['TF_GPU_THREAD_COUNT'] = str(per_gpu_thread_count)
print('TF_GPU_THREAD_COUNT:', os.environ['TF_GPU_THREAD_COUNT'])
# Reduces general thread pool by number of threads used for GPU pool.
main_thread_count = cpu_count - total_gpu_thread_count
flags_obj.inter_op_parallelism_threads = main_thread_count
# Sets thread count for tf.data. Logical cores minus threads assign to the
# private GPU pool along with 2 thread per GPU for event monitoring and
# sending / receiving tensors.
num_monitoring_threads = 2 * flags_obj.num_gpus
num_private_threads = (cpu_count - total_gpu_thread_count
- num_monitoring_threads)
flags_obj.datasets_num_private_threads = num_private_threads
print('inter_op_parallelism_threads:', flags_obj.inter_op_parallelism_threads)
print('intra_op_parallelism_threads:', flags_obj.intra_op_parallelism_threads)
print('datasets_num_private_threads:', flags_obj.datasets_num_private_threads)
# Create session config based on values of inter_op_parallelism_threads and
# intra_op_parallelism_threads. Note that we default to having
# allow_soft_placement = True, which is required for multi-GPU and not
# harmful for other modes.
session_config = tf.ConfigProto(
inter_op_parallelism_threads=flags_obj.inter_op_parallelism_threads,
intra_op_parallelism_threads=flags_obj.intra_op_parallelism_threads,
allow_soft_placement=True)
return session_config
################################################################################ ################################################################################
# Functions for running training/eval/validation loops for the model. # Functions for running training/eval/validation loops for the model.
################################################################################ ################################################################################
...@@ -358,25 +431,25 @@ def resnet_model_fn(features, labels, mode, model_class, ...@@ -358,25 +431,25 @@ def resnet_model_fn(features, labels, mode, model_class,
train_op = None train_op = None
accuracy = tf.metrics.accuracy(labels, predictions['classes']) accuracy = tf.metrics.accuracy(labels, predictions['classes'])
accuracy_top_5 = tf.metrics.mean(tf.nn.in_top_k(predictions=logits, #accuracy_top_5 = tf.metrics.mean(tf.nn.in_top_k(predictions=logits,
targets=labels, # targets=labels,
k=5, # k=5,
name='top_5_op')) # name='top_5_op'))
metrics = {'accuracy': accuracy, metrics = {'accuracy': accuracy}
'accuracy_top_5': accuracy_top_5} # 'accuracy_top_5': accuracy_top_5}
# Create a tensor named train_accuracy for logging purposes # Create a tensor named train_accuracy for logging purposes
tf.identity(accuracy[1], name='train_accuracy') tf.identity(accuracy[1], name='train_accuracy')
tf.identity(accuracy_top_5[1], name='train_accuracy_top_5') #tf.identity(accuracy_top_5[1], name='train_accuracy_top_5')
tf.summary.scalar('train_accuracy', accuracy[1]) tf.summary.scalar('train_accuracy', accuracy[1])
tf.summary.scalar('train_accuracy_top_5', accuracy_top_5[1]) #tf.summary.scalar('train_accuracy_top_5', accuracy_top_5[1])
return tf.estimator.EstimatorSpec( return tf.estimator.EstimatorSpec(
mode=mode, mode=mode,
predictions=predictions, predictions=predictions,
loss=loss, loss=loss,
train_op=train_op, train_op=train_op)
eval_metric_ops=metrics) #eval_metric_ops=metrics)
def resnet_main( def resnet_main(
...@@ -399,23 +472,18 @@ def resnet_main( ...@@ -399,23 +472,18 @@ def resnet_main(
model_helpers.apply_clean(flags.FLAGS) model_helpers.apply_clean(flags.FLAGS)
# Using the Winograd non-fused algorithms provides a small performance boost. session_config = set_environment_vars(flags_obj)
os.environ['TF_ENABLE_WINOGRAD_NONFUSED'] = '1'
# Create session config based on values of inter_op_parallelism_threads and
# intra_op_parallelism_threads. Note that we default to having
# allow_soft_placement = True, which is required for multi-GPU and not
# harmful for other modes.
session_config = tf.ConfigProto(
inter_op_parallelism_threads=flags_obj.inter_op_parallelism_threads,
intra_op_parallelism_threads=flags_obj.intra_op_parallelism_threads,
allow_soft_placement=True)
distribution_strategy = distribution_utils.get_distribution_strategy( distribution_strategy = distribution_utils.get_distribution_strategy(
flags_core.get_num_gpus(flags_obj), flags_obj.all_reduce_alg) flags_core.get_num_gpus(flags_obj), flags_obj.all_reduce_alg)
# Creates a `RunConfig` that checkpoints every 24 hours which essentially
# results in checkpoints at the end of each training loop as determined by
# `epochs_between_evals`. Doing it more often is a needless small cost.
run_config = tf.estimator.RunConfig( run_config = tf.estimator.RunConfig(
train_distribute=distribution_strategy, session_config=session_config) train_distribute=distribution_strategy,
session_config=session_config,
save_checkpoints_secs=60*60*24)
# initialize our model with all but the dense layer from pretrained resnet # initialize our model with all but the dense layer from pretrained resnet
if flags_obj.pretrained_model_checkpoint_path is not None: if flags_obj.pretrained_model_checkpoint_path is not None:
...@@ -459,16 +527,19 @@ def resnet_main( ...@@ -459,16 +527,19 @@ def resnet_main(
def input_fn_train(num_epochs): def input_fn_train(num_epochs):
return input_function( return input_function(
is_training=True, data_dir=flags_obj.data_dir, is_training=True,
data_dir=flags_obj.data_dir,
batch_size=distribution_utils.per_device_batch_size( batch_size=distribution_utils.per_device_batch_size(
flags_obj.batch_size, flags_core.get_num_gpus(flags_obj)), flags_obj.batch_size, flags_core.get_num_gpus(flags_obj)),
num_epochs=num_epochs, num_epochs=num_epochs,
num_gpus=flags_core.get_num_gpus(flags_obj), dtype=flags_core.get_tf_dtype(flags_obj),
dtype=flags_core.get_tf_dtype(flags_obj)) datasets_num_private_threads=flags_obj.datasets_num_private_threads,
num_parallel_batches=flags_obj.num_parallel_calls)
def input_fn_eval(): def input_fn_eval():
return input_function( return input_function(
is_training=False, data_dir=flags_obj.data_dir, is_training=False,
data_dir=flags_obj.data_dir,
batch_size=distribution_utils.per_device_batch_size( batch_size=distribution_utils.per_device_batch_size(
flags_obj.batch_size, flags_core.get_num_gpus(flags_obj)), flags_obj.batch_size, flags_core.get_num_gpus(flags_obj)),
num_epochs=1, num_epochs=1,
...@@ -530,7 +601,7 @@ def resnet_main( ...@@ -530,7 +601,7 @@ def resnet_main(
def define_resnet_flags(resnet_size_choices=None): def define_resnet_flags(resnet_size_choices=None):
"""Add flags and validators for ResNet.""" """Add flags and validators for ResNet."""
flags_core.define_base() flags_core.define_base()
flags_core.define_performance(num_parallel_calls=False) flags_core.define_performance()
flags_core.define_image() flags_core.define_image()
flags_core.define_benchmark() flags_core.define_benchmark()
flags.adopt_module_key_flags(flags_core) flags.adopt_module_key_flags(flags_core)
...@@ -554,7 +625,7 @@ def define_resnet_flags(resnet_size_choices=None): ...@@ -554,7 +625,7 @@ def define_resnet_flags(resnet_size_choices=None):
help=flags_core.help_wrap('Skip training and only perform evaluation on ' help=flags_core.help_wrap('Skip training and only perform evaluation on '
'the latest checkpoint.')) 'the latest checkpoint.'))
flags.DEFINE_boolean( flags.DEFINE_boolean(
name="image_bytes_as_serving_input", default=False, name='image_bytes_as_serving_input', default=False,
help=flags_core.help_wrap( help=flags_core.help_wrap(
'If True exports savedmodel with serving signature that accepts ' 'If True exports savedmodel with serving signature that accepts '
'JPEG image bytes instead of a fixed size [HxWxC] tensor that ' 'JPEG image bytes instead of a fixed size [HxWxC] tensor that '
......
...@@ -45,7 +45,8 @@ def get_loss_scale(flags_obj): ...@@ -45,7 +45,8 @@ def get_loss_scale(flags_obj):
def define_performance(num_parallel_calls=True, inter_op=True, intra_op=True, def define_performance(num_parallel_calls=True, inter_op=True, intra_op=True,
synthetic_data=True, max_train_steps=True, dtype=True, synthetic_data=True, max_train_steps=True, dtype=True,
all_reduce_alg=True): all_reduce_alg=True, tf_gpu_thread_mode=True,
datasets_num_private_threads=True):
"""Register flags for specifying performance tuning arguments. """Register flags for specifying performance tuning arguments.
Args: Args:
...@@ -56,7 +57,9 @@ def define_performance(num_parallel_calls=True, inter_op=True, intra_op=True, ...@@ -56,7 +57,9 @@ def define_performance(num_parallel_calls=True, inter_op=True, intra_op=True,
max_train_steps: Create a flags to allow specification of maximum number max_train_steps: Create a flags to allow specification of maximum number
of training steps of training steps
dtype: Create flags for specifying dtype. dtype: Create flags for specifying dtype.
all_reduce_alg: If set forces a specific algorithm for multi-gpu.
tf_gpu_thread_mode: gpu_private triggers us of private thread pool.
datasets_num_private_threads: Number of private threads for datasets.
Returns: Returns:
A list of flags for core.py to marks as key flags. A list of flags for core.py to marks as key flags.
""" """
...@@ -65,7 +68,7 @@ def define_performance(num_parallel_calls=True, inter_op=True, intra_op=True, ...@@ -65,7 +68,7 @@ def define_performance(num_parallel_calls=True, inter_op=True, intra_op=True,
if num_parallel_calls: if num_parallel_calls:
flags.DEFINE_integer( flags.DEFINE_integer(
name="num_parallel_calls", short_name="npc", name="num_parallel_calls", short_name="npc",
default=multiprocessing.cpu_count(), default=1,
help=help_wrap("The number of records that are processed in parallel " help=help_wrap("The number of records that are processed in parallel "
"during input processing. This can be optimized per " "during input processing. This can be optimized per "
"data set but for generally homogeneous data sets, " "data set but for generally homogeneous data sets, "
...@@ -137,5 +140,20 @@ def define_performance(num_parallel_calls=True, inter_op=True, intra_op=True, ...@@ -137,5 +140,20 @@ def define_performance(num_parallel_calls=True, inter_op=True, intra_op=True,
"See tf.contrib.distribute.AllReduceCrossTowerOps for " "See tf.contrib.distribute.AllReduceCrossTowerOps for "
"more details and available options.")) "more details and available options."))
if tf_gpu_thread_mode:
flags.DEFINE_string(
name="tf_gpu_thread_mode", short_name="gt_mode", default="global",
help=help_wrap(
"Whether and how the GPU device uses its own threadpool.")
)
if datasets_num_private_threads:
flags.DEFINE_integer(
name="datasets_num_private_threads", short_name="dataset_thread_count",
default=None,
help=help_wrap(
"Number of threads for a private threadpool created for all"
"datasets computation..")
)
return key_flags return key_flags
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment