Commit 4749cd5e authored by huchen's avatar huchen
Browse files

del the tf2x benchmark

parent 772777c3
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Used to run benchmark_cnn for distributed tests.
In distributed tests, we spawn processes to run tf_cnn_benchmark tasks. We could
directly spawn tf_cnn_benchmark processes, but we want some added functionality,
such as being able to inject custom images during training. So instead, this
file is spawned as a Python process, which supports the added functionality.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from absl import flags as absl_flags
import numpy as np
import tensorflow.compat.v1 as tf
import benchmark_cnn
import flags
import preprocessing
import test_util
absl_flags.DEFINE_string('fake_input', 'none',
"""What fake input to inject into benchmark_cnn. This
is ignored if --model=test_model.
Options are:
none: Do not inject any fake input.
zeros_and_ones: Half the images will be all 0s with
a label of 0. Half the images will be all 1s with a
label of 1.""")
flags.define_flags()
FLAGS = flags.FLAGS
def get_test_image_preprocessor(batch_size, params):
"""Returns the preprocessing.TestImagePreprocessor that should be injected.
Returns None if no preprocessor should be injected.
Args:
batch_size: The batch size across all GPUs.
params: BenchmarkCNN's parameters.
Returns:
Returns the preprocessing.TestImagePreprocessor that should be injected.
Raises:
ValueError: Flag --fake_input is an invalid value.
"""
if FLAGS.fake_input == 'none':
return None
elif FLAGS.fake_input == 'zeros_and_ones':
half_batch_size = batch_size // 2
images = np.zeros((batch_size, 227, 227, 3), dtype=np.float32)
images[half_batch_size:, :, :, :] = 1
labels = np.array([0] * half_batch_size + [1] * half_batch_size,
dtype=np.int32)
preprocessor = preprocessing.TestImagePreprocessor(
batch_size, [227, 227, 3], params.num_gpus,
benchmark_cnn.get_data_type(params))
preprocessor.set_fake_data(images, labels)
preprocessor.expected_subset = 'validation' if params.eval else 'train'
return preprocessor
else:
raise ValueError('Invalid --fake_input: %s' % FLAGS.fake_input)
def run_with_real_model(params):
"""Runs tf_cnn_benchmarks with a real model."""
bench = benchmark_cnn.BenchmarkCNN(params)
bench.print_info()
preprocessor = get_test_image_preprocessor(bench.batch_size, params)
if preprocessor is not None:
# The test image preprocessor requires queue runners. Since this file is
# used for testing, it is OK to access protected members.
# pylint: disable=protected-access
bench.dataset._queue_runner_required = True
# pylint: enable=protected-access
bench.input_preprocessor = preprocessor
bench.run()
def run_with_test_model(params):
"""Runs tf_cnn_benchmarks with a test model."""
model = test_util.TestCNNModel()
inputs = test_util.get_fake_var_update_inputs()
with test_util.monkey_patch(benchmark_cnn,
LOSS_AND_ACCURACY_DIGITS_TO_SHOW=15):
bench = benchmark_cnn.BenchmarkCNN(params, dataset=test_util.TestDataSet(),
model=model)
# The test model does not use labels when computing loss, so the label
# values do not matter as long as it's the right shape.
labels = np.array([1] * inputs.shape[0])
bench.input_preprocessor.set_fake_data(inputs, labels)
bench.run()
def main(_):
params = benchmark_cnn.make_params_from_flags()
params = benchmark_cnn.setup(params)
if params.model == 'test_model':
run_with_test_model(params)
else:
run_with_real_model(params)
if __name__ == '__main__':
tf.disable_v2_behavior()
tf.app.run()
# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Utilities for CNN benchmarks."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import sys
import threading
import numpy as np
import tensorflow.compat.v1 as tf
def tensorflow_version_tuple():
v = tf.__version__
major, minor, patch = v.split('.')
return (int(major), int(minor), patch)
def tensorflow_version():
vt = tensorflow_version_tuple()
return vt[0] * 1000 + vt[1]
def log_fn(log):
print(log)
def roll_numpy_batches(array, batch_size, shift_ratio):
"""Moves a proportion of batches from start to the end of the array.
This function moves a proportion of batches, specified by `shift_ratio`, from
the starts of the array to the end. The number of batches moved is rounded
down to the nearest integer. For example,
```
roll_numpy_batches([1, 2, 3, 4, 5, 6], 2, 0.34) == [3, 4, 5, 6, 1, 2]
```
Args:
array: A Numpy array whose first dimension is the batch dimension.
batch_size: The batch size.
shift_ratio: Proportion of batches to move from the start of the array to
the end of the array.
Returns:
A new Numpy array, with a proportion of the batches at the start of `array`
moved to the end.
"""
num_items = array.shape[0]
assert num_items % batch_size == 0
num_batches = num_items // batch_size
starting_batch = int(num_batches * shift_ratio)
starting_item = starting_batch * batch_size
return np.roll(array, -starting_item, axis=0)
# For Python 2.7 compatibility, we do not use threading.Barrier.
class Barrier(object):
"""Implements a lightweight Barrier.
Useful for synchronizing a fixed number of threads at known synchronization
points. Threads block on 'wait()' and simultaneously return once they have
all made that call.
# Implementation adopted from boost/thread/barrier.hpp
"""
def __init__(self, parties):
"""Create a barrier, initialised to 'parties' threads."""
self.cond = threading.Condition(threading.Lock())
self.parties = parties
# Indicates the number of waiting parties.
self.waiting = 0
# generation is needed to deal with spurious wakeups. If self.cond.wait()
# wakes up for other reasons, generation will force it go back to wait().
self.generation = 0
self.broken = False
def wait(self):
"""Wait for the barrier."""
with self.cond:
# Check if the barrier has been disabled or not.
if self.broken:
return
gen = self.generation
self.waiting += 1
if self.waiting == self.parties:
self.waiting = 0
self.generation += 1
self.cond.notify_all()
# loop because of spurious wakeups
while gen == self.generation:
self.cond.wait()
# TODO(huangyp): Remove this method once we find a way to know which step
# is the last barrier.
def abort(self):
"""Clear existing barrier and disable this barrier."""
with self.cond:
if self.waiting > 0:
self.generation += 1
self.cond.notify_all()
self.broken = True
class ImageProducer(object):
"""An image producer that puts images into a staging area periodically.
This class is useful for periodically running a set of ops, `put_ops` on a
different thread every `batch_group_size` steps.
The notify_image_consumption() method is used to increment an internal counter
so that every `batch_group_size` times it is called, `put_ops` is executed. A
barrier is placed so that notify_image_consumption() will block until
the previous call to `put_ops` has been executed.
The start() method is used to start the thread that runs `put_ops`.
The done() method waits until the last put_ops is executed and stops the
thread.
The purpose of this class is to fill an image input pipeline every
`batch_group_size` steps. Suppose `put_ops` supplies `batch_group_size` images
to the input pipeline when run, and that every step, 1 batch of images is
consumed. Then, by calling notify_image_consumption() every step, images are
supplied to the input pipeline at the same amount they are consumed.
Example usage:
```
put_ops = ... # Enqueues `batch_group_size` batches to a StagingArea
get_op = ... # Dequeues 1 batch, and does some operations on it
batch_group_size = 4
with tf.Session() as sess:
image_producer = cnn_util.ImageProducer(sess, put_op, batch_group_size)
image_producer.start()
for _ in range(100):
sess.run(get_op)
image_producer.notify_image_consumption()
```
"""
def __init__(self, sess, put_ops, batch_group_size, use_python32_barrier):
self.sess = sess
self.num_gets = 0
self.put_ops = put_ops
self.batch_group_size = batch_group_size
self.done_event = threading.Event()
if (use_python32_barrier and
sys.version_info[0] == 3 and sys.version_info[1] >= 2):
self.put_barrier = threading.Barrier(2)
else:
self.put_barrier = Barrier(2)
def _should_put(self):
return (self.num_gets + 1) % self.batch_group_size == 0
def done(self):
"""Stop the image producer."""
self.done_event.set()
self.put_barrier.abort()
self.thread.join()
def start(self):
"""Start the image producer."""
self.sess.run([self.put_ops])
self.thread = threading.Thread(target=self._loop_producer)
# Set daemon to true to allow Ctrl + C to terminate all threads.
self.thread.daemon = True
self.thread.start()
def notify_image_consumption(self):
"""Increment the counter of image_producer by 1.
This should only be called by the main thread that consumes images and runs
the model computation. One batch of images should be consumed between
calling start() and the first call to this method. Then, one batch of images
should be consumed between any two successive calls to this method.
"""
if self._should_put():
self.put_barrier.wait()
self.num_gets += 1
def _loop_producer(self):
while not self.done_event.isSet():
self.sess.run([self.put_ops])
self.put_barrier.wait()
class BaseClusterManager(object):
"""The manager for the cluster of servers running the benchmark."""
def __init__(self, params):
worker_hosts = params.worker_hosts.split(',')
ps_hosts = params.ps_hosts.split(',') if params.ps_hosts else []
cluster = {'worker': worker_hosts}
if ps_hosts:
cluster['ps'] = ps_hosts
self._cluster_spec = tf.train.ClusterSpec(cluster)
def get_target(self):
"""Returns a target to be passed to tf.Session()."""
raise NotImplementedError('get_target must be implemented by subclass')
def join_server(self):
raise NotImplementedError('join must be implemented by subclass')
def get_cluster_spec(self):
return self._cluster_spec
def num_workers(self):
return len(self._cluster_spec.job_tasks('worker'))
def num_ps(self):
if 'ps' in self._cluster_spec.jobs:
return len(self._cluster_spec.job_tasks('ps'))
else:
return 0
class GrpcClusterManager(BaseClusterManager):
"""A cluster manager for a cluster networked with gRPC."""
def __init__(self, params, config_proto):
super(GrpcClusterManager, self).__init__(params)
if params.job_name == 'controller':
self._target = 'grpc://%s' % self._cluster_spec.job_tasks('worker')[0]
else:
self._server = tf.train.Server(self._cluster_spec,
job_name=params.job_name,
task_index=params.task_index,
config=config_proto,
protocol=params.server_protocol)
self._target = self._server.target
def get_target(self):
return self._target
def join_server(self):
return self._server.join()
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for tf_cnn_benchmarks.cnn_util."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import threading
import time
import tensorflow.compat.v1 as tf
import cnn_util
class CnnUtilBarrierTest(tf.test.TestCase):
def testBarrier(self):
num_tasks = 20
num_waits = 4
barrier = cnn_util.Barrier(num_tasks)
threads = []
sync_matrix = []
for i in range(num_tasks):
sync_times = [0] * num_waits
thread = threading.Thread(
target=self._run_task, args=(barrier, sync_times))
thread.start()
threads.append(thread)
sync_matrix.append(sync_times)
for thread in threads:
thread.join()
for wait_index in range(num_waits - 1):
# Max of times at iteration i < min of times at iteration i + 1
self.assertLessEqual(
max([sync_matrix[i][wait_index] for i in range(num_tasks)]),
min([sync_matrix[i][wait_index + 1] for i in range(num_tasks)]))
def _run_task(self, barrier, sync_times):
for wait_index in range(len(sync_times)):
sync_times[wait_index] = time.time()
barrier.wait()
def testBarrierAbort(self):
num_tasks = 2
num_waits = 1
sync_times = [0] * num_waits
barrier = cnn_util.Barrier(num_tasks)
thread = threading.Thread(
target=self._run_task, args=(barrier, sync_times))
thread.start()
barrier.abort()
# thread won't be blocked by done barrier.
thread.join()
class ImageProducerTest(tf.test.TestCase):
def _slow_tensorflow_op(self):
"""Returns a TensorFlow op that takes approximately 0.1s to complete."""
def slow_func(v):
time.sleep(0.1)
return v
return tf.py_func(slow_func, [tf.constant(0.)], tf.float32).op
def _test_image_producer(self, batch_group_size, put_slower_than_get):
# We use the variable x to simulate a staging area of images. x represents
# the number of batches in the staging area.
x = tf.Variable(0, dtype=tf.int32)
if put_slower_than_get:
put_dep = self._slow_tensorflow_op()
get_dep = tf.no_op()
else:
put_dep = tf.no_op()
get_dep = self._slow_tensorflow_op()
with tf.control_dependencies([put_dep]):
put_op = x.assign_add(batch_group_size, use_locking=True)
with tf.control_dependencies([get_dep]):
get_op = x.assign_sub(1, use_locking=True)
with self.test_session() as sess:
sess.run(tf.variables_initializer([x]))
image_producer = cnn_util.ImageProducer(sess, put_op, batch_group_size,
use_python32_barrier=False)
image_producer.start()
for _ in range(5 * batch_group_size):
sess.run(get_op)
# We assert x is nonnegative, to ensure image_producer never causes
# an unstage op to block. We assert x is at most 2 * batch_group_size,
# to ensure it doesn't use too much memory by storing too many batches
# in the staging area.
self.assertGreaterEqual(sess.run(x), 0)
self.assertLessEqual(sess.run(x), 2 * batch_group_size)
image_producer.notify_image_consumption()
self.assertGreaterEqual(sess.run(x), 0)
self.assertLessEqual(sess.run(x), 2 * batch_group_size)
image_producer.done()
time.sleep(0.1)
self.assertGreaterEqual(sess.run(x), 0)
self.assertLessEqual(sess.run(x), 2 * batch_group_size)
def test_image_producer(self):
self._test_image_producer(1, False)
self._test_image_producer(1, True)
self._test_image_producer(2, False)
self._test_image_producer(2, True)
self._test_image_producer(3, False)
self._test_image_producer(3, True)
self._test_image_producer(8, False)
self._test_image_producer(8, True)
if __name__ == '__main__':
tf.disable_v2_behavior()
tf.test.main()
# Copyright 2018 Google. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""COCO-style evaluation metrics.
Forked from reference model implementation.
COCO API: github.com/cocodataset/cocoapi/
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import atexit
import tempfile
from absl import flags
import numpy as np
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
import six
import tensorflow.compat.v1 as tf
import mlperf
import ssd_constants
FLAGS = flags.FLAGS
# https://github.com/cocodataset/cocoapi/issues/49
if six.PY3:
import pycocotools.coco
pycocotools.coco.unicode = str
def async_eval_runner(queue_predictions, queue_results, val_json_file):
"""Load intermediate eval results and get COCO metrics."""
while True:
message = queue_predictions.get()
if message == 'STOP': # poison pill
break
step, predictions = message
results = compute_map(predictions, val_json_file)
queue_results.put((step, results))
def compute_map(predictions, val_json_file):
"""Use model predictions to compute mAP.
Args:
predictions: a list of tuples returned by decoded_predictions function,
each containing the following elements:
image source_id, box coordinates in XYWH order, probability score, label
val_json_file: path to COCO annotation file
Returns:
A dictionary that maps all COCO metrics (keys) to their values
"""
if val_json_file.startswith("gs://"):
_, local_val_json = tempfile.mkstemp(suffix=".json")
tf.gfile.Remove(local_val_json)
tf.gfile.Copy(val_json_file, local_val_json)
atexit.register(tf.gfile.Remove, local_val_json)
else:
local_val_json = val_json_file
cocoGt = COCO(local_val_json)
cocoDt = cocoGt.loadRes(np.array(predictions))
E = COCOeval(cocoGt, cocoDt, iouType='bbox')
E.evaluate()
E.accumulate()
E.summarize()
print("Current AP: {:.5f}".format(E.stats[0]))
metric_names = ['AP', 'AP50', 'AP75', 'APs', 'APm', 'APl', 'ARmax1',
'ARmax10', 'ARmax100', 'ARs', 'ARm', 'ARl']
# Prefix with "COCO" to group in TensorBoard.
return {"COCO/" + key: value for key, value in zip(metric_names, E.stats)}
def calc_iou(target, candidates):
target_tiled = np.tile(target[np.newaxis, :], (candidates.shape[0], 1))
# Left Top & Right Bottom
lt = np.maximum(target_tiled[:,:2], candidates[:,:2])
rb = np.minimum(target_tiled[:,2:], candidates[:,2:])
delta = np.maximum(rb - lt, 0)
intersect = delta[:,0] * delta[:,1]
delta1 = target_tiled[:,2:] - candidates[:,:2]
area1 = delta1[:,0] * delta1[:,1]
delta2 = target_tiled[:,2:] - candidates[:,:2]
area2 = delta2[:,0] * delta2[:,1]
iou = intersect/(area1 + area2 - intersect)
return iou
# TODO(haoyuzhang): Rewrite this NumPy based implementation to TensorFlow based
# implementation under ssd_model.py accuracy_function.
def decode_predictions(labels_and_predictions):
"""Decode predictions and remove unused boxes and labels."""
predictions = []
for example in labels_and_predictions:
source_id = int(example[ssd_constants.SOURCE_ID])
pred_box = example[ssd_constants.PRED_BOXES]
pred_scores = example[ssd_constants.PRED_SCORES]
locs, labels, probs = decode_single(
pred_box, pred_scores, ssd_constants.OVERLAP_CRITERIA,
ssd_constants.MAX_NUM_EVAL_BOXES, ssd_constants.MAX_NUM_EVAL_BOXES)
raw_height, raw_width, _ = example[ssd_constants.RAW_SHAPE]
for loc, label, prob in zip(locs, labels, probs):
# Ordering convention differs, hence [1], [0] rather than [0], [1]
x, y = loc[1] * raw_width, loc[0] * raw_height
w, h = (loc[3] - loc[1]) * raw_width, (loc[2] - loc[0]) * raw_height
predictions.append(
[source_id, x, y, w, h, prob, ssd_constants.CLASS_INV_MAP[label]])
mlperf.logger.log(key=mlperf.tags.NMS_THRESHOLD,
value=ssd_constants.OVERLAP_CRITERIA)
mlperf.logger.log(key=mlperf.tags.NMS_MAX_DETECTIONS,
value=ssd_constants.MAX_NUM_EVAL_BOXES)
return predictions
def decode_single(bboxes_in, scores_in, criteria, max_output, max_num=200):
# Reference to https://github.com/amdegroot/ssd.pytorch
bboxes_out = []
scores_out = []
labels_out = []
for i, score in enumerate(np.split(scores_in, scores_in.shape[1], 1)):
score = np.squeeze(score, 1)
# skip background
if i == 0:
continue
mask = score > ssd_constants.MIN_SCORE
if not np.any(mask):
continue
bboxes, score = bboxes_in[mask, :], score[mask]
score_idx_sorted = np.argsort(score)
score_sorted = score[score_idx_sorted]
score_idx_sorted = score_idx_sorted[-max_num:]
candidates = []
# perform non-maximum suppression
while len(score_idx_sorted):
idx = score_idx_sorted[-1]
bboxes_sorted = bboxes[score_idx_sorted, :]
bboxes_idx = bboxes[idx, :]
iou = calc_iou(bboxes_idx, bboxes_sorted)
score_idx_sorted = score_idx_sorted[iou < criteria]
candidates.append(idx)
bboxes_out.append(bboxes[candidates, :])
scores_out.append(score[candidates])
labels_out.extend([i]*len(candidates))
if len(scores_out) == 0:
tf.logging.info("No objects detected. Returning dummy values.")
return (
np.zeros(shape=(1, 4), dtype=np.float32),
np.zeros(shape=(1,), dtype=np.int32),
np.ones(shape=(1,), dtype=np.float32) * ssd_constants.DUMMY_SCORE,
)
bboxes_out = np.concatenate(bboxes_out, axis=0)
scores_out = np.concatenate(scores_out, axis=0)
labels_out = np.array(labels_out)
max_ids = np.argsort(scores_out)[-max_output:]
return bboxes_out[max_ids, :], labels_out[max_ids], scores_out[max_ids]
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Constants used in tf_cnn_benchmarks."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from enum import Enum
# Results fetched with this prefix will not be reduced. Instead, they will be
# passed as matrices to model's postprocess function.
UNREDUCED_ACCURACY_OP_PREFIX = "tensor:"
# Eval result values with this name prefix will be included in summary.
SIMPLE_VALUE_RESULT_PREFIX = "simple_value:"
class BenchmarkMode(object):
"""Benchmark running mode."""
TRAIN = "training"
EVAL = "evaluation"
TRAIN_AND_EVAL = "training + evaluation"
FORWARD_ONLY = "forward only"
class NetworkTopology(str, Enum):
"""Network topology describes how multiple GPUs are inter-connected.
"""
# DGX-1 uses hybrid cube mesh topology with the following device peer to peer
# matrix:
# DMA: 0 1 2 3 4 5 6 7
# 0: Y Y Y Y Y N N N
# 1: Y Y Y Y N Y N N
# 2: Y Y Y Y N N Y N
# 3: Y Y Y Y N N N Y
# 4: Y N N N Y Y Y Y
# 5: N Y N N Y Y Y Y
# 6: N N Y N Y Y Y Y
# 7: N N N Y Y Y Y Y
DGX1 = "dgx1"
# V100 in GCP are connected with the following device peer to peer matrix.
# In this topology, bandwidth of the connection depends on if it uses NVLink
# or PCIe link.
# DMA: 0 1 2 3 4 5 6 7
# 0: Y Y Y Y N Y N N
# 1: Y Y Y Y N N N N
# 2: Y Y Y Y N N N Y
# 3: Y Y Y Y N N N N
# 4: N N N N Y Y Y Y
# 5: Y N N N Y Y Y Y
# 6: N N N N Y Y Y Y
# 7: N N Y N Y Y Y Y
GCP_V100 = "gcp_v100"
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment