Unverified Commit 413f15ba authored by Toby Boyd's avatar Toby Boyd Committed by GitHub
Browse files

Merge pull request #5518 from tfboyd/add_perf_args

Add perf args
parents fdf78b86 b98409cb
......@@ -109,8 +109,9 @@ def preprocess_image(image, is_training):
return image
def input_fn(is_training, data_dir, batch_size, num_epochs=1, num_gpus=None,
dtype=tf.float32):
def input_fn(is_training, data_dir, batch_size, num_epochs=1,
dtype=tf.float32, datasets_num_private_threads=None,
num_parallel_batches=1):
"""Input function which provides batches for train or eval.
Args:
......@@ -118,8 +119,9 @@ def input_fn(is_training, data_dir, batch_size, num_epochs=1, num_gpus=None,
data_dir: The directory containing the input data.
batch_size: The number of samples per batch.
num_epochs: The number of epochs to repeat the dataset.
num_gpus: The number of gpus used for training.
dtype: Data type to use for images/features
datasets_num_private_threads: Number of private threads for tf.data.
num_parallel_batches: Number of parallel batches for tf.data.
Returns:
A dataset that can be used for iteration.
......@@ -134,9 +136,9 @@ def input_fn(is_training, data_dir, batch_size, num_epochs=1, num_gpus=None,
shuffle_buffer=_NUM_IMAGES['train'],
parse_record_fn=parse_record,
num_epochs=num_epochs,
num_gpus=num_gpus,
examples_per_epoch=_NUM_IMAGES['train'] if is_training else None,
dtype=dtype
dtype=dtype,
datasets_num_private_threads=datasets_num_private_threads,
num_parallel_batches=num_parallel_batches
)
......
......@@ -158,8 +158,9 @@ def parse_record(raw_record, is_training, dtype):
return image, label
def input_fn(is_training, data_dir, batch_size, num_epochs=1, num_gpus=None,
dtype=tf.float32):
def input_fn(is_training, data_dir, batch_size, num_epochs=1,
dtype=tf.float32, datasets_num_private_threads=None,
num_parallel_batches=1):
"""Input function which provides batches for train or eval.
Args:
......@@ -167,8 +168,9 @@ def input_fn(is_training, data_dir, batch_size, num_epochs=1, num_gpus=None,
data_dir: The directory containing the input data.
batch_size: The number of samples per batch.
num_epochs: The number of epochs to repeat the dataset.
num_gpus: The number of gpus used for training.
dtype: Data type to use for images/features
datasets_num_private_threads: Number of private threads for tf.data.
num_parallel_batches: Number of parallel batches for tf.data.
Returns:
A dataset that can be used for iteration.
......@@ -195,9 +197,9 @@ def input_fn(is_training, data_dir, batch_size, num_epochs=1, num_gpus=None,
shuffle_buffer=_SHUFFLE_BUFFER,
parse_record_fn=parse_record,
num_epochs=num_epochs,
num_gpus=num_gpus,
examples_per_epoch=_NUM_IMAGES['train'] if is_training else None,
dtype=dtype
dtype=dtype,
datasets_num_private_threads=datasets_num_private_threads,
num_parallel_batches=num_parallel_batches
)
......
......@@ -25,11 +25,13 @@ from __future__ import print_function
import functools
import math
import multiprocessing
import os
# pylint: disable=g-bad-import-order
from absl import flags
import tensorflow as tf
from tensorflow.contrib.data.python.ops import threadpool
from official.resnet import resnet_model
from official.utils.flags import core as flags_core
......@@ -39,15 +41,20 @@ from official.utils.logs import logger
from official.resnet import imagenet_preprocessing
from official.utils.misc import distribution_utils
from official.utils.misc import model_helpers
# pylint: enable=g-bad-import-order
################################################################################
# Functions for input processing.
################################################################################
def process_record_dataset(dataset, is_training, batch_size, shuffle_buffer,
parse_record_fn, num_epochs=1, num_gpus=None,
examples_per_epoch=None, dtype=tf.float32):
def process_record_dataset(dataset,
is_training,
batch_size,
shuffle_buffer,
parse_record_fn,
num_epochs=1,
dtype=tf.float32,
datasets_num_private_threads=None,
num_parallel_batches=1):
"""Given a Dataset with raw records, return an iterator over the records.
Args:
......@@ -60,9 +67,10 @@ def process_record_dataset(dataset, is_training, batch_size, shuffle_buffer,
parse_record_fn: A function that takes a raw record and returns the
corresponding (image, label) pair.
num_epochs: The number of epochs to repeat the dataset.
num_gpus: The number of gpus used for training.
examples_per_epoch: The number of examples in an epoch.
dtype: Data type to use for images/features.
datasets_num_private_threads: Number of threads for a private
threadpool created for all datasets computation.
num_parallel_batches: Number of parallel batches for tf.data.
Returns:
Dataset of (image, label) pairs ready for iteration.
......@@ -83,7 +91,7 @@ def process_record_dataset(dataset, is_training, batch_size, shuffle_buffer,
tf.contrib.data.map_and_batch(
lambda value: parse_record_fn(value, is_training, dtype),
batch_size=batch_size,
num_parallel_calls=1,
num_parallel_batches=num_parallel_batches,
drop_remainder=False))
# Operations between the final prefetch and the get_next call to the iterator
......@@ -94,6 +102,16 @@ def process_record_dataset(dataset, is_training, batch_size, shuffle_buffer,
# on how many devices are present.
dataset = dataset.prefetch(buffer_size=tf.contrib.data.AUTOTUNE)
# Defines a specific size thread pool for tf.data operations.
if datasets_num_private_threads:
tf.logging.info('datasets_num_private_threads: %s',
datasets_num_private_threads)
dataset = threadpool.override_threadpool(
dataset,
threadpool.PrivateThreadPool(
datasets_num_private_threads,
display_name='input_pipeline_thread_pool'))
return dataset
......@@ -162,6 +180,45 @@ def image_bytes_serving_input_fn(image_shape, dtype=tf.float32):
images, {'image_bytes': image_bytes_list})
def override_flags_and_set_envars_for_gpu_thread_pool(flags_obj):
"""Override flags and set env_vars for performance.
These settings exist to test the difference between using stock settings
and manual tuning. It also shows some of the ENV_VARS that can be tweaked to
squeeze a few extra examples per second. These settings are defaulted to the
current platform of interest, which changes over time.
On systems with small numbers of cpu cores, e.g. under 8 logical cores,
setting up a gpu thread pool with `tf_gpu_thread_mode=gpu_private` may perform
poorly.
Args:
flags_obj: Current flags, which will be adjusted possibly overriding
what has been set by the user on the command-line.
"""
cpu_count = multiprocessing.cpu_count()
tf.logging.info('Logical CPU cores: %s', cpu_count)
# Sets up thread pool for each GPU for op scheduling.
per_gpu_thread_count = 1
total_gpu_thread_count = per_gpu_thread_count * flags_obj.num_gpus
os.environ['TF_GPU_THREAD_MODE'] = flags_obj.tf_gpu_thread_mode
os.environ['TF_GPU_THREAD_COUNT'] = str(per_gpu_thread_count)
tf.logging.info('TF_GPU_THREAD_COUNT: %s', os.environ['TF_GPU_THREAD_COUNT'])
tf.logging.info('TF_GPU_THREAD_MODE: %s', os.environ['TF_GPU_THREAD_MODE'])
# Reduces general thread pool by number of threads used for GPU pool.
main_thread_count = cpu_count - total_gpu_thread_count
flags_obj.inter_op_parallelism_threads = main_thread_count
# Sets thread count for tf.data. Logical cores minus threads assign to the
# private GPU pool along with 2 thread per GPU for event monitoring and
# sending / receiving tensors.
num_monitoring_threads = 2 * flags_obj.num_gpus
flags_obj.datasets_num_private_threads = (cpu_count - total_gpu_thread_count
- num_monitoring_threads)
################################################################################
# Functions for running training/eval/validation loops for the model.
################################################################################
......@@ -399,13 +456,12 @@ def resnet_main(
model_helpers.apply_clean(flags.FLAGS)
# Using the Winograd non-fused algorithms provides a small performance boost.
os.environ['TF_ENABLE_WINOGRAD_NONFUSED'] = '1'
# Ensures flag override logic is only executed if explicitly triggered.
if flags_obj.tf_gpu_thread_mode:
override_flags_and_set_envars_for_gpu_thread_pool(flags_obj)
# Create session config based on values of inter_op_parallelism_threads and
# intra_op_parallelism_threads. Note that we default to having
# allow_soft_placement = True, which is required for multi-GPU and not
# harmful for other modes.
# Creates session config. allow_soft_placement = True, is required for
# multi-GPU and is not harmful for other modes.
session_config = tf.ConfigProto(
inter_op_parallelism_threads=flags_obj.inter_op_parallelism_threads,
intra_op_parallelism_threads=flags_obj.intra_op_parallelism_threads,
......@@ -414,10 +470,14 @@ def resnet_main(
distribution_strategy = distribution_utils.get_distribution_strategy(
flags_core.get_num_gpus(flags_obj), flags_obj.all_reduce_alg)
# Creates a `RunConfig` that checkpoints every 24 hours which essentially
# results in checkpoints determined only by `epochs_between_evals`.
run_config = tf.estimator.RunConfig(
train_distribute=distribution_strategy, session_config=session_config)
train_distribute=distribution_strategy,
session_config=session_config,
save_checkpoints_secs=60*60*24)
# initialize our model with all but the dense layer from pretrained resnet
# Initializes model with all but the dense layer from pretrained ResNet.
if flags_obj.pretrained_model_checkpoint_path is not None:
warm_start_settings = tf.estimator.WarmStartSettings(
flags_obj.pretrained_model_checkpoint_path,
......@@ -459,16 +519,19 @@ def resnet_main(
def input_fn_train(num_epochs):
return input_function(
is_training=True, data_dir=flags_obj.data_dir,
is_training=True,
data_dir=flags_obj.data_dir,
batch_size=distribution_utils.per_device_batch_size(
flags_obj.batch_size, flags_core.get_num_gpus(flags_obj)),
num_epochs=num_epochs,
num_gpus=flags_core.get_num_gpus(flags_obj),
dtype=flags_core.get_tf_dtype(flags_obj))
dtype=flags_core.get_tf_dtype(flags_obj),
datasets_num_private_threads=flags_obj.datasets_num_private_threads,
num_parallel_batches=flags_obj.datasets_num_parallel_batches)
def input_fn_eval():
return input_function(
is_training=False, data_dir=flags_obj.data_dir,
is_training=False,
data_dir=flags_obj.data_dir,
batch_size=distribution_utils.per_device_batch_size(
flags_obj.batch_size, flags_core.get_num_gpus(flags_obj)),
num_epochs=1,
......@@ -530,7 +593,10 @@ def resnet_main(
def define_resnet_flags(resnet_size_choices=None):
"""Add flags and validators for ResNet."""
flags_core.define_base()
flags_core.define_performance(num_parallel_calls=False)
flags_core.define_performance(num_parallel_calls=False,
tf_gpu_thread_mode=True,
datasets_num_private_threads=True,
datasets_num_parallel_batches=True)
flags_core.define_image()
flags_core.define_benchmark()
flags.adopt_module_key_flags(flags_core)
......@@ -554,7 +620,7 @@ def define_resnet_flags(resnet_size_choices=None):
help=flags_core.help_wrap('Skip training and only perform evaluation on '
'the latest checkpoint.'))
flags.DEFINE_boolean(
name="image_bytes_as_serving_input", default=False,
name='image_bytes_as_serving_input', default=False,
help=flags_core.help_wrap(
'If True exports savedmodel with serving signature that accepts '
'JPEG image bytes instead of a fixed size [HxWxC] tensor that '
......
......@@ -45,7 +45,9 @@ def get_loss_scale(flags_obj):
def define_performance(num_parallel_calls=True, inter_op=True, intra_op=True,
synthetic_data=True, max_train_steps=True, dtype=True,
all_reduce_alg=True):
all_reduce_alg=True, tf_gpu_thread_mode=False,
datasets_num_private_threads=False,
datasets_num_parallel_batches=False):
"""Register flags for specifying performance tuning arguments.
Args:
......@@ -56,6 +58,11 @@ def define_performance(num_parallel_calls=True, inter_op=True, intra_op=True,
max_train_steps: Create a flags to allow specification of maximum number
of training steps
dtype: Create flags for specifying dtype.
all_reduce_alg: If set forces a specific algorithm for multi-gpu.
tf_gpu_thread_mode: gpu_private triggers us of private thread pool.
datasets_num_private_threads: Number of private threads for datasets.
datasets_num_parallel_batches: Determines how many batches to process in
parallel when using map and batch from tf.data.
Returns:
A list of flags for core.py to marks as key flags.
......@@ -137,5 +144,29 @@ def define_performance(num_parallel_calls=True, inter_op=True, intra_op=True,
"See tf.contrib.distribute.AllReduceCrossTowerOps for "
"more details and available options."))
if tf_gpu_thread_mode:
flags.DEFINE_string(
name="tf_gpu_thread_mode", short_name="gt_mode", default=None,
help=help_wrap(
"Whether and how the GPU device uses its own threadpool.")
)
if datasets_num_private_threads:
flags.DEFINE_integer(
name="datasets_num_private_threads",
default=None,
help=help_wrap(
"Number of threads for a private threadpool created for all"
"datasets computation..")
)
if datasets_num_parallel_batches:
flags.DEFINE_integer(
name="datasets_num_parallel_batches",
default=None,
help=help_wrap(
"Determines how many batches to process in parallel when using "
"map and batch from tf.data.")
)
return key_flags
......@@ -42,7 +42,7 @@ def get_distribution_strategy(num_gpus, all_reduce_alg=None):
return tf.contrib.distribute.MirroredStrategy(
num_gpus=num_gpus,
cross_tower_ops=tf.contrib.distribute.AllReduceCrossTowerOps(
all_reduce_alg, num_packs=num_gpus))
all_reduce_alg, num_packs=2))
else:
return tf.contrib.distribute.MirroredStrategy(num_gpus=num_gpus)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment