Commit 84b58a60 authored by Jianmin Chen's avatar Jianmin Chen Committed by Derek Murray
Browse files

Implement distributed inception (#44)

Implements a distributed trainer for Inception.
parent 9a1dfdf2
This diff is collapsed.
...@@ -102,6 +102,17 @@ py_binary( ...@@ -102,6 +102,17 @@ py_binary(
], ],
) )
py_binary(
name = "imagenet_distributed_train",
srcs = [
"imagenet_distributed_train.py",
],
deps = [
":imagenet_data",
":inception_distributed_train",
],
)
py_binary( py_binary(
name = "flowers_train", name = "flowers_train",
srcs = [ srcs = [
...@@ -124,6 +135,17 @@ py_library( ...@@ -124,6 +135,17 @@ py_library(
], ],
) )
py_library(
name = "inception_distributed_train",
srcs = [
"inception_distributed_train.py",
],
deps = [
":image_processing",
":inception",
],
)
py_binary( py_binary(
name = "build_image_data", name = "build_image_data",
srcs = ["data/build_image_data.py"], srcs = ["data/build_image_data.py"],
......
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
# pylint: disable=line-too-long
"""A binary to train Inception in a distributed manner using multiple systems.
Please see accompanying README.md for details and instructions.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import tensorflow as tf
from inception import inception_distributed_train
from inception.imagenet_data import ImagenetData
FLAGS = tf.app.flags.FLAGS
def main(unused_args):
assert FLAGS.job_name in ['ps', 'worker'], 'job_name must be ps or worker'
# Extract all the hostnames for the ps and worker jobs to construct the
# cluster spec.
ps_hosts = FLAGS.ps_hosts.split(',')
worker_hosts = FLAGS.worker_hosts.split(',')
tf.logging.info('PS hosts are: %s' % ps_hosts)
tf.logging.info('Worker hosts are: %s' % worker_hosts)
cluster_spec = tf.train.ClusterSpec({'ps': ps_hosts,
'worker': worker_hosts})
server = tf.train.Server(
{'ps': ps_hosts,
'worker': worker_hosts},
job_name=FLAGS.job_name,
task_index=FLAGS.task_id)
if FLAGS.job_name == 'ps':
# `ps` jobs wait for incoming connections from the workers.
server.join()
else:
# `worker` jobs will actually do the work.
dataset = ImagenetData(subset=FLAGS.subset)
assert dataset.data_files()
# Only the chief checks for or creates train_dir.
if FLAGS.task_id == 0:
if not tf.gfile.Exists(FLAGS.train_dir):
tf.gfile.MakeDirs(FLAGS.train_dir)
inception_distributed_train.train(server.target, dataset, cluster_spec)
if __name__ == '__main__':
tf.logging.set_verbosity(tf.logging.INFO)
tf.app.run()
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""A library to train Inception using multiple replicas with synchronous update.
Please see accompanying README.md for details and instructions.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from datetime import datetime
import os.path
import time
import numpy as np
import tensorflow as tf
from inception import image_processing
from inception import inception_model as inception
from inception.slim import slim
FLAGS = tf.app.flags.FLAGS
tf.app.flags.DEFINE_string('job_name', '', 'One of "ps", "worker"')
tf.app.flags.DEFINE_string('ps_hosts', '',
"""Comma-separated list of hostname:port for the """
"""parameter server jobs. e.g. """
"""'machine1:2222,machine2:1111,machine2:2222'""")
tf.app.flags.DEFINE_string('worker_hosts', '',
"""Comma-separated list of hostname:port for the """
"""worker jobs. e.g. """
"""'machine1:2222,machine2:1111,machine2:2222'""")
tf.app.flags.DEFINE_string('train_dir', '/tmp/imagenet_train',
"""Directory where to write event logs """
"""and checkpoint.""")
tf.app.flags.DEFINE_integer('max_steps', 1000000, 'Number of batches to run.')
tf.app.flags.DEFINE_string('subset', 'train', 'Either "train" or "validation".')
tf.app.flags.DEFINE_boolean('log_device_placement', False,
'Whether to log device placement.')
# Task ID is used to select the chief and also to access the local_step for
# each replica to check staleness of the gradients in sync_replicas_optimizer.
tf.app.flags.DEFINE_integer(
'task_id', 0, 'Task ID of the worker/replica running the training.')
# More details can be found in the sync_replicas_optimizer class:
# tensorflow/python/training/sync_replicas_optimizer.py
tf.app.flags.DEFINE_integer('num_replicas_to_aggregate', -1,
"""Number of gradients to collect before """
"""updating the parameters.""")
tf.app.flags.DEFINE_integer('save_interval_secs', 10 * 60,
'Save interval seconds.')
tf.app.flags.DEFINE_integer('save_summaries_secs', 180,
'Save summaries interval seconds.')
# **IMPORTANT**
# Please note that this learning rate schedule is heavily dependent on the
# hardware architecture, batch size and any changes to the model architecture
# specification. Selecting a finely tuned learning rate schedule is an
# empirical process that requires some experimentation. Please see README.md
# more guidance and discussion.
#
# Learning rate decay factor selected from https://arxiv.org/abs/1604.00981
tf.app.flags.DEFINE_float('initial_learning_rate', 0.045,
'Initial learning rate.')
tf.app.flags.DEFINE_float('num_epochs_per_decay', 2.0,
'Epochs after which learning rate decays.')
tf.app.flags.DEFINE_float('learning_rate_decay_factor', 0.94,
'Learning rate decay factor.')
# Constants dictating the learning rate schedule.
RMSPROP_DECAY = 0.9 # Decay term for RMSProp.
RMSPROP_MOMENTUM = 0.9 # Momentum in RMSProp.
RMSPROP_EPSILON = 1.0 # Epsilon term for RMSProp.
def train(target, dataset, cluster_spec):
"""Train Inception on a dataset for a number of steps."""
# Number of workers and parameter servers are infered from the workers and ps
# hosts string.
num_workers = len(cluster_spec.as_dict()['worker'])
num_parameter_servers = len(cluster_spec.as_dict()['ps'])
# If no value is given, num_replicas_to_aggregate defaults to be the number of
# workers.
if FLAGS.num_replicas_to_aggregate == -1:
num_replicas_to_aggregate = num_workers
else:
num_replicas_to_aggregate = FLAGS.num_replicas_to_aggregate
# Both should be greater than 0 in a distributed training.
assert num_workers > 0 and num_parameter_servers > 0, (' num_workers and '
'num_parameter_servers'
' must be > 0.')
# Choose worker 0 as the chief. Note that any worker could be the chief
# but there should be only one chief.
is_chief = (FLAGS.task_id == 0)
# Ops are assigned to worker by default.
with tf.device('/job:worker/task:%d' % FLAGS.task_id):
# Variables and its related init/assign ops are assigned to ps.
with slim.scopes.arg_scope(
[slim.variables.variable, slim.variables.global_step],
device=slim.variables.VariableDeviceChooser(num_parameter_servers)):
# Create a variable to count the number of train() calls. This equals the
# number of updates applied to the variables.
global_step = slim.variables.global_step()
# Calculate the learning rate schedule.
num_batches_per_epoch = (dataset.num_examples_per_epoch() /
FLAGS.batch_size)
# Decay steps need to be divided by the number of replicas to aggregate.
decay_steps = int(num_batches_per_epoch * FLAGS.num_epochs_per_decay /
num_replicas_to_aggregate)
# Decay the learning rate exponentially based on the number of steps.
lr = tf.train.exponential_decay(FLAGS.initial_learning_rate,
global_step,
decay_steps,
FLAGS.learning_rate_decay_factor,
staircase=True)
# Add a summary to track the learning rate.
tf.scalar_summary('learning_rate', lr)
# Create an optimizer that performs gradient descent.
opt = tf.train.RMSPropOptimizer(lr,
RMSPROP_DECAY,
momentum=RMSPROP_MOMENTUM,
epsilon=RMSPROP_EPSILON)
images, labels = image_processing.distorted_inputs(
dataset,
batch_size=FLAGS.batch_size,
num_preprocess_threads=FLAGS.num_preprocess_threads)
# Number of classes in the Dataset label set plus 1.
# Label 0 is reserved for an (unused) background class.
num_classes = dataset.num_classes() + 1
logits = inception.inference(images, num_classes, for_training=True)
# Add classification loss.
inception.loss(logits, labels)
# Gather all of the losses including regularization losses.
losses = tf.get_collection(slim.losses.LOSSES_COLLECTION)
losses += tf.get_collection(tf.GraphKeys.REGULARIZATION_LOSSES)
total_loss = tf.add_n(losses, name='total_loss')
if is_chief:
# Compute the moving average of all individual losses and the
# total loss.
loss_averages = tf.train.ExponentialMovingAverage(0.9, name='avg')
loss_averages_op = loss_averages.apply(losses + [total_loss])
# Attach a scalar summmary to all individual losses and the total loss;
# do the same for the averaged version of the losses.
for l in losses + [total_loss]:
loss_name = l.op.name
# Name each loss as '(raw)' and name the moving average version of the
# loss as the original loss name.
tf.scalar_summary(loss_name + ' (raw)', l)
tf.scalar_summary(loss_name, loss_averages.average(l))
# Add dependency to compute loss_averages.
with tf.control_dependencies([loss_averages_op]):
total_loss = tf.identity(total_loss)
# Track the moving averages of all trainable variables.
# Note that we maintain a 'double-average' of the BatchNormalization
# global statistics.
# This is not needed when the number of replicas are small but important
# for synchronous distributed training with tens of workers/replicas.
exp_moving_averager = tf.train.ExponentialMovingAverage(
inception.MOVING_AVERAGE_DECAY, global_step)
variables_to_average = (
tf.trainable_variables() + tf.moving_average_variables())
# Add histograms for model variables.
for var in variables_to_average:
tf.histogram_summary(var.op.name, var)
# Create synchronous replica optimizer.
opt = tf.train.SyncReplicasOptimizer(
opt,
replicas_to_aggregate=num_replicas_to_aggregate,
replica_id=FLAGS.task_id,
total_num_replicas=num_workers,
variable_averages=exp_moving_averager,
variables_to_average=variables_to_average)
batchnorm_updates = tf.get_collection(slim.ops.UPDATE_OPS_COLLECTION)
assert batchnorm_updates, 'Batchnorm updates are missing'
batchnorm_updates_op = tf.group(*batchnorm_updates)
# Add dependency to compute batchnorm_updates.
with tf.control_dependencies([batchnorm_updates_op]):
total_loss = tf.identity(total_loss)
# Compute gradients with respect to the loss.
grads = opt.compute_gradients(total_loss)
# Add histograms for gradients.
for grad, var in grads:
if grad is not None:
tf.histogram_summary(var.op.name + '/gradients', grad)
apply_gradients_op = opt.apply_gradients(grads, global_step=global_step)
with tf.control_dependencies([apply_gradients_op]):
train_op = tf.identity(total_loss, name='train_op')
# Get chief queue_runners, init_tokens and clean_up_op, which is used to
# synchronize replicas.
# More details can be found in sync_replicas_optimizer.
chief_queue_runners = [opt.get_chief_queue_runner()]
init_tokens_op = opt.get_init_tokens_op()
clean_up_op = opt.get_clean_up_op()
# Create a saver.
saver = tf.train.Saver()
# Build the summary operation based on the TF collection of Summaries.
summary_op = tf.merge_all_summaries()
# Build an initialization operation to run below.
init_op = tf.initialize_all_variables()
# We run the summaries in the same thread as the training operations by
# passing in None for summary_op to avoid a summary_thread being started.
# Running summaries and training operations in parallel could run out of
# GPU memory.
sv = tf.train.Supervisor(is_chief=is_chief,
logdir=FLAGS.train_dir,
init_op=init_op,
summary_op=None,
global_step=global_step,
saver=saver,
save_model_secs=FLAGS.save_interval_secs)
tf.logging.info('%s Supervisor' % datetime.now())
sess_config = tf.ConfigProto(
allow_soft_placement=True,
log_device_placement=FLAGS.log_device_placement)
# Get a session.
sess = sv.prepare_or_wait_for_session(target, config=sess_config)
# Start the queue runners.
queue_runners = tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS)
sv.start_queue_runners(sess, queue_runners)
tf.logging.info('Started %d queues for processing input data.',
len(queue_runners))
if is_chief:
sv.start_queue_runners(sess, chief_queue_runners)
sess.run(init_tokens_op)
# Train, checking for Nans. Concurrently run the summary operation at a
# specified interval. Note that the summary_op and train_op never run
# simultaneously in order to prevent running out of GPU memory.
next_summary_time = time.time() + FLAGS.save_summaries_secs
while not sv.should_stop():
try:
start_time = time.time()
loss_value, step = sess.run([train_op, global_step])
assert not np.isnan(loss_value), 'Model diverged with loss = NaN'
if step > FLAGS.max_steps:
break
duration = time.time() - start_time
if step % 30 == 0:
examples_per_sec = FLAGS.batch_size / float(duration)
format_str = ('Worker %d: %s: step %d, loss = %.2f'
'(%.1f examples/sec; %.3f sec/batch)')
tf.logging.info(format_str %
(FLAGS.task_id, datetime.now(), step, loss_value,
examples_per_sec, duration))
# Determine if the summary_op should be run on the chief worker.
if is_chief and next_summary_time < time.time():
tf.logging.info('Running Summary operation on the chief.')
summary_str = sess.run(summary_op)
sv.summary_computed(sess, summary_str)
tf.logging.info('Finished running Summary operation.')
# Determine the next time for running the summary.
next_summary_time += FLAGS.save_summaries_secs
except:
if is_chief:
tf.logging.info('About to execute sync_clean_up_op!')
sess.run(clean_up_op)
raise
# Stop the supervisor. This also waits for service threads to finish.
sv.stop()
# Save after the training ends.
if is_chief:
saver.save(sess,
os.path.join(FLAGS.train_dir, 'model.ckpt'),
global_step=global_step)
...@@ -26,7 +26,6 @@ from __future__ import print_function ...@@ -26,7 +26,6 @@ from __future__ import print_function
import re import re
import tensorflow as tf import tensorflow as tf
from inception.slim import slim from inception.slim import slim
...@@ -79,15 +78,13 @@ def inference(images, num_classes, for_training=False, restore_logits=True, ...@@ -79,15 +78,13 @@ def inference(images, num_classes, for_training=False, restore_logits=True,
stddev=0.1, stddev=0.1,
activation=tf.nn.relu, activation=tf.nn.relu,
batch_norm_params=batch_norm_params): batch_norm_params=batch_norm_params):
# Force all Variables to reside on the CPU. logits, endpoints = slim.inception.inception_v3(
with slim.arg_scope([slim.variables.variable], device='/cpu:0'): images,
logits, endpoints = slim.inception.inception_v3( dropout_keep_prob=0.8,
images, num_classes=num_classes,
dropout_keep_prob=0.8, is_training=for_training,
num_classes=num_classes, restore_logits=restore_logits,
is_training=for_training, scope=scope)
restore_logits=restore_logits,
scope=scope)
# Add summaries for viewing model statistics on TensorBoard. # Add summaries for viewing model statistics on TensorBoard.
_activation_summaries(endpoints) _activation_summaries(endpoints)
......
...@@ -24,8 +24,6 @@ import os.path ...@@ -24,8 +24,6 @@ import os.path
import re import re
import time import time
import numpy as np import numpy as np
import tensorflow as tf import tensorflow as tf
...@@ -215,7 +213,6 @@ def train(dataset): ...@@ -215,7 +213,6 @@ def train(dataset):
num_preprocess_threads = FLAGS.num_preprocess_threads * FLAGS.num_gpus num_preprocess_threads = FLAGS.num_preprocess_threads * FLAGS.num_gpus
images, labels = image_processing.distorted_inputs( images, labels = image_processing.distorted_inputs(
dataset, dataset,
batch_size=split_batch_size,
num_preprocess_threads=num_preprocess_threads) num_preprocess_threads=num_preprocess_threads)
input_summaries = copy.copy(tf.get_collection(tf.GraphKeys.SUMMARIES)) input_summaries = copy.copy(tf.get_collection(tf.GraphKeys.SUMMARIES))
...@@ -229,10 +226,22 @@ def train(dataset): ...@@ -229,10 +226,22 @@ def train(dataset):
for i in xrange(FLAGS.num_gpus): for i in xrange(FLAGS.num_gpus):
with tf.device('/gpu:%d' % i): with tf.device('/gpu:%d' % i):
with tf.name_scope('%s_%d' % (inception.TOWER_NAME, i)) as scope: with tf.name_scope('%s_%d' % (inception.TOWER_NAME, i)) as scope:
# Calculate the loss for one tower of the ImageNet model. This # Split the batch of images and labels.
# function constructs the entire ImageNet model but shares the batch_start = split_batch_size * i
# variables across all towers. images_batch = tf.slice(images,
loss = _tower_loss(images, labels, num_classes, scope) begin=[batch_start, 0, 0, 0],
size=[split_batch_size, -1, -1, -1])
labels_batch = tf.slice(labels,
begin=[batch_start],
size=[split_batch_size])
# Force all Variables to reside on the CPU.
with slim.arg_scope([slim.variables.variable], device='/cpu:0'):
# Calculate the loss for one tower of the ImageNet model. This
# function constructs the entire ImageNet model but shares the
# variables across all towers.
loss = _tower_loss(images_batch, labels_batch, num_classes, scope)
# Reuse variables for the next tower. # Reuse variables for the next tower.
tf.get_variable_scope().reuse_variables() tf.get_variable_scope().reuse_variables()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment