Unverified Commit 023fc2b2 authored by Yanhui Liang's avatar Yanhui Liang Committed by GitHub
Browse files

Add unit test, official flags, and benchmark logs for recommendation model (#4343)

* Add unit test, official flags, and benchmark logs

* Fix checking errors

* Reorder imports to fix lints

* Address comments and correct model layers

* Add dataset checking
parent 3a624c29
...@@ -18,9 +18,6 @@ TRAIN_RATINGS_FILENAME = 'train-ratings.csv' ...@@ -18,9 +18,6 @@ TRAIN_RATINGS_FILENAME = 'train-ratings.csv'
TEST_RATINGS_FILENAME = 'test-ratings.csv' TEST_RATINGS_FILENAME = 'test-ratings.csv'
TEST_NEG_FILENAME = 'test-negative.csv' TEST_NEG_FILENAME = 'test-negative.csv'
TRAIN_DATA = 'train_data.csv'
TEST_DATA = 'test_data.csv'
USER = "user_id" USER = "user_id"
ITEM = "item_id" ITEM = "item_id"
RATING = "rating" RATING = "rating"
...@@ -21,19 +21,23 @@ from __future__ import absolute_import ...@@ -21,19 +21,23 @@ from __future__ import absolute_import
from __future__ import division from __future__ import division
from __future__ import print_function from __future__ import print_function
import argparse
import collections import collections
import os import os
import sys import sys
import time import time
import zipfile import zipfile
# pylint: disable=g-bad-import-order
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from six.moves import urllib # pylint: disable=redefined-builtin from six.moves import urllib # pylint: disable=redefined-builtin
from absl import app as absl_app
from absl import flags
import tensorflow as tf import tensorflow as tf
# pylint: enable=g-bad-import-order
from official.recommendation import constants # pylint: disable=g-bad-import-order from official.recommendation import constants
from official.utils.flags import core as flags_core
# URL to download dataset # URL to download dataset
_DATA_URL = "http://files.grouplens.org/datasets/movielens/" _DATA_URL = "http://files.grouplens.org/datasets/movielens/"
...@@ -306,6 +310,10 @@ def main(_): ...@@ -306,6 +310,10 @@ def main(_):
make_dir(FLAGS.data_dir) make_dir(FLAGS.data_dir)
assert FLAGS.dataset, (
"Please specify which dataset to download. "
"Two datasets are available: ml-1m and ml-20m.")
# Download the zip dataset # Download the zip dataset
dataset_zip = FLAGS.dataset + ".zip" dataset_zip = FLAGS.dataset + ".zip"
file_path = os.path.join(FLAGS.data_dir, dataset_zip) file_path = os.path.join(FLAGS.data_dir, dataset_zip)
...@@ -335,14 +343,23 @@ def main(_): ...@@ -335,14 +343,23 @@ def main(_):
parse_file_to_csv(FLAGS.data_dir, FLAGS.dataset) parse_file_to_csv(FLAGS.data_dir, FLAGS.dataset)
def define_data_download_flags():
"""Add flags specifying data download arguments."""
flags.DEFINE_string(
name="data_dir", default="/tmp/movielens-data/",
help=flags_core.help_wrap(
"Directory to download and extract data."))
flags.DEFINE_enum(
name="dataset", default=None,
enum_values=["ml-1m", "ml-20m"], case_sensitive=False,
help=flags_core.help_wrap(
"Dataset to be trained and evaluated. Two datasets are available "
": ml-1m and ml-20m."))
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser() tf.logging.set_verbosity(tf.logging.INFO)
parser.add_argument( define_data_download_flags()
"--data_dir", type=str, default="/tmp/movielens-data/", FLAGS = flags.FLAGS
help="Directory to download data and extract the zip.") absl_app.run(main)
parser.add_argument(
"--dataset", type=str, default="ml-1m", choices=["ml-1m", "ml-20m"],
help="Dataset to be trained and evaluated.")
FLAGS, unparsed = parser.parse_known_args()
tf.app.run(argv=[sys.argv[0]] + unparsed)
...@@ -17,18 +17,12 @@ ...@@ -17,18 +17,12 @@
Load the training dataset and evaluation dataset from csv file into memory. Load the training dataset and evaluation dataset from csv file into memory.
Prepare input for model training and evaluation. Prepare input for model training and evaluation.
""" """
import time
import numpy as np import numpy as np
from six.moves import xrange # pylint: disable=redefined-builtin from six.moves import xrange # pylint: disable=redefined-builtin
import tensorflow as tf import tensorflow as tf
from official.recommendation import constants # pylint: disable=g-bad-import-order from official.recommendation import constants # pylint: disable=g-bad-import-order
# The column names and types of csv file
_CSV_COLUMN_NAMES = [constants.USER, constants.ITEM, constants.RATING]
_CSV_TYPES = [[0], [0], [0]]
# The buffer size for shuffling train dataset. # The buffer size for shuffling train dataset.
_SHUFFLE_BUFFER_SIZE = 1024 _SHUFFLE_BUFFER_SIZE = 1024
...@@ -37,7 +31,7 @@ class NCFDataSet(object): ...@@ -37,7 +31,7 @@ class NCFDataSet(object):
"""A class containing data information for model training and evaluation.""" """A class containing data information for model training and evaluation."""
def __init__(self, train_data, num_users, num_items, num_negatives, def __init__(self, train_data, num_users, num_items, num_negatives,
true_items, all_items): true_items, all_items, all_eval_data):
"""Initialize NCFDataset class. """Initialize NCFDataset class.
Args: Args:
...@@ -50,6 +44,7 @@ class NCFDataSet(object): ...@@ -50,6 +44,7 @@ class NCFDataSet(object):
evaluation. Each entry is a latest positive instance for one user. evaluation. Each entry is a latest positive instance for one user.
all_items: A nested list, all items for evaluation, and each entry is the all_items: A nested list, all items for evaluation, and each entry is the
evaluation items for one user. evaluation items for one user.
all_eval_data: A numpy array of eval/test dataset.
""" """
self.train_data = train_data self.train_data = train_data
self.num_users = num_users self.num_users = num_users
...@@ -57,10 +52,11 @@ class NCFDataSet(object): ...@@ -57,10 +52,11 @@ class NCFDataSet(object):
self.num_negatives = num_negatives self.num_negatives = num_negatives
self.eval_true_items = true_items self.eval_true_items = true_items
self.eval_all_items = all_items self.eval_all_items = all_items
self.all_eval_data = all_eval_data
def load_data(file_name): def load_data(file_name):
"""Load data from a csv file which splits on \t.""" """Load data from a csv file which splits on tab key."""
lines = tf.gfile.Open(file_name, "r").readlines() lines = tf.gfile.Open(file_name, "r").readlines()
# Process the file line by line # Process the file line by line
...@@ -122,13 +118,11 @@ def data_preprocessing(train_fname, test_fname, test_neg_fname, num_negatives): ...@@ -122,13 +118,11 @@ def data_preprocessing(train_fname, test_fname, test_neg_fname, num_negatives):
all_items.append(items) # All items (including positive and negative items) all_items.append(items) # All items (including positive and negative items)
all_test_data.extend(users_items) # Generate test dataset all_test_data.extend(users_items) # Generate test dataset
# Save test dataset into csv file
np.savetxt(constants.TEST_DATA, np.asarray(all_test_data).astype(int),
fmt="%i", delimiter=",")
# Create NCFDataset object # Create NCFDataset object
ncf_dataset = NCFDataSet( ncf_dataset = NCFDataSet(
train_data, num_users, num_items, num_negatives, true_items, all_items) train_data, num_users, num_items, num_negatives, true_items, all_items,
np.asarray(all_test_data)
)
return ncf_dataset return ncf_dataset
...@@ -144,6 +138,9 @@ def generate_train_dataset(train_data, num_items, num_negatives): ...@@ -144,6 +138,9 @@ def generate_train_dataset(train_data, num_items, num_negatives):
num_items: An integer, the number of items in positive training instances. num_items: An integer, the number of items in positive training instances.
num_negatives: An integer, the number of negative training instances num_negatives: An integer, the number of negative training instances
following positive training instances. It is 4 by default. following positive training instances. It is 4 by default.
Returns:
A numpy array of training dataset.
""" """
all_train_data = [] all_train_data = []
# A set with user-item tuples # A set with user-item tuples
...@@ -158,13 +155,10 @@ def generate_train_dataset(train_data, num_items, num_negatives): ...@@ -158,13 +155,10 @@ def generate_train_dataset(train_data, num_items, num_negatives):
j = np.random.randint(num_items) j = np.random.randint(num_items)
all_train_data.append([u, j, 0]) all_train_data.append([u, j, 0])
# Save the train dataset into a csv file return np.asarray(all_train_data)
np.savetxt(constants.TRAIN_DATA, np.asarray(all_train_data).astype(int),
fmt="%i", delimiter=",")
def input_fn(training, batch_size, repeat=1, ncf_dataset=None, def input_fn(training, batch_size, ncf_dataset, repeat=1):
num_parallel_calls=1):
"""Input function for model training and evaluation. """Input function for model training and evaluation.
The train input consists of 1 positive instance (user and item have The train input consists of 1 positive instance (user and item have
...@@ -176,55 +170,39 @@ def input_fn(training, batch_size, repeat=1, ncf_dataset=None, ...@@ -176,55 +170,39 @@ def input_fn(training, batch_size, repeat=1, ncf_dataset=None,
Args: Args:
training: A boolean flag for training mode. training: A boolean flag for training mode.
batch_size: An integer, batch size for training and evaluation. batch_size: An integer, batch size for training and evaluation.
ncf_dataset: An NCFDataSet object, which contains the information about
training and test data.
repeat: An integer, how many times to repeat the dataset. repeat: An integer, how many times to repeat the dataset.
ncf_dataset: An NCFDataSet object, which contains the information to
generate negative training instances.
num_parallel_calls: An integer, number of cpu cores for parallel input
processing.
Returns: Returns:
dataset: A tf.data.Dataset object containing examples loaded from the files. dataset: A tf.data.Dataset object containing examples loaded from the files.
""" """
# Default test file name
file_name = constants.TEST_DATA
# Generate random negative instances for training in each epoch # Generate random negative instances for training in each epoch
if training: if training:
t1 = time.time() train_data = generate_train_dataset(
generate_train_dataset(
ncf_dataset.train_data, ncf_dataset.num_items, ncf_dataset.train_data, ncf_dataset.num_items,
ncf_dataset.num_negatives) ncf_dataset.num_negatives)
file_name = constants.TRAIN_DATA # Get train features and labels
tf.logging.info( train_features = [
"Generating training instances: {:.1f}s".format(time.time() - t1)) (constants.USER, np.expand_dims(train_data[:, 0], axis=1)),
(constants.ITEM, np.expand_dims(train_data[:, 1], axis=1))
# Create a dataset containing the text lines. ]
dataset = tf.data.TextLineDataset(file_name) train_labels = [
(constants.RATING, np.expand_dims(train_data[:, 2], axis=1))]
# Test dataset only has two fields while train dataset has three
num_cols = len(_CSV_COLUMN_NAMES) - 1 dataset = tf.data.Dataset.from_tensor_slices(
# Shuffle the dataset for training (dict(train_features), dict(train_labels))
if training: )
dataset = dataset.shuffle(buffer_size=_SHUFFLE_BUFFER_SIZE) dataset = dataset.shuffle(buffer_size=_SHUFFLE_BUFFER_SIZE)
num_cols += 1 else:
# Create eval/test dataset
def _parse_csv(line): test_user = ncf_dataset.all_eval_data[:, 0]
"""Parse each line of the csv file.""" test_item = ncf_dataset.all_eval_data[:, 1]
# Decode the line into its fields test_features = [
fields = tf.decode_csv(line, record_defaults=_CSV_TYPES[0:num_cols]) (constants.USER, np.expand_dims(test_user, axis=1)),
fields = [tf.expand_dims(field, axis=0) for field in fields] (constants.ITEM, np.expand_dims(test_item, axis=1))]
# Pack the result into a dictionary dataset = tf.data.Dataset.from_tensor_slices(dict(test_features))
features = dict(zip(_CSV_COLUMN_NAMES[0:num_cols], fields))
# Separate the labels from the features for training
if training:
labels = features.pop(constants.RATING)
return features, labels
# Return features only for test/prediction
return features
# Parse each line into a dictionary
dataset = dataset.map(_parse_csv, num_parallel_calls=num_parallel_calls)
# Repeat and batch the dataset # Repeat and batch the dataset
dataset = dataset.repeat(repeat) dataset = dataset.repeat(repeat)
......
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Unit tests for dataset.py."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import numpy as np
import tensorflow as tf # pylint: disable=g-bad-import-order
from official.recommendation import dataset
_TRAIN_FNAME = os.path.join(
os.path.dirname(__file__), "unittest_data/test_train_ratings.csv")
_TEST_FNAME = os.path.join(
os.path.dirname(__file__), "unittest_data/test_eval_ratings.csv")
_TEST_NEG_FNAME = os.path.join(
os.path.dirname(__file__), "unittest_data/test_eval_negative.csv")
_NUM_NEG = 4
class DatasetTest(tf.test.TestCase):
def test_load_data(self):
data = dataset.load_data(_TEST_FNAME)
self.assertEqual(len(data), 2)
self.assertEqual(data[0][0], 0)
self.assertEqual(data[0][2], 1)
self.assertEqual(data[-1][0], 1)
self.assertEqual(data[-1][2], 1)
def test_data_preprocessing(self):
ncf_dataset = dataset.data_preprocessing(
_TRAIN_FNAME, _TEST_FNAME, _TEST_NEG_FNAME, _NUM_NEG)
# Check train data preprocessing
self.assertAllEqual(np.array(ncf_dataset.train_data)[:, 2],
np.full(len(ncf_dataset.train_data), 1))
self.assertEqual(ncf_dataset.num_users, 2)
self.assertEqual(ncf_dataset.num_items, 175)
# Check test dataset
test_dataset = ncf_dataset.all_eval_data
first_true_item = test_dataset[100]
self.assertEqual(first_true_item[1], ncf_dataset.eval_true_items[0])
self.assertEqual(first_true_item[1], ncf_dataset.eval_all_items[0][-1])
last_gt_item = test_dataset[-1]
self.assertEqual(last_gt_item[1], ncf_dataset.eval_true_items[-1])
self.assertEqual(last_gt_item[1], ncf_dataset.eval_all_items[-1][-1])
test_list = test_dataset.tolist()
first_test_items = [x[1] for x in test_list if x[0] == 0]
self.assertAllEqual(first_test_items, ncf_dataset.eval_all_items[0])
last_test_items = [x[1] for x in test_list if x[0] == 1]
self.assertAllEqual(last_test_items, ncf_dataset.eval_all_items[-1])
def test_generate_train_dataset(self):
# Check train dataset
ncf_dataset = dataset.data_preprocessing(
_TRAIN_FNAME, _TEST_FNAME, _TEST_NEG_FNAME, _NUM_NEG)
train_dataset = dataset.generate_train_dataset(
ncf_dataset.train_data, ncf_dataset.num_items, _NUM_NEG)
# Each user has 1 positive instance followed by _NUM_NEG negative instances
train_data_0 = train_dataset[0]
self.assertEqual(train_data_0[2], 1)
for i in range(1, _NUM_NEG + 1):
train_data = train_dataset[i]
self.assertEqual(train_data_0[0], train_data[0])
self.assertNotEqual(train_data_0[1], train_data[1])
self.assertEqual(0, train_data[2])
train_data_last = train_dataset[-1 - _NUM_NEG]
self.assertEqual(train_data_last[2], 1)
for i in range(-1, -_NUM_NEG):
train_data = train_dataset[i]
self.assertEqual(train_data_last[0], train_data[0])
self.assertNotEqual(train_data_last[1], train_data[1])
self.assertEqual(0, train_data[2])
if __name__ == "__main__":
tf.test.main()
...@@ -21,28 +21,32 @@ from __future__ import absolute_import ...@@ -21,28 +21,32 @@ from __future__ import absolute_import
from __future__ import division from __future__ import division
from __future__ import print_function from __future__ import print_function
import argparse
import ast
import heapq import heapq
import math import math
import os import os
import sys
import time
# pylint: disable=g-bad-import-order
import numpy as np import numpy as np
from absl import app as absl_app
from absl import flags
import tensorflow as tf import tensorflow as tf
# pylint: enable=g-bad-import-order
# pylint: disable=g-bad-import-order
from official.recommendation import constants from official.recommendation import constants
from official.recommendation import dataset from official.recommendation import dataset
from official.recommendation import neumf_model from official.recommendation import neumf_model
from official.utils.flags import core as flags_core
from official.utils.logs import hooks_helper
from official.utils.logs import logger
from official.utils.misc import model_helpers
_TOP_K = 10 # Top-k list for evaluation _TOP_K = 10 # Top-k list for evaluation
_EVAL_BATCH_SIZE = 100 # keys for evaluation metrics
_HR_KEY = "HR"
_NDCG_KEY = "NDCG"
def evaluate_model(estimator, batch_size, num_gpus, true_items, all_items, def evaluate_model(estimator, batch_size, num_gpus, ncf_dataset):
num_parallel_calls):
"""Model evaluation with HR and NDCG metrics. """Model evaluation with HR and NDCG metrics.
The evaluation protocol is to rank the test interacted item (truth items) The evaluation protocol is to rank the test interacted item (truth items)
...@@ -60,22 +64,28 @@ def evaluate_model(estimator, batch_size, num_gpus, true_items, all_items, ...@@ -60,22 +64,28 @@ def evaluate_model(estimator, batch_size, num_gpus, true_items, all_items,
estimator: The Estimator. estimator: The Estimator.
batch_size: An integer, the batch size specified by user. batch_size: An integer, the batch size specified by user.
num_gpus: An integer, the number of gpus specified by user. num_gpus: An integer, the number of gpus specified by user.
true_items: A list of test items (true items) for HR and NDCG calculation. ncf_dataset: An NCFDataSet object, which contains the information about
Each item is for one user. test/eval dataset, such as:
all_items: A nested list. Each entry is the 101 items (1 ground truth item eval_true_items, which is a list of test items (true items) for HR and
and 100 negative items) for one user. NDCG calculation. Each item is for one user.
num_parallel_calls: An integer, number of cpu cores for parallel input eval_all_items, which is a nested list. Each entry is the 101 items
processing in input_fn. (1 ground truth item and 100 negative items) for one user.
Returns: Returns:
hit: An integer, the average HR scores for all users. eval_results: A dict of evaluation results for benchmark logging.
ndcg: An integer, the average NDCG scores for all users. eval_results = {
_HR_KEY: hr,
_NDCG_KEY: ndcg,
tf.GraphKeys.GLOBAL_STEP: global_step
}
where hr is an integer indicating the average HR scores across all users,
ndcg is an integer representing the average NDCG scores across all users,
and global_step is the global step
""" """
# Define prediction input function # Define prediction input function
def pred_input_fn(): def pred_input_fn():
return dataset.input_fn( return dataset.input_fn(
False, per_device_batch_size(batch_size, num_gpus), False, per_device_batch_size(batch_size, num_gpus), ncf_dataset)
num_parallel_calls=num_parallel_calls)
# Get predictions # Get predictions
predictions = estimator.predict(input_fn=pred_input_fn) predictions = estimator.predict(input_fn=pred_input_fn)
...@@ -92,13 +102,13 @@ def evaluate_model(estimator, batch_size, num_gpus, true_items, all_items, ...@@ -92,13 +102,13 @@ def evaluate_model(estimator, batch_size, num_gpus, true_items, all_items,
return 0 return 0
hits, ndcgs = [], [] hits, ndcgs = [], []
num_users = len(true_items) num_users = len(ncf_dataset.eval_true_items)
# Reshape the predicted scores and each user takes one row # Reshape the predicted scores and each user takes one row
predicted_scores_list = np.asarray( predicted_scores_list = np.asarray(
all_predicted_scores).reshape(num_users, -1) all_predicted_scores).reshape(num_users, -1)
for i in range(num_users): for i in range(num_users):
items = all_items[i] items = ncf_dataset.eval_all_items[i]
predicted_scores = predicted_scores_list[i] predicted_scores = predicted_scores_list[i]
# Map item and score for each user # Map item and score for each user
map_item_score = {} map_item_score = {}
...@@ -108,7 +118,7 @@ def evaluate_model(estimator, batch_size, num_gpus, true_items, all_items, ...@@ -108,7 +118,7 @@ def evaluate_model(estimator, batch_size, num_gpus, true_items, all_items,
# Evaluate top rank list with HR and NDCG # Evaluate top rank list with HR and NDCG
ranklist = heapq.nlargest(_TOP_K, map_item_score, key=map_item_score.get) ranklist = heapq.nlargest(_TOP_K, map_item_score, key=map_item_score.get)
true_item = true_items[i] true_item = ncf_dataset.eval_true_items[i]
hr = _get_hr(ranklist, true_item) hr = _get_hr(ranklist, true_item)
ndcg = _get_ndcg(ranklist, true_item) ndcg = _get_ndcg(ranklist, true_item)
hits.append(hr) hits.append(hr)
...@@ -116,17 +126,13 @@ def evaluate_model(estimator, batch_size, num_gpus, true_items, all_items, ...@@ -116,17 +126,13 @@ def evaluate_model(estimator, batch_size, num_gpus, true_items, all_items,
# Get average HR and NDCG scores # Get average HR and NDCG scores
hr, ndcg = np.array(hits).mean(), np.array(ndcgs).mean() hr, ndcg = np.array(hits).mean(), np.array(ndcgs).mean()
return hr, ndcg global_step = estimator.get_variable_value(tf.GraphKeys.GLOBAL_STEP)
eval_results = {
_HR_KEY: hr,
def get_num_gpus(num_gpus): _NDCG_KEY: ndcg,
"""Treat num_gpus=-1 as "use all".""" tf.GraphKeys.GLOBAL_STEP: global_step
if num_gpus != -1: }
return num_gpus return eval_results
from tensorflow.python.client import device_lib # pylint: disable=g-import-not-at-top
local_device_protos = device_lib.list_local_devices()
return sum([1 for d in local_device_protos if d.device_type == "GPU"])
def convert_keras_to_estimator(keras_model, num_gpus, model_dir): def convert_keras_to_estimator(keras_model, num_gpus, model_dir):
...@@ -139,7 +145,6 @@ def convert_keras_to_estimator(keras_model, num_gpus, model_dir): ...@@ -139,7 +145,6 @@ def convert_keras_to_estimator(keras_model, num_gpus, model_dir):
Returns: Returns:
est_model: The converted Estimator. est_model: The converted Estimator.
""" """
# TODO(b/79866338): update GradientDescentOptimizer with AdamOptimizer # TODO(b/79866338): update GradientDescentOptimizer with AdamOptimizer
optimizer = tf.train.GradientDescentOptimizer( optimizer = tf.train.GradientDescentOptimizer(
...@@ -201,100 +206,154 @@ def main(_): ...@@ -201,100 +206,154 @@ def main(_):
FLAGS.data_dir, FLAGS.dataset + "-" + constants.TEST_RATINGS_FILENAME) FLAGS.data_dir, FLAGS.dataset + "-" + constants.TEST_RATINGS_FILENAME)
neg_fname = os.path.join( neg_fname = os.path.join(
FLAGS.data_dir, FLAGS.dataset + "-" + constants.TEST_NEG_FILENAME) FLAGS.data_dir, FLAGS.dataset + "-" + constants.TEST_NEG_FILENAME)
t1 = time.time()
assert os.path.exists(train_fname), (
"Run data_download.py first to download and extract {} dataset".format(
FLAGS.dataset))
tf.logging.info("Data preprocessing...")
ncf_dataset = dataset.data_preprocessing( ncf_dataset = dataset.data_preprocessing(
train_fname, test_fname, neg_fname, FLAGS.num_neg) train_fname, test_fname, neg_fname, FLAGS.num_neg)
tf.logging.info("Data preprocessing: {:.1f} s".format(time.time() - t1))
# Create NeuMF model and convert it to Estimator # Create NeuMF model and convert it to Estimator
tf.logging.info("Creating Estimator from Keras model...") tf.logging.info("Creating Estimator from Keras model...")
layers = [int(layer) for layer in FLAGS.layers]
mlp_regularization = [float(reg) for reg in FLAGS.mlp_regularization]
keras_model = neumf_model.NeuMF( keras_model = neumf_model.NeuMF(
ncf_dataset.num_users, ncf_dataset.num_items, FLAGS.num_factors, ncf_dataset.num_users, ncf_dataset.num_items, FLAGS.num_factors,
ast.literal_eval(FLAGS.layers), FLAGS.batch_size, FLAGS.mf_regularization) layers, FLAGS.batch_size, FLAGS.mf_regularization,
num_gpus = get_num_gpus(FLAGS.num_gpus) mlp_regularization)
num_gpus = flags_core.get_num_gpus(FLAGS)
estimator = convert_keras_to_estimator(keras_model, num_gpus, FLAGS.model_dir) estimator = convert_keras_to_estimator(keras_model, num_gpus, FLAGS.model_dir)
# Create hooks that log information about the training and metric values
train_hooks = hooks_helper.get_train_hooks(
FLAGS.hooks,
batch_size=FLAGS.batch_size # for ExamplesPerSecondHook
)
run_params = {
"batch_size": FLAGS.batch_size,
"number_factors": FLAGS.num_factors,
"hr_threshold": FLAGS.hr_threshold,
"train_epochs": FLAGS.train_epochs,
}
benchmark_logger = logger.config_benchmark_logger(FLAGS)
benchmark_logger.log_run_info(
model_name="recommendation",
dataset_name=FLAGS.dataset,
run_params=run_params)
# Training and evaluation cycle # Training and evaluation cycle
def train_input_fn(): def train_input_fn():
return dataset.input_fn( return dataset.input_fn(
True, per_device_batch_size(FLAGS.batch_size, num_gpus), True, per_device_batch_size(FLAGS.batch_size, num_gpus),
FLAGS.epochs_between_evals, ncf_dataset, FLAGS.num_parallel_calls) ncf_dataset, FLAGS.epochs_between_evals)
total_training_cycle = FLAGS.train_epochs // FLAGS.epochs_between_evals
total_training_cycle = (FLAGS.train_epochs //
FLAGS.epochs_between_evals)
for cycle_index in range(total_training_cycle): for cycle_index in range(total_training_cycle):
tf.logging.info("Starting a training cycle: {}/{}".format( tf.logging.info("Starting a training cycle: {}/{}".format(
cycle_index, total_training_cycle - 1)) cycle_index + 1, total_training_cycle))
# Train the model # Train the model
train_cycle_begin = time.time() estimator.train(input_fn=train_input_fn, hooks=train_hooks)
estimator.train(input_fn=train_input_fn,
hooks=[tf.train.ProfilerHook(save_steps=10000)])
train_cycle_end = time.time()
# Evaluate the model # Evaluate the model
eval_cycle_begin = time.time() eval_results = evaluate_model(
hr, ndcg = evaluate_model( estimator, FLAGS.batch_size, num_gpus, ncf_dataset)
estimator, FLAGS.batch_size, num_gpus, ncf_dataset.eval_true_items,
ncf_dataset.eval_all_items, FLAGS.num_parallel_calls) # Benchmark the evaluation results
eval_cycle_end = time.time() benchmark_logger.log_evaluation_result(eval_results)
# Log the HR and NDCG results.
# Log the train time, evaluation time, and HR and NDCG results. hr = eval_results[_HR_KEY]
ndcg = eval_results[_NDCG_KEY]
tf.logging.info( tf.logging.info(
"Iteration {} [{:.1f} s]: HR = {:.4f}, NDCG = {:.4f}, [{:.1f} s]" "Iteration {}: HR = {:.4f}, NDCG = {:.4f}".format(
.format(cycle_index, train_cycle_end - train_cycle_begin, hr, ndcg, cycle_index + 1, hr, ndcg))
eval_cycle_end - eval_cycle_begin))
# If some evaluation threshold is met
# Remove temporary files if model_helpers.past_stop_threshold(FLAGS.hr_threshold, hr):
os.remove(constants.TRAIN_DATA) break
os.remove(constants.TEST_DATA)
# Clear the session explicitly to avoid session delete error
tf.keras.backend.clear_session()
def define_ncf_flags():
"""Add flags for running ncf_main."""
# Add common flags
flags_core.define_base(export_dir=False)
flags_core.define_performance(
num_parallel_calls=False,
inter_op=False,
intra_op=False,
synthetic_data=False,
max_train_steps=False,
dtype=False
)
flags_core.define_benchmark()
flags.adopt_module_key_flags(flags_core)
flags_core.set_defaults(
model_dir="/tmp/ncf/",
data_dir="/tmp/movielens-data/",
train_epochs=2,
batch_size=256,
hooks="ProfilerHook")
# Add ncf-specific flags
flags.DEFINE_enum(
name="dataset", default="ml-1m",
enum_values=["ml-1m", "ml-20m"], case_sensitive=False,
help=flags_core.help_wrap(
"Dataset to be trained and evaluated."))
flags.DEFINE_integer(
name="num_factors", default=8,
help=flags_core.help_wrap("The Embedding size of MF model."))
# Set the default as a list of strings to be consistent with input arguments
flags.DEFINE_list(
name="layers", default=["64", "32", "16", "8"],
help=flags_core.help_wrap(
"The sizes of hidden layers for MLP. Example "
"to specify different sizes of MLP layers: --layers=32,16,8,4"))
flags.DEFINE_float(
name="mf_regularization", default=0.,
help=flags_core.help_wrap(
"The regularization factor for MF embeddings. The factor is used by "
"regularizer which allows to apply penalties on layer parameters or "
"layer activity during optimization."))
flags.DEFINE_list(
name="mlp_regularization", default=["0.", "0.", "0.", "0."],
help=flags_core.help_wrap(
"The regularization factor for each MLP layer. See mf_regularization "
"help for more info about regularization factor."))
flags.DEFINE_integer(
name="num_neg", default=4,
help=flags_core.help_wrap(
"The Number of negative instances to pair with a positive instance."))
flags.DEFINE_float(
name="learning_rate", default=0.001,
help=flags_core.help_wrap("The learning rate."))
flags.DEFINE_float(
name="hr_threshold", default=None,
help=flags_core.help_wrap(
"If passed, training will stop when the evaluation metric HR is "
"greater than or equal to hr_threshold. For dataset ml-1m, the "
"desired hr_threshold is 0.68 which is the result from the paper; "
"For dataset ml-20m, the threshold can be set as 0.95 which is "
"achieved by MLPerf implementation."))
if __name__ == "__main__": if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO) tf.logging.set_verbosity(tf.logging.INFO)
parser = argparse.ArgumentParser() define_ncf_flags()
FLAGS = flags.FLAGS
parser.add_argument( absl_app.run(main)
"--model_dir", nargs="?", default="/tmp/ncf/",
help="Model directory.")
parser.add_argument(
"--data_dir", nargs="?", default="/tmp/movielens-data/",
help="Input data directory. Should be the same as downloaded data dir.")
parser.add_argument(
"--dataset", nargs="?", default="ml-1m", choices=["ml-1m", "ml-20m"],
help="Choose a dataset.")
parser.add_argument(
"--train_epochs", type=int, default=20,
help="Number of epochs.")
parser.add_argument(
"--batch_size", type=int, default=256,
help="Batch size.")
parser.add_argument(
"--num_factors", type=int, default=8,
help="Embedding size of MF model.")
parser.add_argument(
"--layers", nargs="?", default="[64,32,16,8]",
help="Size of hidden layers for MLP.")
parser.add_argument(
"--mf_regularization", type=float, default=0,
help="Regularization for MF embeddings.")
parser.add_argument(
"--num_neg", type=int, default=4,
help="Number of negative instances to pair with a positive instance.")
parser.add_argument(
"--num_parallel_calls", type=int, default=8,
help="Number of parallel calls.")
parser.add_argument(
"--epochs_between_evals", type=int, default=1,
help="Number of epochs between model evaluation.")
parser.add_argument(
"--learning_rate", type=float, default=0.001,
help="Learning rate.")
parser.add_argument(
"--num_gpus", type=int, default=1 if tf.test.is_gpu_available() else 0,
help="How many GPUs to use with the DistributionStrategies API. The "
"default is 1 if TensorFlow can detect a GPU, and 0 otherwise.")
FLAGS, unparsed = parser.parse_known_args()
with tf.Graph().as_default():
tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)
...@@ -43,7 +43,7 @@ class NeuMF(tf.keras.models.Model): ...@@ -43,7 +43,7 @@ class NeuMF(tf.keras.models.Model):
"""Neural matrix factorization (NeuMF) model for recommendations.""" """Neural matrix factorization (NeuMF) model for recommendations."""
def __init__(self, num_users, num_items, mf_dim, model_layers, batch_size, def __init__(self, num_users, num_items, mf_dim, model_layers, batch_size,
mf_regularization=0): mf_regularization, mlp_reg_layers):
"""Initialize NeuMF model. """Initialize NeuMF model.
Args: Args:
...@@ -54,8 +54,10 @@ class NeuMF(tf.keras.models.Model): ...@@ -54,8 +54,10 @@ class NeuMF(tf.keras.models.Model):
Note that the first layer is the concatenation of user and item Note that the first layer is the concatenation of user and item
embeddings. So model_layers[0]//2 is the embedding size for MLP. embeddings. So model_layers[0]//2 is the embedding size for MLP.
batch_size: An integer for the batch size. batch_size: An integer for the batch size.
mf_regularization: A floating number, the regularization for MF mf_regularization: A floating number, the regularization factor for MF
embeddings. embeddings.
mlp_reg_layers: A list of floating numbers, the regularization factors for
each layer in MLP.
Raises: Raises:
ValueError: if the first model layer is not even. ValueError: if the first model layer is not even.
...@@ -89,13 +91,13 @@ class NeuMF(tf.keras.models.Model): ...@@ -89,13 +91,13 @@ class NeuMF(tf.keras.models.Model):
num_users, num_users,
model_layers[0]//2, model_layers[0]//2,
embeddings_initializer=embedding_initializer, embeddings_initializer=embedding_initializer,
embeddings_regularizer=tf.keras.regularizers.l2(model_layers[0]), embeddings_regularizer=tf.keras.regularizers.l2(mlp_reg_layers[0]),
input_length=1) input_length=1)
mlp_embedding_item = tf.keras.layers.Embedding( mlp_embedding_item = tf.keras.layers.Embedding(
num_items, num_items,
model_layers[0]//2, model_layers[0]//2,
embeddings_initializer=embedding_initializer, embeddings_initializer=embedding_initializer,
embeddings_regularizer=tf.keras.regularizers.l2(model_layers[0]), embeddings_regularizer=tf.keras.regularizers.l2(mlp_reg_layers[0]),
input_length=1) input_length=1)
# GMF part # GMF part
...@@ -113,9 +115,10 @@ class NeuMF(tf.keras.models.Model): ...@@ -113,9 +115,10 @@ class NeuMF(tf.keras.models.Model):
mlp_vector = tf.keras.layers.concatenate([mlp_user_latent, mlp_item_latent]) mlp_vector = tf.keras.layers.concatenate([mlp_user_latent, mlp_item_latent])
num_layer = len(model_layers) # Number of layers in the MLP num_layer = len(model_layers) # Number of layers in the MLP
for idx in xrange(1, num_layer): for layer in xrange(1, num_layer):
model_layer = tf.keras.layers.Dense( model_layer = tf.keras.layers.Dense(
model_layers[idx], model_layers[layer],
kernel_regularizer=tf.keras.regularizers.l2(mlp_reg_layers[layer]),
activation="relu") activation="relu")
mlp_vector = model_layer(mlp_vector) mlp_vector = model_layer(mlp_vector)
......
2784 2659 1705 3316 887 815 1783 3483 1085 329 1830 1880 2699 3194 3596 2700 3520 2414 757 3610 2651 2187 2274 2949 1753 589 2945 2172 2877 1992 2215 1028 807 2098 1923 3555 2548 2950 151 2060 3301 807 849 711 3271 3010 475 3412 3389 2797 691 596 2643 766 2344 203 2775 2610 3583 2982 1259 2128 854 2228 2228 2008 3177 1977 3674 3612 808 325 2435 440 1693 3166 1518 2643 940 309 1397 2157 2391 3243 2879 482 3206 143 1972 2498 2711 1641 3008 2733 136 2303 376 826 3064 1123
767 2164 1283 1100 3044 1332 2152 1295 1812 3427 3130 2967 2895 3085 501 2005 688 1457 1733 2345 1827 1600 3295 3397 384 2033 1444 2082 944 2563 1762 1101 496 2151 3093 329 2559 1664 2058 2546 683 1082 3583 2199 1851 258 2553 2274 1059 2910 2299 2115 2770 2094 2915 3348 337 2738 1563 2958 3241 3258 2881 3236 2954 214 3243 1000 2187 2946 435 1232 2208 1334 1280 2982 403 1326 1706 1523 1336 2620 2664 2462 1432 765 898 222 3426 2027 3469 2032 2472 1480 3219 735 1562 2626 1400 308
0 21 1
0 41 1
0 8 1
0 23 1
0 43 1
0 10 1
0 45 1
0 12 1
0 47 1
0 14 1
0 33 1
0 0 1
0 35 1
0 2 1
0 37 1
0 4 1
0 39 1
0 6 1
0 24 1
0 26 1
0 28 1
0 30 1
0 49 1
0 16 1
0 51 1
0 18 1
0 20 1
0 40 1
0 22 1
0 42 1
0 9 1
0 44 1
0 11 1
0 46 1
0 13 1
0 32 1
0 15 1
0 34 1
0 1 1
0 36 1
0 3 1
0 38 1
0 5 1
0 7 1
0 27 1
0 29 1
0 48 1
0 31 1
0 50 1
0 17 1
0 52 1
0 19 1
1 146 1
1 61 1
1 127 1
1 88 1
1 144 1
1 125 1
1 94 1
1 150 1
1 18 1
1 115 1
1 92 1
1 170 1
1 139 1
1 148 1
1 55 1
1 113 1
1 82 1
1 168 1
1 137 1
1 53 1
1 119 1
1 80 1
1 174 1
1 143 1
1 42 1
1 20 1
1 117 1
1 86 1
1 172 1
1 141 1
1 106 1
1 75 1
1 84 1
1 162 1
1 131 1
1 104 1
1 73 1
1 160 1
1 129 1
1 110 1
1 79 1
1 166 1
1 135 1
1 108 1
1 77 1
1 164 1
1 133 1
1 98 1
1 67 1
1 155 1
1 96 1
1 65 1
1 153 1
1 102 1
1 71 1
1 159 1
1 58 1
1 100 1
1 69 1
1 157 1
1 56 1
1 122 1
1 91 1
1 147 1
1 62 1
1 120 1
1 89 1
1 145 1
1 60 1
1 126 1
1 95 1
1 151 1
1 124 1
1 93 1
1 171 1
1 149 1
1 48 1
1 114 1
1 83 1
1 169 1
1 138 1
1 54 1
1 112 1
1 81 1
1 136 1
1 52 1
1 118 1
1 87 1
1 173 1
1 142 1
1 107 1
1 116 1
1 85 1
1 163 1
1 140 1
1 47 1
1 105 1
1 74 1
1 161 1
1 130 1
1 111 1
1 72 1
1 167 1
1 128 1
1 109 1
1 78 1
1 165 1
1 134 1
1 99 1
1 76 1
1 132 1
1 0 1
1 97 1
1 154 1
1 103 1
1 64 1
1 152 1
1 59 1
1 101 1
1 70 1
1 158 1
1 57 1
1 123 1
1 68 1
1 156 1
1 63 1
1 121 1
1 90 1
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment