"vscode:/vscode.git/clone" did not exist on "d92f67be0931690e4a782bf2da2ed5fdbe59625f"
Commit 2164c8db authored by Eli Bixby's avatar Eli Bixby
Browse files

Move device and hook to utils. Fix device stuff

parent 8b829873
......@@ -59,7 +59,6 @@ $ python cifar10_main.py --data-dir=/prefix/to/downloaded/data/cifar-10-batches-
# Run the model on 2 GPUs using CPU as parameter server. After training, it runs the evaluation.
$ python cifar10_main.py --data-dir=/prefix/to/downloaded/data/cifar-10-batches-py \
--job-dir=/tmp/cifar10 \
--force-gpu-compatible \
--num-gpus=2 \
--train-steps=1000
......@@ -68,8 +67,7 @@ $ python cifar10_main.py --data-dir=/prefix/to/downloaded/data/cifar-10-batches-
# a couple of times to perform evaluation.
$ python cifar10_main.py --data-dir=/prefix/to/downloaded/data/cifar-10-batches-bin \
--job-dir=/tmp/cifar10 \
--avg-on-gpu \
--force-gpu-compatible \
--variable-strategy GPU \
--num-gpus=2 \
......@@ -102,7 +100,6 @@ gcloud ml-engine jobs submit training cifarmultigpu \
--module-name cifar10_estimator.cifar10_main \
-- \
--data-dir=$MY_BUCKET/cifar-10-batches-py \
--force-gpu-compatible \
--num-gpus=4 \
--train-steps=1000
```
......@@ -183,11 +180,9 @@ Once you have a `TF_CONFIG` configured properly on each host you're ready to run
# Make sure the model_dir is the same as defined on the TF_CONFIG.
$ python cifar10_main.py --data-dir=gs://path/cifar-10-batches-py \
--job-dir=gs://path/model_dir/ \
--force-gpu-compatible \
--num-gpus=4 \
--train-steps=40000 \
--sync \
\
--num-workers=2
```
......@@ -325,7 +320,6 @@ INFO:tensorflow:Saving dict for global step 1: accuracy = 0.0994, global_step =
# Make sure the model_dir is the same as defined on the TF_CONFIG.
$ python cifar10_main.py --data-dir=gs://path/cifar-10-batches-py \
--job-dir=gs://path/model_dir/ \
--force-gpu-compatible \
--num-gpus=4 \
--train-steps=40000 \
--sync
......
......@@ -30,138 +30,28 @@ from __future__ import print_function
import argparse
import functools
import operator
import itertools
import os
import six
import numpy as np
from six.moves import xrange # pylint: disable=redefined-builtin
import tensorflow as tf
from tensorflow.python.platform import tf_logging as logging
from tensorflow.python.training import basic_session_run_hooks
from tensorflow.python.training import session_run_hook
from tensorflow.python.training import training_util
import cifar10
import cifar10_model
import cifar10_utils
tf.logging.set_verbosity(tf.logging.INFO)
class ExamplesPerSecondHook(session_run_hook.SessionRunHook):
"""Hook to print out examples per second.
Total time is tracked and then divided by the total number of steps
to get the average step time and then batch_size is used to determine
the running average of examples per second. The examples per second for the
most recent interval is also logged.
"""
def __init__(
self,
batch_size,
every_n_steps=100,
every_n_secs=None,):
"""Initializer for ExamplesPerSecondHook.
Args:
batch_size: Total batch size used to calculate examples/second from
global time.
every_n_steps: Log stats every n steps.
every_n_secs: Log stats every n seconds.
"""
if (every_n_steps is None) == (every_n_secs is None):
raise ValueError('exactly one of every_n_steps'
' and every_n_secs should be provided.')
self._timer = basic_session_run_hooks.SecondOrStepTimer(
every_steps=every_n_steps, every_secs=every_n_secs)
self._step_train_time = 0
self._total_steps = 0
self._batch_size = batch_size
def begin(self):
self._global_step_tensor = training_util.get_global_step()
if self._global_step_tensor is None:
raise RuntimeError(
'Global step should be created to use StepCounterHook.')
def before_run(self, run_context): # pylint: disable=unused-argument
return basic_session_run_hooks.SessionRunArgs(self._global_step_tensor)
def after_run(self, run_context, run_values):
_ = run_context
global_step = run_values.results
if self._timer.should_trigger_for_step(global_step):
elapsed_time, elapsed_steps = self._timer.update_last_triggered_step(
global_step)
if elapsed_time is not None:
steps_per_sec = elapsed_steps / elapsed_time
self._step_train_time += elapsed_time
self._total_steps += elapsed_steps
average_examples_per_sec = self._batch_size * (
self._total_steps / self._step_train_time)
current_examples_per_sec = steps_per_sec * self._batch_size
# Average examples/sec followed by current examples/sec
logging.info('%s: %g (%g), step = %g', 'Average examples/sec',
average_examples_per_sec, current_examples_per_sec,
self._total_steps)
class GpuParamServerDeviceSetter(object):
"""Used with tf.device() to place variables on the least loaded GPU.
A common use for this class is to pass a list of GPU devices, e.g. ['gpu:0',
'gpu:1','gpu:2'], as ps_devices. When each variable is placed, it will be
placed on the least loaded gpu. All other Ops, which will be the computation
Ops, will be placed on the worker_device.
"""
tf.logging.set_verbosity(tf.logging.INFO)
def __init__(self, worker_device, ps_devices):
"""Initializer for GpuParamServerDeviceSetter.
Args:
worker_device: the device to use for computation Ops.
ps_devices: a list of devices to use for Variable Ops. Each variable is
assigned to the least loaded device.
"""
self.ps_devices = ps_devices
self.worker_device = worker_device
self.ps_sizes = [0] * len(self.ps_devices)
def __call__(self, op):
if op.device:
return op.device
if op.type not in ['Variable', 'VariableV2', 'VarHandleOp']:
return self.worker_device
# Gets the least loaded ps_device
device_index, _ = min(enumerate(self.ps_sizes), key=operator.itemgetter(1))
device_name = self.ps_devices[device_index]
var_size = op.outputs[0].get_shape().num_elements()
self.ps_sizes[device_index] += var_size
return device_name
def _create_device_setter(avg_on_gpu, worker, num_gpus):
"""Create device setter object."""
if avg_on_gpu:
gpus = ['/gpu:%d' % i for i in range(num_gpus)]
return GpuParamServerDeviceSetter(worker, gpus)
else:
# tf.train.replica_device_setter supports placing variables on the CPU, all
# on one GPU, or on ps_servers defined in a cluster_spec.
return tf.train.replica_device_setter(
worker_device=worker, ps_device='/cpu:0', ps_tasks=1)
def get_model_fn(num_gpus, avg_on_gpu, num_workers):
def get_model_fn(num_gpus, variable_strategy, num_workers):
def _resnet_model_fn(features, labels, mode, params):
"""Resnet model body.
Support single host, one or more GPU training. Parameter distribution can be
either one of the following scheme.
Support single host, one or more GPU training. Parameter distribution can
be either one of the following scheme.
1. CPU is the parameter server and manages gradient updates.
2. Parameters are distributed evenly across all GPUs, and the first GPU
manages gradient updates.
......@@ -186,8 +76,19 @@ def get_model_fn(num_gpus, avg_on_gpu, num_workers):
if num_gpus != 0:
for i in range(num_gpus):
worker = '/gpu:%d' % i
device_setter = _create_device_setter(avg_on_gpu, worker, num_gpus)
worker_device = '/gpu:{}'.format(i)
if variable_strategy == 'CPU':
device_setter = cifar10_utils.local_device_setter(
worker_device=worker_device)
elif variable_strategy == 'GPU':
device_setter = cifar10_utils.local_device_setter(
ps_device_type='gpu',
worker_device=worker_device,
ps_strategy=tf.contrib.training.GreedyLoadBalancingStrategy(
num_gpus,
tf.contrib.training.byte_size_load_fn
)
)
with tf.variable_scope('resnet', reuse=bool(i != 0)):
with tf.name_scope('tower_%d' % i) as name_scope:
with tf.device(device_setter):
......@@ -231,15 +132,14 @@ def get_model_fn(num_gpus, avg_on_gpu, num_workers):
# Now compute global loss and gradients.
gradvars = []
# Server that runs the ops to apply global gradient updates.
avg_device = '/gpu:0' if avg_on_gpu else '/cpu:0'
with tf.device(avg_device):
with tf.name_scope('gradient_averaging'):
loss = tf.reduce_mean(tower_losses, name='loss')
for zipped_gradvars in zip(*tower_gradvars):
# Averaging one var's gradients computed from multiple towers
var = zipped_gradvars[0][1]
grads = [gv[0] for gv in zipped_gradvars]
all_grads = {}
for grad, var in itertools.chain(*tower_gradvars):
if grad is not None:
all_grads.setdefault(var, []).append(grad)
for var, grads in six.iteritems(all_grads):
# Average gradients on the same device as the variables
# to which they apply.
with tf.device(var.device):
if len(grads) == 1:
avg_grad = grads[0]
......@@ -247,6 +147,10 @@ def get_model_fn(num_gpus, avg_on_gpu, num_workers):
avg_grad = tf.multiply(tf.add_n(grads), 1. / len(grads))
gradvars.append((avg_grad, var))
# Device that runs the ops to apply global gradient updates.
consolidation_device = '/gpu:0' if variable_strategy == 'GPU' else '/cpu:0'
with tf.device(consolidation_device):
# Suggested learning rate scheduling from
# https://github.com/ppwwyyxx/tensorpack/blob/master/examples/ResNet/cifar10-resnet.py#L155
# users could apply other scheduling.
......@@ -292,6 +196,7 @@ def get_model_fn(num_gpus, avg_on_gpu, num_workers):
metrics = {
'accuracy': tf.metrics.accuracy(stacked_labels, predictions['classes'])
}
loss = tf.reduce_mean(tower_losses, name='loss')
return tf.estimator.EstimatorSpec(
mode=mode,
......@@ -345,7 +250,7 @@ def _tower_fn(is_training,
tower_grad = tf.gradients(tower_loss, model_params)
return tower_loss, tower_grad, tower_pred
return tower_loss, zip(tower_grad, model_params), tower_pred
def input_fn(data_dir, subset, num_shards, batch_size,
......@@ -433,11 +338,11 @@ def get_experiment_fn(data_dir, num_gpus, is_gpu_ps,
train_steps = hparams.train_steps
eval_steps = num_eval_examples // hparams.eval_batch_size
examples_sec_hook = ExamplesPerSecondHook(
examples_sec_hook = cifar10_utils.ExamplesPerSecondHook(
hparams.train_batch_size, every_n_steps=10)
tensors_to_log = {'learning_rate': 'learning_rate',
'loss': 'gradient_averaging/loss'}
'loss': 'loss'}
logging_hook = tf.train.LoggingTensorHook(
tensors=tensors_to_log, every_n_iter=100)
......@@ -446,7 +351,7 @@ def get_experiment_fn(data_dir, num_gpus, is_gpu_ps,
classifier = tf.estimator.Estimator(
model_fn=get_model_fn(
num_gpus, is_gpu_ps, run_config.num_worker_replicas),
num_gpus, is_gpu_ps, run_config.num_worker_replicas or 1),
config=run_config,
params=vars(hparams)
)
......@@ -467,11 +372,10 @@ def get_experiment_fn(data_dir, num_gpus, is_gpu_ps,
def main(job_dir,
data_dir,
num_gpus,
avg_on_gpu,
variable_strategy,
use_distortion_for_training,
log_device_placement,
num_intra_threads,
force_gpu_compatible,
**hparams):
# The env variable is on deprecation path, default is set to off.
os.environ['TF_SYNC_ON_FINISH'] = '0'
......@@ -482,7 +386,7 @@ def main(job_dir,
log_device_placement=log_device_placement,
intra_op_parallelism_threads=num_intra_threads,
gpu_options=tf.GPUOptions(
force_gpu_compatible=force_gpu_compatible
force_gpu_compatible=True
)
)
......@@ -493,7 +397,7 @@ def main(job_dir,
get_experiment_fn(
data_dir,
num_gpus,
avg_on_gpu,
variable_strategy,
use_distortion_for_training
),
run_config=config,
......@@ -516,10 +420,11 @@ if __name__ == '__main__':
help='The directory where the model will be stored.'
)
parser.add_argument(
'--avg-on-gpu',
action='store_true',
default=False,
help='If present, use GPU to average gradients.'
'--variable_strategy',
choices=['CPU', 'GPU'],
type=str,
default='CPU',
help='Where to locate variable operations'
)
parser.add_argument(
'--num-gpus',
......@@ -563,7 +468,6 @@ if __name__ == '__main__':
default=2e-4,
help='Weight decay for convolutions.'
)
parser.add_argument(
'--learning-rate',
type=float,
......@@ -609,15 +513,6 @@ if __name__ == '__main__':
system will pick an appropriate number.\
"""
)
parser.add_argument(
'--force-gpu-compatible',
action='store_true',
default=False,
help="""\
Whether to enable force_gpu_compatible in GPU_Options. Check
tensorflow/core/protobuf/config.proto#L69 for details.\
"""
)
parser.add_argument(
'--log-device-placement',
action='store_true',
......@@ -641,7 +536,7 @@ if __name__ == '__main__':
if args.num_gpus < 0:
raise ValueError(
'Invalid GPU count: \"num_gpus\" must be 0 or a positive integer.')
if args.num_gpus == 0 and args.avg_on_gpu:
if args.num_gpus == 0 and args.variable_strategy == 'GPU':
raise ValueError(
'No GPU available for use, must use CPU to average gradients.')
if (args.num_layers - 2) % 6 != 0:
......
import six
from tensorflow.python.platform import tf_logging as logging
from tensorflow.core.framework import node_def_pb2
from tensorflow.python.framework import device as pydev
from tensorflow.python.training import basic_session_run_hooks
from tensorflow.python.training import session_run_hook
from tensorflow.python.training import training_util
from tensorflow.python.training import device_setter
class ExamplesPerSecondHook(session_run_hook.SessionRunHook):
"""Hook to print out examples per second.
Total time is tracked and then divided by the total number of steps
to get the average step time and then batch_size is used to determine
the running average of examples per second. The examples per second for the
most recent interval is also logged.
"""
def __init__(
self,
batch_size,
every_n_steps=100,
every_n_secs=None,):
"""Initializer for ExamplesPerSecondHook.
Args:
batch_size: Total batch size used to calculate examples/second from
global time.
every_n_steps: Log stats every n steps.
every_n_secs: Log stats every n seconds.
"""
if (every_n_steps is None) == (every_n_secs is None):
raise ValueError('exactly one of every_n_steps'
' and every_n_secs should be provided.')
self._timer = basic_session_run_hooks.SecondOrStepTimer(
every_steps=every_n_steps, every_secs=every_n_secs)
self._step_train_time = 0
self._total_steps = 0
self._batch_size = batch_size
def begin(self):
self._global_step_tensor = training_util.get_global_step()
if self._global_step_tensor is None:
raise RuntimeError(
'Global step should be created to use StepCounterHook.')
def before_run(self, run_context): # pylint: disable=unused-argument
return basic_session_run_hooks.SessionRunArgs(self._global_step_tensor)
def after_run(self, run_context, run_values):
_ = run_context
global_step = run_values.results
if self._timer.should_trigger_for_step(global_step):
elapsed_time, elapsed_steps = self._timer.update_last_triggered_step(
global_step)
if elapsed_time is not None:
steps_per_sec = elapsed_steps / elapsed_time
self._step_train_time += elapsed_time
self._total_steps += elapsed_steps
average_examples_per_sec = self._batch_size * (
self._total_steps / self._step_train_time)
current_examples_per_sec = steps_per_sec * self._batch_size
# Average examples/sec followed by current examples/sec
logging.info('%s: %g (%g), step = %g', 'Average examples/sec',
average_examples_per_sec, current_examples_per_sec,
self._total_steps)
def local_device_setter(num_devices=1,
ps_device_type='cpu',
worker_device='/cpu:0',
ps_ops=None,
ps_strategy=None):
if ps_ops == None:
ps_ops = ['Variable', 'VariableV2', 'VarHandleOp']
if ps_strategy is None:
ps_strategy = device_setter._RoundRobinStrategy(num_devices)
if not six.callable(ps_strategy):
raise TypeError("ps_strategy must be callable")
def _local_device_chooser(op):
current_device = pydev.DeviceSpec.from_string(op.device or "")
node_def = op if isinstance(op, node_def_pb2.NodeDef) else op.node_def
if node_def.op in ps_ops:
ps_device_spec = pydev.DeviceSpec.from_string(
'/{}:{}'.format(ps_device_type, ps_strategy(op)))
ps_device_spec.merge_from(current_device)
return ps_device_spec.to_string()
else:
worker_device_spec = pydev.DeviceSpec.from_string(worker_device or "")
worker_device_spec.merge_from(current_device)
return worker_device_spec.to_string()
return _local_device_chooser
......@@ -28,10 +28,6 @@ import os
import tensorflow as tf
FLAGS = None
def _int64_feature(value):
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
......@@ -75,12 +71,12 @@ def convert_to_tfrecord(input_files, output_file):
record_writer.write(example.SerializeToString())
def main(unused_argv):
def main(input_dir, output_dir):
file_names = _get_file_names()
for mode, files in file_names.items():
input_files = [
os.path.join(FLAGS.input_dir, f) for f in files]
output_file = os.path.join(FLAGS.output_dir, mode + '.tfrecords')
os.path.join(input_dir, f) for f in files]
output_file = os.path.join(output_dir, mode + '.tfrecords')
# Convert to Examples and write the result to TFRecords.
convert_to_tfrecord(input_files, output_file)
print('Done!')
......@@ -89,13 +85,13 @@ def main(unused_argv):
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--input_dir',
'--input-dir',
type=str,
default='',
help='Directory where CIFAR10 data is located.'
)
parser.add_argument(
'--output_dir',
'--output-dir',
type=str,
default='',
help="""\
......@@ -103,6 +99,5 @@ if __name__ == '__main__':
name as the CIFAR10 inputs + .tfrecords.\
"""
)
FLAGS = parser.parse_args()
tf.app.run(main)
args = parser.parse_args()
main(args.input_dir, args.output_dir)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment