Commit 27b4acd4 authored by Aman Gupta's avatar Aman Gupta
Browse files

Merge remote-tracking branch 'upstream/master'

parents 5133522f d4e1f97f
......@@ -6,10 +6,12 @@
/research/adv_imagenet_models/ @AlexeyKurakin
/research/attention_ocr/ @alexgorban
/research/audioset/ @plakal @dpwe
/research/autoaugment/* @barretzoph
/research/autoencoders/ @snurkabill
/research/brain_coder/ @danabo
/research/cognitive_mapping_and_planning/ @s-gupta
/research/compression/ @nmjohn
/research/cvt_text/ @clarkkev @lmthang
/research/deep_contextual_bandits/ @rikel
/research/deeplab/ @aquariusjay @yknzhu @gpapan
/research/delf/ @andrefaraujo
......
......@@ -54,7 +54,7 @@ our objectives of readable, usable, and maintainable code.
* Runnable from a blank environment with relative ease.
* Trainable on: single GPU/CPU (baseline), multiple GPUs, TPU
* Compatible with Python 2 and 3 (using [six](https://pythonhosted.org/six/) when necessary)
* Conform to [Google Python Style Guide](https://google.github.io/styleguide/pyguide.html)
* Conform to [Google Python Style Guide](https://github.com/google/styleguide/blob/gh-pages/pyguide.md)
**Implementation guidelines**
......
......@@ -72,7 +72,7 @@ NUM_USER_IDS = {
ML_20M: 138493,
}
# Note: Users are indexed [1, k], not [0, k-1]
# Note: Movies are indexed [1, k], not [0, k-1]
# Both the 1m and 20m datasets use the same movie set.
NUM_ITEM_IDS = 3952
......
......@@ -58,6 +58,7 @@ NUM_EVAL_NEGATIVES = 999
CYCLES_TO_BUFFER = 3 # The number of train cycles worth of data to "run ahead"
# of the main training loop.
READY_FILE_TEMP = "ready.json.temp"
READY_FILE = "ready.json"
TRAIN_RECORD_TEMPLATE = "train_{}.tfrecords"
......
......@@ -160,7 +160,8 @@ def _construct_training_records(
train_batch_size, # type: int
training_shards, # type: typing.List[str]
spillover, # type: bool
carryover=None # type: typing.Union[typing.List[np.ndarray], None]
carryover=None, # type: typing.Union[typing.List[np.ndarray], None]
deterministic=False # type: bool
):
"""Generate false negatives and write TFRecords files.
......@@ -204,7 +205,8 @@ def _construct_training_records(
with contextlib.closing(multiprocessing.Pool(
processes=num_workers, initializer=init_worker)) as pool:
data_generator = pool.imap_unordered(_process_shard, map_args) # pylint: disable=no-member
map_fn = pool.imap if deterministic else pool.imap_unordered # pylint: disable=no-member
data_generator = map_fn(_process_shard, map_args)
data = [
np.zeros(shape=(num_pts,), dtype=np.int32) - 1,
np.zeros(shape=(num_pts,), dtype=np.uint16),
......@@ -282,11 +284,17 @@ def _construct_training_records(
raise ValueError("Error detected: point counts do not match: {} vs. {}"
.format(num_pts, written_pts))
with tf.gfile.Open(os.path.join(record_dir, rconst.READY_FILE), "w") as f:
# We write to a temp file then atomically rename it to the final file, because
# writing directly to the final file can cause the main process to read a
# partially written JSON file.
ready_file_temp = os.path.join(record_dir, rconst.READY_FILE_TEMP)
with tf.gfile.Open(ready_file_temp, "w") as f:
json.dump({
"batch_size": train_batch_size,
"batch_count": batch_count,
}, f)
ready_file = os.path.join(record_dir, rconst.READY_FILE)
tf.gfile.Rename(ready_file_temp, ready_file)
log_msg("Cycle {} complete. Total time: {:.1f} seconds"
.format(train_cycle, timeit.default_timer() - st))
......@@ -329,21 +337,35 @@ def _construct_eval_record(cache_paths, eval_batch_size):
items=items[i, :]
)
writer.write(batch_bytes)
tf.gfile.Copy(intermediate_fpath, dest_fpath)
tf.gfile.Remove(intermediate_fpath)
tf.gfile.Rename(intermediate_fpath, dest_fpath)
log_msg("Eval TFRecords file successfully constructed.")
def _generation_loop(
num_workers, cache_paths, num_readers, num_neg, num_train_positives,
num_items, spillover, epochs_per_cycle, train_batch_size, eval_batch_size):
# type: (int, rconst.Paths, int, int, int, int, bool, int, int, int) -> None
def _generation_loop(num_workers, # type: int
cache_paths, # type: rconst.Paths
num_readers, # type: int
num_neg, # type: int
num_train_positives, # type: int
num_items, # type: int
spillover, # type: bool
epochs_per_cycle, # type: int
train_batch_size, # type: int
eval_batch_size, # type: int
deterministic # type: bool
):
# type: (...) -> None
"""Primary run loop for data file generation."""
log_msg("Signaling that I am alive.")
with tf.gfile.Open(cache_paths.subproc_alive, "w") as f:
f.write("Generation subproc has started.")
atexit.register(tf.gfile.Remove, filename=cache_paths.subproc_alive)
@atexit.register
def remove_alive_file():
try:
tf.gfile.Remove(cache_paths.subproc_alive)
except tf.errors.NotFoundError:
return # Main thread has already deleted the entire cache dir.
log_msg("Entering generation loop.")
tf.gfile.MakeDirs(cache_paths.train_epoch_dir)
......@@ -359,7 +381,8 @@ def _generation_loop(
cache_paths=cache_paths, num_readers=num_readers, num_neg=num_neg,
num_train_positives=num_train_positives, num_items=num_items,
epochs_per_cycle=epochs_per_cycle, train_batch_size=train_batch_size,
training_shards=training_shards, spillover=spillover, carryover=None)
training_shards=training_shards, spillover=spillover, carryover=None,
deterministic=deterministic)
_construct_eval_record(cache_paths=cache_paths,
eval_batch_size=eval_batch_size)
......@@ -392,7 +415,7 @@ def _generation_loop(
num_train_positives=num_train_positives, num_items=num_items,
epochs_per_cycle=epochs_per_cycle, train_batch_size=train_batch_size,
training_shards=training_shards, spillover=spillover,
carryover=carryover)
carryover=carryover, deterministic=deterministic)
wait_count = 0
start_time = time.time()
......@@ -436,6 +459,7 @@ def main(_):
epochs_per_cycle=flags.FLAGS.epochs_per_cycle,
train_batch_size=flags.FLAGS.train_batch_size,
eval_batch_size=flags.FLAGS.eval_batch_size,
deterministic=flags.FLAGS.seed is not None,
)
except KeyboardInterrupt:
log_msg("KeyboardInterrupt registered.")
......
......@@ -21,6 +21,7 @@ from __future__ import print_function
import atexit
import contextlib
import gc
import hashlib
import multiprocessing
import json
import os
......@@ -50,7 +51,7 @@ class NCFDataset(object):
"""Container for training and testing data."""
def __init__(self, user_map, item_map, num_data_readers, cache_paths,
num_train_positives):
num_train_positives, deterministic=False):
# type: (dict, dict, int, rconst.Paths) -> None
"""Assign key values for recommendation dataset.
......@@ -61,6 +62,8 @@ class NCFDataset(object):
cache_paths: Object containing locations for various cache files.
num_train_positives: The number of positive training examples in the
dataset.
deterministic: Operations should use deterministic, order preserving
methods, even at the cost of performance.
"""
self.user_map = {int(k): int(v) for k, v in user_map.items()}
......@@ -70,6 +73,7 @@ class NCFDataset(object):
self.num_data_readers = num_data_readers
self.cache_paths = cache_paths
self.num_train_positives = num_train_positives
self.deterministic = deterministic
def _filter_index_sort(raw_rating_path, match_mlperf):
......@@ -340,7 +344,8 @@ def generate_train_eval_data(df, approx_num_shards, num_items, cache_paths,
pickle.dump(eval_data, f, protocol=pickle.HIGHEST_PROTOCOL)
def construct_cache(dataset, data_dir, num_data_readers, match_mlperf):
def construct_cache(dataset, data_dir, num_data_readers, match_mlperf,
deterministic):
# type: (str, str, int, bool) -> NCFDataset
"""Load and digest data CSV into a usable form.
......@@ -351,6 +356,8 @@ def construct_cache(dataset, data_dir, num_data_readers, match_mlperf):
data during training.
match_mlperf: If True, change the behavior of the cache construction to
match the MLPerf reference implementation.
deterministic: Try to enforce repeatable behavior, even at the cost of
performance.
"""
cache_paths = rconst.Paths(data_dir=data_dir)
num_data_readers = (num_data_readers or int(multiprocessing.cpu_count() / 2)
......@@ -377,7 +384,8 @@ def construct_cache(dataset, data_dir, num_data_readers, match_mlperf):
ncf_dataset = NCFDataset(user_map=user_map, item_map=item_map,
num_data_readers=num_data_readers,
cache_paths=cache_paths,
num_train_positives=len(df) - len(user_map))
num_train_positives=len(df) - len(user_map),
deterministic=deterministic)
run_time = timeit.default_timer() - st
tf.logging.info("Cache construction complete. Time: {:.1f} sec."
......@@ -403,13 +411,15 @@ def _shutdown(proc):
def instantiate_pipeline(dataset, data_dir, batch_size, eval_batch_size,
num_data_readers=None, num_neg=4, epochs_per_cycle=1,
match_mlperf=False):
match_mlperf=False, deterministic=False):
# type: (...) -> (NCFDataset, typing.Callable)
"""Preprocess data and start negative generation subprocess."""
tf.logging.info("Beginning data preprocessing.")
ncf_dataset = construct_cache(dataset=dataset, data_dir=data_dir,
num_data_readers=num_data_readers,
match_mlperf=match_mlperf)
match_mlperf=match_mlperf,
deterministic=deterministic)
tf.logging.info("Creating training file subprocess.")
......@@ -439,20 +449,40 @@ def instantiate_pipeline(dataset, data_dir, batch_size, eval_batch_size,
# guarantee batch size and significantly improves
# performance. (~5% increase in examples/sec on
# GPU, and needed for TPU XLA.)
"--redirect_logs", "True",
"--seed", str(int(stat_utils.random_int32()))
"--redirect_logs", "True"
]
if ncf_dataset.deterministic:
subproc_args.extend(["--seed", str(int(stat_utils.random_int32()))])
tf.logging.info(
"Generation subprocess command: {}".format(" ".join(subproc_args)))
proc = subprocess.Popen(args=subproc_args, shell=False, env=subproc_env)
atexit.register(_shutdown, proc=proc)
atexit.register(tf.gfile.DeleteRecursively,
ncf_dataset.cache_paths.cache_root)
cleanup_called = {"finished": False}
@atexit.register
def cleanup():
"""Remove files and subprocess from data generation."""
if cleanup_called["finished"]:
return
_shutdown(proc)
try:
tf.gfile.DeleteRecursively(ncf_dataset.cache_paths.cache_root)
except tf.errors.NotFoundError:
pass
cleanup_called["finished"] = True
for _ in range(300):
if tf.gfile.Exists(ncf_dataset.cache_paths.subproc_alive):
break
time.sleep(1) # allow `alive` file to be written
if not tf.gfile.Exists(ncf_dataset.cache_paths.subproc_alive):
raise ValueError("Generation subprocess did not start correctly. Data will "
"not be available; exiting to avoid waiting forever.")
return ncf_dataset
return ncf_dataset, cleanup
def make_deserialize(params, batch_size, training=False):
......@@ -490,13 +520,53 @@ def make_deserialize(params, batch_size, training=False):
return deserialize
def hash_pipeline(dataset, deterministic):
# type: (tf.data.Dataset, bool) -> None
"""Utility function for detecting non-determinism in the data pipeline.
Args:
dataset: a tf.data.Dataset generated by the input_fn
deterministic: Does the input_fn expect the dataset to be deterministic.
(i.e. fixed seed, sloppy=False, etc.)
"""
if not deterministic:
tf.logging.warning("Data pipeline is not marked as deterministic. Hash "
"values are not expected to be meaningful.")
batch = dataset.make_one_shot_iterator().get_next()
md5 = hashlib.md5()
count = 0
first_batch_hash = b""
with tf.Session() as sess:
while True:
try:
result = sess.run(batch)
if isinstance(result, tuple):
result = result[0] # only hash features
except tf.errors.OutOfRangeError:
break
count += 1
md5.update(memoryview(result[movielens.USER_COLUMN]).tobytes())
md5.update(memoryview(result[movielens.ITEM_COLUMN]).tobytes())
if count == 1:
first_batch_hash = md5.hexdigest()
overall_hash = md5.hexdigest()
tf.logging.info("Batch count: {}".format(count))
tf.logging.info(" [pipeline_hash] First batch hash: {}".format(
first_batch_hash))
tf.logging.info(" [pipeline_hash] All batches hash: {}".format(overall_hash))
def make_train_input_fn(ncf_dataset):
# type: (NCFDataset) -> (typing.Callable, str, int)
"""Construct training input_fn for the current epoch."""
if not tf.gfile.Exists(ncf_dataset.cache_paths.subproc_alive):
raise ValueError("Generation subprocess did not start correctly. Data will "
"not be available; exiting to avoid waiting forever.")
# The generation subprocess must have been alive at some point, because we
# earlier checked that the subproc_alive file existed.
raise ValueError("Generation subprocess unexpectedly died. Data will not "
"be available; exiting to avoid waiting forever.")
train_epoch_dir = ncf_dataset.cache_paths.train_epoch_dir
while not tf.gfile.Exists(train_epoch_dir):
......@@ -546,14 +616,19 @@ def make_train_input_fn(ncf_dataset):
tf.data.TFRecordDataset,
cycle_length=4,
block_length=100000,
sloppy=True,
sloppy=not ncf_dataset.deterministic,
prefetch_input_elements=4,
)
deserialize = make_deserialize(params, batch_size, True)
dataset = record_files.apply(interleave)
dataset = dataset.map(deserialize, num_parallel_calls=4)
return dataset.prefetch(32)
dataset = dataset.prefetch(32)
if params.get("hash_pipeline"):
hash_pipeline(dataset, ncf_dataset.deterministic)
return dataset
return input_fn, record_dir, batch_count
......@@ -578,7 +653,11 @@ def make_pred_input_fn(ncf_dataset):
deserialize = make_deserialize(params, batch_size, False)
dataset = dataset.map(deserialize, num_parallel_calls=4)
dataset = dataset.prefetch(16)
if params.get("hash_pipeline"):
hash_pipeline(dataset, ncf_dataset.deterministic)
return dataset.prefetch(16)
return dataset
return input_fn
......@@ -85,14 +85,14 @@ class BaseTest(tf.test.TestCase):
# construct_cache()
ncf_dataset = data_preprocessing.construct_cache(
dataset=DATASET, data_dir=self.temp_data_dir, num_data_readers=2,
match_mlperf=False)
match_mlperf=False, deterministic=False)
assert ncf_dataset.num_users == NUM_USERS
assert ncf_dataset.num_items == NUM_ITEMS
time.sleep(1) # Ensure we create the next cache in a new directory.
ncf_dataset = data_preprocessing.construct_cache(
dataset=DATASET, data_dir=self.temp_data_dir, num_data_readers=2,
match_mlperf=True)
match_mlperf=True, deterministic=False)
assert ncf_dataset.num_users == NUM_USERS
assert ncf_dataset.num_items == NUM_ITEMS
......@@ -110,16 +110,11 @@ class BaseTest(tf.test.TestCase):
return output
def test_end_to_end(self):
ncf_dataset = data_preprocessing.instantiate_pipeline(
ncf_dataset, _ = data_preprocessing.instantiate_pipeline(
dataset=DATASET, data_dir=self.temp_data_dir,
batch_size=BATCH_SIZE, eval_batch_size=BATCH_SIZE, num_data_readers=2,
num_neg=NUM_NEG)
for _ in range(30):
if tf.gfile.Exists(ncf_dataset.cache_paths.subproc_alive):
break
time.sleep(1) # allow `alive` file to be written
g = tf.Graph()
with g.as_default():
input_fn, record_dir, batch_count = \
......
......@@ -247,6 +247,8 @@ def construct_estimator(num_gpus, model_dir, params, batch_size,
zone=params["tpu_zone"],
project=params["tpu_gcp_project"],
)
tf.logging.info("Issuing reset command to TPU to ensure a clean state.")
tf.Session.reset(tpu_cluster_resolver.get_master())
tpu_config = tf.contrib.tpu.TPUConfig(
iterations_per_loop=100,
......@@ -297,22 +299,28 @@ def run_ncf(_):
if FLAGS.download_if_missing:
movielens.download(FLAGS.dataset, FLAGS.data_dir)
if FLAGS.seed is not None:
np.random.seed(FLAGS.seed)
num_gpus = flags_core.get_num_gpus(FLAGS)
batch_size = distribution_utils.per_device_batch_size(
int(FLAGS.batch_size), num_gpus)
eval_batch_size = int(FLAGS.eval_batch_size or FLAGS.batch_size)
ncf_dataset = data_preprocessing.instantiate_pipeline(
ncf_dataset, cleanup_fn = data_preprocessing.instantiate_pipeline(
dataset=FLAGS.dataset, data_dir=FLAGS.data_dir,
batch_size=batch_size,
eval_batch_size=eval_batch_size,
num_neg=FLAGS.num_neg,
epochs_per_cycle=FLAGS.epochs_between_evals,
match_mlperf=FLAGS.ml_perf)
match_mlperf=FLAGS.ml_perf,
deterministic=FLAGS.seed is not None)
model_helpers.apply_clean(flags.FLAGS)
train_estimator, eval_estimator = construct_estimator(
num_gpus=num_gpus, model_dir=FLAGS.model_dir, params={
"use_seed": FLAGS.seed is not None,
"hash_pipeline": FLAGS.hash_pipeline,
"batch_size": batch_size,
"learning_rate": FLAGS.learning_rate,
"num_users": ncf_dataset.num_users,
......@@ -365,6 +373,7 @@ def run_ncf(_):
tf.logging.warning(
"Estimated ({}) and reported ({}) number of batches differ by more "
"than one".format(approx_train_steps, batch_count))
train_estimator.train(input_fn=train_input_fn, hooks=train_hooks,
steps=batch_count)
tf.gfile.DeleteRecursively(train_record_dir)
......@@ -390,6 +399,8 @@ def run_ncf(_):
if model_helpers.past_stop_threshold(FLAGS.hr_threshold, hr):
break
cleanup_fn() # Cleanup data construction artifacts and subprocess.
# Clear the session explicitly to avoid session delete error
tf.keras.backend.clear_session()
......@@ -496,6 +507,17 @@ def define_ncf_flags():
"which performs better due to the fact the sorting algorithms are "
"not stable."))
flags.DEFINE_integer(
name="seed", default=None, help=flags_core.help_wrap(
"This value will be used to seed both NumPy and TensorFlow."))
flags.DEFINE_bool(
name="hash_pipeline", default=False, help=flags_core.help_wrap(
"This flag will perform a separate run of the pipeline and hash "
"batches as they are produced. \nNOTE: this will significantly slow "
"training. However it is useful to confirm that a random seed is "
"does indeed make the data pipeline deterministic."))
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
......
......@@ -40,10 +40,14 @@ from six.moves import xrange # pylint: disable=redefined-builtin
import tensorflow as tf
from official.datasets import movielens # pylint: disable=g-bad-import-order
from official.recommendation import stat_utils
def neumf_model_fn(features, labels, mode, params):
"""Model Function for NeuMF estimator."""
if params.get("use_seed"):
tf.set_random_seed(stat_utils.random_int32())
users = features[movielens.USER_COLUMN]
items = tf.cast(features[movielens.ITEM_COLUMN], tf.int32)
......
......@@ -42,21 +42,27 @@ do
echo "Beginning run ${i}"
echo " Complete logs are in ${RUN_LOG}"
# Note: The hit rate threshold has been set to 0.62 rather than the MLPerf 0.635
# The reason why the TF implementation does not reach 0.635 is still unknown.
# To reduce variation set the seed flag:
# --seed ${i}
#
# And to confirm that the pipeline is deterministic pass the flag:
# --hash_pipeline
#
# (`--hash_pipeline` will slow down training)
python ncf_main.py --model_dir ${MODEL_DIR} \
--data_dir ${DATA_DIR} \
--dataset ${DATASET} --hooks "" \
${DEVICE_FLAG} \
--clean \
--train_epochs 100 \
--train_epochs 20 \
--batch_size 2048 \
--eval_batch_size 65536 \
--learning_rate 0.0005 \
--layers 256,256,128,64 --num_factors 64 \
--hr_threshold 0.62 \
--hr_threshold 0.635 \
--ml_perf \
|& tee ${RUN_LOG} \
| grep --line-buffered -E --regexp="Iteration [0-9]+: HR = [0-9\.]+, NDCG = [0-9\.]+"
| grep --line-buffered -E --regexp="(Iteration [0-9]+: HR = [0-9\.]+, NDCG = [0-9\.]+)|(pipeline_hash)"
END_TIME=$(date +%s)
echo "Run ${i} complete: $(( $END_TIME - $START_TIME )) seconds."
......
......@@ -8,7 +8,23 @@ See the following papers for more background:
[2] [Identity Mappings in Deep Residual Networks](https://arxiv.org/pdf/1603.05027.pdf) by Kaiming He, Xiangyu Zhang, Shaoqing Ren, and Jian Sun, Jul 2016.
In code v1 refers to the resnet defined in [1], while v2 correspondingly refers to [2]. The principle difference between the two versions is that v1 applies batch normalization and activation after convolution, while v2 applies batch normalization, then activation, and finally convolution. A schematic comparison is presented in Figure 1 (left) of [2].
In code, v1 refers to the ResNet defined in [1] but where a stride 2 is used on
the 3x3 conv rather than the first 1x1 in the bottleneck. This change results
in higher and more stable accuracy with less epochs than the original v1 and has
shown to scale to higher batch sizes with minimal degradation in accuracy.
There is no originating paper and the first mention we are aware of was in the
[torch version of ResNetv1](https://github.com/facebook/fb.resnet.torch). Most
popular v1 implementations are this implementation which we call ResNetv1.5. In
testing we found v1.5 requires ~12% more compute to train and has 6% reduced
throughput for inference compared to ResNetv1. Comparing the v1 model to the
v1.5 model, which has happened in blog posts, is an apples-to-oranges
comparison especially in regards to hardware or platform performance. CIFAR-10
ResNet does not use the bottleneck and is not impacted by these nuances.
v2 refers to [2]. The principle difference between the two versions is that v1
applies batch normalization and activation after convolution, while v2 applies
batch normalization, then activation, and finally convolution. A schematic
comparison is presented in Figure 1 (left) of [2].
Please proceed according to which dataset you would like to train/evaluate on:
......
......@@ -66,7 +66,7 @@ def get_filenames(is_training, data_dir):
return [os.path.join(data_dir, 'test_batch.bin')]
def parse_record(raw_record, is_training):
def parse_record(raw_record, is_training, dtype):
"""Parse CIFAR-10 image and label from a raw record."""
# Convert bytes to a vector of uint8 that is record_bytes long.
record_vector = tf.decode_raw(raw_record, tf.uint8)
......@@ -85,6 +85,7 @@ def parse_record(raw_record, is_training):
image = tf.cast(tf.transpose(depth_major, [1, 2, 0]), tf.float32)
image = preprocess_image(image, is_training)
image = tf.cast(image, dtype)
return image, label
......@@ -107,8 +108,9 @@ def preprocess_image(image, is_training):
return image
def input_fn(is_training, data_dir, batch_size, num_epochs=1, num_gpus=None):
"""Input_fn using the tf.data input pipeline for CIFAR-10 dataset.
def input_fn(is_training, data_dir, batch_size, num_epochs=1, num_gpus=None,
dtype=tf.float32):
"""Input function which provides batches for train or eval.
Args:
is_training: A boolean denoting whether the input is for training.
......@@ -116,6 +118,7 @@ def input_fn(is_training, data_dir, batch_size, num_epochs=1, num_gpus=None):
batch_size: The number of samples per batch.
num_epochs: The number of epochs to repeat the dataset.
num_gpus: The number of gpus used for training.
dtype: Data type to use for images/features
Returns:
A dataset that can be used for iteration.
......@@ -131,13 +134,14 @@ def input_fn(is_training, data_dir, batch_size, num_epochs=1, num_gpus=None):
parse_record_fn=parse_record,
num_epochs=num_epochs,
num_gpus=num_gpus,
examples_per_epoch=_NUM_IMAGES['train'] if is_training else None
examples_per_epoch=_NUM_IMAGES['train'] if is_training else None,
dtype=dtype
)
def get_synth_input_fn():
def get_synth_input_fn(dtype):
return resnet_run_loop.get_synth_input_fn(
_HEIGHT, _WIDTH, _NUM_CHANNELS, _NUM_CLASSES)
_HEIGHT, _WIDTH, _NUM_CHANNELS, _NUM_CLASSES, dtype=dtype)
###############################################################################
......@@ -180,7 +184,6 @@ class Cifar10Model(resnet_model.Model):
first_pool_stride=None,
block_sizes=[num_blocks] * 3,
block_strides=[1, 2, 2],
final_size=64,
resnet_version=resnet_version,
data_format=data_format,
dtype=dtype
......@@ -243,8 +246,9 @@ def run_cifar(flags_obj):
Args:
flags_obj: An object containing parsed flag values.
"""
input_function = (flags_obj.use_synthetic_data and get_synth_input_fn()
or input_fn)
input_function = (flags_obj.use_synthetic_data and
get_synth_input_fn(flags_core.get_tf_dtype(flags_obj)) or
input_fn)
resnet_run_loop.resnet_main(
flags_obj, cifar10_model_fn, input_function, DATASET_NAME,
shape=[_HEIGHT, _WIDTH, _NUM_CHANNELS])
......
......@@ -61,7 +61,7 @@ class BaseTest(tf.test.TestCase):
fake_dataset = tf.data.FixedLengthRecordDataset(
filename, cifar10_main._RECORD_BYTES) # pylint: disable=protected-access
fake_dataset = fake_dataset.map(
lambda val: cifar10_main.parse_record(val, False))
lambda val: cifar10_main.parse_record(val, False, tf.float32))
image, label = fake_dataset.make_one_shot_iterator().get_next()
self.assertAllEqual(label.shape, ())
......@@ -77,9 +77,9 @@ class BaseTest(tf.test.TestCase):
self.assertAllClose(pixel, np.array([-1.225, 0., 1.225]), rtol=1e-3)
def cifar10_model_fn_helper(self, mode, resnet_version, dtype):
input_fn = cifar10_main.get_synth_input_fn()
input_fn = cifar10_main.get_synth_input_fn(dtype)
dataset = input_fn(True, '', _BATCH_SIZE)
iterator = dataset.make_one_shot_iterator()
iterator = dataset.make_initializable_iterator()
features, labels = iterator.get_next()
spec = cifar10_main.cifar10_model_fn(
features, labels, mode, {
......
......@@ -129,7 +129,7 @@ def _parse_example_proto(example_serialized):
return features['image/encoded'], label, bbox
def parse_record(raw_record, is_training):
def parse_record(raw_record, is_training, dtype):
"""Parses a record containing a training example of an image.
The input record is parsed into a label and image, and the image is passed
......@@ -139,6 +139,7 @@ def parse_record(raw_record, is_training):
raw_record: scalar Tensor tf.string containing a serialized
Example protocol buffer.
is_training: A boolean denoting whether the input is for training.
dtype: data type to use for images/features.
Returns:
Tuple with processed image tensor and one-hot-encoded label tensor.
......@@ -152,11 +153,13 @@ def parse_record(raw_record, is_training):
output_width=_DEFAULT_IMAGE_SIZE,
num_channels=_NUM_CHANNELS,
is_training=is_training)
image = tf.cast(image, dtype)
return image, label
def input_fn(is_training, data_dir, batch_size, num_epochs=1, num_gpus=None):
def input_fn(is_training, data_dir, batch_size, num_epochs=1, num_gpus=None,
dtype=tf.float32):
"""Input function which provides batches for train or eval.
Args:
......@@ -165,6 +168,7 @@ def input_fn(is_training, data_dir, batch_size, num_epochs=1, num_gpus=None):
batch_size: The number of samples per batch.
num_epochs: The number of epochs to repeat the dataset.
num_gpus: The number of gpus used for training.
dtype: Data type to use for images/features
Returns:
A dataset that can be used for iteration.
......@@ -192,13 +196,15 @@ def input_fn(is_training, data_dir, batch_size, num_epochs=1, num_gpus=None):
parse_record_fn=parse_record,
num_epochs=num_epochs,
num_gpus=num_gpus,
examples_per_epoch=_NUM_IMAGES['train'] if is_training else None
examples_per_epoch=_NUM_IMAGES['train'] if is_training else None,
dtype=dtype
)
def get_synth_input_fn():
def get_synth_input_fn(dtype):
return resnet_run_loop.get_synth_input_fn(
_DEFAULT_IMAGE_SIZE, _DEFAULT_IMAGE_SIZE, _NUM_CHANNELS, _NUM_CLASSES)
_DEFAULT_IMAGE_SIZE, _DEFAULT_IMAGE_SIZE, _NUM_CHANNELS, _NUM_CLASSES,
dtype=dtype)
###############################################################################
......@@ -226,10 +232,8 @@ class ImagenetModel(resnet_model.Model):
# For bigger models, we want to use "bottleneck" layers
if resnet_size < 50:
bottleneck = False
final_size = 512
else:
bottleneck = True
final_size = 2048
super(ImagenetModel, self).__init__(
resnet_size=resnet_size,
......@@ -242,7 +246,6 @@ class ImagenetModel(resnet_model.Model):
first_pool_stride=2,
block_sizes=_get_block_sizes(resnet_size),
block_strides=[1, 2, 2, 2],
final_size=final_size,
resnet_version=resnet_version,
data_format=data_format,
dtype=dtype
......@@ -322,7 +325,7 @@ def define_imagenet_flags():
resnet_run_loop.define_resnet_flags(
resnet_size_choices=['18', '34', '50', '101', '152', '200'])
flags.adopt_module_key_flags(resnet_run_loop)
flags_core.set_defaults(train_epochs=100)
flags_core.set_defaults(train_epochs=90)
def run_imagenet(flags_obj):
......@@ -331,8 +334,9 @@ def run_imagenet(flags_obj):
Args:
flags_obj: An object containing parsed flag values.
"""
input_function = (flags_obj.use_synthetic_data and get_synth_input_fn()
or input_fn)
input_function = (flags_obj.use_synthetic_data and
get_synth_input_fn(flags_core.get_tf_dtype(flags_obj)) or
input_fn)
resnet_run_loop.resnet_main(
flags_obj, imagenet_model_fn, input_function, DATASET_NAME,
......
......@@ -191,9 +191,9 @@ class BaseTest(tf.test.TestCase):
"""Tests that the EstimatorSpec is given the appropriate arguments."""
tf.train.create_global_step()
input_fn = imagenet_main.get_synth_input_fn()
input_fn = imagenet_main.get_synth_input_fn(dtype)
dataset = input_fn(True, '', _BATCH_SIZE)
iterator = dataset.make_one_shot_iterator()
iterator = dataset.make_initializable_iterator()
features, labels = iterator.get_next()
spec = imagenet_main.imagenet_model_fn(
features, labels, mode, {
......
......@@ -354,7 +354,7 @@ class Model(object):
kernel_size,
conv_stride, first_pool_size, first_pool_stride,
block_sizes, block_strides,
final_size, resnet_version=DEFAULT_VERSION, data_format=None,
resnet_version=DEFAULT_VERSION, data_format=None,
dtype=DEFAULT_DTYPE):
"""Creates a model for classifying an image.
......@@ -376,7 +376,6 @@ class Model(object):
i-th set.
block_strides: List of integers representing the desired stride size for
each of the sets of block layers. Should be same length as block_sizes.
final_size: The expected size of the model after the second pooling.
resnet_version: Integer representing which version of the ResNet network
to use. See README for details. Valid values: [1, 2]
data_format: Input format ('channels_last', 'channels_first', or None).
......@@ -422,7 +421,6 @@ class Model(object):
self.first_pool_stride = first_pool_stride
self.block_sizes = block_sizes
self.block_strides = block_strides
self.final_size = final_size
self.dtype = dtype
self.pre_activation = resnet_version == 2
......@@ -542,7 +540,7 @@ class Model(object):
inputs = tf.reduce_mean(inputs, axes, keepdims=True)
inputs = tf.identity(inputs, 'final_reduce_mean')
inputs = tf.reshape(inputs, [-1, self.final_size])
inputs = tf.squeeze(inputs, axes)
inputs = tf.layers.dense(inputs=inputs, units=self.num_classes)
inputs = tf.identity(inputs, 'final_dense')
return inputs
......@@ -45,7 +45,7 @@ from official.utils.misc import model_helpers
################################################################################
def process_record_dataset(dataset, is_training, batch_size, shuffle_buffer,
parse_record_fn, num_epochs=1, num_gpus=None,
examples_per_epoch=None):
examples_per_epoch=None, dtype=tf.float32):
"""Given a Dataset with raw records, return an iterator over the records.
Args:
......@@ -60,6 +60,7 @@ def process_record_dataset(dataset, is_training, batch_size, shuffle_buffer,
num_epochs: The number of epochs to repeat the dataset.
num_gpus: The number of gpus used for training.
examples_per_epoch: The number of examples in an epoch.
dtype: Data type to use for images/features.
Returns:
Dataset of (image, label) pairs ready for iteration.
......@@ -92,7 +93,7 @@ def process_record_dataset(dataset, is_training, batch_size, shuffle_buffer,
# batch_size is almost always much greater than the number of CPU cores.
dataset = dataset.apply(
tf.contrib.data.map_and_batch(
lambda value: parse_record_fn(value, is_training),
lambda value: parse_record_fn(value, is_training, dtype),
batch_size=batch_size,
num_parallel_batches=1,
drop_remainder=False))
......@@ -108,11 +109,14 @@ def process_record_dataset(dataset, is_training, batch_size, shuffle_buffer,
return dataset
def get_synth_input_fn(height, width, num_channels, num_classes):
"""Returns an input function that returns a dataset with zeroes.
def get_synth_input_fn(height, width, num_channels, num_classes,
dtype=tf.float32):
"""Returns an input function that returns a dataset with random data.
This is useful in debugging input pipeline performance, as it removes all
elements of file reading and image preprocessing.
This input_fn returns a data set that iterates over a set of random data and
bypasses all preprocessing, e.g. jpeg decode and copy. The host to device
copy is still included. This used to find the upper throughput bound when
tunning the full input pipeline.
Args:
height: Integer height that will be used to create a fake image tensor.
......@@ -120,17 +124,32 @@ def get_synth_input_fn(height, width, num_channels, num_classes):
num_channels: Integer depth that will be used to create a fake image tensor.
num_classes: Number of classes that should be represented in the fake labels
tensor
dtype: Data type for features/images.
Returns:
An input_fn that can be used in place of a real one to return a dataset
that can be used for iteration.
"""
def input_fn(is_training, data_dir, batch_size, *args, **kwargs): # pylint: disable=unused-argument
return model_helpers.generate_synthetic_data(
input_shape=tf.TensorShape([batch_size, height, width, num_channels]),
input_dtype=tf.float32,
label_shape=tf.TensorShape([batch_size]),
label_dtype=tf.int32)
# pylint: disable=unused-argument
def input_fn(is_training, data_dir, batch_size, *args, **kwargs):
"""Returns dataset filled with random data."""
# Synthetic input should be within [0, 255].
inputs = tf.truncated_normal(
[batch_size] + [height, width, num_channels],
dtype=dtype,
mean=127,
stddev=60,
name='synthetic_inputs')
labels = tf.random_uniform(
[batch_size],
minval=0,
maxval=num_classes - 1,
dtype=tf.int32,
name='synthetic_labels')
data = tf.data.Dataset.from_tensors((inputs, labels)).repeat()
data = data.prefetch(buffer_size=tf.contrib.data.AUTOTUNE)
return data
return input_fn
......@@ -230,8 +249,8 @@ def resnet_model_fn(features, labels, mode, model_class,
# Generate a summary node for the images
tf.summary.image('images', features, max_outputs=6)
features = tf.cast(features, dtype)
# Checks that features/images have same data type being used for calculations.
assert features.dtype == dtype
model = model_class(resnet_size, data_format, resnet_version=resnet_version,
dtype=dtype)
......@@ -436,14 +455,16 @@ def resnet_main(
batch_size=distribution_utils.per_device_batch_size(
flags_obj.batch_size, flags_core.get_num_gpus(flags_obj)),
num_epochs=num_epochs,
num_gpus=flags_core.get_num_gpus(flags_obj))
num_gpus=flags_core.get_num_gpus(flags_obj),
dtype=flags_core.get_tf_dtype(flags_obj))
def input_fn_eval():
return input_function(
is_training=False, data_dir=flags_obj.data_dir,
batch_size=distribution_utils.per_device_batch_size(
flags_obj.batch_size, flags_core.get_num_gpus(flags_obj)),
num_epochs=1)
num_epochs=1,
dtype=flags_core.get_tf_dtype(flags_obj))
if flags_obj.eval_only or not flags_obj.train_epochs:
# If --eval_only is set, perform a single loop with zero train epochs.
......@@ -501,7 +522,7 @@ def define_resnet_flags(resnet_size_choices=None):
flags.adopt_module_key_flags(flags_core)
flags.DEFINE_enum(
name='resnet_version', short_name='rv', default='2',
name='resnet_version', short_name='rv', default='1',
enum_values=['1', '2'],
help=flags_core.help_wrap(
'Version of ResNet. (1 or 2) See README.md for details.'))
......@@ -515,7 +536,7 @@ def define_resnet_flags(resnet_size_choices=None):
'If not None initialize all the network except the final layer with '
'these values'))
flags.DEFINE_boolean(
name="eval_only", default=False,
name='eval_only', default=False,
help=flags_core.help_wrap('Skip training and only perform evaluation on '
'the latest checkpoint.'))
......
......@@ -291,12 +291,11 @@ class BenchmarkBigQueryLogger(BaseBenchmarkLogger):
RUN_STATUS_RUNNING))
def on_finish(self, status):
thread.start_new_thread(
self._bigquery_uploader.update_run_status,
(self._bigquery_data_set,
self._bigquery_uploader.update_run_status(
self._bigquery_data_set,
self._bigquery_run_status_table,
self._run_id,
status))
status)
def _gather_run_info(model_name, dataset_name, run_params, test_id):
......@@ -308,11 +307,14 @@ def _gather_run_info(model_name, dataset_name, run_params, test_id):
"test_id": test_id,
"run_date": datetime.datetime.utcnow().strftime(
_DATE_TIME_FORMAT_PATTERN)}
session_config = None
if "session_config" in run_params:
session_config = run_params["session_config"]
_collect_tensorflow_info(run_info)
_collect_tensorflow_environment_variables(run_info)
_collect_run_params(run_info, run_params)
_collect_cpu_info(run_info)
_collect_gpu_info(run_info)
_collect_gpu_info(run_info, session_config)
_collect_memory_info(run_info)
_collect_test_environment(run_info)
return run_info
......@@ -386,10 +388,10 @@ def _collect_cpu_info(run_info):
tf.logging.warn("'cpuinfo' not imported. CPU info will not be logged.")
def _collect_gpu_info(run_info):
def _collect_gpu_info(run_info, session_config=None):
"""Collect local GPU information by TF device library."""
gpu_info = {}
local_device_protos = device_lib.list_local_devices()
local_device_protos = device_lib.list_local_devices(session_config)
gpu_info["count"] = len([d for d in local_device_protos
if d.device_type == "GPU"])
......
......@@ -22,6 +22,8 @@ request.
for visual navigation.
- [compression](compression): compressing and decompressing images using a
pre-trained Residual GRU network.
- [cvt_text](cvt_text): semi-supervised sequence learning with cross-view
training.
- [deep_contextual_bandits](deep_contextual_bandits): code for a variety of contextual bandits algorithms using deep neural networks and Thompson sampling.
- [deep_speech](deep_speech): automatic speech recognition.
- [deeplab](deeplab): deep labeling for semantic image segmentation.
......@@ -29,6 +31,8 @@ request.
- [differential_privacy](differential_privacy): differential privacy for training
data.
- [domain_adaptation](domain_adaptation): domain separation networks.
- [fivo](fivo): filtering variational objectives for training generative
sequence models.
- [gan](gan): generative adversarial networks.
- [im2txt](im2txt): image-to-text neural network for image captioning.
- [inception](inception): deep convolutional networks for computer vision.
......
......@@ -347,12 +347,10 @@ class Worker(threading.Thread):
value_loss = advantage ** 2
# Calculate our policy loss
actions_one_hot = tf.one_hot(memory.actions, self.action_size, dtype=tf.float32)
policy = tf.nn.softmax(logits)
entropy = tf.reduce_sum(policy * tf.log(policy + 1e-20), axis=1)
entropy = tf.nn.softmax_cross_entropy_with_logits_v2(labels=policy, logits=logits)
policy_loss = tf.nn.softmax_cross_entropy_with_logits_v2(labels=actions_one_hot,
policy_loss = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=memory.actions,
logits=logits)
policy_loss *= tf.stop_gradient(advantage)
policy_loss -= 0.01 * entropy
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment