Commit 7bea23de authored by Marianne Linhares Monteiro's avatar Marianne Linhares Monteiro Committed by GitHub
Browse files

Adding hook for images/sec and other small changes

parent f24c44d5
......@@ -36,6 +36,10 @@ import os
import numpy as np
from six.moves import xrange # pylint: disable=redefined-builtin
import tensorflow as tf
from tensorflow.python.platform import tf_logging as logging
from tensorflow.python.training import basic_session_run_hooks
from tensorflow.python.training import session_run_hook
from tensorflow.python.training import training_util
import cifar10
import cifar10_model
......@@ -50,7 +54,7 @@ tf.flags.DEFINE_string('data_dir', '',
tf.flags.DEFINE_string('model_dir', '',
'The directory where the model will be stored.')
tf.flags.DEFINE_boolean('is_cpu_ps', False,
tf.flags.DEFINE_boolean('is_cpu_ps', True,
'If using CPU as the parameter server.')
tf.flags.DEFINE_integer('num_gpus', 1,
......@@ -58,7 +62,7 @@ tf.flags.DEFINE_integer('num_gpus', 1,
tf.flags.DEFINE_integer('num_layers', 44, 'The number of layers of the model.')
tf.flags.DEFINE_integer('train_steps', 10000,
tf.flags.DEFINE_integer('train_steps', 80000,
'The number of steps to use for training.')
tf.flags.DEFINE_integer('train_batch_size', 128, 'Batch size for training.')
......@@ -75,14 +79,16 @@ tf.flags.DEFINE_boolean('use_distortion_for_training', True,
tf.flags.DEFINE_boolean('run_experiment', False,
'If True will run an experiment,'
'otherwise will run training and evaluation'
'using the estimator interface')
'using the estimator interface.'
'Experiments perform training on several workers in parallel'
', in other words experiments know how to invoke train and'
' eval in a sensible fashion for distributed training.')
tf.flags.DEFINE_boolean('sync', False,
'If true when running in a distributed environment'
'will run on sync mode')
tf.flags.DEFINE_integer('num_workers', 1,
'Number of workers')
tf.flags.DEFINE_integer('num_workers', 1, 'Number of workers')
# Perf flags
tf.flags.DEFINE_integer('num_intra_threads', 1,
......@@ -110,16 +116,80 @@ tf.flags.DEFINE_boolean('log_device_placement', False,
'Whether to log device placement.')
# TODO(jamesqin): Replace with fix in b/62239022
class ParamServerDeviceSetter(object):
"""Helper class to assign variables on the least loaded ps-device."""
class ExamplesPerSecondHook(session_run_hook.SessionRunHook):
"""Hook to print out examples per second
Total time is tracked and then divided by the total number of steps
to get the average step time and then batch_size is used to determine
the running average of examples per second. The examples per second for the
most recent interval is also logged.
"""
def __init__(self, worker_device, ps_devices):
"""Initializer for ParamServerDeviceSetter.
def __init__(
self,
batch_size,
every_n_steps=100,
every_n_secs=None,):
"""Initializer for ExamplesPerSecondHook.
Args:
batch_size: Total batch size used to calculate examples/second from
global time.
every_n_steps: Log stats every n steps.
every_n_secs: Log stats every n seconds.
"""
if (every_n_steps is None) == (every_n_secs is None):
raise ValueError(
'exactly one of every_n_steps and every_n_secs should be provided.')
self._timer = basic_session_run_hooks.SecondOrStepTimer(
every_steps=every_n_steps, every_secs=every_n_secs)
self._step_train_time = 0
self._total_steps = 0
self._batch_size = batch_size
def begin(self):
self._global_step_tensor = training_util.get_global_step()
if self._global_step_tensor is None:
raise RuntimeError(
'Global step should be created to use StepCounterHook.')
def before_run(self, run_context): # pylint: disable=unused-argument
return basic_session_run_hooks.SessionRunArgs(self._global_step_tensor)
def after_run(self, run_context, run_values):
_ = run_context
global_step = run_values.results
if self._timer.should_trigger_for_step(global_step):
elapsed_time, elapsed_steps = self._timer.update_last_triggered_step(
global_step)
if elapsed_time is not None:
steps_per_sec = elapsed_steps / elapsed_time
self._step_train_time += elapsed_time
self._total_steps += elapsed_steps
average_examples_per_sec = self._batch_size * (
self._total_steps / self._step_train_time)
current_examples_per_sec = steps_per_sec * self._batch_size
# Average examples/sec followed by current examples/sec
logging.info('%s: %g (%g), step = %g', 'Average examples/sec',
average_examples_per_sec, current_examples_per_sec,
self._total_steps)
class GpuParamServerDeviceSetter(object):
"""Used with tf.device() to place variables on the least loaded GPU.
A common use for this class is to pass a list of GPU devices, e.g. ['gpu:0',
'gpu:1','gpu:2'], as ps_devices. When each variable is placed, it will be
placed on the least loaded gpu. All other Ops, which will be the computation
Ops, will be placed on the worker_device.
"""
def __init__(self, worker_device, ps_devices):
"""Initializer for GpuParamServerDeviceSetter.
Args:
worker_device: the device to use for computer ops.
ps_devices: a list of devices to use for Variable ops. Each variable is
worker_device: the device to use for computation Ops.
ps_devices: a list of devices to use for Variable Ops. Each variable is
assigned to the least loaded device.
"""
self.ps_devices = ps_devices
......@@ -131,7 +201,8 @@ class ParamServerDeviceSetter(object):
return op.device
if op.type not in ['Variable', 'VariableV2', 'VarHandleOp']:
return self.worker_device
# Gets the least loaded ps_device
device_index, _ = min(enumerate(self.ps_sizes), key=operator.itemgetter(1))
device_name = self.ps_devices[device_index]
var_size = op.outputs[0].get_shape().num_elements()
......@@ -139,15 +210,16 @@ class ParamServerDeviceSetter(object):
return device_name
def _create_device_setter(is_cpu_ps, worker):
def _create_device_setter(is_cpu_ps, worker, num_gpus):
"""Create device setter object."""
if is_cpu_ps:
# tf.train.replica_device_setter supports placing variables on the CPU, all
# on one GPU, or on ps_servers defined in a cluster_spec.
return tf.train.replica_device_setter(
worker_device=worker, ps_device='/cpu:0', ps_tasks=1)
else:
gpus = ['/gpu:%d' % i for i in range(FLAGS.num_gpus)]
return ParamServerDeviceSetter(worker, gpus)
gpus = ['/gpu:%d' % i for i in range(num_gpus)]
return GpuParamServerDeviceSetter(worker, gpus)
def _resnet_model_fn(features, labels, mode):
......@@ -181,7 +253,7 @@ def _resnet_model_fn(features, labels, mode):
if num_gpus != 0:
for i in range(num_gpus):
worker = '/gpu:%d' % i
device_setter = _create_device_setter(is_cpu_ps, worker)
device_setter = _create_device_setter(is_cpu_ps, worker, FLAGS.num_gpus)
with tf.variable_scope('resnet', reuse=bool(i != 0)):
with tf.name_scope('tower_%d' % i) as name_scope:
with tf.device(device_setter):
......@@ -210,7 +282,7 @@ def _resnet_model_fn(features, labels, mode):
ps_device = '/cpu:0' if is_cpu_ps else '/gpu:0'
with tf.device(ps_device):
with tf.name_scope('gradient_averaging'):
loss = tf.reduce_mean(tower_losses)
loss = tf.reduce_mean(tower_losses, name='loss')
for zipped_gradvars in zip(*tower_gradvars):
# Averaging one var's gradients computed from multiple towers
var = zipped_gradvars[0][1]
......@@ -403,7 +475,7 @@ def main(unused_argv):
train_steps = FLAGS.train_steps
eval_steps = num_eval_examples // FLAGS.eval_batch_size
# session configuration
# Session configuration.
sess_config = tf.ConfigProto()
sess_config.allow_soft_placement = True
sess_config.log_device_placement = FLAGS.log_device_placement
......@@ -411,10 +483,18 @@ def main(unused_argv):
sess_config.inter_op_parallelism_threads = FLAGS.num_inter_threads
sess_config.gpu_options.force_gpu_compatible = FLAGS.force_gpu_compatible
# log learning_rate
tensors_to_log = {'learning_rate': 'learning_rate'}
# Hooks that add extra logging that is useful to see the loss more often in
# the console as well as examples per second.
tensors_to_log = {'learning_rate': 'learning_rate',
'loss': 'gradient_averaging/loss'}
logging_hook = tf.train.LoggingTensorHook(
tensors=tensors_to_log, every_n_iter=100)
examples_sec_hook = ExamplesPerSecondHook(
FLAGS.train_batch_size, every_n_steps=10)
hooks = [logging_hook, examples_sec_hook]
if FLAGS.run_experiment:
config = tf.contrib.learn.RunConfig(model_dir=FLAGS.model_dir)
......@@ -422,7 +502,7 @@ def main(unused_argv):
tf.contrib.learn.learn_runner.run(
get_experiment_fn(train_input_fn, eval_input_fn,
train_steps, eval_steps,
[logging_hook]), run_config=config)
hooks), run_config=config)
else:
config = tf.estimator.RunConfig()
......@@ -433,7 +513,7 @@ def main(unused_argv):
print('Starting to train...')
classifier.train(input_fn=train_input_fn,
steps=train_steps,
hooks=[logging_hook])
hooks=hooks)
print('Starting to evaluate...')
eval_results = classifier.evaluate(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment