Unverified Commit 4dc1080d authored by Taylor Robie's avatar Taylor Robie Committed by GitHub
Browse files

Fix/ncf mlperf tweaks: robustness and determinism (#5334)

* bug fixes and add seed

* more random corrections

* make cleanup more robust

* return cleanup fn

* delint and address PR comments.

* delint and fix tests

* delinting is never done

* add pipeline hashing

* delint
parent 903194c5
......@@ -160,7 +160,8 @@ def _construct_training_records(
train_batch_size, # type: int
training_shards, # type: typing.List[str]
spillover, # type: bool
carryover=None # type: typing.Union[typing.List[np.ndarray], None]
carryover=None, # type: typing.Union[typing.List[np.ndarray], None]
deterministic=False # type: bool
):
"""Generate false negatives and write TFRecords files.
......@@ -204,7 +205,8 @@ def _construct_training_records(
with contextlib.closing(multiprocessing.Pool(
processes=num_workers, initializer=init_worker)) as pool:
data_generator = pool.imap_unordered(_process_shard, map_args) # pylint: disable=no-member
map_fn = pool.imap if deterministic else pool.imap_unordered # pylint: disable=no-member
data_generator = map_fn(_process_shard, map_args)
data = [
np.zeros(shape=(num_pts,), dtype=np.int32) - 1,
np.zeros(shape=(num_pts,), dtype=np.uint16),
......@@ -339,16 +341,31 @@ def _construct_eval_record(cache_paths, eval_batch_size):
log_msg("Eval TFRecords file successfully constructed.")
def _generation_loop(
num_workers, cache_paths, num_readers, num_neg, num_train_positives,
num_items, spillover, epochs_per_cycle, train_batch_size, eval_batch_size):
# type: (int, rconst.Paths, int, int, int, int, bool, int, int, int) -> None
def _generation_loop(num_workers, # type: int
cache_paths, # type: rconst.Paths
num_readers, # type: int
num_neg, # type: int
num_train_positives, # type: int
num_items, # type: int
spillover, # type: bool
epochs_per_cycle, # type: int
train_batch_size, # type: int
eval_batch_size, # type: int
deterministic # type: bool
):
# type: (...) -> None
"""Primary run loop for data file generation."""
log_msg("Signaling that I am alive.")
with tf.gfile.Open(cache_paths.subproc_alive, "w") as f:
f.write("Generation subproc has started.")
atexit.register(tf.gfile.Remove, filename=cache_paths.subproc_alive)
@atexit.register
def remove_alive_file():
try:
tf.gfile.Remove(cache_paths.subproc_alive)
except tf.errors.NotFoundError:
return # Main thread has already deleted the entire cache dir.
log_msg("Entering generation loop.")
tf.gfile.MakeDirs(cache_paths.train_epoch_dir)
......@@ -364,7 +381,8 @@ def _generation_loop(
cache_paths=cache_paths, num_readers=num_readers, num_neg=num_neg,
num_train_positives=num_train_positives, num_items=num_items,
epochs_per_cycle=epochs_per_cycle, train_batch_size=train_batch_size,
training_shards=training_shards, spillover=spillover, carryover=None)
training_shards=training_shards, spillover=spillover, carryover=None,
deterministic=deterministic)
_construct_eval_record(cache_paths=cache_paths,
eval_batch_size=eval_batch_size)
......@@ -397,7 +415,7 @@ def _generation_loop(
num_train_positives=num_train_positives, num_items=num_items,
epochs_per_cycle=epochs_per_cycle, train_batch_size=train_batch_size,
training_shards=training_shards, spillover=spillover,
carryover=carryover)
carryover=carryover, deterministic=deterministic)
wait_count = 0
start_time = time.time()
......@@ -441,6 +459,7 @@ def main(_):
epochs_per_cycle=flags.FLAGS.epochs_per_cycle,
train_batch_size=flags.FLAGS.train_batch_size,
eval_batch_size=flags.FLAGS.eval_batch_size,
deterministic=flags.FLAGS.seed is not None,
)
except KeyboardInterrupt:
log_msg("KeyboardInterrupt registered.")
......
......@@ -21,6 +21,7 @@ from __future__ import print_function
import atexit
import contextlib
import gc
import hashlib
import multiprocessing
import json
import os
......@@ -50,7 +51,7 @@ class NCFDataset(object):
"""Container for training and testing data."""
def __init__(self, user_map, item_map, num_data_readers, cache_paths,
num_train_positives):
num_train_positives, deterministic=False):
# type: (dict, dict, int, rconst.Paths) -> None
"""Assign key values for recommendation dataset.
......@@ -61,6 +62,8 @@ class NCFDataset(object):
cache_paths: Object containing locations for various cache files.
num_train_positives: The number of positive training examples in the
dataset.
deterministic: Operations should use deterministic, order preserving
methods, even at the cost of performance.
"""
self.user_map = {int(k): int(v) for k, v in user_map.items()}
......@@ -70,6 +73,7 @@ class NCFDataset(object):
self.num_data_readers = num_data_readers
self.cache_paths = cache_paths
self.num_train_positives = num_train_positives
self.deterministic = deterministic
def _filter_index_sort(raw_rating_path, match_mlperf):
......@@ -340,7 +344,8 @@ def generate_train_eval_data(df, approx_num_shards, num_items, cache_paths,
pickle.dump(eval_data, f, protocol=pickle.HIGHEST_PROTOCOL)
def construct_cache(dataset, data_dir, num_data_readers, match_mlperf):
def construct_cache(dataset, data_dir, num_data_readers, match_mlperf,
deterministic):
# type: (str, str, int, bool) -> NCFDataset
"""Load and digest data CSV into a usable form.
......@@ -351,6 +356,8 @@ def construct_cache(dataset, data_dir, num_data_readers, match_mlperf):
data during training.
match_mlperf: If True, change the behavior of the cache construction to
match the MLPerf reference implementation.
deterministic: Try to enforce repeatable behavior, even at the cost of
performance.
"""
cache_paths = rconst.Paths(data_dir=data_dir)
num_data_readers = (num_data_readers or int(multiprocessing.cpu_count() / 2)
......@@ -377,7 +384,8 @@ def construct_cache(dataset, data_dir, num_data_readers, match_mlperf):
ncf_dataset = NCFDataset(user_map=user_map, item_map=item_map,
num_data_readers=num_data_readers,
cache_paths=cache_paths,
num_train_positives=len(df) - len(user_map))
num_train_positives=len(df) - len(user_map),
deterministic=deterministic)
run_time = timeit.default_timer() - st
tf.logging.info("Cache construction complete. Time: {:.1f} sec."
......@@ -403,13 +411,15 @@ def _shutdown(proc):
def instantiate_pipeline(dataset, data_dir, batch_size, eval_batch_size,
num_data_readers=None, num_neg=4, epochs_per_cycle=1,
match_mlperf=False):
match_mlperf=False, deterministic=False):
# type: (...) -> (NCFDataset, typing.Callable)
"""Preprocess data and start negative generation subprocess."""
tf.logging.info("Beginning data preprocessing.")
ncf_dataset = construct_cache(dataset=dataset, data_dir=data_dir,
num_data_readers=num_data_readers,
match_mlperf=match_mlperf)
match_mlperf=match_mlperf,
deterministic=deterministic)
tf.logging.info("Creating training file subprocess.")
......@@ -439,18 +449,30 @@ def instantiate_pipeline(dataset, data_dir, batch_size, eval_batch_size,
# guarantee batch size and significantly improves
# performance. (~5% increase in examples/sec on
# GPU, and needed for TPU XLA.)
"--redirect_logs", "True",
"--seed", str(int(stat_utils.random_int32()))
"--redirect_logs", "True"
]
if ncf_dataset.deterministic:
subproc_args.extend(["--seed", str(int(stat_utils.random_int32()))])
tf.logging.info(
"Generation subprocess command: {}".format(" ".join(subproc_args)))
proc = subprocess.Popen(args=subproc_args, shell=False, env=subproc_env)
atexit.register(_shutdown, proc=proc)
atexit.register(tf.gfile.DeleteRecursively,
ncf_dataset.cache_paths.cache_root)
cleanup_called = {"finished": False}
@atexit.register
def cleanup():
"""Remove files and subprocess from data generation."""
if cleanup_called["finished"]:
return
_shutdown(proc)
try:
tf.gfile.DeleteRecursively(ncf_dataset.cache_paths.cache_root)
except tf.errors.NotFoundError:
pass
cleanup_called["finished"] = True
for _ in range(300):
if tf.gfile.Exists(ncf_dataset.cache_paths.subproc_alive):
......@@ -460,7 +482,7 @@ def instantiate_pipeline(dataset, data_dir, batch_size, eval_batch_size,
raise ValueError("Generation subprocess did not start correctly. Data will "
"not be available; exiting to avoid waiting forever.")
return ncf_dataset
return ncf_dataset, cleanup
def make_deserialize(params, batch_size, training=False):
......@@ -498,6 +520,44 @@ def make_deserialize(params, batch_size, training=False):
return deserialize
def hash_pipeline(dataset, deterministic):
# type: (tf.data.Dataset, bool) -> None
"""Utility function for detecting non-determinism in the data pipeline.
Args:
dataset: a tf.data.Dataset generated by the input_fn
deterministic: Does the input_fn expect the dataset to be deterministic.
(i.e. fixed seed, sloppy=False, etc.)
"""
if not deterministic:
tf.logging.warning("Data pipeline is not marked as deterministic. Hash "
"values are not expected to be meaningful.")
batch = dataset.make_one_shot_iterator().get_next()
md5 = hashlib.md5()
count = 0
first_batch_hash = b""
with tf.Session() as sess:
while True:
try:
result = sess.run(batch)
if isinstance(result, tuple):
result = result[0] # only hash features
except tf.errors.OutOfRangeError:
break
count += 1
md5.update(memoryview(result[movielens.USER_COLUMN]).tobytes())
md5.update(memoryview(result[movielens.ITEM_COLUMN]).tobytes())
if count == 1:
first_batch_hash = md5.hexdigest()
overall_hash = md5.hexdigest()
tf.logging.info("Batch count: {}".format(count))
tf.logging.info(" [pipeline_hash] First batch hash: {}".format(
first_batch_hash))
tf.logging.info(" [pipeline_hash] All batches hash: {}".format(overall_hash))
def make_train_input_fn(ncf_dataset):
# type: (NCFDataset) -> (typing.Callable, str, int)
"""Construct training input_fn for the current epoch."""
......@@ -556,14 +616,19 @@ def make_train_input_fn(ncf_dataset):
tf.data.TFRecordDataset,
cycle_length=4,
block_length=100000,
sloppy=True,
sloppy=not ncf_dataset.deterministic,
prefetch_input_elements=4,
)
deserialize = make_deserialize(params, batch_size, True)
dataset = record_files.apply(interleave)
dataset = dataset.map(deserialize, num_parallel_calls=4)
return dataset.prefetch(32)
dataset = dataset.prefetch(32)
if params.get("hash_pipeline"):
hash_pipeline(dataset, ncf_dataset.deterministic)
return dataset
return input_fn, record_dir, batch_count
......@@ -588,7 +653,11 @@ def make_pred_input_fn(ncf_dataset):
deserialize = make_deserialize(params, batch_size, False)
dataset = dataset.map(deserialize, num_parallel_calls=4)
dataset = dataset.prefetch(16)
if params.get("hash_pipeline"):
hash_pipeline(dataset, ncf_dataset.deterministic)
return dataset.prefetch(16)
return dataset
return input_fn
......@@ -85,14 +85,14 @@ class BaseTest(tf.test.TestCase):
# construct_cache()
ncf_dataset = data_preprocessing.construct_cache(
dataset=DATASET, data_dir=self.temp_data_dir, num_data_readers=2,
match_mlperf=False)
match_mlperf=False, deterministic=False)
assert ncf_dataset.num_users == NUM_USERS
assert ncf_dataset.num_items == NUM_ITEMS
time.sleep(1) # Ensure we create the next cache in a new directory.
ncf_dataset = data_preprocessing.construct_cache(
dataset=DATASET, data_dir=self.temp_data_dir, num_data_readers=2,
match_mlperf=True)
match_mlperf=True, deterministic=False)
assert ncf_dataset.num_users == NUM_USERS
assert ncf_dataset.num_items == NUM_ITEMS
......@@ -110,7 +110,7 @@ class BaseTest(tf.test.TestCase):
return output
def test_end_to_end(self):
ncf_dataset = data_preprocessing.instantiate_pipeline(
ncf_dataset, _ = data_preprocessing.instantiate_pipeline(
dataset=DATASET, data_dir=self.temp_data_dir,
batch_size=BATCH_SIZE, eval_batch_size=BATCH_SIZE, num_data_readers=2,
num_neg=NUM_NEG)
......
......@@ -247,6 +247,8 @@ def construct_estimator(num_gpus, model_dir, params, batch_size,
zone=params["tpu_zone"],
project=params["tpu_gcp_project"],
)
tf.logging.info("Issuing reset command to TPU to ensure a clean state.")
tf.Session.reset(tpu_cluster_resolver.get_master())
tpu_config = tf.contrib.tpu.TPUConfig(
iterations_per_loop=100,
......@@ -297,22 +299,28 @@ def run_ncf(_):
if FLAGS.download_if_missing:
movielens.download(FLAGS.dataset, FLAGS.data_dir)
if FLAGS.seed is not None:
np.random.seed(FLAGS.seed)
num_gpus = flags_core.get_num_gpus(FLAGS)
batch_size = distribution_utils.per_device_batch_size(
int(FLAGS.batch_size), num_gpus)
eval_batch_size = int(FLAGS.eval_batch_size or FLAGS.batch_size)
ncf_dataset = data_preprocessing.instantiate_pipeline(
ncf_dataset, cleanup_fn = data_preprocessing.instantiate_pipeline(
dataset=FLAGS.dataset, data_dir=FLAGS.data_dir,
batch_size=batch_size,
eval_batch_size=eval_batch_size,
num_neg=FLAGS.num_neg,
epochs_per_cycle=FLAGS.epochs_between_evals,
match_mlperf=FLAGS.ml_perf)
match_mlperf=FLAGS.ml_perf,
deterministic=FLAGS.seed is not None)
model_helpers.apply_clean(flags.FLAGS)
train_estimator, eval_estimator = construct_estimator(
num_gpus=num_gpus, model_dir=FLAGS.model_dir, params={
"use_seed": FLAGS.seed is not None,
"hash_pipeline": FLAGS.hash_pipeline,
"batch_size": batch_size,
"learning_rate": FLAGS.learning_rate,
"num_users": ncf_dataset.num_users,
......@@ -365,6 +373,7 @@ def run_ncf(_):
tf.logging.warning(
"Estimated ({}) and reported ({}) number of batches differ by more "
"than one".format(approx_train_steps, batch_count))
train_estimator.train(input_fn=train_input_fn, hooks=train_hooks,
steps=batch_count)
tf.gfile.DeleteRecursively(train_record_dir)
......@@ -390,6 +399,8 @@ def run_ncf(_):
if model_helpers.past_stop_threshold(FLAGS.hr_threshold, hr):
break
cleanup_fn() # Cleanup data construction artifacts and subprocess.
# Clear the session explicitly to avoid session delete error
tf.keras.backend.clear_session()
......@@ -496,6 +507,17 @@ def define_ncf_flags():
"which performs better due to the fact the sorting algorithms are "
"not stable."))
flags.DEFINE_integer(
name="seed", default=None, help=flags_core.help_wrap(
"This value will be used to seed both NumPy and TensorFlow."))
flags.DEFINE_bool(
name="hash_pipeline", default=False, help=flags_core.help_wrap(
"This flag will perform a separate run of the pipeline and hash "
"batches as they are produced. \nNOTE: this will significantly slow "
"training. However it is useful to confirm that a random seed is "
"does indeed make the data pipeline deterministic."))
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
......
......@@ -40,10 +40,14 @@ from six.moves import xrange # pylint: disable=redefined-builtin
import tensorflow as tf
from official.datasets import movielens # pylint: disable=g-bad-import-order
from official.recommendation import stat_utils
def neumf_model_fn(features, labels, mode, params):
"""Model Function for NeuMF estimator."""
if params.get("use_seed"):
tf.set_random_seed(stat_utils.random_int32())
users = features[movielens.USER_COLUMN]
items = tf.cast(features[movielens.ITEM_COLUMN], tf.int32)
......
......@@ -42,21 +42,27 @@ do
echo "Beginning run ${i}"
echo " Complete logs are in ${RUN_LOG}"
# Note: The hit rate threshold has been set to 0.62 rather than the MLPerf 0.635
# The reason why the TF implementation does not reach 0.635 is still unknown.
# To reduce variation set the seed flag:
# --seed ${i}
#
# And to confirm that the pipeline is deterministic pass the flag:
# --hash_pipeline
#
# (`--hash_pipeline` will slow down training)
python ncf_main.py --model_dir ${MODEL_DIR} \
--data_dir ${DATA_DIR} \
--dataset ${DATASET} --hooks "" \
${DEVICE_FLAG} \
--clean \
--train_epochs 100 \
--train_epochs 20 \
--batch_size 2048 \
--eval_batch_size 65536 \
--learning_rate 0.0005 \
--layers 256,256,128,64 --num_factors 64 \
--hr_threshold 0.62 \
--hr_threshold 0.635 \
--ml_perf \
|& tee ${RUN_LOG} \
| grep --line-buffered -E --regexp="Iteration [0-9]+: HR = [0-9\.]+, NDCG = [0-9\.]+"
| grep --line-buffered -E --regexp="(Iteration [0-9]+: HR = [0-9\.]+, NDCG = [0-9\.]+)|(pipeline_hash)"
END_TIME=$(date +%s)
echo "Run ${i} complete: $(( $END_TIME - $START_TIME )) seconds."
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment