Commit ee3997b3 authored by qianyj's avatar qianyj
Browse files

new tf branch for dtk21.10.1

parent 2795dc1f
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Benchmarks the all-reduce algorithms of tf_cnn_benchmarks.
tf_cnn_benchmarks uses all-reduce to aggregate gradients. This benchmark is
useful for benchmarking the performance of just this gradient aggregation,
instead of the entire model. All the flags that tf_cnn_benchmarks accepts are
also accepted by this script, although many are silently ignored.
The number and shapes of the tensors all-reduced are those of the variables of
the model specified by the --model flag.
TODO(reedwm): Allow custom sizes to be specified.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import time
from absl import app
from absl import flags as absl_flags
import tensorflow.compat.v1 as tf
from tensorflow.python.ops import control_flow_ops
import benchmark_cnn
import cnn_util
import flags
from cnn_util import log_fn
absl_flags.DEFINE_integer('iters_per_step', 5,
'Number of iterations to run all-reduce for, per '
'step. Every step, a session will be run on a Graph '
'that contains this many copies of the all-reduce. '
'The copies are run sequentially. Setting this above '
'1 is useful to lower the overhead of starting the '
'session run, running the VariableV2 ops at the '
'start of the step, etc.')
flags.define_flags()
for name in flags.param_specs.keys():
absl_flags.declare_key_flag(name)
def get_var_shapes(model):
"""Returns the list of variable shapes for a tf_cnn_benchmarks Model."""
with tf.Graph().as_default():
# The variable shapes do not depend on the batch size.
images = tf.placeholder(tf.float32, model.get_input_shapes('train')[0])
model.build_network([images])
return [[int(d) for d in v.shape.dims] for v in tf.trainable_variables()]
def all_reduce(all_device_tensors, variable_mgr):
"""Performs a single batch all-reduce.
Args:
all_device_tensors: List of lists of tensors. all_device_tensors[t][i] is
a tensor, where t is the tower the tensor is on and i is the index of
the tensor.
variable_mgr: The VariableMgr to perform the all-reduce.
Returns:
List of list of tensors in the same form as `all_device_tensors`, except the
tensors are aggregated across towers.
"""
tower_grads = [[(g, None) for g in device_tensors] for
device_tensors in all_device_tensors]
_, aggregated_tower_grads = variable_mgr.preprocess_device_grads(tower_grads)
return [
[g for g, _ in agg_device_tensors]
for agg_device_tensors in aggregated_tower_grads]
def build_all_reduce_iterations(all_device_tensors, tower_devices, variable_mgr,
num_iters):
"""Builds the all-reduce ops for multiple iterations to aggregate tensors.
The tensors in `all_device_tensors` are aggregated `num_iters` times. Each
iteration aggregates the results from the previous iteration. The iterations
are run sequentially, so the aggregations for an iteration do not start
running until the previous iteration has completed. Each iteration after the
first is aggregating already-aggregated values, but it does not matter because
we are only aggregating for benchmarking purposes.
Args:
all_device_tensors: List of lists of tensors. all_device_tensors[t][i] is
a tensor, where t is the tower the tensor is on and i is the index of
the tensor.
tower_devices: A list of device strings. tower_devices[t] is the device
of the tensors in all_device_tensors[t].
variable_mgr: The VariableMgr to perform the all-reduce.
num_iters: Number of iterations to aggregate tensors for.
Returns:
An op that when run, causes the all-reduce ops to run.
"""
for i in range(num_iters):
with tf.name_scope('iteration_%d' % i):
# Step 1: Do the aggregation.
with tf.name_scope('tensor_aggregation'):
all_device_tensors = all_reduce(all_device_tensors, variable_mgr)
# Step 2. Create identity ops, to bring the aggregated results back to
# each device.
new_all_device_tensors = []
for device, device_tensors in zip(tower_devices, all_device_tensors):
with tf.device(device):
new_all_device_tensors.append([
tf.identity(t, name='identity_after_allreduce')
for t in device_tensors
])
all_device_tensors = new_all_device_tensors
# Step 3. Add control dependencies to delay the next iteration until this
# iteration is complete. To avoid extra overhead, we do not have any
# cross-device control dependencies, which means it's possible for two
# iterations to slightly overlap.
new_all_device_tensors = []
for device_tensors in all_device_tensors:
new_all_device_tensors.append([
control_flow_ops.with_dependencies(
device_tensors, t, name='identity_after_dependencies')
for t in device_tensors
])
all_device_tensors = new_all_device_tensors
# To prevent the dependency optimizer from removing every op we created,
# we store the results in variables.
ops_to_run = []
for device, device_tensors in zip(tower_devices, all_device_tensors):
with tf.device(device):
for t in device_tensors:
# The placeholder initial value is never run.
var = tf.Variable(tf.placeholder(tf.float32, t.shape), collections=[])
ops_to_run.append(var.assign(t))
return tf.group(*ops_to_run)
def build_graph(tower_devices, tensor_shapes, variable_mgr, num_iters):
"""Builds the graph for the benchmark.
Args:
tower_devices: A list of device strings of the devices to run the all-reduce
benchmark on.
tensor_shapes: A list of shapes of the tensors that will be aggregated for
the all-reduce.
variable_mgr: The VariableMgr to perform the all-reduce.
num_iters: Number of iterations to aggregate tensors for.
Returns:
An op that runs the benchmark.
"""
all_device_tensors = []
for i, tower_device in enumerate(tower_devices):
with tf.device(tower_device):
device_tensors = []
for j, shape in enumerate(tensor_shapes):
tensor = tf.Variable(tf.random_normal(shape, dtype=tf.float32),
name='tensor_%d_on_device_%d' % (j, i))
device_tensors.append(tensor)
all_device_tensors.append(device_tensors)
log_fn('Building all-reduce ops')
benchmark_op = build_all_reduce_iterations(all_device_tensors, tower_devices,
variable_mgr, num_iters)
log_fn('Done building all-reduce ops')
return benchmark_op
def run_graph(benchmark_op, bench_cnn, init_ops, dummy_loss_op):
"""Runs the graph for the benchmark.
Args:
benchmark_op: An op that runs the benchmark.
bench_cnn: The BenchmarkCNN where params and other attributes are obtained.
init_ops: A list of ops that are run before `benchmark_op` for
initialization.
dummy_loss_op: Any op. We must pass a loss op to
`benchmark_cnn.benchmark_one_step`, but the result of the op is never
actually used.
"""
config = benchmark_cnn.create_config_proto(bench_cnn.params)
with tf.Session(config=config) as sess:
for op in init_ops:
sess.run(op)
step_train_times = []
fetches = {'average_loss': dummy_loss_op, 'benchmark_op': benchmark_op}
log_fn('Running warmup')
for i in range(-bench_cnn.num_warmup_batches, bench_cnn.num_batches):
if i == 0:
log_fn('Running all-reduce ops')
start = time.time()
if i > 0 and i % bench_cnn.params.display_every == 0:
log_fn('Iteration: %d. Average time per step so far: %s' %
(i, (time.time() - start) / i))
# Call benchmark_one_step instead of directly calling sess.run(...), to
# potentially get a trace file, partitioned graphs, etc.
benchmark_cnn.benchmark_one_step(
sess=sess,
fetches=fetches,
step=i,
# The batch size is only used for the images/sec calculation, which is
# not actually calculated because we pass show_images_per_sec=False.
batch_size=None,
step_train_times=step_train_times,
trace_filename=bench_cnn.trace_filename,
partitioned_graph_file_prefix=(
bench_cnn.params.partitioned_graph_file_prefix),
profiler=None,
image_producer=None,
params=bench_cnn.params,
show_images_per_sec=False)
log_fn('Average time per step: %s' %
((time.time() - start) / bench_cnn.num_batches))
def run_benchmark(bench_cnn, num_iters):
"""Runs the all-reduce benchmark.
Args:
bench_cnn: The BenchmarkCNN where params, the variable manager, and other
attributes are obtained.
num_iters: Number of iterations to do all-reduce for for.
Raises:
ValueError: Invalid params of bench_cnn.
"""
if bench_cnn.params.variable_update != 'replicated':
raise ValueError('--variable_update=replicated must be specified to use'
'the all-reduce benchmark')
if bench_cnn.params.variable_consistency == 'relaxed':
raise ValueError('--variable_consistency=relaxed is not supported')
benchmark_op = build_graph(bench_cnn.raw_devices,
get_var_shapes(bench_cnn.model),
bench_cnn.variable_mgr, num_iters)
init_ops = [
tf.global_variables_initializer(),
bench_cnn.variable_mgr.get_post_init_ops()
]
loss_op = tf.no_op()
if bench_cnn.graph_file:
path, filename = os.path.split(bench_cnn.graph_file)
as_text = filename.endswith('txt')
log_fn('Writing GraphDef as %s to %s' % (
'text' if as_text else 'binary', bench_cnn.graph_file))
tf.train.write_graph(tf.get_default_graph().as_graph_def(add_shapes=True),
path, filename, as_text)
run_graph(benchmark_op, bench_cnn, init_ops, loss_op)
# TODO(reedwm): Reduce redundancy with tf_cnn_benchmarks
def main(positional_arguments):
# Command-line arguments like '--distortions False' are equivalent to
# '--distortions=True False', where False is a positional argument. To prevent
# this from silently running with distortions, we do not allow positional
# arguments.
assert len(positional_arguments) >= 1
if len(positional_arguments) > 1:
raise ValueError('Received unknown positional arguments: %s'
% positional_arguments[1:])
params = benchmark_cnn.make_params_from_flags()
params = benchmark_cnn.setup(params)
bench = benchmark_cnn.BenchmarkCNN(params)
tfversion = cnn_util.tensorflow_version_tuple()
log_fn('TensorFlow: %i.%i' % (tfversion[0], tfversion[1]))
run_benchmark(bench, absl_flags.FLAGS.iters_per_step)
if __name__ == '__main__':
tf.disable_v2_behavior()
app.run(main) # Raises error on invalid flags, unlike tf.app.run()
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for tf_cnn_benchmark.allreduce."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import collections as pycoll
import numpy as np
import tensorflow.compat.v1 as tf
from tensorflow.python.framework import ops
from tensorflow.python.framework import test_util
from tensorflow.python.ops import variables
import allreduce
class AllReduceTest(tf.test.TestCase):
def testGroupKey(self):
d0 = ['/job:worker/replica:0/task:0/device:GPU:1',
'/job:worker/replica:0/task:0/device:GPU:0',
'/job:worker/replica:0/task:0/device:GPU:3',]
d1 = ['/job:worker/replica:0/task:1/device:GPU:1',
'/job:worker/replica:0/task:1/device:GPU:0',
'/job:worker/replica:0/task:1/device:GPU:3',]
d2 = ['/job:worker/replica:0/task:1/device:GPU:1',
'/job:worker/replica:0/task:1/device:GPU:3',
'/job:worker/replica:0/task:1/device:GPU:0',]
d3 = ['/job:worker/replica:0/task:1/device:GPU:1',
'/job:worker/replica:0/task:1/device:GPU:3',
'/job:worker/replica:0/task:1/device:GPU:2',]
d4 = ['/job:worker/task:0/device:GPU:1',
'/job:worker/task:0/device:GPU:2',
'/job:worker/task:0/device:GPU:3',]
d5 = ['/job:worker/task:0/device:CPU:1',
'/job:worker/task:0/device:CPU:2']
d6 = ['/job:worker/task:0/device:CPU:2',
'/job:worker/task:0/device:CPU:1']
g0 = allreduce.collective_group_key(d0)
g1 = allreduce.collective_group_key(d1)
g2 = allreduce.collective_group_key(d2)
g3 = allreduce.collective_group_key(d3)
g4 = allreduce.collective_group_key(d4)
g5 = allreduce.collective_group_key(d5)
g6 = allreduce.collective_group_key(d6)
self.assertEqual(g0, g1)
self.assertEqual(g0, g2)
self.assertTrue(g0 != g3)
self.assertEqual(g3, g4)
self.assertEqual(g5, g6)
self.assertTrue(g4 != g5)
def testExtractRanges(self):
x = []
expected_ranges = []
expected_singles = []
ranges, singles = allreduce.extract_ranges(x)
self.assertEqual(expected_ranges, ranges)
self.assertEqual(expected_singles, singles)
x = [1, 3, 4, 6, 7, 8, 9]
expected_ranges = [[3, 4], [6, 9]]
expected_singles = [1]
ranges, singles = allreduce.extract_ranges(x)
self.assertEqual(expected_ranges, ranges)
self.assertEqual(expected_singles, singles)
x = [1, 2, 3, 4, 6, 7, 8, 9]
expected_ranges = [[1, 4], [6, 9]]
expected_singles = []
ranges, singles = allreduce.extract_ranges(x)
self.assertEqual(expected_ranges, ranges)
self.assertEqual(expected_singles, singles)
x = [1, 3, 4, 6, 7, 9]
expected_ranges = [[3, 4], [6, 7]]
expected_singles = [1, 9]
ranges, singles = allreduce.extract_ranges(x)
self.assertEqual(expected_ranges, ranges)
self.assertEqual(expected_singles, singles)
x = [1, 3, 6, 9]
expected_ranges = []
expected_singles = [1, 3, 6, 9]
ranges, singles = allreduce.extract_ranges(x)
self.assertEqual(expected_ranges, ranges)
self.assertEqual(expected_singles, singles)
def testPackRange(self):
packing = {}
t0 = tf.constant([0, 1, 2, 3], dtype=tf.float32)
t1 = tf.constant([4, 5, 6, 7], dtype=tf.float32)
gv = [(t0, 'v0'), (t1, 'v1')]
new_t = allreduce.pack_range('0:0', packing, gv, [0, 1])
self.assertEqual(1, new_t.shape.ndims)
self.assertEqual(8, new_t.shape.dims[0])
self.assertEqual(
packing, {
'0:0':
allreduce.GradPackTuple(
indices=range(2),
vars=['v0', 'v1'],
shapes=[tf.TensorShape([4]),
tf.TensorShape([4])])
})
t2 = tf.constant([[0, 1, 2], [3, 4, 5], [6, 7, 8]], dtype=tf.float32)
t3 = tf.constant([[0, 1, 2], [3, 4, 5], [6, 7, 8]], dtype=tf.float32)
gv = [(t0, 'v0'), (t1, 'v1'), (t2, 'v2'), (t3, 'v3')]
packing = {}
new_t = allreduce.pack_range('1:0', packing, gv, [0, 3])
self.assertEqual(1, new_t.shape.ndims)
self.assertEqual(26, new_t.shape.dims[0])
self.assertEqual(
packing, {
'1:0':
allreduce.GradPackTuple(
indices=range(4),
vars=['v0', 'v1', 'v2', 'v3'],
shapes=[
tf.TensorShape([4]),
tf.TensorShape([4]),
tf.TensorShape([3, 3]),
tf.TensorShape([3, 3])
])
})
def testUnpackGradTuple(self):
packing = {
'0:0':
allreduce.GradPackTuple(
indices=range(4),
vars=['v0', 'v1', 'v2', 'v3'],
shapes=[
tf.TensorShape([4]),
tf.TensorShape([4]),
tf.TensorShape([3, 3]),
tf.TensorShape([3, 3])
])
}
tc = tf.constant([0, 1, 2, 3, 4, 5, 6, 7,
0, 1, 2, 3, 4, 5, 6, 7, 8,
0, 1, 2, 3, 4, 5, 6, 7, 8], dtype=tf.float32)
packed_gv = [tc, 'packing_var_placeholder']
gv = allreduce.unpack_grad_tuple(packed_gv, packing['0:0'])
self.assertEqual(4, len(gv))
self.assertEqual('v0', gv[0][1])
self.assertEqual('v1', gv[1][1])
self.assertEqual('v2', gv[2][1])
self.assertEqual('v3', gv[3][1])
self.assertEqual(1, gv[0][0].shape.ndims)
self.assertEqual(4, gv[0][0].shape.dims[0])
self.assertEqual(1, gv[1][0].shape.ndims)
self.assertEqual(4, gv[1][0].shape.dims[0])
self.assertEqual(2, gv[2][0].shape.ndims)
self.assertEqual(3, gv[2][0].shape.dims[0])
self.assertEqual(3, gv[2][0].shape.dims[1])
def testPackSmallTensors(self):
t0 = tf.constant([0, 1, 2, 3], dtype=tf.float32)
t1 = tf.constant([4, 5, 6, 7], dtype=tf.float32)
t2 = tf.constant([[0, 1, 2], [3, 4, 5], [6, 7, 8]], dtype=tf.float32)
t3 = tf.constant([[0, 1, 2], [3, 4, 5], [6, 7, 8]], dtype=tf.float32)
tower_grads = []
for d in range(0, 3):
gv = [(t0, 'v_%d_0' % d), (t1, 'v_%d_1' %d), (t2, 'v_%d_2' %d),
(t3, 'v_%d_3' % d)]
tower_grads.append(gv)
# 1) Set the size limit so small that nothing gets concatenated.
new_tower_grads, packing = allreduce.pack_small_tensors(
tower_grads, max_bytes=12,
max_group=10)
self.assertEqual(tower_grads, new_tower_grads)
self.assertTrue(packing is None)
# 2) Set the size limit so only the first two tensors get concatenated
new_tower_grads, packing = allreduce.pack_small_tensors(
tower_grads, max_bytes=16, # 16 bytes == 4 elements
max_group=10)
self.assertEqual(3, len(new_tower_grads))
self.assertEqual(4, len(tower_grads[0]))
first_tower = new_tower_grads[0]
self.assertEqual(3, len(first_tower))
self.assertEqual(1, first_tower[0][0].shape.ndims)
self.assertEqual(8, first_tower[0][0].shape.dims[0])
self.assertEqual(packing,
{'0:0': allreduce.GradPackTuple(
indices=range(2),
vars=['v_0_0', 'v_0_1'],
shapes=[tf.TensorShape([4]),
tf.TensorShape([4])]),
'1:0': allreduce.GradPackTuple(
indices=range(2),
vars=['v_1_0', 'v_1_1'],
shapes=[tf.TensorShape([4]),
tf.TensorShape([4])]),
'2:0': allreduce.GradPackTuple(
indices=range(2),
vars=['v_2_0', 'v_2_1'],
shapes=[tf.TensorShape([4]),
tf.TensorShape([4])])})
# 3) Set the size limit so all tensors get concatenated
new_tower_grads, packing = allreduce.pack_small_tensors(
tower_grads, max_bytes=256, # bytes = 64 elements
max_group=10)
self.assertEqual(3, len(new_tower_grads))
self.assertEqual(4, len(tower_grads[0]))
self.assertEqual(1, len(new_tower_grads[0]))
first_tower = new_tower_grads[0]
self.assertEqual(1, first_tower[0][0].shape.ndims)
self.assertEqual(26, first_tower[0][0].shape.dims[0])
self.assertEqual(packing,
{'0:0': allreduce.GradPackTuple(
indices=range(4),
vars=['v_0_0', 'v_0_1', 'v_0_2', 'v_0_3'],
shapes=[tf.TensorShape([4]),
tf.TensorShape([4]),
tf.TensorShape([3, 3,]),
tf.TensorShape([3, 3,])]),
'1:0': allreduce.GradPackTuple(
indices=range(4),
vars=['v_1_0', 'v_1_1', 'v_1_2', 'v_1_3'],
shapes=[tf.TensorShape([4]),
tf.TensorShape([4]),
tf.TensorShape([3, 3,]),
tf.TensorShape([3, 3,])]),
'2:0': allreduce.GradPackTuple(
indices=range(4),
vars=['v_2_0', 'v_2_1', 'v_2_2', 'v_2_3'],
shapes=[tf.TensorShape([4]),
tf.TensorShape([4]),
tf.TensorShape([3, 3,]),
tf.TensorShape([3, 3,])])})
def testUnpackSmallTensors(self):
packing = {'0:0': allreduce.GradPackTuple(indices=range(2),
vars=['v_0_0', 'v_0_1'],
shapes=[tf.TensorShape([4]),
tf.TensorShape([4])]),
'0:1': allreduce.GradPackTuple(indices=range(3, 5),
vars=['v_0_3', 'v_0_4'],
shapes=[tf.TensorShape([3, 3,]),
tf.TensorShape([3, 3,])]),
'1:0': allreduce.GradPackTuple(indices=range(2),
vars=['v_1_0', 'v_1_1'],
shapes=[tf.TensorShape([4]),
tf.TensorShape([4])]),
'1:1': allreduce.GradPackTuple(indices=range(3, 5),
vars=['v_1_3', 'v_1_4'],
shapes=[tf.TensorShape([3, 3,]),
tf.TensorShape([3, 3,])])}
t0 = tf.constant([0, 1, 2, 3, 4, 5, 6, 7], dtype=tf.float32)
t1 = tf.constant([17, 17], dtype=tf.float32)
t2 = tf.constant([0, 1, 2, 3, 4, 5, 6, 7, 8,
0, 1, 2, 3, 4, 5, 6, 7, 8], dtype=tf.float32)
t3 = tf.constant([0], dtype=tf.float32)
tower_grads = []
for d in range(0, 2):
one_tower = [(t0, 'packing_var_placeholder'),
(t2, 'packing_var_placeholder'),
(t1, 'v_%d_2' % d), (t3, 'v_%d_5' %d)]
tower_grads.append(one_tower)
new_tower_grads = allreduce.unpack_small_tensors(tower_grads, packing)
self.assertEqual(2, len(new_tower_grads))
for d, tg in enumerate(new_tower_grads):
self.assertEqual(6, len(tg))
self.assertEqual('v_%d_0' % d, tg[0][1])
self.assertEqual('v_%d_1' % d, tg[1][1])
self.assertEqual('v_%d_2' % d, tg[2][1])
self.assertEqual('v_%d_3' % d, tg[3][1])
self.assertEqual('v_%d_4' % d, tg[4][1])
self.assertEqual('v_%d_5' % d, tg[5][1])
self.assertEqual(1, tg[0][0].shape.ndims)
self.assertEqual(4, tg[0][0].shape.dims[0])
self.assertEqual(1, tg[1][0].shape.ndims)
self.assertEqual(4, tg[1][0].shape.dims[0])
self.assertEqual(1, tg[2][0].shape.ndims)
self.assertEqual(2, tg[2][0].shape.dims[0])
self.assertEqual(2, tg[3][0].shape.ndims)
self.assertEqual(3, tg[3][0].shape.dims[0])
self.assertEqual(3, tg[3][0].shape.dims[1])
self.assertEqual(2, tg[4][0].shape.ndims)
self.assertEqual(3, tg[4][0].shape.dims[0])
self.assertEqual(3, tg[4][0].shape.dims[1])
self.assertEqual(1, tg[5][0].shape.ndims)
self.assertEqual(1, tg[5][0].shape.dims[0])
class DynamicPackingTest(test_util.TensorFlowTestCase):
"""Packing/Unpacking tests that require executing a TensorFlow session."""
def _init_tensors(self, num_towers, tensor_shapes):
"""Construct a collection of tensors across multiple devices."""
num_tensors = len(tensor_shapes)
consts = []
tensors = []
vrbls = []
tower_grads = []
tf.Variable([-1], dtype=tf.int32, name='packing_var_placeholder')
for dev_idx in range(0, num_towers):
devname = '/job:localhost/device:GPU:%d' % dev_idx
consts.append([])
tensors.append([])
vrbls.append([])
with tf.device(devname):
base_value = 0
gv_tuples = []
for t_idx in range(0, num_tensors):
shape = tensor_shapes[t_idx]
num_elts = 0
for d in shape:
num_elts = (num_elts or 1) * d
c = np.fromiter(range(base_value, base_value + num_elts),
dtype=np.float32).reshape(shape)
base_value += num_elts
consts[dev_idx].append(c)
tensors[dev_idx].append(tf.constant(c))
vrbls[dev_idx].append(
tf.Variable(c, name='v_d%d_t%d' % (dev_idx, t_idx)))
gv_tuples.append((tensors[dev_idx][-1], vrbls[dev_idx][-1]))
tower_grads.append(gv_tuples)
return tower_grads, consts, tensors, vrbls
_test_tuple = pycoll.namedtuple('_test_tuple',
'num_devices, in_shapes out_shapes out_i')
def _do_pack_unpack_test(self, tt):
"""Do a single pack-unpack test.
Args:
tt: A _test_tuple defining the parameters of the test to do.
This test executes a graph that performs a pack of tower_grads
followed by an unpack and verifies that the shapes and values
of gradient tensors are unchanged, along with paired variables.
"""
with ops.Graph().as_default():
tower_grads, consts, _, vrbls = self._init_tensors(
tt.num_devices, tt.in_shapes)
packed_tg, packing = allreduce.pack_small_tensors(
tower_grads, max_bytes=40, max_group=10)
unpacked_tg = allreduce.unpack_small_tensors(packed_tg, packing)
with self.test_session() as sess:
sess.run(variables.global_variables_initializer())
packed = sess.run(packed_tg)
for d in range(0, tt.num_devices):
for t in range(0, len(tt.out_shapes)):
num_elts = 0
for dim in tt.out_shapes[t]:
num_elts = (num_elts or 1) * dim
self.assertTrue(np.array_equal(
np.array(range(tt.out_i[t], tt.out_i[t] + num_elts),
dtype=np.float32).reshape(tt.out_shapes[t]),
packed[d][t][0]))
unpacked = sess.run(unpacked_tg)
for d in range(0, tt.num_devices):
for t in range(0, len(tt.in_shapes)):
self.assertTrue(np.array_equal(consts[d][t], unpacked[d][t][0]))
self.assertEqual(vrbls[d][t], unpacked_tg[d][t][1])
def testPackUnpack0(self):
self._do_pack_unpack_test(
self._test_tuple(num_devices=3,
in_shapes=[[8], [3, 3], [12], [5, 5, 5]],
out_shapes=[[17], [12], [5, 5, 5]],
out_i=[0, 17, 29]))
def testPackUnpack1(self):
self._do_pack_unpack_test(
self._test_tuple(num_devices=4,
in_shapes=[[5, 5, 5], [2, 3], [5]],
out_shapes=[[11], [5, 5, 5]],
out_i=[125, 0]))
def testPackUnpack2(self):
self._do_pack_unpack_test(
self._test_tuple(num_devices=2,
in_shapes=[[5, 5, 5], [2, 3], [1, 5], [7], [100]],
out_shapes=[[18], [5, 5, 5], [100]],
out_i=[125, 0, 143]))
def _do_all_reduce_pack_test(self, tt):
"""Test that all-reduce results are the same with or without packing."""
with ops.Graph().as_default():
tower_grads, consts, _, _ = self._init_tensors(
tt.num_devices, tt.in_shapes)
dev_prefixes = ['/job:localhost']
num_workers = 1
alg = 'xring'
shards = 1
single_session = True
gpu_indices = range(0, tt.num_devices)
assert len(gpu_indices) == len(tower_grads)
no_pack_all_reduce = allreduce.sum_gradients_all_reduce(
single_session,
dev_prefixes, tower_grads, num_workers, alg, shards,
gpu_indices,
agg_small_grads_max_bytes=0, agg_small_grads_max_group=1)
packed_tg, packing = allreduce.pack_small_tensors(tower_grads, 100, 100)
packed_all_reduce = allreduce.sum_gradients_all_reduce(
single_session,
dev_prefixes, packed_tg, num_workers, alg, shards,
gpu_indices,
agg_small_grads_max_bytes=0, agg_small_grads_max_group=1)
unpacked_tg = allreduce.unpack_small_tensors(packed_all_reduce, packing)
with self.test_session() as sess:
sess.run(variables.global_variables_initializer())
no_pack_values = sess.run(no_pack_all_reduce)
pack_unpack_values = sess.run(unpacked_tg)
for d in range(1, tt.num_devices):
for t in range(0, len(tt.in_shapes)):
self.assertTrue(np.allclose(no_pack_values[d][t][0],
tt.num_devices * consts[0][t]))
self.assertTrue(np.array_equal(no_pack_values[d][t][0],
pack_unpack_values[d][t][0]))
def testAllReducePacked0(self):
self._do_all_reduce_pack_test(
self._test_tuple(num_devices=3,
in_shapes=[[8], [3, 3], [12], [5, 5, 5]],
out_shapes=[[17], [12], [5, 5, 5]],
out_i=[0, 17, 29]))
def testAllReducePacked1(self):
self._do_all_reduce_pack_test(
self._test_tuple(num_devices=2,
in_shapes=[[8], [3, 3], [12], [5, 5, 5], [3], [4]],
out_shapes=[[17], [7], [12], [5, 5, 5]],
out_i=[0, 17, 29, 154, 157]))
if __name__ == '__main__':
tf.disable_v2_behavior()
tf.test.main()
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Utilities for CNN benchmarks."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import sys
import threading
import numpy as np
import tensorflow.compat.v1 as tf
def tensorflow_version_tuple():
v = tf.__version__
major, minor, patch = v.split('.')
return (int(major), int(minor), patch)
def tensorflow_version():
vt = tensorflow_version_tuple()
return vt[0] * 1000 + vt[1]
def log_fn(log):
print(log)
def roll_numpy_batches(array, batch_size, shift_ratio):
"""Moves a proportion of batches from start to the end of the array.
This function moves a proportion of batches, specified by `shift_ratio`, from
the starts of the array to the end. The number of batches moved is rounded
down to the nearest integer. For example,
```
roll_numpy_batches([1, 2, 3, 4, 5, 6], 2, 0.34) == [3, 4, 5, 6, 1, 2]
```
Args:
array: A Numpy array whose first dimension is the batch dimension.
batch_size: The batch size.
shift_ratio: Proportion of batches to move from the start of the array to
the end of the array.
Returns:
A new Numpy array, with a proportion of the batches at the start of `array`
moved to the end.
"""
num_items = array.shape[0]
assert num_items % batch_size == 0
num_batches = num_items // batch_size
starting_batch = int(num_batches * shift_ratio)
starting_item = starting_batch * batch_size
return np.roll(array, -starting_item, axis=0)
# For Python 2.7 compatibility, we do not use threading.Barrier.
class Barrier(object):
"""Implements a lightweight Barrier.
Useful for synchronizing a fixed number of threads at known synchronization
points. Threads block on 'wait()' and simultaneously return once they have
all made that call.
# Implementation adopted from boost/thread/barrier.hpp
"""
def __init__(self, parties):
"""Create a barrier, initialised to 'parties' threads."""
self.cond = threading.Condition(threading.Lock())
self.parties = parties
# Indicates the number of waiting parties.
self.waiting = 0
# generation is needed to deal with spurious wakeups. If self.cond.wait()
# wakes up for other reasons, generation will force it go back to wait().
self.generation = 0
self.broken = False
def wait(self):
"""Wait for the barrier."""
with self.cond:
# Check if the barrier has been disabled or not.
if self.broken:
return
gen = self.generation
self.waiting += 1
if self.waiting == self.parties:
self.waiting = 0
self.generation += 1
self.cond.notify_all()
# loop because of spurious wakeups
while gen == self.generation:
self.cond.wait()
# TODO(huangyp): Remove this method once we find a way to know which step
# is the last barrier.
def abort(self):
"""Clear existing barrier and disable this barrier."""
with self.cond:
if self.waiting > 0:
self.generation += 1
self.cond.notify_all()
self.broken = True
class ImageProducer(object):
"""An image producer that puts images into a staging area periodically.
This class is useful for periodically running a set of ops, `put_ops` on a
different thread every `batch_group_size` steps.
The notify_image_consumption() method is used to increment an internal counter
so that every `batch_group_size` times it is called, `put_ops` is executed. A
barrier is placed so that notify_image_consumption() will block until
the previous call to `put_ops` has been executed.
The start() method is used to start the thread that runs `put_ops`.
The done() method waits until the last put_ops is executed and stops the
thread.
The purpose of this class is to fill an image input pipeline every
`batch_group_size` steps. Suppose `put_ops` supplies `batch_group_size` images
to the input pipeline when run, and that every step, 1 batch of images is
consumed. Then, by calling notify_image_consumption() every step, images are
supplied to the input pipeline at the same amount they are consumed.
Example usage:
```
put_ops = ... # Enqueues `batch_group_size` batches to a StagingArea
get_op = ... # Dequeues 1 batch, and does some operations on it
batch_group_size = 4
with tf.Session() as sess:
image_producer = cnn_util.ImageProducer(sess, put_op, batch_group_size)
image_producer.start()
for _ in range(100):
sess.run(get_op)
image_producer.notify_image_consumption()
```
"""
def __init__(self, sess, put_ops, batch_group_size, use_python32_barrier):
self.sess = sess
self.num_gets = 0
self.put_ops = put_ops
self.batch_group_size = batch_group_size
self.done_event = threading.Event()
if (use_python32_barrier and
sys.version_info[0] == 3 and sys.version_info[1] >= 2):
self.put_barrier = threading.Barrier(2)
else:
self.put_barrier = Barrier(2)
def _should_put(self):
return (self.num_gets + 1) % self.batch_group_size == 0
def done(self):
"""Stop the image producer."""
self.done_event.set()
self.put_barrier.abort()
self.thread.join()
def start(self):
"""Start the image producer."""
self.sess.run([self.put_ops])
self.thread = threading.Thread(target=self._loop_producer)
# Set daemon to true to allow Ctrl + C to terminate all threads.
self.thread.daemon = True
self.thread.start()
def notify_image_consumption(self):
"""Increment the counter of image_producer by 1.
This should only be called by the main thread that consumes images and runs
the model computation. One batch of images should be consumed between
calling start() and the first call to this method. Then, one batch of images
should be consumed between any two successive calls to this method.
"""
if self._should_put():
self.put_barrier.wait()
self.num_gets += 1
def _loop_producer(self):
while not self.done_event.isSet():
self.sess.run([self.put_ops])
self.put_barrier.wait()
class BaseClusterManager(object):
"""The manager for the cluster of servers running the benchmark."""
def __init__(self, params):
worker_hosts = params.worker_hosts.split(',')
ps_hosts = params.ps_hosts.split(',') if params.ps_hosts else []
cluster = {'worker': worker_hosts}
if ps_hosts:
cluster['ps'] = ps_hosts
self._cluster_spec = tf.train.ClusterSpec(cluster)
def get_target(self):
"""Returns a target to be passed to tf.Session()."""
raise NotImplementedError('get_target must be implemented by subclass')
def join_server(self):
raise NotImplementedError('join must be implemented by subclass')
def get_cluster_spec(self):
return self._cluster_spec
def num_workers(self):
return len(self._cluster_spec.job_tasks('worker'))
def num_ps(self):
if 'ps' in self._cluster_spec.jobs:
return len(self._cluster_spec.job_tasks('ps'))
else:
return 0
class GrpcClusterManager(BaseClusterManager):
"""A cluster manager for a cluster networked with gRPC."""
def __init__(self, params, config_proto):
super(GrpcClusterManager, self).__init__(params)
if params.job_name == 'controller':
self._target = 'grpc://%s' % self._cluster_spec.job_tasks('worker')[0]
else:
self._server = tf.train.Server(self._cluster_spec,
job_name=params.job_name,
task_index=params.task_index,
config=config_proto,
protocol=params.server_protocol)
self._target = self._server.target
def get_target(self):
return self._target
def join_server(self):
return self._server.join()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment