ncf_main.py 10.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""NCF framework to train and evaluate the NeuMF model.

The NeuMF model assembles both MF and MLP models under the NCF framework. Check
`neumf_model.py` for more details about the models.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import argparse
import ast
import heapq
import math
import os
import sys
import time

import numpy as np
import tensorflow as tf

# pylint: disable=g-bad-import-order
from official.recommendation import constants
from official.recommendation import dataset
from official.recommendation import neumf_model

_TOP_K = 10  # Top-k list for evaluation
_EVAL_BATCH_SIZE = 100


def evaluate_model(estimator, batch_size, num_gpus, true_items, all_items,
                   num_parallel_calls):
  """Model evaluation with HR and NDCG metrics.

  The evaluation protocol is to rank the test interacted item (truth items)
  among the randomly chosen 100 items that are not interacted by the user.
  The performance of the ranked list is judged by Hit Ratio (HR) and Normalized
  Discounted Cumulative Gain (NDCG).

  For evaluation, the ranked list is truncated at 10 for both metrics. As such,
  the HR intuitively measures whether the test item is present on the top-10
  list, and the NDCG accounts for the position of the hit by assigning higher
  scores to hits at top ranks. Both metrics are calculated for each test user,
  and the average scores are reported.

  Args:
    estimator: The Estimator.
    batch_size: An integer, the batch size specified by user.
    num_gpus: An integer, the number of gpus specified by user.
    true_items: A list of test items (true items) for HR and NDCG calculation.
      Each item is for one user.
    all_items: A nested list. Each entry is the 101 items (1 ground truth item
      and 100 negative items) for one user.
    num_parallel_calls: An integer, number of cpu cores for parallel input
      processing in input_fn.

  Returns:
    hit: An integer, the average HR scores for all users.
    ndcg: An integer, the average NDCG scores for all users.
  """
  # Define prediction input function
  def pred_input_fn():
    return dataset.input_fn(
        False, per_device_batch_size(batch_size, num_gpus),
        num_parallel_calls=num_parallel_calls)

  # Get predictions
  predictions = estimator.predict(input_fn=pred_input_fn)
  all_predicted_scores = [p[constants.RATING] for p in predictions]

  # Calculate HR score
  def _get_hr(ranklist, true_item):
    return 1 if true_item in ranklist else 0

  # Calculate NDCG score
  def _get_ndcg(ranklist, true_item):
    if true_item in ranklist:
      return math.log(2) / math.log(ranklist.index(true_item) + 2)
    return 0

  hits, ndcgs = [], []
  num_users = len(true_items)
  # Reshape the predicted scores and each user takes one row
  predicted_scores_list = np.asarray(
      all_predicted_scores).reshape(num_users, -1)

  for i in range(num_users):
    items = all_items[i]
    predicted_scores = predicted_scores_list[i]
    # Map item and score for each user
    map_item_score = {}
    for j, item in enumerate(items):
      score = predicted_scores[j]
      map_item_score[item] = score

    # Evaluate top rank list with HR and NDCG
    ranklist = heapq.nlargest(_TOP_K, map_item_score, key=map_item_score.get)
    true_item = true_items[i]
    hr = _get_hr(ranklist, true_item)
    ndcg = _get_ndcg(ranklist, true_item)
    hits.append(hr)
    ndcgs.append(ndcg)

  # Get average HR and NDCG scores
  hr, ndcg = np.array(hits).mean(), np.array(ndcgs).mean()
  return hr, ndcg


def get_num_gpus(num_gpus):
  """Treat num_gpus=-1 as "use all"."""
  if num_gpus != -1:
    return num_gpus

  from tensorflow.python.client import device_lib  # pylint: disable=g-import-not-at-top
  local_device_protos = device_lib.list_local_devices()
  return sum([1 for d in local_device_protos if d.device_type == "GPU"])


def convert_keras_to_estimator(keras_model, num_gpus, model_dir):
  """Configure and convert keras model to Estimator.

  Args:
    keras_model: A Keras model object.
    num_gpus: An integer, the number of gpus.
    model_dir: A string, the directory to save and restore checkpoints.

  Returns:
    est_model: The converted Estimator.

  """
  # TODO(b/79866338): update GradientDescentOptimizer with AdamOptimizer
  optimizer = tf.train.GradientDescentOptimizer(
      learning_rate=FLAGS.learning_rate)
  keras_model.compile(optimizer=optimizer, loss="binary_crossentropy")

  if num_gpus == 0:
    distribution = tf.contrib.distribute.OneDeviceStrategy("device:CPU:0")
  elif num_gpus == 1:
    distribution = tf.contrib.distribute.OneDeviceStrategy("device:GPU:0")
  else:
    distribution = tf.contrib.distribute.MirroredStrategy(num_gpus=num_gpus)

  run_config = tf.estimator.RunConfig(train_distribute=distribution)

  estimator = tf.keras.estimator.model_to_estimator(
      keras_model=keras_model, model_dir=model_dir, config=run_config)

  return estimator


def per_device_batch_size(batch_size, num_gpus):
  """For multi-gpu, batch-size must be a multiple of the number of GPUs.

  Note that this should eventually be handled by DistributionStrategies
  directly. Multi-GPU support is currently experimental, however,
  so doing the work here until that feature is in place.

  Args:
    batch_size: Global batch size to be divided among devices. This should be
      equal to num_gpus times the single-GPU batch_size for multi-gpu training.
    num_gpus: How many GPUs are used with DistributionStrategies.

  Returns:
    Batch size per device.

  Raises:
    ValueError: if batch_size is not divisible by number of devices
  """
  if num_gpus <= 1:
    return batch_size

  remainder = batch_size % num_gpus
  if remainder:
    err = ("When running with multiple GPUs, batch size "
           "must be a multiple of the number of available GPUs. Found {} "
           "GPUs with a batch size of {}; try --batch_size={} instead."
          ).format(num_gpus, batch_size, batch_size - remainder)
    raise ValueError(err)
  return int(batch_size / num_gpus)


def main(_):
  # Data preprocessing
  # The file name of training and test dataset
  train_fname = os.path.join(
      FLAGS.data_dir, FLAGS.dataset + "-" + constants.TRAIN_RATINGS_FILENAME)
  test_fname = os.path.join(
      FLAGS.data_dir, FLAGS.dataset + "-" + constants.TEST_RATINGS_FILENAME)
  neg_fname = os.path.join(
      FLAGS.data_dir, FLAGS.dataset + "-" + constants.TEST_NEG_FILENAME)
  t1 = time.time()
  ncf_dataset = dataset.data_preprocessing(
      train_fname, test_fname, neg_fname, FLAGS.num_neg)
  tf.logging.info("Data preprocessing: {:.1f} s".format(time.time() - t1))

  # Create NeuMF model and convert it to Estimator
  tf.logging.info("Creating Estimator from Keras model...")
  keras_model = neumf_model.NeuMF(
      ncf_dataset.num_users, ncf_dataset.num_items, FLAGS.num_factors,
      ast.literal_eval(FLAGS.layers), FLAGS.batch_size, FLAGS.mf_regularization)
  num_gpus = get_num_gpus(FLAGS.num_gpus)
  estimator = convert_keras_to_estimator(keras_model, num_gpus, FLAGS.model_dir)

  # Training and evaluation cycle
  def train_input_fn():
    return dataset.input_fn(
        True, per_device_batch_size(FLAGS.batch_size, num_gpus),
        FLAGS.epochs_between_evals, ncf_dataset, FLAGS.num_parallel_calls)

  total_training_cycle = (FLAGS.train_epochs //
                          FLAGS.epochs_between_evals)
  for cycle_index in range(total_training_cycle):
    tf.logging.info("Starting a training cycle: {}/{}".format(
        cycle_index, total_training_cycle - 1))

    # Train the model
    train_cycle_begin = time.time()
    estimator.train(input_fn=train_input_fn,
                    hooks=[tf.train.ProfilerHook(save_steps=10000)])
    train_cycle_end = time.time()

    # Evaluate the model
    eval_cycle_begin = time.time()
    hr, ndcg = evaluate_model(
        estimator, FLAGS.batch_size, num_gpus, ncf_dataset.eval_true_items,
        ncf_dataset.eval_all_items, FLAGS.num_parallel_calls)
    eval_cycle_end = time.time()

    # Log the train time, evaluation time, and HR and NDCG results.
    tf.logging.info(
        "Iteration {} [{:.1f} s]: HR = {:.4f}, NDCG = {:.4f}, [{:.1f} s]"
        .format(cycle_index, train_cycle_end - train_cycle_begin, hr, ndcg,
                eval_cycle_end - eval_cycle_begin))

  # Remove temporary files
  os.remove(constants.TRAIN_DATA)
  os.remove(constants.TEST_DATA)


if __name__ == "__main__":
  tf.logging.set_verbosity(tf.logging.INFO)
  parser = argparse.ArgumentParser()

  parser.add_argument(
      "--model_dir", nargs="?", default="/tmp/ncf/",
      help="Model directory.")
  parser.add_argument(
      "--data_dir", nargs="?", default="/tmp/movielens-data/",
      help="Input data directory. Should be the same as downloaded data dir.")
  parser.add_argument(
      "--dataset", nargs="?", default="ml-1m", choices=["ml-1m", "ml-20m"],
      help="Choose a dataset.")
  parser.add_argument(
      "--train_epochs", type=int, default=20,
      help="Number of epochs.")
  parser.add_argument(
      "--batch_size", type=int, default=256,
      help="Batch size.")
  parser.add_argument(
      "--num_factors", type=int, default=8,
      help="Embedding size of MF model.")
  parser.add_argument(
      "--layers", nargs="?", default="[64,32,16,8]",
      help="Size of hidden layers for MLP.")
  parser.add_argument(
      "--mf_regularization", type=float, default=0,
      help="Regularization for MF embeddings.")
  parser.add_argument(
      "--num_neg", type=int, default=4,
      help="Number of negative instances to pair with a positive instance.")
  parser.add_argument(
      "--num_parallel_calls", type=int, default=8,
      help="Number of parallel calls.")
  parser.add_argument(
      "--epochs_between_evals", type=int, default=1,
      help="Number of epochs between model evaluation.")
  parser.add_argument(
      "--learning_rate", type=float, default=0.001,
      help="Learning rate.")
  parser.add_argument(
      "--num_gpus", type=int, default=1 if tf.test.is_gpu_available() else 0,
      help="How many GPUs to use with the DistributionStrategies API. The "
           "default is 1 if TensorFlow can detect a GPU, and 0 otherwise.")

  FLAGS, unparsed = parser.parse_known_args()
  with tf.Graph().as_default():
    tf.app.run(main=main, argv=[sys.argv[0]] + unparsed)