Commit 75d592e9 authored by Reed's avatar Reed Committed by Taylor Robie
Browse files

Add --use_synthetic_data option to NCF. (#5468)

* Add --use_synthetic_data option to NCF.

* Add comment to _SYNTHETIC_BATCHES_PER_EPOCH

* Fix test

* Hopefully fix lint issue
parent 42f98218
......@@ -48,6 +48,18 @@ from official.recommendation import stat_utils
from official.recommendation import popen_helper
DATASET_TO_NUM_USERS_AND_ITEMS = {
"ml-1m": (6040, 3706),
"ml-20m": (138493, 26744)
}
# Number of batches to run per epoch when using synthetic data. At high batch
# sizes, we run for more batches than with real data, which is good since
# running more batches reduces noise when measuring the average batches/second.
_SYNTHETIC_BATCHES_PER_EPOCH = 2000
class NCFDataset(object):
"""Container for training and testing data."""
......@@ -376,6 +388,14 @@ def construct_cache(dataset, data_dir, num_data_readers, match_mlperf,
raw_rating_path = os.path.join(data_dir, dataset, movielens.RATINGS_FILE)
df, user_map, item_map = _filter_index_sort(raw_rating_path, match_mlperf)
num_users, num_items = DATASET_TO_NUM_USERS_AND_ITEMS[dataset]
if num_users != len(user_map):
raise ValueError("Expected to find {} users, but found {}".format(
num_users, len(user_map)))
if num_items != len(item_map):
raise ValueError("Expected to find {} items, but found {}".format(
num_items, len(item_map)))
generate_train_eval_data(df=df, approx_num_shards=approx_num_shards,
num_items=len(item_map), cache_paths=cache_paths,
......@@ -570,9 +590,12 @@ def hash_pipeline(dataset, deterministic):
def make_train_input_fn(ncf_dataset):
# type: (NCFDataset) -> (typing.Callable, str, int)
# type: (typing.Optional[NCFDataset]) -> (typing.Callable, str, int)
"""Construct training input_fn for the current epoch."""
if ncf_dataset is None:
return make_train_synthetic_input_fn()
if not tf.gfile.Exists(ncf_dataset.cache_paths.subproc_alive):
# The generation subprocess must have been alive at some point, because we
# earlier checked that the subproc_alive file existed.
......@@ -644,10 +667,40 @@ def make_train_input_fn(ncf_dataset):
return input_fn, record_dir, batch_count
def make_train_synthetic_input_fn():
"""Construct training input_fn that uses synthetic data."""
def input_fn(params):
"""Generated input_fn for the given epoch."""
batch_size = params["batch_size"]
num_users = params["num_users"]
num_items = params["num_items"]
users = tf.random_uniform([batch_size], dtype=tf.int32, minval=0,
maxval=num_users)
items = tf.random_uniform([batch_size], dtype=tf.int32, minval=0,
maxval=num_items)
labels = tf.random_uniform([batch_size], dtype=tf.int32, minval=0,
maxval=2)
data = {
movielens.USER_COLUMN: users,
movielens.ITEM_COLUMN: items,
}, labels
dataset = tf.data.Dataset.from_tensors(data).repeat(
_SYNTHETIC_BATCHES_PER_EPOCH)
dataset = dataset.prefetch(32)
return dataset
return input_fn, None, _SYNTHETIC_BATCHES_PER_EPOCH
def make_pred_input_fn(ncf_dataset):
# type: (NCFDataset) -> typing.Callable
# type: (typing.Optional[NCFDataset]) -> typing.Callable
"""Construct input_fn for metric evaluation."""
if ncf_dataset is None:
return make_synthetic_pred_input_fn()
def input_fn(params):
"""Input function based on eval batch size."""
......@@ -672,3 +725,32 @@ def make_pred_input_fn(ncf_dataset):
return dataset
return input_fn
def make_synthetic_pred_input_fn():
"""Construct input_fn for metric evaluation that uses synthetic data."""
def input_fn(params):
"""Generated input_fn for the given epoch."""
batch_size = params["eval_batch_size"]
num_users = params["num_users"]
num_items = params["num_items"]
users = tf.random_uniform([batch_size], dtype=tf.int32, minval=0,
maxval=num_users)
items = tf.random_uniform([batch_size], dtype=tf.int32, minval=0,
maxval=num_items)
dupe_mask = tf.cast(tf.random_uniform([batch_size], dtype=tf.int32,
minval=0, maxval=2), tf.bool)
data = {
movielens.USER_COLUMN: users,
movielens.ITEM_COLUMN: items,
rconst.DUPLICATE_MASK: dupe_mask,
}
dataset = tf.data.Dataset.from_tensors(data).repeat(
_SYNTHETIC_BATCHES_PER_EPOCH)
dataset = dataset.prefetch(16)
return dataset
return input_fn
......@@ -80,6 +80,8 @@ class BaseTest(tf.test.TestCase):
movielens.download = mock_download
movielens.NUM_RATINGS[DATASET] = NUM_PTS
data_preprocessing.DATASET_TO_NUM_USERS_AND_ITEMS[DATASET] = (NUM_USERS,
NUM_ITEMS)
def test_preprocessing(self):
# For the most part the necessary checks are performed within
......
......@@ -118,7 +118,7 @@ def main(_):
def run_ncf(_):
"""Run NCF training and eval loop."""
if FLAGS.download_if_missing:
if FLAGS.download_if_missing and not FLAGS.use_synthetic_data:
movielens.download(FLAGS.dataset, FLAGS.data_dir)
if FLAGS.seed is not None:
......@@ -137,14 +137,25 @@ def run_ncf(_):
"eval examples per user does not evenly divide eval_batch_size. "
"Overriding to {}".format(eval_batch_size))
ncf_dataset, cleanup_fn = data_preprocessing.instantiate_pipeline(
dataset=FLAGS.dataset, data_dir=FLAGS.data_dir,
batch_size=batch_size,
eval_batch_size=eval_batch_size,
num_neg=FLAGS.num_neg,
epochs_per_cycle=FLAGS.epochs_between_evals,
match_mlperf=FLAGS.ml_perf,
deterministic=FLAGS.seed is not None)
if FLAGS.use_synthetic_data:
ncf_dataset = None
cleanup_fn = lambda: None
num_users, num_items = data_preprocessing.DATASET_TO_NUM_USERS_AND_ITEMS[
FLAGS.dataset]
approx_train_steps = None
else:
ncf_dataset, cleanup_fn = data_preprocessing.instantiate_pipeline(
dataset=FLAGS.dataset, data_dir=FLAGS.data_dir,
batch_size=batch_size,
eval_batch_size=eval_batch_size,
num_neg=FLAGS.num_neg,
epochs_per_cycle=FLAGS.epochs_between_evals,
match_mlperf=FLAGS.ml_perf,
deterministic=FLAGS.seed is not None)
num_users = ncf_dataset.num_users
num_items = ncf_dataset.num_items
approx_train_steps = int(ncf_dataset.num_train_positives
* (1 + FLAGS.num_neg) // FLAGS.batch_size)
model_helpers.apply_clean(flags.FLAGS)
......@@ -153,9 +164,10 @@ def run_ncf(_):
"use_seed": FLAGS.seed is not None,
"hash_pipeline": FLAGS.hash_pipeline,
"batch_size": batch_size,
"eval_batch_size": eval_batch_size,
"learning_rate": FLAGS.learning_rate,
"num_users": ncf_dataset.num_users,
"num_items": ncf_dataset.num_items,
"num_users": num_users,
"num_items": num_items,
"mf_dim": FLAGS.num_factors,
"model_layers": [int(layer) for layer in FLAGS.layers],
"mf_regularization": FLAGS.mf_regularization,
......@@ -192,8 +204,6 @@ def run_ncf(_):
run_params=run_params,
test_id=FLAGS.benchmark_test_id)
approx_train_steps = int(ncf_dataset.num_train_positives
* (1 + FLAGS.num_neg) // FLAGS.batch_size)
pred_input_fn = data_preprocessing.make_pred_input_fn(ncf_dataset=ncf_dataset)
total_training_cycle = FLAGS.train_epochs // FLAGS.epochs_between_evals
......@@ -205,14 +215,15 @@ def run_ncf(_):
train_input_fn, train_record_dir, batch_count = \
data_preprocessing.make_train_input_fn(ncf_dataset=ncf_dataset)
if np.abs(approx_train_steps - batch_count) > 1:
if approx_train_steps and np.abs(approx_train_steps - batch_count) > 1:
tf.logging.warning(
"Estimated ({}) and reported ({}) number of batches differ by more "
"than one".format(approx_train_steps, batch_count))
train_estimator.train(input_fn=train_input_fn, hooks=train_hooks,
steps=batch_count)
tf.gfile.DeleteRecursively(train_record_dir)
if train_record_dir:
tf.gfile.DeleteRecursively(train_record_dir)
tf.logging.info("Beginning evaluation.")
eval_results = eval_estimator.evaluate(pred_input_fn)
......@@ -245,7 +256,7 @@ def define_ncf_flags():
num_parallel_calls=False,
inter_op=False,
intra_op=False,
synthetic_data=False,
synthetic_data=True,
max_train_steps=False,
dtype=False,
all_reduce_alg=False
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment