Commit 26301e74 authored by Toby Boyd's avatar Toby Boyd
Browse files

refactor method and flag names.

parent eff33131
...@@ -104,6 +104,8 @@ def process_record_dataset(dataset, ...@@ -104,6 +104,8 @@ def process_record_dataset(dataset,
# Defines a specific size thread pool for tf.data operations. # Defines a specific size thread pool for tf.data operations.
if datasets_num_private_threads: if datasets_num_private_threads:
tf.logging.info('datasets_num_private_threads: %s',
datasets_num_private_threads)
dataset = threadpool.override_threadpool( dataset = threadpool.override_threadpool(
dataset, dataset,
threadpool.PrivateThreadPool( threadpool.PrivateThreadPool(
...@@ -178,8 +180,8 @@ def image_bytes_serving_input_fn(image_shape, dtype=tf.float32): ...@@ -178,8 +180,8 @@ def image_bytes_serving_input_fn(image_shape, dtype=tf.float32):
images, {'image_bytes': image_bytes_list}) images, {'image_bytes': image_bytes_list})
def set_environment_vars(flags_obj): def override_flags_and_set_envars_for_gpu_private_threads(flags_obj):
"""Adjust flags and set env_vars for performance. """Override flags and set env_vars for performance.
These settings exist to test the difference between using stock settings These settings exist to test the difference between using stock settings
and manual tuning. It also shows some of the ENV_VARS that can be tweaked to and manual tuning. It also shows some of the ENV_VARS that can be tweaked to
...@@ -193,46 +195,29 @@ def set_environment_vars(flags_obj): ...@@ -193,46 +195,29 @@ def set_environment_vars(flags_obj):
Args: Args:
flags_obj: Current flags, which will be adjusted possibly overriding flags_obj: Current flags, which will be adjusted possibly overriding
what has been set by the user on the command-line. what has been set by the user on the command-line.
Returns:
tf.ConfigProto: session_config proto to add to the session.
""" """
if flags_obj.tf_gpu_thread_mode in ['gpu_private']: cpu_count = multiprocessing.cpu_count()
cpu_count = multiprocessing.cpu_count() tf.logging.info('Logical CPU cores: %s', cpu_count)
print('Logical CPU cores:', cpu_count)
# Sets up thread pool for each GPU for op scheduling.
# Sets up thread pool for each GPU for op scheduling. per_gpu_thread_count = 1
per_gpu_thread_count = 1 total_gpu_thread_count = per_gpu_thread_count * flags_obj.num_gpus
total_gpu_thread_count = per_gpu_thread_count * flags_obj.num_gpus os.environ['TF_GPU_THREAD_MODE'] = flags_obj.tf_gpu_thread_mode
os.environ['TF_GPU_THREAD_MODE'] = flags_obj.tf_gpu_thread_mode os.environ['TF_GPU_THREAD_COUNT'] = str(per_gpu_thread_count)
os.environ['TF_GPU_THREAD_COUNT'] = str(per_gpu_thread_count) tf.logging.info('TF_GPU_THREAD_COUNT: %s', os.environ['TF_GPU_THREAD_COUNT'])
print('TF_GPU_THREAD_COUNT:', os.environ['TF_GPU_THREAD_COUNT']) tf.logging.info('TF_GPU_THREAD_MODE: %s', os.environ['TF_GPU_THREAD_MODE'])
# Reduces general thread pool by number of threads used for GPU pool. # Reduces general thread pool by number of threads used for GPU pool.
main_thread_count = cpu_count - total_gpu_thread_count main_thread_count = cpu_count - total_gpu_thread_count
flags_obj.inter_op_parallelism_threads = main_thread_count flags_obj.inter_op_parallelism_threads = main_thread_count
# Sets thread count for tf.data. Logical cores minus threads assign to the # Sets thread count for tf.data. Logical cores minus threads assign to the
# private GPU pool along with 2 thread per GPU for event monitoring and # private GPU pool along with 2 thread per GPU for event monitoring and
# sending / receiving tensors. # sending / receiving tensors.
num_monitoring_threads = 2 * flags_obj.num_gpus num_monitoring_threads = 2 * flags_obj.num_gpus
num_private_threads = (cpu_count - total_gpu_thread_count num_private_threads = (cpu_count - total_gpu_thread_count
- num_monitoring_threads) - num_monitoring_threads)
flags_obj.datasets_num_private_threads = num_private_threads flags_obj.datasets_num_private_threads = num_private_threads
print('inter_op_parallelism_threads:', flags_obj.inter_op_parallelism_threads)
print('intra_op_parallelism_threads:', flags_obj.intra_op_parallelism_threads)
print('datasets_num_private_threads:', flags_obj.datasets_num_private_threads)
# Create session config based on values of inter_op_parallelism_threads and
# intra_op_parallelism_threads. Note that we default to having
# allow_soft_placement = True, which is required for multi-GPU and not
# harmful for other modes.
session_config = tf.ConfigProto(
inter_op_parallelism_threads=flags_obj.inter_op_parallelism_threads,
intra_op_parallelism_threads=flags_obj.intra_op_parallelism_threads,
allow_soft_placement=True)
return session_config
################################################################################ ################################################################################
...@@ -472,7 +457,18 @@ def resnet_main( ...@@ -472,7 +457,18 @@ def resnet_main(
model_helpers.apply_clean(flags.FLAGS) model_helpers.apply_clean(flags.FLAGS)
session_config = set_environment_vars(flags_obj) # Ensures flog override logic is only executed if explicitly triggered
if flags_obj.tf_gpu_thread_mode:
override_flags_and_set_envars_for_gpu_private_threads(flags_obj)
# Create session config based on values of inter_op_parallelism_threads and
# intra_op_parallelism_threads. Note that we default to having
# allow_soft_placement = True, which is required for multi-GPU and not
# harmful for other modes.
session_config = tf.ConfigProto(
inter_op_parallelism_threads=flags_obj.inter_op_parallelism_threads,
intra_op_parallelism_threads=flags_obj.intra_op_parallelism_threads,
allow_soft_placement=True)
distribution_strategy = distribution_utils.get_distribution_strategy( distribution_strategy = distribution_utils.get_distribution_strategy(
flags_core.get_num_gpus(flags_obj), flags_obj.all_reduce_alg) flags_core.get_num_gpus(flags_obj), flags_obj.all_reduce_alg)
...@@ -534,7 +530,7 @@ def resnet_main( ...@@ -534,7 +530,7 @@ def resnet_main(
num_epochs=num_epochs, num_epochs=num_epochs,
dtype=flags_core.get_tf_dtype(flags_obj), dtype=flags_core.get_tf_dtype(flags_obj),
datasets_num_private_threads=flags_obj.datasets_num_private_threads, datasets_num_private_threads=flags_obj.datasets_num_private_threads,
num_parallel_batches=flags_obj.num_parallel_calls) num_parallel_batches=flags_obj.datasets_num_parallel_batches)
def input_fn_eval(): def input_fn_eval():
return input_function( return input_function(
...@@ -601,7 +597,10 @@ def resnet_main( ...@@ -601,7 +597,10 @@ def resnet_main(
def define_resnet_flags(resnet_size_choices=None): def define_resnet_flags(resnet_size_choices=None):
"""Add flags and validators for ResNet.""" """Add flags and validators for ResNet."""
flags_core.define_base() flags_core.define_base()
flags_core.define_performance() flags_core.define_performance(num_parallel_calls=False,
tf_gpu_thread_mode=True,
datasets_num_private_threads=True,
datasets_num_parallel_batches=True)
flags_core.define_image() flags_core.define_image()
flags_core.define_benchmark() flags_core.define_benchmark()
flags.adopt_module_key_flags(flags_core) flags.adopt_module_key_flags(flags_core)
......
...@@ -45,8 +45,9 @@ def get_loss_scale(flags_obj): ...@@ -45,8 +45,9 @@ def get_loss_scale(flags_obj):
def define_performance(num_parallel_calls=True, inter_op=True, intra_op=True, def define_performance(num_parallel_calls=True, inter_op=True, intra_op=True,
synthetic_data=True, max_train_steps=True, dtype=True, synthetic_data=True, max_train_steps=True, dtype=True,
all_reduce_alg=True, tf_gpu_thread_mode=True, all_reduce_alg=True, tf_gpu_thread_mode=False,
datasets_num_private_threads=True): datasets_num_private_threads=False,
datasets_num_parallel_batches=False):
"""Register flags for specifying performance tuning arguments. """Register flags for specifying performance tuning arguments.
Args: Args:
...@@ -60,6 +61,9 @@ def define_performance(num_parallel_calls=True, inter_op=True, intra_op=True, ...@@ -60,6 +61,9 @@ def define_performance(num_parallel_calls=True, inter_op=True, intra_op=True,
all_reduce_alg: If set forces a specific algorithm for multi-gpu. all_reduce_alg: If set forces a specific algorithm for multi-gpu.
tf_gpu_thread_mode: gpu_private triggers us of private thread pool. tf_gpu_thread_mode: gpu_private triggers us of private thread pool.
datasets_num_private_threads: Number of private threads for datasets. datasets_num_private_threads: Number of private threads for datasets.
datasets_num_parallel_batches: Determines how many batches to process in
parallel when using map and batch from tf.data.
Returns: Returns:
A list of flags for core.py to marks as key flags. A list of flags for core.py to marks as key flags.
""" """
...@@ -68,7 +72,7 @@ def define_performance(num_parallel_calls=True, inter_op=True, intra_op=True, ...@@ -68,7 +72,7 @@ def define_performance(num_parallel_calls=True, inter_op=True, intra_op=True,
if num_parallel_calls: if num_parallel_calls:
flags.DEFINE_integer( flags.DEFINE_integer(
name="num_parallel_calls", short_name="npc", name="num_parallel_calls", short_name="npc",
default=1, default=multiprocessing.cpu_count(),
help=help_wrap("The number of records that are processed in parallel " help=help_wrap("The number of records that are processed in parallel "
"during input processing. This can be optimized per " "during input processing. This can be optimized per "
"data set but for generally homogeneous data sets, " "data set but for generally homogeneous data sets, "
...@@ -142,18 +146,27 @@ def define_performance(num_parallel_calls=True, inter_op=True, intra_op=True, ...@@ -142,18 +146,27 @@ def define_performance(num_parallel_calls=True, inter_op=True, intra_op=True,
if tf_gpu_thread_mode: if tf_gpu_thread_mode:
flags.DEFINE_string( flags.DEFINE_string(
name="tf_gpu_thread_mode", short_name="gt_mode", default="global", name="tf_gpu_thread_mode", short_name="gt_mode", default=None,
help=help_wrap( help=help_wrap(
"Whether and how the GPU device uses its own threadpool.") "Whether and how the GPU device uses its own threadpool.")
) )
if datasets_num_private_threads: if datasets_num_private_threads:
flags.DEFINE_integer( flags.DEFINE_integer(
name="datasets_num_private_threads", short_name="dataset_thread_count", name="datasets_num_private_threads",
default=None, default=None,
help=help_wrap( help=help_wrap(
"Number of threads for a private threadpool created for all" "Number of threads for a private threadpool created for all"
"datasets computation..") "datasets computation..")
) )
if datasets_num_parallel_batches:
flags.DEFINE_integer(
name="datasets_num_parallel_batches",
default=None,
help=help_wrap(
"Determines how many batches to process in parallel when using "
"map and batch from tf.data.")
)
return key_flags return key_flags
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment