Unverified Commit 19d4eaaf authored by Taylor Robie's avatar Taylor Robie Committed by GitHub
Browse files

Reorder NCF data pipeline (#5536)

* intermediate commit

finish replacing spillover with resampled padding

intermediate commit

* resolve merge conflict

* intermediate commit

* further consolidate the data pipeline

* complete first pass at data pipeline refactor

* remove some leftover code

* fix test

* remove resampling, and move train padding logic into neumf.py

* small tweaks

* fix weight bug

* address PR comments

* fix dict zip. (Reed led me astray)

* delint

* make data test deterministic and delint

* Reed didn't lead me astray. I just can't read.

* more delinting

* even more delinting

* use resampling for last batch padding

* pad last batch with unique data

* Revert "pad last batch with unique data"

This reverts commit cbdf46efcd5c7907038a24105b88d38e7f1d6da2.

* move padded batch to the beginning

* delint

* fix step check for synthetic data
parent 413f15ba
...@@ -35,16 +35,16 @@ class Paths(object): ...@@ -35,16 +35,16 @@ class Paths(object):
"positive_shard_{}.pickle") "positive_shard_{}.pickle")
self.train_epoch_dir = os.path.join(self.cache_root, "training_epochs") self.train_epoch_dir = os.path.join(self.cache_root, "training_epochs")
self.eval_data_subdir = os.path.join(self.cache_root, "eval_data") self.eval_data_subdir = os.path.join(self.cache_root, "eval_data")
self.eval_raw_file = os.path.join(self.eval_data_subdir, "raw.pickle")
self.eval_record_template_temp = os.path.join(self.eval_data_subdir,
"eval_records.temp")
self.eval_record_template = os.path.join(
self.eval_data_subdir, "padded_eval_batch_size_{}.tfrecords")
self.subproc_alive = os.path.join(self.cache_root, "subproc.alive") self.subproc_alive = os.path.join(self.cache_root, "subproc.alive")
APPROX_PTS_PER_TRAIN_SHARD = 128000 APPROX_PTS_PER_TRAIN_SHARD = 128000
# Keys for data shards
TRAIN_KEY = "train"
EVAL_KEY = "eval"
# In both datasets, each user has at least 20 ratings. # In both datasets, each user has at least 20 ratings.
MIN_NUM_RATINGS = 20 MIN_NUM_RATINGS = 20
...@@ -68,7 +68,9 @@ FLAGFILE_TEMP = "flagfile.temp" ...@@ -68,7 +68,9 @@ FLAGFILE_TEMP = "flagfile.temp"
FLAGFILE = "flagfile" FLAGFILE = "flagfile"
READY_FILE_TEMP = "ready.json.temp" READY_FILE_TEMP = "ready.json.temp"
READY_FILE = "ready.json" READY_FILE = "ready.json"
TRAIN_RECORD_TEMPLATE = "train_{}.tfrecords" TRAIN_RECORD_TEMPLATE = "train_{}.tfrecords"
EVAL_RECORD_TEMPLATE = "eval_{}.tfrecords"
TIMEOUT_SECONDS = 3600 * 2 # If the train loop goes more than two hours without TIMEOUT_SECONDS = 3600 * 2 # If the train loop goes more than two hours without
# consuming an epoch of data, this is a good # consuming an epoch of data, this is a good
......
...@@ -69,7 +69,7 @@ def get_cycle_folder_name(i): ...@@ -69,7 +69,7 @@ def get_cycle_folder_name(i):
def _process_shard(args): def _process_shard(args):
# type: ((str, int, int, int)) -> (np.ndarray, np.ndarray, np.ndarray) # type: ((str, int, int, int, bool)) -> (np.ndarray, np.ndarray, np.ndarray)
"""Read a shard of training data and return training vectors. """Read a shard of training data and return training vectors.
Args: Args:
...@@ -77,8 +77,10 @@ def _process_shard(args): ...@@ -77,8 +77,10 @@ def _process_shard(args):
num_items: The cardinality of the item set. num_items: The cardinality of the item set.
num_neg: The number of negatives to generate per positive example. num_neg: The number of negatives to generate per positive example.
seed: Random seed to be used when generating negatives. seed: Random seed to be used when generating negatives.
is_training: Generate training (True) or eval (False) data.
match_mlperf: Match the MLPerf reference behavior
""" """
shard_path, num_items, num_neg, seed = args shard_path, num_items, num_neg, seed, is_training, match_mlperf = args
np.random.seed(seed) np.random.seed(seed)
# The choice to store the training shards in files rather than in memory # The choice to store the training shards in files rather than in memory
...@@ -92,8 +94,14 @@ def _process_shard(args): ...@@ -92,8 +94,14 @@ def _process_shard(args):
with tf.gfile.Open(shard_path, "rb") as f: with tf.gfile.Open(shard_path, "rb") as f:
shard = pickle.load(f) shard = pickle.load(f)
users = shard[movielens.USER_COLUMN] users = shard[rconst.TRAIN_KEY][movielens.USER_COLUMN]
items = shard[movielens.ITEM_COLUMN] items = shard[rconst.TRAIN_KEY][movielens.ITEM_COLUMN]
if not is_training:
# For eval, there is one positive which was held out from the training set.
test_positive_dict = dict(zip(
shard[rconst.EVAL_KEY][movielens.USER_COLUMN],
shard[rconst.EVAL_KEY][movielens.ITEM_COLUMN]))
delta = users[1:] - users[:-1] delta = users[1:] - users[:-1]
boundaries = ([0] + (np.argwhere(delta)[:, 0] + 1).tolist() + boundaries = ([0] + (np.argwhere(delta)[:, 0] + 1).tolist() +
...@@ -104,16 +112,34 @@ def _process_shard(args): ...@@ -104,16 +112,34 @@ def _process_shard(args):
label_blocks = [] label_blocks = []
for i in range(len(boundaries) - 1): for i in range(len(boundaries) - 1):
assert len(set(users[boundaries[i]:boundaries[i+1]])) == 1 assert len(set(users[boundaries[i]:boundaries[i+1]])) == 1
current_user = users[boundaries[i]]
positive_items = items[boundaries[i]:boundaries[i+1]] positive_items = items[boundaries[i]:boundaries[i+1]]
positive_set = set(positive_items) positive_set = set(positive_items)
if positive_items.shape[0] != len(positive_set): if positive_items.shape[0] != len(positive_set):
raise ValueError("Duplicate entries detected.") raise ValueError("Duplicate entries detected.")
if is_training:
n_pos = len(positive_set) n_pos = len(positive_set)
negatives = stat_utils.sample_with_exclusion(
num_items, positive_set, n_pos * num_neg, replacement=True)
else:
if not match_mlperf:
# The mlperf reference allows the holdout item to appear as a negative.
# Including it in the positive set makes the eval more stringent,
# because an appearance of the test item would be removed by
# deduplication rules. (Effectively resulting in a minute reduction of
# NUM_EVAL_NEGATIVES)
positive_set.add(test_positive_dict[current_user])
negatives = stat_utils.sample_with_exclusion( negatives = stat_utils.sample_with_exclusion(
num_items, positive_set, n_pos * num_neg) num_items, positive_set, num_neg, replacement=match_mlperf)
positive_set = [test_positive_dict[current_user]]
n_pos = len(positive_set)
assert n_pos == 1
user_blocks.append(users[boundaries[i]] * np.ones( user_blocks.append(current_user * np.ones(
(n_pos * (1 + num_neg),), dtype=np.int32)) (n_pos * (1 + num_neg),), dtype=np.int32))
item_blocks.append( item_blocks.append(
np.array(list(positive_set) + negatives, dtype=np.uint16)) np.array(list(positive_set) + negatives, dtype=np.uint16))
...@@ -157,75 +183,83 @@ def init_worker(): ...@@ -157,75 +183,83 @@ def init_worker():
signal.signal(signal.SIGINT, sigint_handler) signal.signal(signal.SIGINT, sigint_handler)
def _construct_training_records( def _construct_records(
train_cycle, # type: int is_training, # type: bool
train_cycle, # type: typing.Optional[int]
num_workers, # type: int num_workers, # type: int
cache_paths, # type: rconst.Paths cache_paths, # type: rconst.Paths
num_readers, # type: int num_readers, # type: int
num_neg, # type: int num_neg, # type: int
num_train_positives, # type: int num_positives, # type: int
num_items, # type: int num_items, # type: int
epochs_per_cycle, # type: int epochs_per_cycle, # type: int
train_batch_size, # type: int batch_size, # type: int
training_shards, # type: typing.List[str] training_shards, # type: typing.List[str]
spillover, # type: bool deterministic=False, # type: bool
carryover=None, # type: typing.Union[typing.List[np.ndarray], None] match_mlperf=False # type: bool
deterministic=False # type: bool
): ):
"""Generate false negatives and write TFRecords files. """Generate false negatives and write TFRecords files.
Args: Args:
is_training: Are training records (True) or eval records (False) created.
train_cycle: Integer of which cycle the generated data is for. train_cycle: Integer of which cycle the generated data is for.
num_workers: Number of multiprocessing workers to use for negative num_workers: Number of multiprocessing workers to use for negative
generation. generation.
cache_paths: Paths object with information of where to write files. cache_paths: Paths object with information of where to write files.
num_readers: The number of reader datasets in the train input_fn. num_readers: The number of reader datasets in the input_fn.
num_neg: The number of false negatives per positive example. num_neg: The number of false negatives per positive example.
num_train_positives: The number of positive examples. This value is used num_positives: The number of positive examples. This value is used
to pre-allocate arrays while the imap is still running. (NumPy does not to pre-allocate arrays while the imap is still running. (NumPy does not
allow dynamic arrays.) allow dynamic arrays.)
num_items: The cardinality of the item set. num_items: The cardinality of the item set.
epochs_per_cycle: The number of epochs worth of data to construct. epochs_per_cycle: The number of epochs worth of data to construct.
train_batch_size: The expected batch size used during training. This is used batch_size: The expected batch size used during training. This is used
to properly batch data when writing TFRecords. to properly batch data when writing TFRecords.
training_shards: The picked positive examples from which to generate training_shards: The picked positive examples from which to generate
negatives. negatives.
spillover: If the final batch is incomplete, push it to the next
cycle (True) or include a partial batch (False).
carryover: The data points to be spilled over to the next cycle.
""" """
st = timeit.default_timer() st = timeit.default_timer()
if not is_training:
# Later logic assumes that all items for a given user are in the same batch.
assert not batch_size % (rconst.NUM_EVAL_NEGATIVES + 1)
assert num_neg == rconst.NUM_EVAL_NEGATIVES
assert epochs_per_cycle == 1 or is_training
num_workers = min([num_workers, len(training_shards) * epochs_per_cycle]) num_workers = min([num_workers, len(training_shards) * epochs_per_cycle])
carryover = carryover or [
np.zeros((0,), dtype=np.int32), num_pts = num_positives * (1 + num_neg)
np.zeros((0,), dtype=np.uint16),
np.zeros((0,), dtype=np.int8), # Equivalent to `int(ceil(num_pts / batch_size)) * batch_size`, but without
] # precision concerns
num_carryover = carryover[0].shape[0] num_pts_with_padding = (num_pts + batch_size - 1) // batch_size * batch_size
num_pts = num_carryover + num_train_positives * (1 + num_neg) num_padding = num_pts_with_padding - num_pts
# We choose a different random seed for each process, so that the processes # We choose a different random seed for each process, so that the processes
# will not all choose the same random numbers. # will not all choose the same random numbers.
process_seeds = [np.random.randint(2**32) process_seeds = [stat_utils.random_int32()
for _ in training_shards * epochs_per_cycle] for _ in training_shards * epochs_per_cycle]
map_args = [(shard, num_items, num_neg, process_seeds[i]) map_args = [
(shard, num_items, num_neg, process_seeds[i], is_training, match_mlperf)
for i, shard in enumerate(training_shards * epochs_per_cycle)] for i, shard in enumerate(training_shards * epochs_per_cycle)]
with popen_helper.get_pool(num_workers, init_worker) as pool: with popen_helper.get_pool(num_workers, init_worker) as pool:
map_fn = pool.imap if deterministic else pool.imap_unordered # pylint: disable=no-member map_fn = pool.imap if deterministic else pool.imap_unordered # pylint: disable=no-member
data_generator = map_fn(_process_shard, map_args) data_generator = map_fn(_process_shard, map_args)
data = [ data = [
np.zeros(shape=(num_pts,), dtype=np.int32) - 1, np.zeros(shape=(num_pts_with_padding,), dtype=np.int32) - 1,
np.zeros(shape=(num_pts,), dtype=np.uint16), np.zeros(shape=(num_pts_with_padding,), dtype=np.uint16),
np.zeros(shape=(num_pts,), dtype=np.int8), np.zeros(shape=(num_pts_with_padding,), dtype=np.int8),
] ]
# The carryover data is always first. # Training data is shuffled. Evaluation data MUST not be shuffled.
for i in range(3): # Downstream processing depends on the fact that evaluation data for a given
data[i][:num_carryover] = carryover[i] # user is grouped within a batch.
index_destinations = np.random.permutation( if is_training:
num_train_positives * (1 + num_neg)) + num_carryover index_destinations = np.random.permutation(num_pts)
else:
index_destinations = np.arange(num_pts)
start_ind = 0 start_ind = 0
for data_segment in data_generator: for data_segment in data_generator:
n_in_segment = data_segment[0].shape[0] n_in_segment = data_segment[0].shape[0]
...@@ -234,63 +268,84 @@ def _construct_training_records( ...@@ -234,63 +268,84 @@ def _construct_training_records(
for i in range(3): for i in range(3):
data[i][dest] = data_segment[i] data[i][dest] = data_segment[i]
# Check that no points were dropped. assert np.sum(data[0] == -1) == num_padding
assert (num_pts - num_carryover) == start_ind
assert not np.sum(data[0] == -1)
record_dir = os.path.join(cache_paths.train_epoch_dir, if is_training:
get_cycle_folder_name(train_cycle)) if num_padding:
tf.gfile.MakeDirs(record_dir) # In order to have a full batch, randomly include points from earlier in
# the batch.
pad_sample_indices = np.random.randint(
low=0, high=num_pts, size=(num_padding,))
dest = np.arange(start=start_ind, stop=start_ind + num_padding)
start_ind += num_padding
for i in range(3):
data[i][dest] = data[i][pad_sample_indices]
else:
# For Evaluation, padding is all zeros. The evaluation input_fn knows how
# to interpret and discard the zero padded entries.
data[0][num_pts:] = 0
batches_per_file = np.ceil(num_pts / train_batch_size / num_readers) # Check that no points were overlooked.
assert not np.sum(data[0] == -1)
batches_per_file = np.ceil(num_pts_with_padding / batch_size / num_readers)
current_file_id = -1 current_file_id = -1
current_batch_id = -1 current_batch_id = -1
batches_by_file = [[] for _ in range(num_readers)] batches_by_file = [[] for _ in range(num_readers)]
output_carryover = [
np.zeros(shape=(0,), dtype=np.int32),
np.zeros(shape=(0,), dtype=np.uint16),
np.zeros(shape=(0,), dtype=np.int8),
]
while True: while True:
current_batch_id += 1 current_batch_id += 1
if (current_batch_id % batches_per_file) == 0: if (current_batch_id % batches_per_file) == 0:
current_file_id += 1 current_file_id += 1
end_ind = (current_batch_id + 1) * train_batch_size
if end_ind > num_pts: start_ind = current_batch_id * batch_size
if spillover: end_ind = start_ind + batch_size
output_carryover = [data[i][current_batch_id*train_batch_size:num_pts] if end_ind > num_pts_with_padding:
for i in range(3)] if start_ind != num_pts_with_padding:
break raise ValueError("Batch padding does not line up")
else:
batches_by_file[current_file_id].append(current_batch_id)
break break
batches_by_file[current_file_id].append(current_batch_id) batches_by_file[current_file_id].append(current_batch_id)
if is_training:
# Empirically it is observed that placing the batch with repeated values at
# the start rather than the end improves convergence.
batches_by_file[0][0], batches_by_file[-1][-1] = \
batches_by_file[-1][-1], batches_by_file[0][0]
if is_training:
template = rconst.TRAIN_RECORD_TEMPLATE
record_dir = os.path.join(cache_paths.train_epoch_dir,
get_cycle_folder_name(train_cycle))
tf.gfile.MakeDirs(record_dir)
else:
template = rconst.EVAL_RECORD_TEMPLATE
record_dir = cache_paths.eval_data_subdir
batch_count = 0 batch_count = 0
for i in range(num_readers): for i in range(num_readers):
fpath = os.path.join(record_dir, rconst.TRAIN_RECORD_TEMPLATE.format(i)) fpath = os.path.join(record_dir, template.format(i))
log_msg("Writing {}".format(fpath)) log_msg("Writing {}".format(fpath))
with tf.python_io.TFRecordWriter(fpath) as writer: with tf.python_io.TFRecordWriter(fpath) as writer:
for j in batches_by_file[i]: for j in batches_by_file[i]:
start_ind = j * train_batch_size start_ind = j * batch_size
end_ind = start_ind + train_batch_size end_ind = start_ind + batch_size
batch_bytes = _construct_record( record_kwargs = dict(
users=data[0][start_ind:end_ind], users=data[0][start_ind:end_ind],
items=data[1][start_ind:end_ind], items=data[1][start_ind:end_ind],
labels=data[2][start_ind:end_ind],
) )
writer.write(batch_bytes) if is_training:
batch_count += 1 record_kwargs["labels"] = data[2][start_ind:end_ind]
else:
record_kwargs["dupe_mask"] = stat_utils.mask_duplicates(
record_kwargs["items"].reshape(-1, num_neg + 1),
axis=1).flatten().astype(np.int8)
batch_bytes = _construct_record(**record_kwargs)
if spillover: writer.write(batch_bytes)
written_pts = output_carryover[0].shape[0] + batch_count * train_batch_size batch_count += 1
if num_pts != written_pts:
raise ValueError("Error detected: point counts do not match: {} vs. {}"
.format(num_pts, written_pts))
# We write to a temp file then atomically rename it to the final file, because # We write to a temp file then atomically rename it to the final file, because
# writing directly to the final file can cause the main process to read a # writing directly to the final file can cause the main process to read a
...@@ -298,65 +353,18 @@ def _construct_training_records( ...@@ -298,65 +353,18 @@ def _construct_training_records(
ready_file_temp = os.path.join(record_dir, rconst.READY_FILE_TEMP) ready_file_temp = os.path.join(record_dir, rconst.READY_FILE_TEMP)
with tf.gfile.Open(ready_file_temp, "w") as f: with tf.gfile.Open(ready_file_temp, "w") as f:
json.dump({ json.dump({
"batch_size": train_batch_size, "batch_size": batch_size,
"batch_count": batch_count, "batch_count": batch_count,
}, f) }, f)
ready_file = os.path.join(record_dir, rconst.READY_FILE) ready_file = os.path.join(record_dir, rconst.READY_FILE)
tf.gfile.Rename(ready_file_temp, ready_file) tf.gfile.Rename(ready_file_temp, ready_file)
if is_training:
log_msg("Cycle {} complete. Total time: {:.1f} seconds" log_msg("Cycle {} complete. Total time: {:.1f} seconds"
.format(train_cycle, timeit.default_timer() - st)) .format(train_cycle, timeit.default_timer() - st))
else:
return output_carryover log_msg("Eval construction complete. Total time: {:.1f} seconds"
.format(timeit.default_timer() - st))
def _construct_eval_record(cache_paths, eval_batch_size):
"""Convert Eval data to a single TFRecords file."""
# Later logic assumes that all items for a given user are in the same batch.
assert not eval_batch_size % (rconst.NUM_EVAL_NEGATIVES + 1)
log_msg("Beginning construction of eval TFRecords file.")
raw_fpath = cache_paths.eval_raw_file
intermediate_fpath = cache_paths.eval_record_template_temp
dest_fpath = cache_paths.eval_record_template.format(eval_batch_size)
with tf.gfile.Open(raw_fpath, "rb") as f:
eval_data = pickle.load(f)
users = eval_data[0][movielens.USER_COLUMN]
items = eval_data[0][movielens.ITEM_COLUMN]
assert users.shape == items.shape
# eval_data[1] is the labels, but during evaluation they are infered as they
# have a set structure. They are included the the data artifact for debug
# purposes.
# This packaging assumes that the caller knows to drop the padded values.
n_pts = users.shape[0]
n_pad = eval_batch_size - (n_pts % eval_batch_size)
assert not (n_pts + n_pad) % eval_batch_size
users = np.concatenate([users, np.zeros(shape=(n_pad,), dtype=np.int32)])\
.reshape((-1, eval_batch_size))
items = np.concatenate([items, np.zeros(shape=(n_pad,), dtype=np.uint16)])\
.reshape((-1, eval_batch_size))
num_batches = users.shape[0]
with tf.python_io.TFRecordWriter(intermediate_fpath) as writer:
for i in range(num_batches):
batch_users = users[i, :]
batch_items = items[i, :]
dupe_mask = stat_utils.mask_duplicates(
batch_items.reshape(-1, rconst.NUM_EVAL_NEGATIVES + 1),
axis=1).flatten().astype(np.int8)
batch_bytes = _construct_record(
users=batch_users,
items=batch_items,
dupe_mask=dupe_mask
)
writer.write(batch_bytes)
tf.gfile.Rename(intermediate_fpath, dest_fpath)
log_msg("Eval TFRecords file successfully constructed.")
def _generation_loop(num_workers, # type: int def _generation_loop(num_workers, # type: int
...@@ -365,11 +373,12 @@ def _generation_loop(num_workers, # type: int ...@@ -365,11 +373,12 @@ def _generation_loop(num_workers, # type: int
num_neg, # type: int num_neg, # type: int
num_train_positives, # type: int num_train_positives, # type: int
num_items, # type: int num_items, # type: int
spillover, # type: bool num_users, # type: int
epochs_per_cycle, # type: int epochs_per_cycle, # type: int
train_batch_size, # type: int train_batch_size, # type: int
eval_batch_size, # type: int eval_batch_size, # type: int
deterministic # type: bool deterministic, # type: bool
match_mlperf # type: bool
): ):
# type: (...) -> None # type: (...) -> None
"""Primary run loop for data file generation.""" """Primary run loop for data file generation."""
...@@ -387,23 +396,32 @@ def _generation_loop(num_workers, # type: int ...@@ -387,23 +396,32 @@ def _generation_loop(num_workers, # type: int
log_msg("Entering generation loop.") log_msg("Entering generation loop.")
tf.gfile.MakeDirs(cache_paths.train_epoch_dir) tf.gfile.MakeDirs(cache_paths.train_epoch_dir)
tf.gfile.MakeDirs(cache_paths.eval_data_subdir)
training_shards = [os.path.join(cache_paths.train_shard_subdir, i) for i in training_shards = [os.path.join(cache_paths.train_shard_subdir, i) for i in
tf.gfile.ListDirectory(cache_paths.train_shard_subdir)] tf.gfile.ListDirectory(cache_paths.train_shard_subdir)]
shared_kwargs = dict(
num_workers=multiprocessing.cpu_count(), cache_paths=cache_paths,
num_readers=num_readers, num_items=num_items,
training_shards=training_shards, deterministic=deterministic,
match_mlperf=match_mlperf
)
# Training blocks on the creation of the first epoch, so the num_workers # Training blocks on the creation of the first epoch, so the num_workers
# limit is not respected for this invocation # limit is not respected for this invocation
train_cycle = 0 train_cycle = 0
carryover = _construct_training_records( _construct_records(
train_cycle=train_cycle, num_workers=multiprocessing.cpu_count(), is_training=True, train_cycle=train_cycle, num_neg=num_neg,
cache_paths=cache_paths, num_readers=num_readers, num_neg=num_neg, num_positives=num_train_positives, epochs_per_cycle=epochs_per_cycle,
num_train_positives=num_train_positives, num_items=num_items, batch_size=train_batch_size, **shared_kwargs)
epochs_per_cycle=epochs_per_cycle, train_batch_size=train_batch_size,
training_shards=training_shards, spillover=spillover, carryover=None, # Construct evaluation set.
deterministic=deterministic) shared_kwargs["num_workers"] = num_workers
_construct_records(
_construct_eval_record(cache_paths=cache_paths, is_training=False, train_cycle=None, num_neg=rconst.NUM_EVAL_NEGATIVES,
eval_batch_size=eval_batch_size) num_positives=num_users, epochs_per_cycle=1, batch_size=eval_batch_size,
**shared_kwargs)
wait_count = 0 wait_count = 0
start_time = time.time() start_time = time.time()
...@@ -427,13 +445,10 @@ def _generation_loop(num_workers, # type: int ...@@ -427,13 +445,10 @@ def _generation_loop(num_workers, # type: int
continue continue
train_cycle += 1 train_cycle += 1
carryover = _construct_training_records( _construct_records(
train_cycle=train_cycle, num_workers=num_workers, is_training=True, train_cycle=train_cycle, num_neg=num_neg,
cache_paths=cache_paths, num_readers=num_readers, num_neg=num_neg, num_positives=num_train_positives, epochs_per_cycle=epochs_per_cycle,
num_train_positives=num_train_positives, num_items=num_items, batch_size=train_batch_size, **shared_kwargs)
epochs_per_cycle=epochs_per_cycle, train_batch_size=train_batch_size,
training_shards=training_shards, spillover=spillover,
carryover=carryover, deterministic=deterministic)
wait_count = 0 wait_count = 0
start_time = time.time() start_time = time.time()
...@@ -499,11 +514,12 @@ def main(_): ...@@ -499,11 +514,12 @@ def main(_):
num_neg=flags.FLAGS.num_neg, num_neg=flags.FLAGS.num_neg,
num_train_positives=flags.FLAGS.num_train_positives, num_train_positives=flags.FLAGS.num_train_positives,
num_items=flags.FLAGS.num_items, num_items=flags.FLAGS.num_items,
spillover=flags.FLAGS.spillover, num_users=flags.FLAGS.num_users,
epochs_per_cycle=flags.FLAGS.epochs_per_cycle, epochs_per_cycle=flags.FLAGS.epochs_per_cycle,
train_batch_size=flags.FLAGS.train_batch_size, train_batch_size=flags.FLAGS.train_batch_size,
eval_batch_size=flags.FLAGS.eval_batch_size, eval_batch_size=flags.FLAGS.eval_batch_size,
deterministic=flags.FLAGS.seed is not None, deterministic=flags.FLAGS.seed is not None,
match_mlperf=flags.FLAGS.ml_perf,
) )
except KeyboardInterrupt: except KeyboardInterrupt:
log_msg("KeyboardInterrupt registered.") log_msg("KeyboardInterrupt registered.")
...@@ -536,6 +552,8 @@ def define_flags(): ...@@ -536,6 +552,8 @@ def define_flags():
help="The number of positive training examples.") help="The number of positive training examples.")
flags.DEFINE_integer(name="num_items", default=None, flags.DEFINE_integer(name="num_items", default=None,
help="Number of items from which to select negatives.") help="Number of items from which to select negatives.")
flags.DEFINE_integer(name="num_users", default=None,
help="The number of unique users. Used for evaluation.")
flags.DEFINE_integer(name="epochs_per_cycle", default=1, flags.DEFINE_integer(name="epochs_per_cycle", default=1,
help="The number of epochs of training data to produce" help="The number of epochs of training data to produce"
"at a time.") "at a time.")
...@@ -545,11 +563,6 @@ def define_flags(): ...@@ -545,11 +563,6 @@ def define_flags():
flags.DEFINE_integer(name="eval_batch_size", default=None, flags.DEFINE_integer(name="eval_batch_size", default=None,
help="The batch size with which evaluation TFRecords " help="The batch size with which evaluation TFRecords "
"will be chunked.") "will be chunked.")
flags.DEFINE_boolean(
name="spillover", default=True,
help="If a complete batch cannot be provided, return an empty batch and "
"start the next epoch from a non-empty buffer. This guarantees "
"fixed batch sizes.")
flags.DEFINE_boolean(name="redirect_logs", default=False, flags.DEFINE_boolean(name="redirect_logs", default=False,
help="Catch logs and write them to a file. " help="Catch logs and write them to a file. "
"(Useful if this is run as a subprocess)") "(Useful if this is run as a subprocess)")
...@@ -558,6 +571,8 @@ def define_flags(): ...@@ -558,6 +571,8 @@ def define_flags():
flags.DEFINE_integer(name="seed", default=None, flags.DEFINE_integer(name="seed", default=None,
help="NumPy random seed to set at startup. If not " help="NumPy random seed to set at startup. If not "
"specified, a seed will not be set.") "specified, a seed will not be set.")
flags.DEFINE_boolean(name="ml_perf", default=None,
help="Match MLPerf. See ncf_main.py for details.")
flags.mark_flags_as_required(["data_dir", "cache_id"]) flags.mark_flags_as_required(["data_dir", "cache_id"])
......
...@@ -57,7 +57,7 @@ DATASET_TO_NUM_USERS_AND_ITEMS = { ...@@ -57,7 +57,7 @@ DATASET_TO_NUM_USERS_AND_ITEMS = {
# Number of batches to run per epoch when using synthetic data. At high batch # Number of batches to run per epoch when using synthetic data. At high batch
# sizes, we run for more batches than with real data, which is good since # sizes, we run for more batches than with real data, which is good since
# running more batches reduces noise when measuring the average batches/second. # running more batches reduces noise when measuring the average batches/second.
_SYNTHETIC_BATCHES_PER_EPOCH = 2000 SYNTHETIC_BATCHES_PER_EPOCH = 2000
class NCFDataset(object): class NCFDataset(object):
...@@ -65,7 +65,7 @@ class NCFDataset(object): ...@@ -65,7 +65,7 @@ class NCFDataset(object):
def __init__(self, user_map, item_map, num_data_readers, cache_paths, def __init__(self, user_map, item_map, num_data_readers, cache_paths,
num_train_positives, deterministic=False): num_train_positives, deterministic=False):
# type: (dict, dict, int, rconst.Paths) -> None # type: (dict, dict, int, rconst.Paths, int, bool) -> None
"""Assign key values for recommendation dataset. """Assign key values for recommendation dataset.
Args: Args:
...@@ -175,7 +175,6 @@ def _filter_index_sort(raw_rating_path, match_mlperf): ...@@ -175,7 +175,6 @@ def _filter_index_sort(raw_rating_path, match_mlperf):
def _train_eval_map_fn(args): def _train_eval_map_fn(args):
# type: (...) -> typing.Dict(np.ndarray)
"""Split training and testing data and generate testing negatives. """Split training and testing data and generate testing negatives.
This function is called as part of a multiprocessing map. The principle This function is called as part of a multiprocessing map. The principle
...@@ -186,9 +185,8 @@ def _train_eval_map_fn(args): ...@@ -186,9 +185,8 @@ def _train_eval_map_fn(args):
For each user, all but the last item is written into a pickle file which the For each user, all but the last item is written into a pickle file which the
training data producer can consume on as needed. The last item for a user training data producer can consume on as needed. The last item for a user
is a validation point; for each validation point a number of negatives are is a validation point; it is written under a separate key and will be used
generated (typically 999). The validation data is returned by this function, later to generate the evaluation data.
as it is held in memory for the remainder of the run.
Args: Args:
shard: A dict containing the user and item arrays. shard: A dict containing the user and item arrays.
...@@ -198,16 +196,10 @@ def _train_eval_map_fn(args): ...@@ -198,16 +196,10 @@ def _train_eval_map_fn(args):
which validation negatives should be drawn. which validation negatives should be drawn.
cache_paths: rconst.Paths object containing locations for various cache cache_paths: rconst.Paths object containing locations for various cache
files. files.
seed: Random seed to be used when generating testing negatives.
match_mlperf: If True, sample eval negative with replacements, which the
MLPerf reference implementation does.
Returns:
A dict containing the evaluation data for a given shard.
""" """
shard, shard_id, num_items, cache_paths, seed, match_mlperf = args shard, shard_id, num_items, cache_paths = args
np.random.seed(seed)
users = shard[movielens.USER_COLUMN] users = shard[movielens.USER_COLUMN]
items = shard[movielens.ITEM_COLUMN] items = shard[movielens.ITEM_COLUMN]
...@@ -218,7 +210,6 @@ def _train_eval_map_fn(args): ...@@ -218,7 +210,6 @@ def _train_eval_map_fn(args):
[users.shape[0]]) [users.shape[0]])
train_blocks = [] train_blocks = []
test_blocks = []
test_positives = [] test_positives = []
for i in range(len(boundaries) - 1): for i in range(len(boundaries) - 1):
# This is simply a vector of repeated values such that the shard could be # This is simply a vector of repeated values such that the shard could be
...@@ -233,38 +224,30 @@ def _train_eval_map_fn(args): ...@@ -233,38 +224,30 @@ def _train_eval_map_fn(args):
block_items = items[boundaries[i]:boundaries[i+1]] block_items = items[boundaries[i]:boundaries[i+1]]
train_blocks.append((block_user[:-1], block_items[:-1])) train_blocks.append((block_user[:-1], block_items[:-1]))
test_negatives = stat_utils.sample_with_exclusion(
num_items=num_items, positive_set=set(block_items),
n=rconst.NUM_EVAL_NEGATIVES, replacement=match_mlperf)
test_blocks.append((
block_user[0] * np.ones((rconst.NUM_EVAL_NEGATIVES + 1,),
dtype=np.int32),
np.array([block_items[-1]] + test_negatives, dtype=np.uint16)
))
test_positives.append((block_user[0], block_items[-1])) test_positives.append((block_user[0], block_items[-1]))
train_users = np.concatenate([i[0] for i in train_blocks]) train_users = np.concatenate([i[0] for i in train_blocks])
train_items = np.concatenate([i[1] for i in train_blocks]) train_items = np.concatenate([i[1] for i in train_blocks])
test_pos_users = np.array([i[0] for i in test_positives],
dtype=train_users.dtype)
test_pos_items = np.array([i[1] for i in test_positives],
dtype=train_items.dtype)
train_shard_fpath = cache_paths.train_shard_template.format( train_shard_fpath = cache_paths.train_shard_template.format(
str(shard_id).zfill(5)) str(shard_id).zfill(5))
with tf.gfile.Open(train_shard_fpath, "wb") as f: with tf.gfile.Open(train_shard_fpath, "wb") as f:
pickle.dump({ pickle.dump({
rconst.TRAIN_KEY: {
movielens.USER_COLUMN: train_users, movielens.USER_COLUMN: train_users,
movielens.ITEM_COLUMN: train_items, movielens.ITEM_COLUMN: train_items,
}, f) },
rconst.EVAL_KEY: {
test_users = np.concatenate([i[0] for i in test_blocks]) movielens.USER_COLUMN: test_pos_users,
test_items = np.concatenate([i[1] for i in test_blocks]) movielens.ITEM_COLUMN: test_pos_items,
assert test_users.shape == test_items.shape
assert test_items.shape[0] % (rconst.NUM_EVAL_NEGATIVES + 1) == 0
return {
movielens.USER_COLUMN: test_users,
movielens.ITEM_COLUMN: test_items,
} }
}, f)
def generate_train_eval_data(df, approx_num_shards, num_items, cache_paths, def generate_train_eval_data(df, approx_num_shards, num_items, cache_paths,
...@@ -327,38 +310,16 @@ def generate_train_eval_data(df, approx_num_shards, num_items, cache_paths, ...@@ -327,38 +310,16 @@ def generate_train_eval_data(df, approx_num_shards, num_items, cache_paths,
"negatives per user...".format(rconst.NUM_EVAL_NEGATIVES)) "negatives per user...".format(rconst.NUM_EVAL_NEGATIVES))
tf.gfile.MakeDirs(cache_paths.train_shard_subdir) tf.gfile.MakeDirs(cache_paths.train_shard_subdir)
# We choose a different random seed for each process, so that the processes map_args = [(shards[i], i, num_items, cache_paths)
# will not all choose the same random numbers.
process_seeds = [np.random.randint(2**32) for _ in range(approx_num_shards)]
map_args = [(shards[i], i, num_items, cache_paths, process_seeds[i],
match_mlperf)
for i in range(approx_num_shards)] for i in range(approx_num_shards)]
with popen_helper.get_pool(multiprocessing.cpu_count()) as pool:
test_shards = pool.map(_train_eval_map_fn, map_args) # pylint: disable=no-member
tf.logging.info("Merging test shards...")
test_users = np.concatenate([i[movielens.USER_COLUMN] for i in test_shards])
test_items = np.concatenate([i[movielens.ITEM_COLUMN] for i in test_shards])
assert test_users.shape == test_items.shape
assert test_items.shape[0] % (rconst.NUM_EVAL_NEGATIVES + 1) == 0
test_labels = np.zeros(shape=test_users.shape) with popen_helper.get_pool(multiprocessing.cpu_count()) as pool:
test_labels[0::(rconst.NUM_EVAL_NEGATIVES + 1)] = 1 pool.map(_train_eval_map_fn, map_args) # pylint: disable=no-member
eval_data = ({
movielens.USER_COLUMN: test_users,
movielens.ITEM_COLUMN: test_items,
}, test_labels)
tf.logging.info("Writing test data to file.")
tf.gfile.MakeDirs(cache_paths.eval_data_subdir)
with tf.gfile.Open(cache_paths.eval_raw_file, "wb") as f:
pickle.dump(eval_data, f, protocol=pickle.HIGHEST_PROTOCOL)
def construct_cache(dataset, data_dir, num_data_readers, match_mlperf, def construct_cache(dataset, data_dir, num_data_readers, match_mlperf,
deterministic, cache_id=None): deterministic, cache_id=None):
# type: (str, str, int, bool, typing.Optional[int]) -> NCFDataset # type: (str, str, int, bool, bool, typing.Optional[int]) -> NCFDataset
"""Load and digest data CSV into a usable form. """Load and digest data CSV into a usable form.
Args: Args:
...@@ -419,18 +380,21 @@ def _shutdown(proc): ...@@ -419,18 +380,21 @@ def _shutdown(proc):
"""Convenience function to cleanly shut down async generation process.""" """Convenience function to cleanly shut down async generation process."""
tf.logging.info("Shutting down train data creation subprocess.") tf.logging.info("Shutting down train data creation subprocess.")
try:
try: try:
proc.send_signal(signal.SIGINT) proc.send_signal(signal.SIGINT)
time.sleep(1) time.sleep(5)
if proc.returncode is not None: if proc.returncode is not None:
return # SIGINT was handled successfully within 1 sec return # SIGINT was handled successfully within 5 seconds
except socket.error: except socket.error:
pass pass
# Otherwise another second of grace period and then forcibly kill the process. # Otherwise another second of grace period and then force kill the process.
time.sleep(1) time.sleep(1)
proc.terminate() proc.terminate()
except: # pylint: disable=broad-except
tf.logging.error("Data generation subprocess could not be killed.")
def instantiate_pipeline(dataset, data_dir, batch_size, eval_batch_size, def instantiate_pipeline(dataset, data_dir, batch_size, eval_batch_size,
...@@ -456,18 +420,17 @@ def instantiate_pipeline(dataset, data_dir, batch_size, eval_batch_size, ...@@ -456,18 +420,17 @@ def instantiate_pipeline(dataset, data_dir, batch_size, eval_batch_size,
"num_neg": num_neg, "num_neg": num_neg,
"num_train_positives": ncf_dataset.num_train_positives, "num_train_positives": ncf_dataset.num_train_positives,
"num_items": ncf_dataset.num_items, "num_items": ncf_dataset.num_items,
"num_users": ncf_dataset.num_users,
"num_readers": ncf_dataset.num_data_readers, "num_readers": ncf_dataset.num_data_readers,
"epochs_per_cycle": epochs_per_cycle, "epochs_per_cycle": epochs_per_cycle,
"train_batch_size": batch_size, "train_batch_size": batch_size,
"eval_batch_size": eval_batch_size, "eval_batch_size": eval_batch_size,
"num_workers": num_workers, "num_workers": num_workers,
# This allows the training input function to guarantee batch size and
# significantly improves performance. (~5% increase in examples/sec on
# GPU, and needed for TPU XLA.)
"spillover": True,
"redirect_logs": use_subprocess, "redirect_logs": use_subprocess,
"use_tf_logging": not use_subprocess, "use_tf_logging": not use_subprocess,
"ml_perf": match_mlperf,
} }
if ncf_dataset.deterministic: if ncf_dataset.deterministic:
flags_["seed"] = stat_utils.random_int32() flags_["seed"] = stat_utils.random_int32()
tf.gfile.MakeDirs(data_dir) tf.gfile.MakeDirs(data_dir)
...@@ -608,12 +571,12 @@ def hash_pipeline(dataset, deterministic): ...@@ -608,12 +571,12 @@ def hash_pipeline(dataset, deterministic):
tf.logging.info(" [pipeline_hash] All batches hash: {}".format(overall_hash)) tf.logging.info(" [pipeline_hash] All batches hash: {}".format(overall_hash))
def make_train_input_fn(ncf_dataset): def make_input_fn(ncf_dataset, is_training):
# type: (typing.Optional[NCFDataset]) -> (typing.Callable, str, int) # type: (typing.Optional[NCFDataset], bool) -> (typing.Callable, str, int)
"""Construct training input_fn for the current epoch.""" """Construct training input_fn for the current epoch."""
if ncf_dataset is None: if ncf_dataset is None:
return make_train_synthetic_input_fn() return make_synthetic_input_fn(is_training)
if not tf.gfile.Exists(ncf_dataset.cache_paths.subproc_alive): if not tf.gfile.Exists(ncf_dataset.cache_paths.subproc_alive):
# The generation subprocess must have been alive at some point, because we # The generation subprocess must have been alive at some point, because we
...@@ -621,6 +584,7 @@ def make_train_input_fn(ncf_dataset): ...@@ -621,6 +584,7 @@ def make_train_input_fn(ncf_dataset):
raise ValueError("Generation subprocess unexpectedly died. Data will not " raise ValueError("Generation subprocess unexpectedly died. Data will not "
"be available; exiting to avoid waiting forever.") "be available; exiting to avoid waiting forever.")
if is_training:
train_epoch_dir = ncf_dataset.cache_paths.train_epoch_dir train_epoch_dir = ncf_dataset.cache_paths.train_epoch_dir
while not tf.gfile.Exists(train_epoch_dir): while not tf.gfile.Exists(train_epoch_dir):
tf.logging.info("Waiting for {} to exist.".format(train_epoch_dir)) tf.logging.info("Waiting for {} to exist.".format(train_epoch_dir))
...@@ -634,6 +598,10 @@ def make_train_input_fn(ncf_dataset): ...@@ -634,6 +598,10 @@ def make_train_input_fn(ncf_dataset):
train_data_dirs.sort() # names are zfilled so that train_data_dirs.sort() # names are zfilled so that
# lexicographic sort == numeric sort # lexicographic sort == numeric sort
record_dir = os.path.join(train_epoch_dir, train_data_dirs[0]) record_dir = os.path.join(train_epoch_dir, train_data_dirs[0])
template = rconst.TRAIN_RECORD_TEMPLATE
else:
record_dir = ncf_dataset.cache_paths.eval_data_subdir
template = rconst.EVAL_RECORD_TEMPLATE
ready_file = os.path.join(record_dir, rconst.READY_FILE) ready_file = os.path.join(record_dir, rconst.READY_FILE)
while not tf.gfile.Exists(ready_file): while not tf.gfile.Exists(ready_file):
...@@ -643,16 +611,18 @@ def make_train_input_fn(ncf_dataset): ...@@ -643,16 +611,18 @@ def make_train_input_fn(ncf_dataset):
with tf.gfile.Open(ready_file, "r") as f: with tf.gfile.Open(ready_file, "r") as f:
epoch_metadata = json.load(f) epoch_metadata = json.load(f)
# The data pipeline uses spillover to guarantee static batch sizes. This # This value is used to check that the batch count from the subprocess matches
# means that an extra batch will need to be run every few epochs. TPUs # the batch count expected by the main thread.
# require that the number of batches to be run is known at the time that
# estimator.train() is called, so having the generation pipeline report
# number of batches guarantees that this count is correct.
batch_count = epoch_metadata["batch_count"] batch_count = epoch_metadata["batch_count"]
def input_fn(params): def input_fn(params):
"""Generated input_fn for the given epoch.""" """Generated input_fn for the given epoch."""
if is_training:
batch_size = params["batch_size"] batch_size = params["batch_size"]
else:
# Estimator has "eval_batch_size" included in the params, but TPUEstimator
# populates "batch_size" to the appropriate value.
batch_size = params.get("eval_batch_size") or params["batch_size"]
if epoch_metadata["batch_size"] != batch_size: if epoch_metadata["batch_size"] != batch_size:
raise ValueError( raise ValueError(
...@@ -662,8 +632,7 @@ def make_train_input_fn(ncf_dataset): ...@@ -662,8 +632,7 @@ def make_train_input_fn(ncf_dataset):
.format(epoch_metadata["batch_size"], batch_size)) .format(epoch_metadata["batch_size"], batch_size))
record_files = tf.data.Dataset.list_files( record_files = tf.data.Dataset.list_files(
os.path.join(record_dir, rconst.TRAIN_RECORD_TEMPLATE.format("*")), os.path.join(record_dir, template.format("*")), shuffle=False)
shuffle=False)
interleave = tf.contrib.data.parallel_interleave( interleave = tf.contrib.data.parallel_interleave(
tf.data.TFRecordDataset, tf.data.TFRecordDataset,
...@@ -673,7 +642,7 @@ def make_train_input_fn(ncf_dataset): ...@@ -673,7 +642,7 @@ def make_train_input_fn(ncf_dataset):
prefetch_input_elements=4, prefetch_input_elements=4,
) )
deserialize = make_deserialize(params, batch_size, True) deserialize = make_deserialize(params, batch_size, is_training)
dataset = record_files.apply(interleave) dataset = record_files.apply(interleave)
dataset = dataset.map(deserialize, num_parallel_calls=4) dataset = dataset.map(deserialize, num_parallel_calls=4)
dataset = dataset.prefetch(32) dataset = dataset.prefetch(32)
...@@ -686,11 +655,12 @@ def make_train_input_fn(ncf_dataset): ...@@ -686,11 +655,12 @@ def make_train_input_fn(ncf_dataset):
return input_fn, record_dir, batch_count return input_fn, record_dir, batch_count
def make_train_synthetic_input_fn(): def make_synthetic_input_fn(is_training):
"""Construct training input_fn that uses synthetic data.""" """Construct training input_fn that uses synthetic data."""
def input_fn(params): def input_fn(params):
"""Generated input_fn for the given epoch.""" """Generated input_fn for the given epoch."""
batch_size = params["batch_size"] batch_size = (params["batch_size"] if is_training else
params["eval_batch_size"] or params["batch_size"])
num_users = params["num_users"] num_users = params["num_users"]
num_items = params["num_items"] num_items = params["num_items"]
...@@ -698,78 +668,26 @@ def make_train_synthetic_input_fn(): ...@@ -698,78 +668,26 @@ def make_train_synthetic_input_fn():
maxval=num_users) maxval=num_users)
items = tf.random_uniform([batch_size], dtype=tf.int32, minval=0, items = tf.random_uniform([batch_size], dtype=tf.int32, minval=0,
maxval=num_items) maxval=num_items)
if is_training:
labels = tf.random_uniform([batch_size], dtype=tf.int32, minval=0, labels = tf.random_uniform([batch_size], dtype=tf.int32, minval=0,
maxval=2) maxval=2)
data = { data = {
movielens.USER_COLUMN: users, movielens.USER_COLUMN: users,
movielens.ITEM_COLUMN: items, movielens.ITEM_COLUMN: items,
}, labels }, labels
dataset = tf.data.Dataset.from_tensors(data).repeat( else:
_SYNTHETIC_BATCHES_PER_EPOCH)
dataset = dataset.prefetch(32)
return dataset
return input_fn, None, _SYNTHETIC_BATCHES_PER_EPOCH
def make_pred_input_fn(ncf_dataset):
# type: (typing.Optional[NCFDataset]) -> typing.Callable
"""Construct input_fn for metric evaluation."""
if ncf_dataset is None:
return make_synthetic_pred_input_fn()
def input_fn(params):
"""Input function based on eval batch size."""
# Estimator has "eval_batch_size" included in the params, but TPUEstimator
# populates "batch_size" to the appropriate value.
batch_size = params.get("eval_batch_size") or params["batch_size"]
record_file = ncf_dataset.cache_paths.eval_record_template.format(
batch_size)
while not tf.gfile.Exists(record_file):
tf.logging.info(
"Waiting for eval data to be written to {}".format(record_file))
time.sleep(1)
dataset = tf.data.TFRecordDataset(record_file)
deserialize = make_deserialize(params, batch_size, False)
dataset = dataset.map(deserialize, num_parallel_calls=4)
dataset = dataset.prefetch(16)
if params.get("hash_pipeline"):
hash_pipeline(dataset, ncf_dataset.deterministic)
return dataset
return input_fn
def make_synthetic_pred_input_fn():
"""Construct input_fn for metric evaluation that uses synthetic data."""
def input_fn(params):
"""Generated input_fn for the given epoch."""
batch_size = params["eval_batch_size"]
num_users = params["num_users"]
num_items = params["num_items"]
users = tf.random_uniform([batch_size], dtype=tf.int32, minval=0,
maxval=num_users)
items = tf.random_uniform([batch_size], dtype=tf.int32, minval=0,
maxval=num_items)
dupe_mask = tf.cast(tf.random_uniform([batch_size], dtype=tf.int32, dupe_mask = tf.cast(tf.random_uniform([batch_size], dtype=tf.int32,
minval=0, maxval=2), tf.bool) minval=0, maxval=2), tf.bool)
data = { data = {
movielens.USER_COLUMN: users, movielens.USER_COLUMN: users,
movielens.ITEM_COLUMN: items, movielens.ITEM_COLUMN: items,
rconst.DUPLICATE_MASK: dupe_mask, rconst.DUPLICATE_MASK: dupe_mask,
} }
dataset = tf.data.Dataset.from_tensors(data).repeat( dataset = tf.data.Dataset.from_tensors(data).repeat(
_SYNTHETIC_BATCHES_PER_EPOCH) SYNTHETIC_BATCHES_PER_EPOCH)
dataset = dataset.prefetch(16) dataset = dataset.prefetch(32)
return dataset return dataset
return input_fn return input_fn, None, SYNTHETIC_BATCHES_PER_EPOCH
...@@ -28,7 +28,9 @@ import tensorflow as tf ...@@ -28,7 +28,9 @@ import tensorflow as tf
from official.datasets import movielens from official.datasets import movielens
from official.recommendation import constants as rconst from official.recommendation import constants as rconst
from official.recommendation import data_async_generation
from official.recommendation import data_preprocessing from official.recommendation import data_preprocessing
from official.recommendation import stat_utils
DATASET = "ml-test" DATASET = "ml-test"
...@@ -121,7 +123,7 @@ class BaseTest(tf.test.TestCase): ...@@ -121,7 +123,7 @@ class BaseTest(tf.test.TestCase):
g = tf.Graph() g = tf.Graph()
with g.as_default(): with g.as_default():
input_fn, record_dir, batch_count = \ input_fn, record_dir, batch_count = \
data_preprocessing.make_train_input_fn(ncf_dataset) data_preprocessing.make_input_fn(ncf_dataset, True)
dataset = input_fn({"batch_size": BATCH_SIZE, "use_tpu": False}) dataset = input_fn({"batch_size": BATCH_SIZE, "use_tpu": False})
first_epoch = self.drain_dataset(dataset=dataset, g=g) first_epoch = self.drain_dataset(dataset=dataset, g=g)
user_inv_map = {v: k for k, v in ncf_dataset.user_map.items()} user_inv_map = {v: k for k, v in ncf_dataset.user_map.items()}
...@@ -134,6 +136,7 @@ class BaseTest(tf.test.TestCase): ...@@ -134,6 +136,7 @@ class BaseTest(tf.test.TestCase):
for features, labels in first_epoch: for features, labels in first_epoch:
for u, i, l in zip(features[movielens.USER_COLUMN], for u, i, l in zip(features[movielens.USER_COLUMN],
features[movielens.ITEM_COLUMN], labels): features[movielens.ITEM_COLUMN], labels):
u_raw = user_inv_map[u] u_raw = user_inv_map[u]
i_raw = item_inv_map[i] i_raw = item_inv_map[i]
if ((u_raw, i_raw) in self.seen_pairs) != l: if ((u_raw, i_raw) in self.seen_pairs) != l:
...@@ -145,9 +148,7 @@ class BaseTest(tf.test.TestCase): ...@@ -145,9 +148,7 @@ class BaseTest(tf.test.TestCase):
train_examples[l].add((u_raw, i_raw)) train_examples[l].add((u_raw, i_raw))
num_positives_seen = len(train_examples[True]) num_positives_seen = len(train_examples[True])
# The numbers don't match exactly because the last batch spills over into assert ncf_dataset.num_train_positives == num_positives_seen
# the next epoch
assert ncf_dataset.num_train_positives - num_positives_seen < BATCH_SIZE
# This check is more heuristic because negatives are sampled with # This check is more heuristic because negatives are sampled with
# replacement. It only checks that negative generation is reasonably random. # replacement. It only checks that negative generation is reasonably random.
...@@ -162,20 +163,42 @@ class BaseTest(tf.test.TestCase): ...@@ -162,20 +163,42 @@ class BaseTest(tf.test.TestCase):
movielens.TIMESTAMP_COLUMN: times}) movielens.TIMESTAMP_COLUMN: times})
cache_paths = rconst.Paths(data_dir=self.temp_data_dir) cache_paths = rconst.Paths(data_dir=self.temp_data_dir)
np.random.seed(1) np.random.seed(1)
data_preprocessing.generate_train_eval_data(df, approx_num_shards=2,
num_items=10, num_shards = 2
cache_paths=cache_paths, num_items = 10
match_mlperf=True) data_preprocessing.generate_train_eval_data(
with tf.gfile.Open(cache_paths.eval_raw_file, "rb") as f: df, approx_num_shards=num_shards, num_items=num_items,
eval_data = pickle.load(f) cache_paths=cache_paths, match_mlperf=True)
raw_shards = tf.gfile.ListDirectory(cache_paths.train_shard_subdir)
assert len(raw_shards) == num_shards
sharded_eval_data = []
for i in range(2):
sharded_eval_data.append(data_async_generation._process_shard(
(os.path.join(cache_paths.train_shard_subdir, raw_shards[i]),
num_items, rconst.NUM_EVAL_NEGATIVES, stat_utils.random_int32(),
False, True)))
if sharded_eval_data[0][0][0] == 1:
# Order is not assured for this part of the pipeline.
sharded_eval_data.reverse()
eval_data = [np.concatenate([shard[i] for shard in sharded_eval_data])
for i in range(3)]
eval_data = {
movielens.USER_COLUMN: eval_data[0],
movielens.ITEM_COLUMN: eval_data[1],
}
eval_items_per_user = rconst.NUM_EVAL_NEGATIVES + 1 eval_items_per_user = rconst.NUM_EVAL_NEGATIVES + 1
self.assertAllClose(eval_data[0][movielens.USER_COLUMN], self.assertAllClose(eval_data[movielens.USER_COLUMN],
[0] * eval_items_per_user + [1] * eval_items_per_user) [0] * eval_items_per_user + [1] * eval_items_per_user)
# Each shard process should generate different random items. # Each shard process should generate different random items.
self.assertNotAllClose( self.assertNotAllClose(
eval_data[0][movielens.ITEM_COLUMN][:eval_items_per_user], eval_data[movielens.ITEM_COLUMN][:eval_items_per_user],
eval_data[0][movielens.ITEM_COLUMN][eval_items_per_user:]) eval_data[movielens.ITEM_COLUMN][eval_items_per_user:])
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -142,7 +142,8 @@ def run_ncf(_): ...@@ -142,7 +142,8 @@ def run_ncf(_):
cleanup_fn = lambda: None cleanup_fn = lambda: None
num_users, num_items = data_preprocessing.DATASET_TO_NUM_USERS_AND_ITEMS[ num_users, num_items = data_preprocessing.DATASET_TO_NUM_USERS_AND_ITEMS[
FLAGS.dataset] FLAGS.dataset]
approx_train_steps = None num_train_steps = data_preprocessing.SYNTHETIC_BATCHES_PER_EPOCH
num_eval_steps = data_preprocessing.SYNTHETIC_BATCHES_PER_EPOCH
else: else:
ncf_dataset, cleanup_fn = data_preprocessing.instantiate_pipeline( ncf_dataset, cleanup_fn = data_preprocessing.instantiate_pipeline(
dataset=FLAGS.dataset, data_dir=FLAGS.data_dir, dataset=FLAGS.dataset, data_dir=FLAGS.data_dir,
...@@ -156,8 +157,11 @@ def run_ncf(_): ...@@ -156,8 +157,11 @@ def run_ncf(_):
cache_id=FLAGS.cache_id) cache_id=FLAGS.cache_id)
num_users = ncf_dataset.num_users num_users = ncf_dataset.num_users
num_items = ncf_dataset.num_items num_items = ncf_dataset.num_items
approx_train_steps = int(ncf_dataset.num_train_positives num_train_steps = int(np.ceil(
* (1 + FLAGS.num_neg) // FLAGS.batch_size) FLAGS.epochs_between_evals * ncf_dataset.num_train_positives *
(1 + FLAGS.num_neg) / FLAGS.batch_size))
num_eval_steps = int(np.ceil((1 + rconst.NUM_EVAL_NEGATIVES) *
ncf_dataset.num_users / eval_batch_size))
model_helpers.apply_clean(flags.FLAGS) model_helpers.apply_clean(flags.FLAGS)
...@@ -206,8 +210,8 @@ def run_ncf(_): ...@@ -206,8 +210,8 @@ def run_ncf(_):
run_params=run_params, run_params=run_params,
test_id=FLAGS.benchmark_test_id) test_id=FLAGS.benchmark_test_id)
pred_input_fn = data_preprocessing.make_pred_input_fn(ncf_dataset=ncf_dataset)
pred_input_fn = None
total_training_cycle = FLAGS.train_epochs // FLAGS.epochs_between_evals total_training_cycle = FLAGS.train_epochs // FLAGS.epochs_between_evals
for cycle_index in range(total_training_cycle): for cycle_index in range(total_training_cycle):
tf.logging.info("Starting a training cycle: {}/{}".format( tf.logging.info("Starting a training cycle: {}/{}".format(
...@@ -215,20 +219,31 @@ def run_ncf(_): ...@@ -215,20 +219,31 @@ def run_ncf(_):
# Train the model # Train the model
train_input_fn, train_record_dir, batch_count = \ train_input_fn, train_record_dir, batch_count = \
data_preprocessing.make_train_input_fn(ncf_dataset=ncf_dataset) data_preprocessing.make_input_fn(
ncf_dataset=ncf_dataset, is_training=True)
if approx_train_steps and np.abs(approx_train_steps - batch_count) > 1: if batch_count != num_train_steps:
tf.logging.warning( raise ValueError(
"Estimated ({}) and reported ({}) number of batches differ by more " "Step counts do not match. ({} vs. {}) The async process is "
"than one".format(approx_train_steps, batch_count)) "producing incorrect shards.".format(batch_count, num_train_steps))
train_estimator.train(input_fn=train_input_fn, hooks=train_hooks, train_estimator.train(input_fn=train_input_fn, hooks=train_hooks,
steps=batch_count) steps=num_train_steps)
if train_record_dir: if train_record_dir:
tf.gfile.DeleteRecursively(train_record_dir) tf.gfile.DeleteRecursively(train_record_dir)
tf.logging.info("Beginning evaluation.") tf.logging.info("Beginning evaluation.")
eval_results = eval_estimator.evaluate(pred_input_fn) if pred_input_fn is None:
pred_input_fn, _, eval_batch_count = data_preprocessing.make_input_fn(
ncf_dataset=ncf_dataset, is_training=False)
if eval_batch_count != num_eval_steps:
raise ValueError(
"Step counts do not match. ({} vs. {}) The async process is "
"producing incorrect shards.".format(
eval_batch_count, num_eval_steps))
eval_results = eval_estimator.evaluate(pred_input_fn, steps=num_eval_steps)
tf.logging.info("Evaluation complete.") tf.logging.info("Evaluation complete.")
# Benchmark the evaluation results # Benchmark the evaluation results
......
...@@ -48,7 +48,7 @@ do ...@@ -48,7 +48,7 @@ do
# And to confirm that the pipeline is deterministic pass the flag: # And to confirm that the pipeline is deterministic pass the flag:
# --hash_pipeline # --hash_pipeline
# #
# (`--hash_pipeline` will slow down training) # (`--hash_pipeline` will slow down training, though not as much as one might imagine.)
python ncf_main.py --model_dir ${MODEL_DIR} \ python ncf_main.py --model_dir ${MODEL_DIR} \
--data_dir ${DATA_DIR} \ --data_dir ${DATA_DIR} \
--dataset ${DATASET} --hooks "" \ --dataset ${DATASET} --hooks "" \
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment