Commit 7e9e15ad authored by Toby Boyd's avatar Toby Boyd Committed by GitHub
Browse files

Merge pull request #2056 from tfboyd/cifar_mkl

Added data_format flag to support MKL and other interesting tests
parents 3bf85a4e 90fbe70e
......@@ -74,8 +74,8 @@ class Cifar10DataSet(object):
dataset = tf.contrib.data.TFRecordDataset(filenames).repeat()
# Parse records.
dataset = dataset.map(self.parser, num_threads=batch_size,
output_buffer_size=2 * batch_size)
dataset = dataset.map(
self.parser, num_threads=batch_size, output_buffer_size=2 * batch_size)
# Potentially shuffle records.
if self.subset == 'train':
......
......@@ -32,21 +32,21 @@ import argparse
import functools
import itertools
import os
import six
import numpy as np
from six.moves import xrange # pylint: disable=redefined-builtin
import tensorflow as tf
import cifar10
import cifar10_model
import cifar10_utils
import numpy as np
import six
from six.moves import xrange # pylint: disable=redefined-builtin
import tensorflow as tf
tf.logging.set_verbosity(tf.logging.INFO)
def get_model_fn(num_gpus, variable_strategy, num_workers, sync):
def get_model_fn(num_gpus, variable_strategy, num_workers):
"""Returns a function that will build the resnet model."""
def _resnet_model_fn(features, labels, mode, params):
"""Resnet model body.
......@@ -74,6 +74,16 @@ def get_model_fn(num_gpus, variable_strategy, num_workers, sync):
tower_gradvars = []
tower_preds = []
# channels first (NCHW) is normally optimal on GPU and channels last (NHWC)
# on CPU. The exception is Intel MKL on CPU which is optimal with
# channels_last.
data_format = params.data_format
if not data_format:
if num_gpus == 0:
data_format = 'channels_last'
else:
data_format = 'channels_first'
if num_gpus == 0:
num_devices = 1
device_type = 'cpu'
......@@ -84,28 +94,20 @@ def get_model_fn(num_gpus, variable_strategy, num_workers, sync):
for i in range(num_devices):
worker_device = '/{}:{}'.format(device_type, i)
if variable_strategy == 'CPU':
device_setter = cifar10_utils.local_device_setter(
worker_device=worker_device)
device_setter = cifar10_utils.local_device_setter(
worker_device=worker_device)
elif variable_strategy == 'GPU':
device_setter = cifar10_utils.local_device_setter(
ps_device_type='gpu',
worker_device=worker_device,
ps_strategy=tf.contrib.training.GreedyLoadBalancingStrategy(
num_gpus,
tf.contrib.training.byte_size_load_fn
)
)
device_setter = cifar10_utils.local_device_setter(
ps_device_type='gpu',
worker_device=worker_device,
ps_strategy=tf.contrib.training.GreedyLoadBalancingStrategy(
num_gpus, tf.contrib.training.byte_size_load_fn))
with tf.variable_scope('resnet', reuse=bool(i != 0)):
with tf.name_scope('tower_%d' % i) as name_scope:
with tf.device(device_setter):
loss, gradvars, preds = _tower_fn(
is_training,
weight_decay,
tower_features[i],
tower_labels[i],
(device_type == 'cpu'),
params.num_layers,
params.batch_norm_decay,
is_training, weight_decay, tower_features[i], tower_labels[i],
data_format, params.num_layers, params.batch_norm_decay,
params.batch_norm_epsilon)
tower_losses.append(loss)
tower_gradvars.append(gradvars)
......@@ -136,7 +138,6 @@ def get_model_fn(num_gpus, variable_strategy, num_workers, sync):
avg_grad = tf.multiply(tf.add_n(grads), 1. / len(grads))
gradvars.append((avg_grad, var))
# Device that runs the ops to apply global gradient updates.
consolidation_device = '/gpu:0' if variable_strategy == 'GPU' else '/cpu:0'
with tf.device(consolidation_device):
......@@ -159,10 +160,9 @@ def get_model_fn(num_gpus, variable_strategy, num_workers, sync):
learning_rate=learning_rate, momentum=momentum)
chief_hooks = []
if sync:
if params.sync:
optimizer = tf.train.SyncReplicasOptimizer(
optimizer,
replicas_to_aggregate=num_workers)
optimizer, replicas_to_aggregate=num_workers)
sync_replicas_hook = optimizer.make_session_run_hook(True)
chief_hooks.append(sync_replicas_hook)
......@@ -182,7 +182,8 @@ def get_model_fn(num_gpus, variable_strategy, num_workers, sync):
}
stacked_labels = tf.concat(labels, axis=0)
metrics = {
'accuracy': tf.metrics.accuracy(stacked_labels, predictions['classes'])
'accuracy':
tf.metrics.accuracy(stacked_labels, predictions['classes'])
}
loss = tf.reduce_mean(tower_losses, name='loss')
......@@ -193,35 +194,35 @@ def get_model_fn(num_gpus, variable_strategy, num_workers, sync):
train_op=train_op,
training_chief_hooks=chief_hooks,
eval_metric_ops=metrics)
return _resnet_model_fn
def _tower_fn(is_training,
weight_decay,
feature,
label,
is_cpu,
num_layers,
batch_norm_decay,
batch_norm_epsilon):
"""Build computation tower for each device (CPU or GPU).
def _tower_fn(is_training, weight_decay, feature, label, data_format,
num_layers, batch_norm_decay, batch_norm_epsilon):
"""Build computation tower (Resnet).
Args:
is_training: true if is training graph.
weight_decay: weight regularization strength, a float.
feature: a Tensor.
label: a Tensor.
tower_losses: a list to be appended with current tower's loss.
tower_gradvars: a list to be appended with current tower's gradients.
tower_preds: a list to be appended with current tower's predictions.
is_cpu: true if build tower on CPU.
data_format: channels_last (NHWC) or channels_first (NCHW).
num_layers: number of layers, an int.
batch_norm_decay: decay for batch normalization, a float.
batch_norm_epsilon: epsilon for batch normalization, a float.
Returns:
A tuple with the loss for the tower, the gradients and parameters, and
predictions.
"""
data_format = 'channels_last' if is_cpu else 'channels_first'
model = cifar10_model.ResNetCifar10(
num_layers,
batch_norm_decay=batch_norm_decay,
batch_norm_epsilon=batch_norm_epsilon,
is_training=is_training, data_format=data_format)
is_training=is_training,
data_format=data_format)
logits = model.forward_pass(feature, input_data_format='channels_last')
tower_pred = {
'classes': tf.argmax(input=logits, axis=1),
......@@ -241,13 +242,20 @@ def _tower_fn(is_training,
return tower_loss, zip(tower_grad, model_params), tower_pred
def input_fn(data_dir, subset, num_shards, batch_size,
def input_fn(data_dir,
subset,
num_shards,
batch_size,
use_distortion_for_training=True):
"""Create input graph for model.
Args:
data_dir: Directory where TFRecords representing the dataset are located.
subset: one of 'train', 'validate' and 'eval'.
num_shards: num of towers participating in data-parallel training.
batch_size: total batch size for training to be divided by the number of
shards.
use_distortion_for_training: True to use distortions.
Returns:
two lists of tensors for features and labels, each of num_shards length.
"""
......@@ -276,10 +284,10 @@ def input_fn(data_dir, subset, num_shards, batch_size,
return feature_shards, label_shards
# create experiment
def get_experiment_fn(data_dir, num_gpus, is_gpu_ps,
use_distortion_for_training=True,
sync=True):
def get_experiment_fn(data_dir,
num_gpus,
variable_strategy,
use_distortion_for_training=True):
"""Returns an Experiment function.
Experiments perform training on several workers in parallel,
......@@ -291,9 +299,9 @@ def get_experiment_fn(data_dir, num_gpus, is_gpu_ps,
Args:
data_dir: str. Location of the data for input_fns.
num_gpus: int. Number of GPUs on each worker.
is_gpu_ps: bool. If true, average gradients on GPUs.
variable_strategy: String. CPU to use CPU as the parameter server
and GPU to use the GPUs as the parameter server.
use_distortion_for_training: bool. See cifar10.Cifar10DataSet.
sync: bool. If true synchronizes variable updates across workers.
Returns:
A function (tf.estimator.RunConfig, tf.contrib.training.HParams) ->
tf.contrib.learn.Experiment.
......@@ -302,6 +310,7 @@ def get_experiment_fn(data_dir, num_gpus, is_gpu_ps,
methods on Experiment (train, evaluate) based on information
about the current runner in `run_config`.
"""
def _experiment_fn(run_config, hparams):
"""Returns an Experiment."""
# Create estimator.
......@@ -311,40 +320,37 @@ def get_experiment_fn(data_dir, num_gpus, is_gpu_ps,
subset='train',
num_shards=num_gpus,
batch_size=hparams.train_batch_size,
use_distortion_for_training=use_distortion_for_training
)
use_distortion_for_training=use_distortion_for_training)
eval_input_fn = functools.partial(
input_fn,
data_dir,
subset='eval',
batch_size=hparams.eval_batch_size,
num_shards=num_gpus
)
num_shards=num_gpus)
num_eval_examples = cifar10.Cifar10DataSet.num_examples_per_epoch('eval')
if num_eval_examples % hparams.eval_batch_size != 0:
raise ValueError('validation set size must be multiple of eval_batch_size')
raise ValueError(
'validation set size must be multiple of eval_batch_size')
train_steps = hparams.train_steps
eval_steps = num_eval_examples // hparams.eval_batch_size
examples_sec_hook = cifar10_utils.ExamplesPerSecondHook(
hparams.train_batch_size, every_n_steps=10)
hparams.train_batch_size, every_n_steps=10)
tensors_to_log = {'learning_rate': 'learning_rate',
'loss': 'loss'}
tensors_to_log = {'learning_rate': 'learning_rate', 'loss': 'loss'}
logging_hook = tf.train.LoggingTensorHook(
tensors=tensors_to_log, every_n_iter=100)
tensors=tensors_to_log, every_n_iter=100)
hooks = [logging_hook, examples_sec_hook]
classifier = tf.estimator.Estimator(
model_fn=get_model_fn(
num_gpus, is_gpu_ps, run_config.num_worker_replicas or 1, sync),
model_fn=get_model_fn(num_gpus, variable_strategy,
run_config.num_worker_replicas or 1),
config=run_config,
params=hparams
)
params=hparams)
# Create experiment.
experiment = tf.contrib.learn.Experiment(
......@@ -356,45 +362,31 @@ def get_experiment_fn(data_dir, num_gpus, is_gpu_ps,
# Adding hooks to be used by the estimator on training modes
experiment.extend_train_hooks(hooks)
return experiment
return _experiment_fn
def main(job_dir,
data_dir,
num_gpus,
variable_strategy,
use_distortion_for_training,
log_device_placement,
num_intra_threads,
sync,
def main(job_dir, data_dir, num_gpus, variable_strategy,
use_distortion_for_training, log_device_placement, num_intra_threads,
**hparams):
# The env variable is on deprecation path, default is set to off.
os.environ['TF_SYNC_ON_FINISH'] = '0'
os.environ['TF_ENABLE_WINOGRAD_NONFUSED'] = '1'
# Session configuration.
sess_config = tf.ConfigProto(
allow_soft_placement=True,
log_device_placement=log_device_placement,
intra_op_parallelism_threads=num_intra_threads,
gpu_options=tf.GPUOptions(
force_gpu_compatible=True
)
)
gpu_options=tf.GPUOptions(force_gpu_compatible=True))
config = cifar10_utils.RunConfig(
session_config=sess_config,
model_dir=job_dir)
session_config=sess_config, model_dir=job_dir)
tf.contrib.learn.learn_runner.run(
get_experiment_fn(
data_dir,
num_gpus,
variable_strategy,
use_distortion_for_training,
sync
),
get_experiment_fn(data_dir, num_gpus, variable_strategy,
use_distortion_for_training),
run_config=config,
hparams=tf.contrib.training.HParams(**hparams)
)
hparams=tf.contrib.training.HParams(**hparams))
if __name__ == '__main__':
......@@ -403,63 +395,53 @@ if __name__ == '__main__':
'--data-dir',
type=str,
required=True,
help='The directory where the CIFAR-10 input data is stored.'
)
help='The directory where the CIFAR-10 input data is stored.')
parser.add_argument(
'--job-dir',
type=str,
required=True,
help='The directory where the model will be stored.'
)
help='The directory where the model will be stored.')
parser.add_argument(
'--variable-strategy',
choices=['CPU', 'GPU'],
type=str,
default='CPU',
help='Where to locate variable operations'
)
help='Where to locate variable operations')
parser.add_argument(
'--num-gpus',
type=int,
default=1,
help='The number of gpus used. Uses only CPU if set to 0.'
)
help='The number of gpus used. Uses only CPU if set to 0.')
parser.add_argument(
'--num-layers',
type=int,
default=44,
help='The number of layers of the model.'
)
help='The number of layers of the model.')
parser.add_argument(
'--train-steps',
type=int,
default=80000,
help='The number of steps to use for training.'
)
help='The number of steps to use for training.')
parser.add_argument(
'--train-batch-size',
type=int,
default=128,
help='Batch size for training.'
)
help='Batch size for training.')
parser.add_argument(
'--eval-batch-size',
type=int,
default=100,
help='Batch size for validation.'
)
help='Batch size for validation.')
parser.add_argument(
'--momentum',
type=float,
default=0.9,
help='Momentum for MomentumOptimizer.'
)
help='Momentum for MomentumOptimizer.')
parser.add_argument(
'--weight-decay',
type=float,
default=2e-4,
help='Weight decay for convolutions.'
)
help='Weight decay for convolutions.')
parser.add_argument(
'--learning-rate',
type=float,
......@@ -468,22 +450,19 @@ if __name__ == '__main__':
This is the inital learning rate value. The learning rate will decrease
during training. For more details check the model_fn implementation in
this file.\
"""
)
""")
parser.add_argument(
'--use-distortion-for-training',
type=bool,
default=True,
help='If doing image distortion for training.'
)
help='If doing image distortion for training.')
parser.add_argument(
'--sync',
action='store_true',
default=False,
help="""\
If present when running in a distributed environment will run on sync mode.\
"""
)
""")
parser.add_argument(
'--num-intra-threads',
type=int,
......@@ -492,8 +471,7 @@ if __name__ == '__main__':
Number of threads to use for intra-op parallelism. When training on CPU
set to 0 to have the system pick the appropriate number or alternatively
set it to the number of physical CPU cores.\
"""
)
""")
parser.add_argument(
'--num-inter-threads',
type=int,
......@@ -501,35 +479,38 @@ if __name__ == '__main__':
help="""\
Number of threads to use for inter-op parallelism. If set to 0, the
system will pick an appropriate number.\
"""
)
""")
parser.add_argument(
'--data-format',
type=str,
default=None,
help="""\
If not set, the data format best for the training device is used.
Allowed values: channels_first (NCHW) channels_last (NHWC).\
""")
parser.add_argument(
'--log-device-placement',
action='store_true',
default=False,
help='Whether to log device placement.'
)
help='Whether to log device placement.')
parser.add_argument(
'--batch-norm-decay',
type=float,
default=0.997,
help='Decay for batch norm.'
)
help='Decay for batch norm.')
parser.add_argument(
'--batch-norm-epsilon',
type=float,
default=1e-5,
help='Epsilon for batch norm.'
)
help='Epsilon for batch norm.')
args = parser.parse_args()
if args.num_gpus < 0:
raise ValueError(
'Invalid GPU count: \"--num-gpus\" must be 0 or a positive integer.')
if args.num_gpus == 0 and args.variable_strategy == 'GPU':
raise ValueError(
'num-gpus=0, CPU must be used as parameter server. Set'
'--variable-strategy=CPU.')
raise ValueError('num-gpus=0, CPU must be used as parameter server. Set'
'--variable-strategy=CPU.')
if (args.num_layers - 2) % 6 != 0:
raise ValueError('Invalid --num-layers parameter.')
if args.num_gpus != 0 and args.train_batch_size % args.num_gpus != 0:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment