Commit 64710c05 authored by Reed's avatar Reed Committed by Taylor Robie
Browse files

Fix convergence issues for MLPerf. (#5161)

* Fix convergence issues for MLPerf.

Thank you to @robieta for helping me find these issues, and for providng an algorithm for the `get_hit_rate_and_ndcg_mlperf` function.

This change causes every forked process to set a new seed, so that forked processes do not generate the same set of random numbers. This improves evaluation hit rates.

Additionally, it adds a flag, --ml_perf, that makes further changes so that the evaluation hit rate can match the MLPerf reference implementation.

I ran 4 times with --ml_perf and 4 times without. Without --ml_perf, the highest hit rates achieved by each run were 0.6278, 0.6287, 0.6289, and 0.6241. With --ml_perf, the highest hit rates were 0.6353, 0.6356, 0.6367, and 0.6353.

* fix lint error

* Fix failing test

* Address @robieta's feedback

* Address more feedback
parent b4cd5f5c
...@@ -22,7 +22,6 @@ import atexit ...@@ -22,7 +22,6 @@ import atexit
import contextlib import contextlib
import datetime import datetime
import gc import gc
import functools
import multiprocessing import multiprocessing
import json import json
import os import os
...@@ -64,8 +63,8 @@ def get_cycle_folder_name(i): ...@@ -64,8 +63,8 @@ def get_cycle_folder_name(i):
return "cycle_{}".format(str(i).zfill(5)) return "cycle_{}".format(str(i).zfill(5))
def _process_shard(shard_path, num_items, num_neg): def _process_shard(args):
# type: (str, int, int) -> (np.ndarray, np.ndarray, np.ndarray) # type: ((str, int, int, int)) -> (np.ndarray, np.ndarray, np.ndarray)
"""Read a shard of training data and return training vectors. """Read a shard of training data and return training vectors.
Args: Args:
...@@ -74,6 +73,8 @@ def _process_shard(shard_path, num_items, num_neg): ...@@ -74,6 +73,8 @@ def _process_shard(shard_path, num_items, num_neg):
num_neg: The number of negatives to generate per positive example. num_neg: The number of negatives to generate per positive example.
seed: Random seed to be used when generating negatives. seed: Random seed to be used when generating negatives.
""" """
shard_path, num_items, num_neg, seed = args
np.random.seed(seed)
# The choice to store the training shards in files rather than in memory # The choice to store the training shards in files rather than in memory
# is motivated by the fact that multiprocessing serializes arguments, # is motivated by the fact that multiprocessing serializes arguments,
...@@ -194,13 +195,16 @@ def _construct_training_records( ...@@ -194,13 +195,16 @@ def _construct_training_records(
num_carryover = carryover[0].shape[0] num_carryover = carryover[0].shape[0]
num_pts = num_carryover + num_train_positives * (1 + num_neg) num_pts = num_carryover + num_train_positives * (1 + num_neg)
map_args = [i for i in training_shards * epochs_per_cycle] # We choose a different random seed for each process, so that the processes
map_fn = functools.partial(_process_shard, num_neg=num_neg, # will not all choose the same random numbers.
num_items=num_items) process_seeds = [np.random.randint(2**32)
for _ in training_shards * epochs_per_cycle]
map_args = [(shard, num_items, num_neg, process_seeds[i])
for i, shard in enumerate(training_shards * epochs_per_cycle)]
with contextlib.closing(multiprocessing.Pool( with contextlib.closing(multiprocessing.Pool(
processes=num_workers, initializer=init_worker)) as pool: processes=num_workers, initializer=init_worker)) as pool:
data_generator = pool.imap_unordered(map_fn, map_args) # pylint: disable=no-member data_generator = pool.imap_unordered(_process_shard, map_args) # pylint: disable=no-member
data = [ data = [
np.zeros(shape=(num_pts,), dtype=np.int32) - 1, np.zeros(shape=(num_pts,), dtype=np.int32) - 1,
np.zeros(shape=(num_pts,), dtype=np.uint16), np.zeros(shape=(num_pts,), dtype=np.uint16),
......
...@@ -72,8 +72,8 @@ class NCFDataset(object): ...@@ -72,8 +72,8 @@ class NCFDataset(object):
self.num_train_positives = num_train_positives self.num_train_positives = num_train_positives
def _filter_index_sort(raw_rating_path): def _filter_index_sort(raw_rating_path, match_mlperf):
# type: (str) -> (pd.DataFrame, dict, dict) # type: (str, bool) -> (pd.DataFrame, dict, dict)
"""Read in data CSV, and output structured data. """Read in data CSV, and output structured data.
This function reads in the raw CSV of positive items, and performs three This function reads in the raw CSV of positive items, and performs three
...@@ -98,6 +98,8 @@ def _filter_index_sort(raw_rating_path): ...@@ -98,6 +98,8 @@ def _filter_index_sort(raw_rating_path):
Args: Args:
raw_rating_path: The path to the CSV which contains the raw dataset. raw_rating_path: The path to the CSV which contains the raw dataset.
match_mlperf: If True, change the sorting algorithm to match the MLPerf
reference implementation.
Returns: Returns:
A filtered, zero-index remapped, sorted dataframe, a dict mapping raw user A filtered, zero-index remapped, sorted dataframe, a dict mapping raw user
...@@ -136,6 +138,16 @@ def _filter_index_sort(raw_rating_path): ...@@ -136,6 +138,16 @@ def _filter_index_sort(raw_rating_path):
# This sort is used to shard the dataframe by user, and later to select # This sort is used to shard the dataframe by user, and later to select
# the last item for a user to be used in validation. # the last item for a user to be used in validation.
tf.logging.info("Sorting by user, timestamp...") tf.logging.info("Sorting by user, timestamp...")
if match_mlperf:
# This sort is equivalent to the non-MLPerf sort, except that the order of
# items with the same user and timestamp are sometimes different. For some
# reason, this sort results in a better hit-rate during evaluation, matching
# the performance of the MLPerf reference implementation.
df.sort_values(by=movielens.TIMESTAMP_COLUMN, inplace=True)
df.sort_values([movielens.USER_COLUMN, movielens.TIMESTAMP_COLUMN],
inplace=True, kind="mergesort")
else:
df.sort_values([movielens.USER_COLUMN, movielens.TIMESTAMP_COLUMN], df.sort_values([movielens.USER_COLUMN, movielens.TIMESTAMP_COLUMN],
inplace=True) inplace=True)
...@@ -169,12 +181,16 @@ def _train_eval_map_fn(args): ...@@ -169,12 +181,16 @@ def _train_eval_map_fn(args):
which validation negatives should be drawn. which validation negatives should be drawn.
cache_paths: rconst.Paths object containing locations for various cache cache_paths: rconst.Paths object containing locations for various cache
files. files.
seed: Random seed to be used when generating testing negatives.
match_mlperf: If True, sample eval negative with replacements, which the
MLPerf reference implementation does.
Returns: Returns:
A dict containing the evaluation data for a given shard. A dict containing the evaluation data for a given shard.
""" """
shard, shard_id, num_items, cache_paths = args shard, shard_id, num_items, cache_paths, seed, match_mlperf = args
np.random.seed(seed)
users = shard[movielens.USER_COLUMN] users = shard[movielens.USER_COLUMN]
items = shard[movielens.ITEM_COLUMN] items = shard[movielens.ITEM_COLUMN]
...@@ -203,7 +219,7 @@ def _train_eval_map_fn(args): ...@@ -203,7 +219,7 @@ def _train_eval_map_fn(args):
test_negatives = stat_utils.sample_with_exclusion( test_negatives = stat_utils.sample_with_exclusion(
num_items=num_items, positive_set=set(block_items), num_items=num_items, positive_set=set(block_items),
n=rconst.NUM_EVAL_NEGATIVES, replacement=False) n=rconst.NUM_EVAL_NEGATIVES, replacement=match_mlperf)
test_blocks.append(( test_blocks.append((
block_user[0] * np.ones((rconst.NUM_EVAL_NEGATIVES + 1,), block_user[0] * np.ones((rconst.NUM_EVAL_NEGATIVES + 1,),
dtype=np.int32), dtype=np.int32),
...@@ -234,8 +250,9 @@ def _train_eval_map_fn(args): ...@@ -234,8 +250,9 @@ def _train_eval_map_fn(args):
} }
def generate_train_eval_data(df, approx_num_shards, num_items, cache_paths): def generate_train_eval_data(df, approx_num_shards, num_items, cache_paths,
# type: (pd.DataFrame, int, int, rconst.Paths) -> None match_mlperf):
# type: (pd.DataFrame, int, int, rconst.Paths, bool) -> None
"""Construct training and evaluation datasets. """Construct training and evaluation datasets.
This function manages dataset construction and validation that the This function manages dataset construction and validation that the
...@@ -257,6 +274,8 @@ def generate_train_eval_data(df, approx_num_shards, num_items, cache_paths): ...@@ -257,6 +274,8 @@ def generate_train_eval_data(df, approx_num_shards, num_items, cache_paths):
num_items: The cardinality of the item set. num_items: The cardinality of the item set.
cache_paths: rconst.Paths object containing locations for various cache cache_paths: rconst.Paths object containing locations for various cache
files. files.
match_mlperf: If True, sample eval negative with replacements, which the
MLPerf reference implementation does.
""" """
num_rows = len(df) num_rows = len(df)
...@@ -290,9 +309,13 @@ def generate_train_eval_data(df, approx_num_shards, num_items, cache_paths): ...@@ -290,9 +309,13 @@ def generate_train_eval_data(df, approx_num_shards, num_items, cache_paths):
tf.logging.info("Splitting train and test data and generating {} test " tf.logging.info("Splitting train and test data and generating {} test "
"negatives per user...".format(rconst.NUM_EVAL_NEGATIVES)) "negatives per user...".format(rconst.NUM_EVAL_NEGATIVES))
tf.gfile.MakeDirs(cache_paths.train_shard_subdir) tf.gfile.MakeDirs(cache_paths.train_shard_subdir)
map_args = [(shards[i], i, num_items, cache_paths)
for i in range(approx_num_shards)]
# We choose a different random seed for each process, so that the processes
# will not all choose the same random numbers.
process_seeds = [np.random.randint(2**32) for _ in range(approx_num_shards)]
map_args = [(shards[i], i, num_items, cache_paths, process_seeds[i],
match_mlperf)
for i in range(approx_num_shards)]
with contextlib.closing( with contextlib.closing(
multiprocessing.Pool(multiprocessing.cpu_count())) as pool: multiprocessing.Pool(multiprocessing.cpu_count())) as pool:
test_shards = pool.map(_train_eval_map_fn, map_args) # pylint: disable=no-member test_shards = pool.map(_train_eval_map_fn, map_args) # pylint: disable=no-member
...@@ -317,8 +340,8 @@ def generate_train_eval_data(df, approx_num_shards, num_items, cache_paths): ...@@ -317,8 +340,8 @@ def generate_train_eval_data(df, approx_num_shards, num_items, cache_paths):
pickle.dump(eval_data, f, protocol=pickle.HIGHEST_PROTOCOL) pickle.dump(eval_data, f, protocol=pickle.HIGHEST_PROTOCOL)
def construct_cache(dataset, data_dir, num_data_readers): def construct_cache(dataset, data_dir, num_data_readers, match_mlperf):
# type: (str, str, int, int, bool) -> NCFDataset # type: (str, str, int, bool) -> NCFDataset
"""Load and digest data CSV into a usable form. """Load and digest data CSV into a usable form.
Args: Args:
...@@ -326,6 +349,8 @@ def construct_cache(dataset, data_dir, num_data_readers): ...@@ -326,6 +349,8 @@ def construct_cache(dataset, data_dir, num_data_readers):
data_dir: The root directory of the dataset. data_dir: The root directory of the dataset.
num_data_readers: The number of parallel processes which will request num_data_readers: The number of parallel processes which will request
data during training. data during training.
match_mlperf: If True, change the behavior of the cache construction to
match the MLPerf reference implementation.
""" """
cache_paths = rconst.Paths(data_dir=data_dir) cache_paths = rconst.Paths(data_dir=data_dir)
num_data_readers = (num_data_readers or int(multiprocessing.cpu_count() / 2) num_data_readers = (num_data_readers or int(multiprocessing.cpu_count() / 2)
...@@ -342,10 +367,11 @@ def construct_cache(dataset, data_dir, num_data_readers): ...@@ -342,10 +367,11 @@ def construct_cache(dataset, data_dir, num_data_readers):
tf.gfile.MakeDirs(cache_paths.cache_root) tf.gfile.MakeDirs(cache_paths.cache_root)
raw_rating_path = os.path.join(data_dir, dataset, movielens.RATINGS_FILE) raw_rating_path = os.path.join(data_dir, dataset, movielens.RATINGS_FILE)
df, user_map, item_map = _filter_index_sort(raw_rating_path) df, user_map, item_map = _filter_index_sort(raw_rating_path, match_mlperf)
generate_train_eval_data(df=df, approx_num_shards=approx_num_shards, generate_train_eval_data(df=df, approx_num_shards=approx_num_shards,
num_items=len(item_map), cache_paths=cache_paths) num_items=len(item_map), cache_paths=cache_paths,
match_mlperf=match_mlperf)
del approx_num_shards # value may have changed. del approx_num_shards # value may have changed.
ncf_dataset = NCFDataset(user_map=user_map, item_map=item_map, ncf_dataset = NCFDataset(user_map=user_map, item_map=item_map,
...@@ -376,12 +402,14 @@ def _shutdown(proc): ...@@ -376,12 +402,14 @@ def _shutdown(proc):
def instantiate_pipeline(dataset, data_dir, batch_size, eval_batch_size, def instantiate_pipeline(dataset, data_dir, batch_size, eval_batch_size,
num_data_readers=None, num_neg=4, epochs_per_cycle=1): num_data_readers=None, num_neg=4, epochs_per_cycle=1,
match_mlperf=False):
"""Preprocess data and start negative generation subprocess.""" """Preprocess data and start negative generation subprocess."""
tf.logging.info("Beginning data preprocessing.") tf.logging.info("Beginning data preprocessing.")
ncf_dataset = construct_cache(dataset=dataset, data_dir=data_dir, ncf_dataset = construct_cache(dataset=dataset, data_dir=data_dir,
num_data_readers=num_data_readers) num_data_readers=num_data_readers,
match_mlperf=match_mlperf)
tf.logging.info("Creating training file subprocess.") tf.logging.info("Creating training file subprocess.")
......
...@@ -19,9 +19,11 @@ from __future__ import division ...@@ -19,9 +19,11 @@ from __future__ import division
from __future__ import print_function from __future__ import print_function
import os import os
import pickle
import time import time
import numpy as np import numpy as np
import pandas as pd
import tensorflow as tf import tensorflow as tf
from official.datasets import movielens from official.datasets import movielens
...@@ -82,7 +84,15 @@ class BaseTest(tf.test.TestCase): ...@@ -82,7 +84,15 @@ class BaseTest(tf.test.TestCase):
# For the most part the necessary checks are performed within # For the most part the necessary checks are performed within
# construct_cache() # construct_cache()
ncf_dataset = data_preprocessing.construct_cache( ncf_dataset = data_preprocessing.construct_cache(
dataset=DATASET, data_dir=self.temp_data_dir, num_data_readers=2) dataset=DATASET, data_dir=self.temp_data_dir, num_data_readers=2,
match_mlperf=False)
assert ncf_dataset.num_users == NUM_USERS
assert ncf_dataset.num_items == NUM_ITEMS
time.sleep(1) # Ensure we create the next cache in a new directory.
ncf_dataset = data_preprocessing.construct_cache(
dataset=DATASET, data_dir=self.temp_data_dir, num_data_readers=2,
match_mlperf=True)
assert ncf_dataset.num_users == NUM_USERS assert ncf_dataset.num_users == NUM_USERS
assert ncf_dataset.num_items == NUM_ITEMS assert ncf_dataset.num_items == NUM_ITEMS
...@@ -145,6 +155,30 @@ class BaseTest(tf.test.TestCase): ...@@ -145,6 +155,30 @@ class BaseTest(tf.test.TestCase):
# replacement. It only checks that negative generation is reasonably random. # replacement. It only checks that negative generation is reasonably random.
assert len(train_examples[False]) / NUM_NEG / num_positives_seen > 0.9 assert len(train_examples[False]) / NUM_NEG / num_positives_seen > 0.9
def test_shard_randomness(self):
users = [0, 0, 0, 0, 1, 1, 1, 1]
items = [0, 2, 4, 6, 0, 2, 4, 6]
times = [1, 2, 3, 4, 1, 2, 3, 4]
df = pd.DataFrame({movielens.USER_COLUMN: users,
movielens.ITEM_COLUMN: items,
movielens.TIMESTAMP_COLUMN: times})
cache_paths = rconst.Paths(data_dir=self.temp_data_dir)
np.random.seed(1)
data_preprocessing.generate_train_eval_data(df, approx_num_shards=2,
num_items=10,
cache_paths=cache_paths,
match_mlperf=True)
with tf.gfile.Open(cache_paths.eval_raw_file, "rb") as f:
eval_data = pickle.load(f)
eval_items_per_user = rconst.NUM_EVAL_NEGATIVES + 1
self.assertAllClose(eval_data[0][movielens.USER_COLUMN],
[0] * eval_items_per_user + [1] * eval_items_per_user)
# Each shard process should generate different random items.
self.assertNotAllClose(
eval_data[0][movielens.ITEM_COLUMN][:eval_items_per_user],
eval_data[0][movielens.ITEM_COLUMN][eval_items_per_user:])
if __name__ == "__main__": if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO) tf.logging.set_verbosity(tf.logging.INFO)
......
...@@ -54,6 +54,107 @@ _HR_KEY = "HR" ...@@ -54,6 +54,107 @@ _HR_KEY = "HR"
_NDCG_KEY = "NDCG" _NDCG_KEY = "NDCG"
def get_hit_rate_and_ndcg(predicted_scores_by_user, items_by_user, top_k=_TOP_K,
match_mlperf=False):
"""Returns the hit rate and the normalized DCG for evaluation.
`predicted_scores_by_user` and `items_by_user` are parallel NumPy arrays with
shape (num_users, num_items) such that `predicted_scores_by_user[i, j]` is the
predicted score that user `i` would rate item `items_by_user[i][j]`.
`items_by_user[i, 0]` is the item that user `i` interacted with, while
`items_by_user[i, 1:] are items that user `i` did not interact with. The goal
of the NCF model to give a high score for `predicted_scores_by_user[i, 0]`
compared to `predicted_scores_by_user[i, 1:]`, and the returned HR and NDCG
will be higher the more successful the model is at this goal.
If `match_mlperf` is True, then the HR and NDCG computations are done in a
slightly unusual way to match the MLPerf reference implementation.
Specifically, if `items_by_user[i, :]` contains duplicate items, it will be
treated as if the item only appeared once. Effectively, for duplicate items in
a row, the predicted score for all but one of the items will be set to
-infinity
For example, suppose we have that following inputs:
predicted_scores_by_user: [[ 2, 3, 3],
[ 5, 4, 4]]
items_by_user: [[10, 20, 20],
[30, 40, 40]]
top_k: 2
Then with match_mlperf=True, the HR would be 2/2 = 1.0. With
match_mlperf=False, the HR would be 1/2 = 0.5. This is because each user has
predicted scores for only 2 unique items: 10 and 20 for the first user, and 30
and 40 for the second. Therefore, with match_mlperf=True, it's guarenteed the
first item's score is in the top 2. With match_mlperf=False, this function
would compute the first user's first item is not in the top 2, because item 20
has a higher score, and item 20 occurs twice.
Args:
predicted_scores_by_user: 2D Numpy array of the predicted scores.
`predicted_scores_by_user[i, j]` is the predicted score that user `i`
would rate item `items_by_user[i][j]`.
items_by_user: 2d numpy array of the item IDs. For user `i`,
`items_by_user[i][0]` is the itme that user `i` interacted with, while
`predicted_scores_by_user[i, 1:] are items that user `i` did not interact
with.
top_k: Only consider the highest rated `top_k` items per user. The HR and
NDCG for that user will only be nonzero if the predicted score for that
user's first item is in the `top_k` top scores.
match_mlperf: If True, compute HR and NDCG slightly differently to match the
MLPerf reference implementation.
Returns:
(hr, ndcg) tuple of floats, averaged across all users.
"""
num_users = predicted_scores_by_user.shape[0]
zero_indices = np.zeros((num_users, 1), dtype=np.int32)
if match_mlperf:
predicted_scores_by_user = predicted_scores_by_user.copy()
items_by_user = items_by_user.copy()
# For each user, sort the items and predictions by increasing item number.
# We use mergesort since it's the only stable sort, which we need to be
# equivalent to the MLPerf reference implementation.
sorted_items_indices = items_by_user.argsort(kind="mergesort")
sorted_items = items_by_user[
np.arange(num_users)[:, np.newaxis], sorted_items_indices]
sorted_predictions = predicted_scores_by_user[
np.arange(num_users)[:, np.newaxis], sorted_items_indices]
# For items that occur more than once in a user's row, set the predicted
# score of the subsequent occurrences to -infinity, which effectively
# removes them from the array.
diffs = sorted_items[:, :-1] - sorted_items[:, 1:]
diffs = np.concatenate(
[np.ones((diffs.shape[0], 1), dtype=diffs.dtype), diffs], axis=1)
predicted_scores_by_user = np.where(diffs, sorted_predictions, -np.inf)
# After this block, `zero_indices` will be a (num_users, 1) shaped array
# indicating, for each user, the index of item of value 0 in
# `sorted_items_indices`. This item is the one we want to check if it is in
# the top_k items.
zero_indices = np.array(np.where(sorted_items_indices == 0))
assert np.array_equal(zero_indices[0, :], np.arange(num_users))
zero_indices = zero_indices[1, :, np.newaxis]
# NumPy has an np.argparition() method, however log(1000) is so small that
# sorting the whole array is simpler and fast enough.
top_indicies = np.argsort(predicted_scores_by_user, axis=1)[:, -top_k:]
top_indicies = np.flip(top_indicies, axis=1)
# Both HR and NDCG vectorized computation takes advantage of the fact that if
# the positive example for a user is not in the top k, that index does not
# appear. That is to say: hit_ind.shape[0] <= num_users
hit_ind = np.argwhere(np.equal(top_indicies, zero_indices))
hr = hit_ind.shape[0] / num_users
ndcg = np.sum(np.log(2) / np.log(hit_ind[:, 1] + 2)) / num_users
return hr, ndcg
def evaluate_model(estimator, ncf_dataset, pred_input_fn): def evaluate_model(estimator, ncf_dataset, pred_input_fn):
# type: (tf.estimator.Estimator, prepare.NCFDataset, typing.Callable) -> dict # type: (tf.estimator.Estimator, prepare.NCFDataset, typing.Callable) -> dict
"""Model evaluation with HR and NDCG metrics. """Model evaluation with HR and NDCG metrics.
...@@ -95,28 +196,25 @@ def evaluate_model(estimator, ncf_dataset, pred_input_fn): ...@@ -95,28 +196,25 @@ def evaluate_model(estimator, ncf_dataset, pred_input_fn):
# Get predictions # Get predictions
predictions = estimator.predict(input_fn=pred_input_fn, predictions = estimator.predict(input_fn=pred_input_fn,
yield_single_examples=False) yield_single_examples=False)
predictions = list(predictions)
prediction_batches = [p[movielens.RATING_COLUMN] for p in predictions] prediction_batches = [p[movielens.RATING_COLUMN] for p in predictions]
item_batches = [p[movielens.ITEM_COLUMN] for p in predictions]
# Reshape the predicted scores and each user takes one row # Reshape the predicted scores and items. Each user takes one row.
prediction_with_padding = np.concatenate(prediction_batches, axis=0) prediction_with_padding = np.concatenate(prediction_batches, axis=0)
predicted_scores_by_user = prediction_with_padding[ predicted_scores_by_user = prediction_with_padding[
:ncf_dataset.num_users * (1 + rconst.NUM_EVAL_NEGATIVES)]\ :ncf_dataset.num_users * (1 + rconst.NUM_EVAL_NEGATIVES)]\
.reshape(ncf_dataset.num_users, -1) .reshape(ncf_dataset.num_users, -1)
item_with_padding = np.concatenate(item_batches, axis=0)
items_by_user = item_with_padding[
:ncf_dataset.num_users * (1 + rconst.NUM_EVAL_NEGATIVES)]\
.reshape(ncf_dataset.num_users, -1)
tf.logging.info("Computing metrics...") tf.logging.info("Computing metrics...")
# NumPy has an np.argparition() method, however log(1000) is so small that hr, ndcg = get_hit_rate_and_ndcg(predicted_scores_by_user, items_by_user,
# sorting the whole array is simpler and fast enough. match_mlperf=FLAGS.ml_perf)
top_indicies = np.argsort(predicted_scores_by_user, axis=1)[:, -_TOP_K:]
top_indicies = np.flip(top_indicies, axis=1)
# Both HR and NDCG vectorized computation takes advantage of the fact that if
# the positive example for a user is not in the top k, that index does not
# appear. That is to say: hit_ind.shape[0] <= num_users
hit_ind = np.argwhere(np.equal(top_indicies, 0))
hr = hit_ind.shape[0] / ncf_dataset.num_users
ndcg = np.sum(np.log(2) / np.log(hit_ind[:, 1] + 2)) / ncf_dataset.num_users
global_step = estimator.get_variable_value(tf.GraphKeys.GLOBAL_STEP) global_step = estimator.get_variable_value(tf.GraphKeys.GLOBAL_STEP)
eval_results = { eval_results = {
...@@ -208,7 +306,8 @@ def run_ncf(_): ...@@ -208,7 +306,8 @@ def run_ncf(_):
batch_size=batch_size, batch_size=batch_size,
eval_batch_size=eval_batch_size, eval_batch_size=eval_batch_size,
num_neg=FLAGS.num_neg, num_neg=FLAGS.num_neg,
epochs_per_cycle=FLAGS.epochs_between_evals) epochs_per_cycle=FLAGS.epochs_between_evals,
match_mlperf=FLAGS.ml_perf)
model_helpers.apply_clean(flags.FLAGS) model_helpers.apply_clean(flags.FLAGS)
...@@ -382,6 +481,21 @@ def define_ncf_flags(): ...@@ -382,6 +481,21 @@ def define_ncf_flags():
"For dataset ml-20m, the threshold can be set as 0.95 which is " "For dataset ml-20m, the threshold can be set as 0.95 which is "
"achieved by MLPerf implementation.")) "achieved by MLPerf implementation."))
flags.DEFINE_bool(
name="ml_perf", default=None,
help=flags_core.help_wrap(
"If set, changes the behavior of the model slightly to match the "
"MLPerf reference implementations here: \n"
"https://github.com/mlperf/reference/tree/master/recommendation/"
"pytorch\n"
"The two changes are:\n"
"1. When computing the HR and NDCG during evaluation, remove "
"duplicate user-item pairs before the computation. This results in "
"better HRs and NDCGs.\n"
"2. Use a different soring algorithm when sorting the input data, "
"which performs better due to the fact the sorting algorithms are "
"not stable."))
if __name__ == "__main__": if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO) tf.logging.set_verbosity(tf.logging.INFO)
......
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests NCF."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import math
import numpy as np
import tensorflow as tf
from official.recommendation import ncf_main
class NcfTest(tf.test.TestCase):
def test_hit_rate_and_ndcg(self):
# Test with no duplicate items
predictions = np.array([
[1., 2., 0.], # In top 2
[2., 1., 0.], # In top 1
[0., 2., 1.], # In top 3
[2., 3., 4.] # In top 3
])
items = np.array([
[1, 2, 3],
[2, 3, 1],
[3, 2, 1],
[2, 1, 3],
])
hr, ndcg = ncf_main.get_hit_rate_and_ndcg(predictions, items, 1)
self.assertAlmostEqual(hr, 1 / 4)
self.assertAlmostEqual(ndcg, 1 / 4)
hr, ndcg = ncf_main.get_hit_rate_and_ndcg(predictions, items, 2)
self.assertAlmostEqual(hr, 2 / 4)
self.assertAlmostEqual(ndcg, (1 + math.log(2) / math.log(3)) / 4)
hr, ndcg = ncf_main.get_hit_rate_and_ndcg(predictions, items, 3)
self.assertAlmostEqual(hr, 4 / 4)
self.assertAlmostEqual(ndcg, (1 + math.log(2) / math.log(3) +
2 * math.log(2) / math.log(4)) / 4)
hr, ndcg = ncf_main.get_hit_rate_and_ndcg(predictions, items, 1,
match_mlperf=True)
self.assertAlmostEqual(hr, 1 / 4)
self.assertAlmostEqual(ndcg, 1 / 4)
hr, ndcg = ncf_main.get_hit_rate_and_ndcg(predictions, items, 2,
match_mlperf=True)
self.assertAlmostEqual(hr, 2 / 4)
self.assertAlmostEqual(ndcg, (1 + math.log(2) / math.log(3)) / 4)
hr, ndcg = ncf_main.get_hit_rate_and_ndcg(predictions, items, 3,
match_mlperf=True)
self.assertAlmostEqual(hr, 4 / 4)
self.assertAlmostEqual(ndcg, (1 + math.log(2) / math.log(3) +
2 * math.log(2) / math.log(4)) / 4)
# Test with duplicate items. In the MLPerf case, we treat the duplicates as
# a single item. Otherwise, we treat the duplicates as separate items.
predictions = np.array([
[1., 2., 2., 3.], # In top 4. MLPerf: In top 3
[3., 1., 0., 2.], # In top 1. MLPerf: In top 1
[0., 2., 3., 2.], # In top 4. MLPerf: In top 3
[3., 2., 4., 2.] # In top 2. MLPerf: In top 2
])
items = np.array([
[1, 2, 2, 3],
[1, 2, 3, 4],
[1, 2, 3, 2],
[4, 3, 2, 1],
])
hr, ndcg = ncf_main.get_hit_rate_and_ndcg(predictions, items, 1)
self.assertAlmostEqual(hr, 1 / 4)
self.assertAlmostEqual(ndcg, 1 / 4)
hr, ndcg = ncf_main.get_hit_rate_and_ndcg(predictions, items, 2)
self.assertAlmostEqual(hr, 2 / 4)
self.assertAlmostEqual(ndcg, (1 + math.log(2) / math.log(3)) / 4)
hr, ndcg = ncf_main.get_hit_rate_and_ndcg(predictions, items, 3)
self.assertAlmostEqual(hr, 2 / 4)
self.assertAlmostEqual(ndcg, (1 + math.log(2) / math.log(3)) / 4)
hr, ndcg = ncf_main.get_hit_rate_and_ndcg(predictions, items, 4)
self.assertAlmostEqual(hr, 4 / 4)
self.assertAlmostEqual(ndcg, (1 + math.log(2) / math.log(3) +
2 * math.log(2) / math.log(5)) / 4)
hr, ndcg = ncf_main.get_hit_rate_and_ndcg(predictions, items, 1,
match_mlperf=True)
self.assertAlmostEqual(hr, 1 / 4)
self.assertAlmostEqual(ndcg, 1 / 4)
hr, ndcg = ncf_main.get_hit_rate_and_ndcg(predictions, items, 2,
match_mlperf=True)
self.assertAlmostEqual(hr, 2 / 4)
self.assertAlmostEqual(ndcg, (1 + math.log(2) / math.log(3)) / 4)
hr, ndcg = ncf_main.get_hit_rate_and_ndcg(predictions, items, 3,
match_mlperf=True)
self.assertAlmostEqual(hr, 4 / 4)
self.assertAlmostEqual(ndcg, (1 + math.log(2) / math.log(3) +
2 * math.log(2) / math.log(4)) / 4)
hr, ndcg = ncf_main.get_hit_rate_and_ndcg(predictions, items, 4,
match_mlperf=True)
self.assertAlmostEqual(hr, 4 / 4)
self.assertAlmostEqual(ndcg, (1 + math.log(2) / math.log(3) +
2 * math.log(2) / math.log(4)) / 4)
# Test with duplicate items, where the predictions for the same item can
# differ. In the MLPerf case, we should take the first prediction.
predictions = np.array([
[3., 2., 4., 4.], # In top 3. MLPerf: In top 2
[3., 4., 2., 4.], # In top 3. MLPerf: In top 3
[2., 3., 4., 1.], # In top 3. MLPerf: In top 2
[4., 3., 5., 2.] # In top 2. MLPerf: In top 1
])
items = np.array([
[1, 2, 2, 3],
[4, 3, 3, 2],
[2, 1, 1, 1],
[4, 2, 2, 1],
])
hr, ndcg = ncf_main.get_hit_rate_and_ndcg(predictions, items, 1)
self.assertAlmostEqual(hr, 0 / 4)
self.assertAlmostEqual(ndcg, 0 / 4)
hr, ndcg = ncf_main.get_hit_rate_and_ndcg(predictions, items, 2)
self.assertAlmostEqual(hr, 1 / 4)
self.assertAlmostEqual(ndcg, (math.log(2) / math.log(3)) / 4)
hr, ndcg = ncf_main.get_hit_rate_and_ndcg(predictions, items, 3)
self.assertAlmostEqual(hr, 4 / 4)
self.assertAlmostEqual(ndcg, (math.log(2) / math.log(3) +
3 * math.log(2) / math.log(4)) / 4)
hr, ndcg = ncf_main.get_hit_rate_and_ndcg(predictions, items, 4)
self.assertAlmostEqual(hr, 4 / 4)
self.assertAlmostEqual(ndcg, (math.log(2) / math.log(3) +
3 * math.log(2) / math.log(4)) / 4)
hr, ndcg = ncf_main.get_hit_rate_and_ndcg(predictions, items, 1,
match_mlperf=True)
self.assertAlmostEqual(hr, 1 / 4)
self.assertAlmostEqual(ndcg, 1 / 4)
hr, ndcg = ncf_main.get_hit_rate_and_ndcg(predictions, items, 2,
match_mlperf=True)
self.assertAlmostEqual(hr, 3 / 4)
self.assertAlmostEqual(ndcg, (1 + 2 * math.log(2) / math.log(3)) / 4)
hr, ndcg = ncf_main.get_hit_rate_and_ndcg(predictions, items, 3,
match_mlperf=True)
self.assertAlmostEqual(hr, 4 / 4)
self.assertAlmostEqual(ndcg, (1 + 2 * math.log(2) / math.log(3) +
math.log(2) / math.log(4)) / 4)
hr, ndcg = ncf_main.get_hit_rate_and_ndcg(predictions, items, 4,
match_mlperf=True)
self.assertAlmostEqual(hr, 4 / 4)
self.assertAlmostEqual(ndcg, (1 + 2 * math.log(2) / math.log(3) +
math.log(2) / math.log(4)) / 4)
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
tf.test.main()
...@@ -51,6 +51,7 @@ def neumf_model_fn(features, labels, mode, params): ...@@ -51,6 +51,7 @@ def neumf_model_fn(features, labels, mode, params):
if mode == tf.estimator.ModeKeys.PREDICT: if mode == tf.estimator.ModeKeys.PREDICT:
predictions = { predictions = {
movielens.ITEM_COLUMN: items,
movielens.RATING_COLUMN: logits, movielens.RATING_COLUMN: logits,
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment