"vscode:/vscode.git/clone" did not exist on "a588943eee06b8dace38d873a644d55d74927ff4"
Unverified Commit 6518c1c7 authored by Taylor Robie's avatar Taylor Robie Committed by GitHub
Browse files

NCF pipeline refactor (take 2) and initial TPU port. (#4935)

* intermediate commit

* ncf now working

* reorder pipeline

* allow batched decode for file backed dataset

* fix bug

* more tweaks

* parallize false negative generation

* shared pool hack

* workers ignore sigint

* intermediate commit

* simplify buffer backed dataset creation to fixed length record approach only. (more cleanup needed)

* more tweaks

* simplify pipeline

* fix misplaced cleanup() calls. (validation works\!)

* more tweaks

* sixify memoryview usage

* more sixification

* fix bug

* add future imports

* break up training input pipeline

* more pipeline tuning

* first pass at moving negative generation to async

* refactor async pipeline to use files instead of ipc

* refactor async pipeline

* move expansion and concatenation from reduce worker to generation workers

* abandon complete async due to interactions with the tensorflow threadpool

* cleanup

* remove performance_comparison.py

* experiment with rough generator + interleave pipeline

* yet more pipeline tuning

* update on-the-fly pipeline

* refactor preprocessing, and move train generation behind a GRPC server

* fix leftover call

* intermediate commit

* intermediate commit

* fix index error in data pipeline, and add logging to train data server

* make sharding more robust to imbalance

* correctly sample with replacement

* file buffers are no longer needed for this branch

* tweak sampling methods

* add README for data pipeline

* fix eval sampling, and vectorize eval metrics

* add spillover and static training batch sizes

* clean up cruft from earlier iterations

* rough delint

* delint 2 / n

* add type annotations

* update run script

* make run.sh a bit nicer

* change embedding initializer to match reference

* rough pass at pure estimator model_fn

* impose static shape hack (revisit later)

* refinements

* fix dir error in run.sh

* add documentation

* add more docs and fix an assert

* old data test is no longer valid. Keeping it around as reference for the new one

* rough draft of data pipeline validation script

* don't rely on shuffle default

* tweaks and documentation

* add separate eval batch size for performance

* initial commit

* terrible hacking

* mini hacks

* missed a bug

* messing about trying to get TPU running

* TFRecords based TPU attempt

* bug fixes

* don't log remotely

* more bug fixes

* TPU tweaks and bug fixes

* more tweaks

* more adjustments

* rework model definition

* tweak data pipeline

* refactor async TFRecords generation

* temp commit to run.sh

* update log behavior

* fix logging bug

* add check for subprocess start to avoid cryptic hangs

* unify deserialize and make it TPU compliant

* delint

* remove gRPC pipeline code

* fix logging bug

* delint and remove old test files

* add unit tests for NCF pipeline

* delint

* clean up run.sh, and add run_tpu.sh

* forgot the most important line

* fix run.sh bugs

* yet more bash debugging

* small tweak to add keras summaries to model_fn

* Clean up sixification issues

* address PR comments

* delinting is never over
parent a88b89be
...@@ -133,8 +133,12 @@ def _download_and_clean(dataset, data_dir): ...@@ -133,8 +133,12 @@ def _download_and_clean(dataset, data_dir):
_regularize_20m_dataset(temp_dir) _regularize_20m_dataset(temp_dir)
for fname in tf.gfile.ListDirectory(temp_dir): for fname in tf.gfile.ListDirectory(temp_dir):
tf.gfile.Copy(os.path.join(temp_dir, fname), if not tf.gfile.Exists(os.path.join(data_subdir, fname)):
os.path.join(data_subdir, fname)) tf.gfile.Copy(os.path.join(temp_dir, fname),
os.path.join(data_subdir, fname))
else:
tf.logging.info("Skipping copy of {}, as it already exists in the "
"destination folder.".format(fname))
finally: finally:
tf.gfile.DeleteRecursively(temp_dir) tf.gfile.DeleteRecursively(temp_dir)
......
...@@ -43,7 +43,7 @@ In both datasets, the timestamp is represented in seconds since midnight Coordin ...@@ -43,7 +43,7 @@ In both datasets, the timestamp is represented in seconds since midnight Coordin
### Download and preprocess dataset ### Download and preprocess dataset
To download the dataset, please install Pandas package first. Then issue the following command: To download the dataset, please install Pandas package first. Then issue the following command:
``` ```
python movielens_dataset.py python ../datasets/movielens.py
``` ```
Arguments: Arguments:
* `--data_dir`: Directory where to download and save the preprocessed data. By default, it is `/tmp/movielens-data/`. * `--data_dir`: Directory where to download and save the preprocessed data. By default, it is `/tmp/movielens-data/`.
...@@ -51,7 +51,8 @@ Arguments: ...@@ -51,7 +51,8 @@ Arguments:
Use the `--help` or `-h` flag to get a full list of possible arguments. Use the `--help` or `-h` flag to get a full list of possible arguments.
Note the ml-20m dataset is large (the rating file is ~500 MB), and it may take several minutes (~10 mins) for data preprocessing. Note the ml-20m dataset is large (the rating file is ~500 MB), and it may take several minutes (~2 mins) for data preprocessing.
Both the ml-1m and ml-20m datasets will be coerced into a common format when downloaded.
### Train and evaluate model ### Train and evaluate model
To train and evaluate the model, issue the following command: To train and evaluate the model, issue the following command:
......
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Central location for NCF specific values."""
import os
import time
# ==============================================================================
# == Main Thread Data Processing ===============================================
# ==============================================================================
class Paths(object):
"""Container for various path information used while training NCF."""
def __init__(self, data_dir, cache_id=None):
self.cache_id = cache_id or int(time.time())
self.data_dir = data_dir
self.cache_root = os.path.join(
self.data_dir, "{}_ncf_recommendation_cache".format(self.cache_id))
self.train_shard_subdir = os.path.join(self.cache_root,
"raw_training_shards")
self.train_shard_template = os.path.join(self.train_shard_subdir,
"positive_shard_{}.pickle")
self.train_epoch_dir = os.path.join(self.cache_root, "training_epochs")
self.eval_data_subdir = os.path.join(self.cache_root, "eval_data")
self.eval_raw_file = os.path.join(self.eval_data_subdir, "raw.pickle")
self.eval_record_template_temp = os.path.join(self.eval_data_subdir,
"eval_records.temp")
self.eval_record_template = os.path.join(
self.eval_data_subdir, "padded_eval_batch_size_{}.tfrecords")
self.subproc_alive = os.path.join(self.cache_root, "subproc.alive")
APPROX_PTS_PER_TRAIN_SHARD = 128000
# In both datasets, each user has at least 20 ratings.
MIN_NUM_RATINGS = 20
# The number of negative examples attached with a positive example
# when performing evaluation.
NUM_EVAL_NEGATIVES = 999
# ==============================================================================
# == Subprocess Data Generation ================================================
# ==============================================================================
CYCLES_TO_BUFFER = 3 # The number of train cycles worth of data to "run ahead"
# of the main training loop.
READY_FILE = "ready.json"
TRAIN_RECORD_TEMPLATE = "train_{}.tfrecords"
TIMEOUT_SECONDS = 3600 * 2 # If the train loop goes more than two hours without
# consuming an epoch of data, this is a good
# indicator that the main thread is dead and the
# subprocess is orphaned.
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Asynchronously generate TFRecords files for NCF."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import atexit
import contextlib
import datetime
import gc
import functools
import logging
import multiprocessing
import json
import os
import pickle
import signal
import sys
import tempfile
import time
import timeit
import traceback
import typing
import numpy as np
import tensorflow as tf
from absl import app as absl_app
from absl import logging as absl_logging
from absl import flags
from official.datasets import movielens
from official.recommendation import constants as rconst
from official.recommendation import stat_utils
def log_msg(msg):
"""Include timestamp info when logging messages to a file."""
if flags.FLAGS.redirect_logs:
timestamp = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
absl_logging.info("[{}] {}".format(timestamp, msg))
else:
absl_logging.info(msg)
sys.stdout.flush()
sys.stderr.flush()
def get_cycle_folder_name(i):
return "cycle_{}".format(str(i).zfill(5))
def _process_shard(shard_path, num_items, num_neg):
# type: (str, int, int) -> (np.ndarray, np.ndarray, np.ndarray)
"""Read a shard of training data and return training vectors.
Args:
shard_path: The filepath of the positive instance training shard.
num_items: The cardinality of the item set.
num_neg: The number of negatives to generate per positive example.
seed: Random seed to be used when generating negatives.
"""
# The choice to store the training shards in files rather than in memory
# is motivated by the fact that multiprocessing serializes arguments,
# transmits them to map workers, and then deserializes them. By storing the
# training shards in files, the serialization work only needs to be done once.
#
# A similar effect could be achieved by simply holding pickled bytes in
# memory, however the processing is not I/O bound and is therefore
# unnecessary.
with tf.gfile.Open(shard_path, "rb") as f:
shard = pickle.load(f)
users = shard[movielens.USER_COLUMN]
items = shard[movielens.ITEM_COLUMN]
delta = users[1:] - users[:-1]
boundaries = ([0] + (np.argwhere(delta)[:, 0] + 1).tolist() +
[users.shape[0]])
user_blocks = []
item_blocks = []
label_blocks = []
for i in range(len(boundaries) - 1):
assert len(set(users[boundaries[i]:boundaries[i+1]])) == 1
positive_items = items[boundaries[i]:boundaries[i+1]]
positive_set = set(positive_items)
if positive_items.shape[0] != len(positive_set):
raise ValueError("Duplicate entries detected.")
n_pos = len(positive_set)
negatives = stat_utils.sample_with_exclusion(
num_items, positive_set, n_pos * num_neg)
user_blocks.append(users[boundaries[i]] * np.ones(
(n_pos * (1 + num_neg),), dtype=np.int32))
item_blocks.append(
np.array(list(positive_set) + negatives, dtype=np.uint16))
labels_for_user = np.zeros((n_pos * (1 + num_neg),), dtype=np.int8)
labels_for_user[:n_pos] = 1
label_blocks.append(labels_for_user)
users_out = np.concatenate(user_blocks)
items_out = np.concatenate(item_blocks)
labels_out = np.concatenate(label_blocks)
assert users_out.shape == items_out.shape == labels_out.shape
return users_out, items_out, labels_out
def _construct_record(users, items, labels=None):
"""Convert NumPy arrays into a TFRecords entry."""
feature_dict = {
movielens.USER_COLUMN: tf.train.Feature(
bytes_list=tf.train.BytesList(value=[memoryview(users).tobytes()])),
movielens.ITEM_COLUMN: tf.train.Feature(
bytes_list=tf.train.BytesList(value=[memoryview(items).tobytes()])),
}
if labels is not None:
feature_dict["labels"] = tf.train.Feature(
bytes_list=tf.train.BytesList(value=[memoryview(labels).tobytes()]))
return tf.train.Example(
features=tf.train.Features(feature=feature_dict)).SerializeToString()
def sigint_handler(signal, frame):
log_msg("Shutting down worker.")
def init_worker():
signal.signal(signal.SIGINT, sigint_handler)
def _construct_training_records(
train_cycle, # type: int
num_workers, # type: int
cache_paths, # type: rconst.Paths
num_readers, # type: int
num_neg, # type: int
num_train_positives, # type: int
num_items, # type: int
epochs_per_cycle, # type: int
train_batch_size, # type: int
training_shards, # type: typing.List[str]
spillover, # type: bool
carryover=None # type: typing.Union[typing.List[np.ndarray], None]
):
"""Generate false negatives and write TFRecords files.
Args:
train_cycle: Integer of which cycle the generated data is for.
num_workers: Number of multiprocessing workers to use for negative
generation.
cache_paths: Paths object with information of where to write files.
num_readers: The number of reader datasets in the train input_fn.
num_neg: The number of false negatives per positive example.
num_train_positives: The number of positive examples. This value is used
to pre-allocate arrays while the imap is still running. (NumPy does not
allow dynamic arrays.)
num_items: The cardinality of the item set.
epochs_per_cycle: The number of epochs worth of data to construct.
train_batch_size: The expected batch size used during training. This is used
to properly batch data when writing TFRecords.
training_shards: The picked positive examples from which to generate
negatives.
spillover: If the final batch is incomplete, push it to the next
cycle (True) or include a partial batch (False).
carryover: The data points to be spilled over to the next cycle.
"""
st = timeit.default_timer()
num_workers = min([num_workers, len(training_shards) * epochs_per_cycle])
carryover = carryover or [
np.zeros((0,), dtype=np.int32),
np.zeros((0,), dtype=np.uint16),
np.zeros((0,), dtype=np.int8),
]
num_carryover = carryover[0].shape[0]
num_pts = num_carryover + num_train_positives * (1 + num_neg)
map_args = [i for i in training_shards * epochs_per_cycle]
map_fn = functools.partial(_process_shard, num_neg=num_neg,
num_items=num_items)
with contextlib.closing(multiprocessing.Pool(
processes=num_workers, initializer=init_worker)) as pool:
data_generator = pool.imap_unordered(map_fn, map_args) # pylint: disable=no-member
data = [
np.zeros(shape=(num_pts,), dtype=np.int32) - 1,
np.zeros(shape=(num_pts,), dtype=np.uint16),
np.zeros(shape=(num_pts,), dtype=np.int8),
]
# The carryover data is always first.
for i in range(3):
data[i][:num_carryover] = carryover[i]
index_destinations = np.random.permutation(
num_train_positives * (1 + num_neg)) + num_carryover
start_ind = 0
for data_segment in data_generator:
n_in_segment = data_segment[0].shape[0]
dest = index_destinations[start_ind:start_ind + n_in_segment]
start_ind += n_in_segment
for i in range(3):
data[i][dest] = data_segment[i]
# Check that no points were dropped.
assert (num_pts - num_carryover) == start_ind
assert not np.sum(data[0] == -1)
record_dir = os.path.join(cache_paths.train_epoch_dir,
get_cycle_folder_name(train_cycle))
tf.gfile.MakeDirs(record_dir)
batches_per_file = np.ceil(num_pts / train_batch_size / num_readers)
current_file_id = -1
current_batch_id = -1
batches_by_file = [[] for _ in range(num_readers)]
output_carryover = [
np.zeros(shape=(0,), dtype=np.int32),
np.zeros(shape=(0,), dtype=np.uint16),
np.zeros(shape=(0,), dtype=np.int8),
]
while True:
current_batch_id += 1
if (current_batch_id % batches_per_file) == 0:
current_file_id += 1
end_ind = (current_batch_id + 1) * train_batch_size
if end_ind > num_pts:
if spillover:
output_carryover = [data[i][current_batch_id*train_batch_size:num_pts]
for i in range(3)]
break
else:
batches_by_file[current_file_id].append(current_batch_id)
break
batches_by_file[current_file_id].append(current_batch_id)
batch_count = 0
for i in range(num_readers):
fpath = os.path.join(record_dir, rconst.TRAIN_RECORD_TEMPLATE.format(i))
log_msg("Writing {}".format(fpath))
with tf.python_io.TFRecordWriter(fpath) as writer:
for j in batches_by_file[i]:
start_ind = j * train_batch_size
end_ind = start_ind + train_batch_size
batch_bytes = _construct_record(
users=data[0][start_ind:end_ind],
items=data[1][start_ind:end_ind],
labels=data[2][start_ind:end_ind],
)
writer.write(batch_bytes)
batch_count += 1
if spillover:
written_pts = output_carryover[0].shape[0] + batch_count * train_batch_size
if num_pts != written_pts:
raise ValueError("Error detected: point counts do not match: {} vs. {}"
.format(num_pts, written_pts))
with tf.gfile.Open(os.path.join(record_dir, rconst.READY_FILE), "w") as f:
json.dump({
"batch_size": train_batch_size,
"batch_count": batch_count,
}, f)
log_msg("Cycle {} complete. Total time: {:.1f} seconds"
.format(train_cycle, timeit.default_timer() - st))
return output_carryover
def _construct_eval_record(cache_paths, eval_batch_size):
"""Convert Eval data to a single TFRecords file."""
log_msg("Beginning construction of eval TFRecords file.")
raw_fpath = cache_paths.eval_raw_file
intermediate_fpath = cache_paths.eval_record_template_temp
dest_fpath = cache_paths.eval_record_template.format(eval_batch_size)
with tf.gfile.Open(raw_fpath, "rb") as f:
eval_data = pickle.load(f)
users = eval_data[0][movielens.USER_COLUMN]
items = eval_data[0][movielens.ITEM_COLUMN]
assert users.shape == items.shape
# eval_data[1] is the labels, but during evaluation they are infered as they
# have a set structure. They are included the the data artifact for debug
# purposes.
# This packaging assumes that the caller knows to drop the padded values.
n_pts = users.shape[0]
n_pad = eval_batch_size - (n_pts % eval_batch_size)
assert not (n_pts + n_pad) % eval_batch_size
users = np.concatenate([users, np.zeros(shape=(n_pad,), dtype=np.int32)])\
.reshape((-1, eval_batch_size))
items = np.concatenate([items, np.zeros(shape=(n_pad,), dtype=np.uint16)])\
.reshape((-1, eval_batch_size))
num_batches = users.shape[0]
with tf.python_io.TFRecordWriter(intermediate_fpath) as writer:
for i in range(num_batches):
batch_bytes = _construct_record(
users=users[i, :],
items=items[i, :]
)
writer.write(batch_bytes)
tf.gfile.Copy(intermediate_fpath, dest_fpath)
tf.gfile.Remove(intermediate_fpath)
log_msg("Eval TFRecords file successfully constructed.")
def _generation_loop(
num_workers, cache_paths, num_readers, num_neg, num_train_positives,
num_items, spillover, epochs_per_cycle, train_batch_size, eval_batch_size):
# type: (int, rconst.Paths, int, int, int, int, bool, int, int, int) -> None
"""Primary run loop for data file generation."""
log_msg("Signaling that I am alive.")
with tf.gfile.Open(cache_paths.subproc_alive, "w") as f:
f.write("Generation subproc has started.")
atexit.register(tf.gfile.Remove, filename=cache_paths.subproc_alive)
log_msg("Entering generation loop.")
tf.gfile.MakeDirs(cache_paths.train_epoch_dir)
training_shards = [os.path.join(cache_paths.train_shard_subdir, i) for i in
tf.gfile.ListDirectory(cache_paths.train_shard_subdir)]
# Training blocks on the creation of the first epoch, so the num_workers
# limit is not respected for this invocation
train_cycle = 0
carryover = _construct_training_records(
train_cycle=train_cycle, num_workers=multiprocessing.cpu_count(),
cache_paths=cache_paths, num_readers=num_readers, num_neg=num_neg,
num_train_positives=num_train_positives, num_items=num_items,
epochs_per_cycle=epochs_per_cycle, train_batch_size=train_batch_size,
training_shards=training_shards, spillover=spillover, carryover=None)
_construct_eval_record(cache_paths=cache_paths,
eval_batch_size=eval_batch_size)
wait_count = 0
start_time = time.time()
while True:
ready_epochs = tf.gfile.ListDirectory(cache_paths.train_epoch_dir)
if len(ready_epochs) >= rconst.CYCLES_TO_BUFFER:
wait_count += 1
sleep_time = max([0, wait_count * 5 - (time.time() - start_time)])
time.sleep(sleep_time)
if (wait_count % 10) == 0:
log_msg("Waited {} times for data to be consumed."
.format(wait_count))
if time.time() - start_time > rconst.TIMEOUT_SECONDS:
log_msg("Waited more than {} seconds. Concluding that this "
"process is orphaned and exiting gracefully."
.format(rconst.TIMEOUT_SECONDS))
sys.exit()
continue
train_cycle += 1
carryover = _construct_training_records(
train_cycle=train_cycle, num_workers=num_workers,
cache_paths=cache_paths, num_readers=num_readers, num_neg=num_neg,
num_train_positives=num_train_positives, num_items=num_items,
epochs_per_cycle=epochs_per_cycle, train_batch_size=train_batch_size,
training_shards=training_shards, spillover=spillover,
carryover=carryover)
wait_count = 0
start_time = time.time()
gc.collect()
def main(_):
redirect_logs = flags.FLAGS.redirect_logs
cache_paths = rconst.Paths(
data_dir=flags.FLAGS.data_dir, cache_id=flags.FLAGS.cache_id)
log_file_name = "data_gen_proc_{}.log".format(cache_paths.cache_id)
log_file = os.path.join(cache_paths.data_dir, log_file_name)
if log_file.startswith("gs://") and redirect_logs:
fallback_log_file = os.path.join(tempfile.gettempdir(), log_file_name)
print("Unable to log to {}. Falling back to {}"
.format(log_file, fallback_log_file))
log_file = fallback_log_file
# This server is generally run in a subprocess.
if redirect_logs:
print("Redirecting stdout and stderr to {}".format(log_file))
log_stream = open(log_file, "wt") # Note: not tf.gfile.Open().
stdout = log_stream
stderr = log_stream
try:
if redirect_logs:
absl_logging.get_absl_logger().addHandler(
hdlr=logging.StreamHandler(stream=stdout))
sys.stdout = stdout
sys.stderr = stderr
print("Logs redirected.")
try:
log_msg("sys.argv: {}".format(" ".join(sys.argv)))
if flags.FLAGS.seed is not None:
np.random.seed(flags.FLAGS.seed)
_generation_loop(
num_workers=flags.FLAGS.num_workers,
cache_paths=cache_paths,
num_readers=flags.FLAGS.num_readers,
num_neg=flags.FLAGS.num_neg,
num_train_positives=flags.FLAGS.num_train_positives,
num_items=flags.FLAGS.num_items,
spillover=flags.FLAGS.spillover,
epochs_per_cycle=flags.FLAGS.epochs_per_cycle,
train_batch_size=flags.FLAGS.train_batch_size,
eval_batch_size=flags.FLAGS.eval_batch_size,
)
except KeyboardInterrupt:
log_msg("KeyboardInterrupt registered.")
except:
traceback.print_exc()
raise
finally:
log_msg("Shutting down generation subprocess.")
sys.stdout.flush()
sys.stderr.flush()
if redirect_logs:
log_stream.close()
def define_flags():
"""Construct flags for the server.
This function does not use offical.utils.flags, as these flags are not meant
to be used by humans. Rather, they should be passed as part of a subprocess
call.
"""
flags.DEFINE_integer(name="num_workers", default=multiprocessing.cpu_count(),
help="Size of the negative generation worker pool.")
flags.DEFINE_string(name="data_dir", default=None,
help="The data root. (used to construct cache paths.)")
flags.DEFINE_string(name="cache_id", default=None,
help="The cache_id generated in the main process.")
flags.DEFINE_integer(name="num_readers", default=4,
help="Number of reader datasets in training. This sets"
"how the epoch files are sharded.")
flags.DEFINE_integer(name="num_neg", default=None,
help="The Number of negative instances to pair with a "
"positive instance.")
flags.DEFINE_integer(name="num_train_positives", default=None,
help="The number of positive training examples.")
flags.DEFINE_integer(name="num_items", default=None,
help="Number of items from which to select negatives.")
flags.DEFINE_integer(name="epochs_per_cycle", default=1,
help="The number of epochs of training data to produce"
"at a time.")
flags.DEFINE_integer(name="train_batch_size", default=None,
help="The batch size with which training TFRecords will "
"be chunked.")
flags.DEFINE_integer(name="eval_batch_size", default=None,
help="The batch size with which evaluation TFRecords "
"will be chunked.")
flags.DEFINE_boolean(
name="spillover", default=True,
help="If a complete batch cannot be provided, return an empty batch and "
"start the next epoch from a non-empty buffer. This guarantees "
"fixed batch sizes.")
flags.DEFINE_boolean(name="redirect_logs", default=False,
help="Catch logs and write them to a file. "
"(Useful if this is run as a subprocess)")
flags.DEFINE_integer(name="seed", default=None,
help="NumPy random seed to set at startup. If not "
"specified, a seed will not be set.")
flags.mark_flags_as_required(
["data_dir", "cache_id", "num_neg", "num_train_positives", "num_items",
"train_batch_size", "eval_batch_size"])
if __name__ == "__main__":
define_flags()
absl_app.run(main)
This diff is collapsed.
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Test NCF data pipeline."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import time
import numpy as np
import tensorflow as tf
from official.datasets import movielens
from official.recommendation import constants as rconst
from official.recommendation import data_preprocessing
DATASET = "ml-test"
NUM_USERS = 1000
NUM_ITEMS = 2000
NUM_PTS = 50000
BATCH_SIZE = 2048
NUM_NEG = 4
def mock_download(*args, **kwargs):
return
class BaseTest(tf.test.TestCase):
def setUp(self):
self.temp_data_dir = self.get_temp_dir()
ratings_folder = os.path.join(self.temp_data_dir, DATASET)
tf.gfile.MakeDirs(ratings_folder)
np.random.seed(0)
raw_user_ids = np.arange(NUM_USERS * 3)
np.random.shuffle(raw_user_ids)
raw_user_ids = raw_user_ids[:NUM_USERS]
raw_item_ids = np.arange(NUM_ITEMS * 3)
np.random.shuffle(raw_item_ids)
raw_item_ids = raw_item_ids[:NUM_ITEMS]
users = np.random.choice(raw_user_ids, NUM_PTS)
items = np.random.choice(raw_item_ids, NUM_PTS)
scores = np.random.randint(low=0, high=5, size=NUM_PTS)
times = np.random.randint(low=1000000000, high=1200000000, size=NUM_PTS)
rating_file = os.path.join(ratings_folder, movielens.RATINGS_FILE)
self.seen_pairs = set()
self.holdout = {}
with tf.gfile.Open(rating_file, "w") as f:
f.write("user_id,item_id,rating,timestamp\n")
for usr, itm, scr, ts in zip(users, items, scores, times):
pair = (usr, itm)
if pair in self.seen_pairs:
continue
self.seen_pairs.add(pair)
if usr not in self.holdout or (ts, itm) > self.holdout[usr]:
self.holdout[usr] = (ts, itm)
f.write("{},{},{},{}\n".format(usr, itm, scr, ts))
movielens.download = mock_download
movielens.NUM_RATINGS[DATASET] = NUM_PTS
def test_preprocessing(self):
# For the most part the necessary checks are performed within
# construct_cache()
ncf_dataset = data_preprocessing.construct_cache(
dataset=DATASET, data_dir=self.temp_data_dir, num_data_readers=2)
assert ncf_dataset.num_users == NUM_USERS
assert ncf_dataset.num_items == NUM_ITEMS
def drain_dataset(self, dataset, g):
# type: (tf.data.Dataset, tf.Graph) -> list
with self.test_session(graph=g) as sess:
with g.as_default():
batch = dataset.make_one_shot_iterator().get_next()
output = []
while True:
try:
output.append(sess.run(batch))
except tf.errors.OutOfRangeError:
break
return output
def test_end_to_end(self):
ncf_dataset = data_preprocessing.instantiate_pipeline(
dataset=DATASET, data_dir=self.temp_data_dir,
batch_size=BATCH_SIZE, eval_batch_size=BATCH_SIZE, num_data_readers=2,
num_neg=NUM_NEG)
time.sleep(5) # allow `alive` file to be written
g = tf.Graph()
with g.as_default():
input_fn, record_dir, batch_count = \
data_preprocessing.make_train_input_fn(ncf_dataset)
dataset = input_fn({"batch_size": BATCH_SIZE, "use_tpu": False})
first_epoch = self.drain_dataset(dataset=dataset, g=g)
user_inv_map = {v: k for k, v in ncf_dataset.user_map.items()}
item_inv_map = {v: k for k, v in ncf_dataset.item_map.items()}
train_examples = {
True: set(),
False: set(),
}
for features, labels in first_epoch:
for u, i, l in zip(features[movielens.USER_COLUMN],
features[movielens.ITEM_COLUMN], labels):
u_raw = user_inv_map[u]
i_raw = item_inv_map[i]
if ((u_raw, i_raw) in self.seen_pairs) != l:
# The evaluation item is not considered during false negative
# generation, so it will occasionally appear as a negative example
# during training.
assert not l
assert i_raw == self.holdout[u_raw][1]
train_examples[l].add((u_raw, i_raw))
num_positives_seen = len(train_examples[True])
# The numbers don't match exactly because the last batch spills over into
# the next epoch
assert ncf_dataset.num_train_positives - num_positives_seen < BATCH_SIZE
# This check is more heuristic because negatives are sampled with
# replacement. It only checks that negative generation is reasonably random.
assert len(train_examples[False]) / NUM_NEG / num_positives_seen > 0.9
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
tf.test.main()
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Unit tests for dataset.py."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import shutil
import numpy as np
import tensorflow as tf # pylint: disable=g-bad-import-order
# from official.recommendation import dataset
from official.datasets import movielens
from official.recommendation import movielens_dataset
_TEST_FNAME = os.path.join(
os.path.dirname(__file__), "unittest_data/test_eval_ratings.csv")
_NUM_NEG = 4
class DatasetTest(tf.test.TestCase):
def setUp(self):
# Create temporary CSV file
self.temp_dir = self.get_temp_dir()
tf.gfile.MakeDirs(os.path.join(self.temp_dir,
movielens_dataset._BUFFER_SUBDIR))
path_map = {
"test_train_ratings.csv": "ml-1m-train-ratings.csv",
"test_eval_ratings.csv": "ml-1m-test-ratings.csv",
"test_eval_negative.csv": "ml-1m-test-negative.csv"
}
for src, dest in path_map.items():
src = os.path.join(os.path.dirname(__file__), "unittest_data", src)
dest = os.path.join(self.temp_dir, movielens_dataset._BUFFER_SUBDIR, dest)
with tf.gfile.Open(src, "r") as f_in, tf.gfile.Open(dest, "w") as f_out:
f_out.write(f_in.read())
def test_load_data(self):
data = movielens_dataset.load_data(_TEST_FNAME)
self.assertEqual(len(data), 2)
self.assertEqual(data[0][0], 0)
self.assertEqual(data[0][2], 1)
self.assertEqual(data[-1][0], 1)
self.assertEqual(data[-1][2], 1)
def test_data_preprocessing(self):
ncf_dataset = movielens_dataset.data_preprocessing(
self.temp_dir, movielens.ML_1M, _NUM_NEG)
# Check train data preprocessing
self.assertAllEqual(np.array(ncf_dataset.train_data)[:, 2],
np.full(len(ncf_dataset.train_data), 1))
self.assertEqual(ncf_dataset.num_users, 2)
self.assertEqual(ncf_dataset.num_items, 175)
# Check test dataset
test_dataset = ncf_dataset.all_eval_data
first_true_item = test_dataset[100]
self.assertEqual(first_true_item[1], ncf_dataset.eval_true_items[0])
self.assertEqual(first_true_item[1], ncf_dataset.eval_all_items[0][-1])
last_gt_item = test_dataset[-1]
self.assertEqual(last_gt_item[1], ncf_dataset.eval_true_items[-1])
self.assertEqual(last_gt_item[1], ncf_dataset.eval_all_items[-1][-1])
test_list = test_dataset.tolist()
first_test_items = [x[1] for x in test_list if x[0] == 0]
self.assertAllEqual(first_test_items, ncf_dataset.eval_all_items[0])
last_test_items = [x[1] for x in test_list if x[0] == 1]
self.assertAllEqual(last_test_items, ncf_dataset.eval_all_items[-1])
def test_generate_train_dataset(self):
# Check train dataset
ncf_dataset = movielens_dataset.data_preprocessing(
self.temp_dir, movielens.ML_1M, _NUM_NEG)
train_dataset = movielens_dataset.generate_train_dataset(
ncf_dataset.train_data, ncf_dataset.num_items, _NUM_NEG)
# Each user has 1 positive instance followed by _NUM_NEG negative instances
train_data_0 = train_dataset[0]
self.assertEqual(train_data_0[2], 1)
for i in range(1, _NUM_NEG + 1):
train_data = train_dataset[i]
self.assertEqual(train_data_0[0], train_data[0])
self.assertNotEqual(train_data_0[1], train_data[1])
self.assertEqual(0, train_data[2])
train_data_last = train_dataset[-1 - _NUM_NEG]
self.assertEqual(train_data_last[2], 1)
for i in range(-1, -_NUM_NEG):
train_data = train_dataset[i]
self.assertEqual(train_data_last[0], train_data[0])
self.assertNotEqual(train_data_last[1], train_data[1])
self.assertEqual(0, train_data[2])
if __name__ == "__main__":
tf.test.main()
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Prepare MovieLens dataset for NCF recommendation model."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import collections
import functools
import os
import tempfile
import time
# pylint: disable=wrong-import-order
from absl import app as absl_app
from absl import flags
import numpy as np
import pandas as pd
from six.moves import xrange
import tensorflow as tf
# pylint: enable=wrong-import-order
from official.datasets import movielens
from official.utils.data import file_io
from official.utils.flags import core as flags_core
_BUFFER_SUBDIR = "ncf_recommendation_buffer"
_TRAIN_RATINGS_FILENAME = 'train-ratings.csv'
_TEST_RATINGS_FILENAME = 'test-ratings.csv'
_TEST_NEG_FILENAME = 'test-negative.csv'
# The number of negative examples attached with a positive example
# in training dataset. It is set as 100 in the paper.
_NUMBER_NEGATIVES = 100
# In both datasets, each user has at least 20 ratings.
_MIN_NUM_RATINGS = 20
# The buffer size for shuffling train dataset.
_SHUFFLE_BUFFER_SIZE = 1024
_FEATURE_MAP = {
movielens.USER_COLUMN: tf.FixedLenFeature([1], dtype=tf.int64),
movielens.ITEM_COLUMN: tf.FixedLenFeature([1], dtype=tf.int64),
movielens.RATING_COLUMN: tf.FixedLenFeature([1], dtype=tf.int64),
}
_FEATURE_MAP_EVAL = {
movielens.USER_COLUMN: tf.FixedLenFeature([1], dtype=tf.int64),
movielens.ITEM_COLUMN: tf.FixedLenFeature([1], dtype=tf.int64),
}
_COLUMNS = [movielens.USER_COLUMN, movielens.ITEM_COLUMN,
movielens.RATING_COLUMN]
_EVAL_COLUMNS = _COLUMNS[:2]
_EVAL_BUFFER_SIZE = {
movielens.ML_1M: 34130690,
movielens.ML_20M: 800961490,
}
def generate_train_eval_data(df, original_users, original_items):
"""Generate the dataset for model training and evaluation.
Given all user and item interaction information, for each user, first sort
the interactions based on timestamp. Then the latest one is taken out as
Test ratings (leave-one-out evaluation) and the remaining data for training.
The Test negatives are randomly sampled from all non-interacted items, and the
number of Test negatives is 100 by default (defined as _NUMBER_NEGATIVES).
Args:
df: The DataFrame of ratings data.
original_users: A list of the original unique user ids in the dataset.
original_items: A list of the original unique item ids in the dataset.
Returns:
all_ratings: A list of the [user_id, item_id] with interactions.
test_ratings: A list of [user_id, item_id], and each line is the latest
user_item interaction for the user.
test_negs: A list of item ids with shape [num_users, 100].
Each line consists of 100 item ids for the user with no interactions.
"""
# Need to sort before popping to get last item
tf.logging.info("Sorting user_item_map by timestamp...")
df.sort_values(by=movielens.TIMESTAMP_COLUMN, inplace=True)
all_ratings = set(zip(df[movielens.USER_COLUMN], df[movielens.ITEM_COLUMN]))
user_to_items = collections.defaultdict(list)
# Generate user_item rating matrix for training
t1 = time.time()
row_count = 0
for row in df.itertuples():
user_to_items[getattr(row, movielens.USER_COLUMN)].append(
getattr(row, movielens.ITEM_COLUMN))
row_count += 1
if row_count % 50000 == 0:
tf.logging.info("Processing user_to_items row: {}".format(row_count))
tf.logging.info(
"Process {} rows in [{:.1f}]s".format(row_count, time.time() - t1))
# Generate test ratings and test negatives
t2 = time.time()
test_ratings = []
test_negs = []
# Generate the 0-based index for each item, and put it into a set
all_items = set(range(len(original_items)))
for user in range(len(original_users)):
test_item = user_to_items[user].pop() # Get the latest item id
all_ratings.remove((user, test_item)) # Remove the test item
all_negs = all_items.difference(user_to_items[user])
all_negs = sorted(list(all_negs)) # determinism
test_ratings.append((user, test_item))
test_negs.append(list(np.random.choice(all_negs, _NUMBER_NEGATIVES)))
if user % 1000 == 0:
tf.logging.info("Processing user: {}".format(user))
tf.logging.info("Process {} users in {:.1f}s".format(
len(original_users), time.time() - t2))
all_ratings = list(all_ratings) # convert set to list
return all_ratings, test_ratings, test_negs
def _csv_buffer_paths(data_dir, dataset):
buffer_dir = os.path.join(data_dir, _BUFFER_SUBDIR)
return (
os.path.join(buffer_dir, dataset + "-" + _TRAIN_RATINGS_FILENAME),
os.path.join(buffer_dir, dataset + "-" + _TEST_RATINGS_FILENAME),
os.path.join(buffer_dir, dataset + "-" + _TEST_NEG_FILENAME)
)
def construct_train_eval_csv(data_dir, dataset):
"""Parse the raw data to csv file to be used in model training and evaluation.
ml-1m dataset is small in size (~25M), while ml-20m is large (~500M). It may
take several minutes to process ml-20m dataset.
Args:
data_dir: A string, the root directory of the movielens dataset.
dataset: A string, the dataset name to be processed.
"""
assert dataset in movielens.DATASETS
if all([tf.gfile.Exists(i) for i in _csv_buffer_paths(data_dir, dataset)]):
return
# Use random seed as parameter
np.random.seed(0)
df = movielens.ratings_csv_to_dataframe(data_dir=data_dir, dataset=dataset)
# Get the info of users who have more than 20 ratings on items
grouped = df.groupby(movielens.USER_COLUMN)
df = grouped.filter(lambda x: len(x) >= _MIN_NUM_RATINGS)
original_users = df[movielens.USER_COLUMN].unique()
original_items = df[movielens.ITEM_COLUMN].unique()
# Map the ids of user and item to 0 based index for following processing
tf.logging.info("Generating user_map and item_map...")
user_map = {user: index for index, user in enumerate(original_users)}
item_map = {item: index for index, item in enumerate(original_items)}
df[movielens.USER_COLUMN] = df[movielens.USER_COLUMN].apply(
lambda user: user_map[user])
df[movielens.ITEM_COLUMN] = df[movielens.ITEM_COLUMN].apply(
lambda item: item_map[item])
assert df[movielens.USER_COLUMN].max() == len(original_users) - 1
assert df[movielens.ITEM_COLUMN].max() == len(original_items) - 1
# Generate data for train and test
all_ratings, test_ratings, test_negs = generate_train_eval_data(
df, original_users, original_items)
# Serialize to csv file. Each csv file contains three columns
# (user_id, item_id, interaction)
tf.gfile.MakeDirs(os.path.join(data_dir, _BUFFER_SUBDIR))
train_ratings_file, test_ratings_file, test_negs_file = _csv_buffer_paths(
data_dir, dataset)
# As there are only two fields (user_id, item_id) in all_ratings and
# test_ratings, we need to add a fake rating to make three columns
df_train_ratings = pd.DataFrame(all_ratings)
df_train_ratings["fake_rating"] = 1
with tf.gfile.Open(train_ratings_file, "w") as f:
df_train_ratings.to_csv(f, index=False, header=False, sep="\t")
tf.logging.info("Train ratings is {}".format(train_ratings_file))
df_test_ratings = pd.DataFrame(test_ratings)
df_test_ratings["fake_rating"] = 1
with tf.gfile.Open(test_ratings_file, "w") as f:
df_test_ratings.to_csv(f, index=False, header=False, sep="\t")
tf.logging.info("Test ratings is {}".format(test_ratings_file))
df_test_negs = pd.DataFrame(test_negs)
with tf.gfile.Open(test_negs_file, "w") as f:
df_test_negs.to_csv(f, index=False, header=False, sep="\t")
tf.logging.info("Test negatives is {}".format(test_negs_file))
class NCFDataSet(object):
"""A class containing data information for model training and evaluation."""
def __init__(self, train_data, num_users, num_items, num_negatives,
true_items, all_items, all_eval_data):
"""Initialize NCFDataset class.
Args:
train_data: A list containing the positive training instances.
num_users: An integer, the number of users in training dataset.
num_items: An integer, the number of items in training dataset.
num_negatives: An integer, the number of negative instances for each user
in train dataset.
true_items: A list, the ground truth (positive) items of users for
evaluation. Each entry is a latest positive instance for one user.
all_items: A nested list, all items for evaluation, and each entry is the
evaluation items for one user.
all_eval_data: A numpy array of eval/test dataset.
"""
self.train_data = train_data
self.num_users = num_users
self.num_items = num_items
self.num_negatives = num_negatives
self.eval_true_items = true_items
self.eval_all_items = all_items
self.all_eval_data = all_eval_data
def load_data(file_name):
"""Load data from a csv file which splits on tab key."""
lines = tf.gfile.Open(file_name, "r").readlines()
# Process the file line by line
def _process_line(line):
return [int(col) for col in line.split("\t")]
data = [_process_line(line) for line in lines]
return data
def data_preprocessing(data_dir, dataset, num_negatives):
"""Preprocess the train and test dataset.
In data preprocessing, the training positive instances are loaded into memory
for random negative instance generation in each training epoch. The test
dataset are generated from test positive and negative instances.
Args:
data_dir: A string, the root directory of the movielens dataset.
dataset: A string, the dataset name to be processed.
num_negatives: An integer, the number of negative instances for each user
in train dataset.
Returns:
ncf_dataset: A NCFDataset object containing information about training and
evaluation/test dataset.
"""
train_fname, test_fname, test_neg_fname = _csv_buffer_paths(
data_dir, dataset)
# Load training positive instances into memory for later train data generation
train_data = load_data(train_fname)
# Get total number of users in the dataset
num_users = len(np.unique(np.array(train_data)[:, 0]))
# Process test dataset to csv file
test_ratings = load_data(test_fname)
test_negatives = load_data(test_neg_fname)
# Get the total number of items in both train dataset and test dataset (the
# whole dataset)
num_items = len(
set(np.array(train_data)[:, 1]) | set(np.array(test_ratings)[:, 1]))
# Generate test instances for each user
true_items, all_items = [], []
all_test_data = []
for idx in range(num_users):
items = test_negatives[idx]
rating = test_ratings[idx]
user = rating[0] # User
true_item = rating[1] # Positive item as ground truth
# All items with first 100 as negative and last one positive
items.append(true_item)
users = np.full(len(items), user, dtype=np.int32)
users_items = list(zip(users, items)) # User-item list
true_items.append(true_item) # all ground truth items
all_items.append(items) # All items (including positive and negative items)
all_test_data.extend(users_items) # Generate test dataset
# Create NCFDataset object
ncf_dataset = NCFDataSet(
train_data, num_users, num_items, num_negatives, true_items, all_items,
np.asarray(all_test_data)
)
return ncf_dataset
def generate_train_dataset(train_data, num_items, num_negatives):
"""Generate train dataset for each epoch.
Given positive training instances, randomly generate negative instances to
form the training dataset.
Args:
train_data: A list of positive training instances.
num_items: An integer, the number of items in positive training instances.
num_negatives: An integer, the number of negative training instances
following positive training instances. It is 4 by default.
Returns:
A numpy array of training dataset.
"""
all_train_data = []
# A set with user-item tuples
train_data_set = set((u, i) for u, i, _ in train_data)
for u, i, _ in train_data:
# Positive instance
all_train_data.append([u, i, 1])
# Negative instances, randomly generated
for _ in xrange(num_negatives):
j = np.random.randint(num_items)
while (u, j) in train_data_set:
j = np.random.randint(num_items)
all_train_data.append([u, j, 0])
return np.asarray(all_train_data)
def _deserialize_train(examples_serialized):
features = tf.parse_example(examples_serialized, _FEATURE_MAP)
train_features = {
movielens.USER_COLUMN: features[movielens.USER_COLUMN],
movielens.ITEM_COLUMN: features[movielens.ITEM_COLUMN],
}
return train_features, features[movielens.RATING_COLUMN]
def _deserialize_eval(examples_serialized):
features = tf.parse_example(examples_serialized, _FEATURE_MAP_EVAL)
return features
def get_input_fn(training, batch_size, ncf_dataset, data_dir, dataset,
repeat=1):
"""Input function for model training and evaluation.
The train input consists of 1 positive instance (user and item have
interactions) followed by some number of negative instances in which the items
are randomly chosen. The number of negative instances is "num_negatives" which
is 4 by default. Note that for each epoch, we need to re-generate the negative
instances. Together with positive instances, they form a new train dataset.
Args:
training: A boolean flag for training mode.
batch_size: An integer, batch size for training and evaluation.
ncf_dataset: An NCFDataSet object, which contains the information about
training and test data.
repeat: An integer, how many times to repeat the dataset.
Returns:
dataset: A tf.data.Dataset object containing examples loaded from the files.
"""
# Generate random negative instances for training in each epoch
if training:
tf.logging.info("Generating training data.")
train_data = generate_train_dataset(
ncf_dataset.train_data, ncf_dataset.num_items,
ncf_dataset.num_negatives)
df = pd.DataFrame(data=train_data, columns=_COLUMNS)
if data_dir.startswith("gs://"):
buffer_dir = os.path.join(data_dir, _BUFFER_SUBDIR)
else:
buffer_dir = None
buffer_path = file_io.write_to_temp_buffer(df, buffer_dir, _COLUMNS)
map_fn = _deserialize_train
else:
df = pd.DataFrame(ncf_dataset.all_eval_data, columns=_EVAL_COLUMNS)
buffer_path = os.path.join(
data_dir, _BUFFER_SUBDIR, dataset + "_eval_buffer")
file_io.write_to_buffer(
dataframe=df, buffer_path=buffer_path, columns=_EVAL_COLUMNS,
expected_size=_EVAL_BUFFER_SIZE[dataset])
map_fn = _deserialize_eval
def input_fn(): # pylint: disable=missing-docstring
dataset = tf.data.TFRecordDataset(buffer_path)
if training:
dataset = dataset.shuffle(buffer_size=_SHUFFLE_BUFFER_SIZE)
dataset = dataset.batch(batch_size)
dataset = dataset.map(map_fn, num_parallel_calls=16)
dataset = dataset.repeat(repeat)
# Prefetch to improve speed of input pipeline.
dataset = dataset.prefetch(buffer_size=tf.contrib.data.AUTOTUNE)
return dataset
return input_fn
def main(_):
movielens.download(dataset=flags.FLAGS.dataset, data_dir=flags.FLAGS.data_dir)
construct_train_eval_csv(flags.FLAGS.data_dir, flags.FLAGS.dataset)
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
movielens.define_data_download_flags()
flags.adopt_module_key_flags(movielens)
flags_core.set_defaults(dataset="ml-1m")
absl_app.run(main)
...@@ -17,13 +17,19 @@ ...@@ -17,13 +17,19 @@
The NeuMF model assembles both MF and MLP models under the NCF framework. Check The NeuMF model assembles both MF and MLP models under the NCF framework. Check
`neumf_model.py` for more details about the models. `neumf_model.py` for more details about the models.
""" """
from __future__ import absolute_import from __future__ import absolute_import
from __future__ import division from __future__ import division
from __future__ import print_function from __future__ import print_function
import contextlib
import gc
import heapq import heapq
import math import math
import multiprocessing
import os import os
import signal
import typing
# pylint: disable=g-bad-import-order # pylint: disable=g-bad-import-order
import numpy as np import numpy as np
...@@ -33,7 +39,8 @@ import tensorflow as tf ...@@ -33,7 +39,8 @@ import tensorflow as tf
# pylint: enable=g-bad-import-order # pylint: enable=g-bad-import-order
from official.datasets import movielens from official.datasets import movielens
from official.recommendation import movielens_dataset from official.recommendation import constants as rconst
from official.recommendation import data_preprocessing
from official.recommendation import neumf_model from official.recommendation import neumf_model
from official.utils.flags import core as flags_core from official.utils.flags import core as flags_core
from official.utils.logs import hooks_helper from official.utils.logs import hooks_helper
...@@ -47,11 +54,12 @@ _HR_KEY = "HR" ...@@ -47,11 +54,12 @@ _HR_KEY = "HR"
_NDCG_KEY = "NDCG" _NDCG_KEY = "NDCG"
def evaluate_model(estimator, batch_size, num_gpus, ncf_dataset, pred_input_fn): def evaluate_model(estimator, ncf_dataset, pred_input_fn):
# type: (tf.estimator.Estimator, prepare.NCFDataset, typing.Callable) -> dict
"""Model evaluation with HR and NDCG metrics. """Model evaluation with HR and NDCG metrics.
The evaluation protocol is to rank the test interacted item (truth items) The evaluation protocol is to rank the test interacted item (truth items)
among the randomly chosen 100 items that are not interacted by the user. among the randomly chosen 999 items that are not interacted by the user.
The performance of the ranked list is judged by Hit Ratio (HR) and Normalized The performance of the ranked list is judged by Hit Ratio (HR) and Normalized
Discounted Cumulative Gain (NDCG). Discounted Cumulative Gain (NDCG).
...@@ -63,14 +71,11 @@ def evaluate_model(estimator, batch_size, num_gpus, ncf_dataset, pred_input_fn): ...@@ -63,14 +71,11 @@ def evaluate_model(estimator, batch_size, num_gpus, ncf_dataset, pred_input_fn):
Args: Args:
estimator: The Estimator. estimator: The Estimator.
batch_size: An integer, the batch size specified by user.
num_gpus: An integer, the number of gpus specified by user.
ncf_dataset: An NCFDataSet object, which contains the information about ncf_dataset: An NCFDataSet object, which contains the information about
test/eval dataset, such as: test/eval dataset, such as:
eval_true_items, which is a list of test items (true items) for HR and num_users: How many unique users are in the eval set.
NDCG calculation. Each item is for one user. test_data: The points which are used for consistent evaluation. These
eval_all_items, which is a nested list. Each entry is the 101 items are already included in the pred_input_fn.
(1 ground truth item and 100 negative items) for one user.
pred_input_fn: The input function for the test data. pred_input_fn: The input function for the test data.
Returns: Returns:
...@@ -85,82 +90,103 @@ def evaluate_model(estimator, batch_size, num_gpus, ncf_dataset, pred_input_fn): ...@@ -85,82 +90,103 @@ def evaluate_model(estimator, batch_size, num_gpus, ncf_dataset, pred_input_fn):
and global_step is the global step and global_step is the global step
""" """
# Get predictions tf.logging.info("Computing predictions for eval set...")
predictions = estimator.predict(input_fn=pred_input_fn)
all_predicted_scores = [p[movielens.RATING_COLUMN] for p in predictions]
# Calculate HR score # Get predictions
def _get_hr(ranklist, true_item): predictions = estimator.predict(input_fn=pred_input_fn,
return 1 if true_item in ranklist else 0 yield_single_examples=False)
# Calculate NDCG score prediction_batches = [p[movielens.RATING_COLUMN] for p in predictions]
def _get_ndcg(ranklist, true_item):
if true_item in ranklist:
return math.log(2) / math.log(ranklist.index(true_item) + 2)
return 0
hits, ndcgs = [], []
num_users = len(ncf_dataset.eval_true_items)
# Reshape the predicted scores and each user takes one row # Reshape the predicted scores and each user takes one row
predicted_scores_list = np.asarray( prediction_with_padding = np.concatenate(prediction_batches, axis=0)
all_predicted_scores).reshape(num_users, -1) predicted_scores_by_user = prediction_with_padding[
:ncf_dataset.num_users * (1 + rconst.NUM_EVAL_NEGATIVES)]\
for i in range(num_users): .reshape(ncf_dataset.num_users, -1)
items = ncf_dataset.eval_all_items[i]
predicted_scores = predicted_scores_list[i] tf.logging.info("Computing metrics...")
# Map item and score for each user
map_item_score = {} # NumPy has an np.argparition() method, however log(1000) is so small that
for j, item in enumerate(items): # sorting the whole array is simpler and fast enough.
score = predicted_scores[j] top_indicies = np.argsort(predicted_scores_by_user, axis=1)[:, -_TOP_K:]
map_item_score[item] = score top_indicies = np.flip(top_indicies, axis=1)
# Evaluate top rank list with HR and NDCG # Both HR and NDCG vectorized computation takes advantage of the fact that if
ranklist = heapq.nlargest(_TOP_K, map_item_score, key=map_item_score.get) # the positive example for a user is not in the top k, that index does not
true_item = ncf_dataset.eval_true_items[i] # appear. That is to say: hit_ind.shape[0] <= num_users
hr = _get_hr(ranklist, true_item) hit_ind = np.argwhere(np.equal(top_indicies, 0))
ndcg = _get_ndcg(ranklist, true_item) hr = hit_ind.shape[0] / ncf_dataset.num_users
hits.append(hr) ndcg = np.sum(np.log(2) / np.log(hit_ind[:, 1] + 2)) / ncf_dataset.num_users
ndcgs.append(ndcg)
# Get average HR and NDCG scores
hr, ndcg = np.array(hits).mean(), np.array(ndcgs).mean()
global_step = estimator.get_variable_value(tf.GraphKeys.GLOBAL_STEP) global_step = estimator.get_variable_value(tf.GraphKeys.GLOBAL_STEP)
eval_results = { eval_results = {
_HR_KEY: hr, _HR_KEY: hr,
_NDCG_KEY: ndcg, _NDCG_KEY: ndcg,
tf.GraphKeys.GLOBAL_STEP: global_step tf.GraphKeys.GLOBAL_STEP: global_step
} }
return eval_results return eval_results
def convert_keras_to_estimator(keras_model, num_gpus, model_dir): def construct_estimator(num_gpus, model_dir, params, batch_size,
"""Configure and convert keras model to Estimator. eval_batch_size):
"""Construct either an Estimator or TPUEstimator for NCF.
Args: Args:
keras_model: A Keras model object. num_gpus: The number of gpus (Used to select distribution strategy)
num_gpus: An integer, the number of gpus. model_dir: The model directory for the estimator
model_dir: A string, the directory to save and restore checkpoints. params: The params dict for the estimator
batch_size: The mini-batch size for training.
eval_batch_size: The batch size used during evaluation.
Returns: Returns:
est_model: The converted Estimator. An Estimator or TPUEstimator.
""" """
optimizer = tf.train.AdamOptimizer(
learning_rate=FLAGS.learning_rate)
keras_model.compile(optimizer=optimizer, loss="binary_crossentropy")
if num_gpus == 0:
distribution = tf.contrib.distribute.OneDeviceStrategy("device:CPU:0")
elif num_gpus == 1:
distribution = tf.contrib.distribute.OneDeviceStrategy("device:GPU:0")
else:
distribution = tf.contrib.distribute.MirroredStrategy(num_gpus=num_gpus)
if params["use_tpu"]:
tpu_cluster_resolver = tf.contrib.cluster_resolver.TPUClusterResolver(
tpu=params["tpu"],
zone=params["tpu_zone"],
project=params["tpu_gcp_project"],
)
tpu_config = tf.contrib.tpu.TPUConfig(
iterations_per_loop=100,
num_shards=8)
run_config = tf.contrib.tpu.RunConfig(
cluster=tpu_cluster_resolver,
model_dir=model_dir,
session_config=tf.ConfigProto(
allow_soft_placement=True, log_device_placement=False),
tpu_config=tpu_config)
tpu_params = {k: v for k, v in params.items() if k != "batch_size"}
train_estimator = tf.contrib.tpu.TPUEstimator(
model_fn=neumf_model.neumf_model_fn,
use_tpu=True,
train_batch_size=batch_size,
params=tpu_params,
config=run_config)
eval_estimator = tf.contrib.tpu.TPUEstimator(
model_fn=neumf_model.neumf_model_fn,
use_tpu=False,
train_batch_size=1,
predict_batch_size=eval_batch_size,
params=tpu_params,
config=run_config)
return train_estimator, eval_estimator
distribution = distribution_utils.get_distribution_strategy(num_gpus=num_gpus)
run_config = tf.estimator.RunConfig(train_distribute=distribution) run_config = tf.estimator.RunConfig(train_distribute=distribution)
params["eval_batch_size"] = eval_batch_size
estimator = tf.keras.estimator.model_to_estimator( estimator = tf.estimator.Estimator(model_fn=neumf_model.neumf_model_fn,
keras_model=keras_model, model_dir=model_dir, config=run_config) model_dir=model_dir, config=run_config,
params=params)
return estimator return estimator, estimator
def main(_): def main(_):
...@@ -172,25 +198,35 @@ def run_ncf(_): ...@@ -172,25 +198,35 @@ def run_ncf(_):
"""Run NCF training and eval loop.""" """Run NCF training and eval loop."""
if FLAGS.download_if_missing: if FLAGS.download_if_missing:
movielens.download(FLAGS.dataset, FLAGS.data_dir) movielens.download(FLAGS.dataset, FLAGS.data_dir)
movielens_dataset.construct_train_eval_csv(
data_dir=FLAGS.data_dir, dataset=FLAGS.dataset)
tf.logging.info("Data preprocessing...") num_gpus = flags_core.get_num_gpus(FLAGS)
ncf_dataset = movielens_dataset.data_preprocessing( batch_size = distribution_utils.per_device_batch_size(
FLAGS.data_dir, FLAGS.dataset, FLAGS.num_neg) int(FLAGS.batch_size), num_gpus)
eval_batch_size = int(FLAGS.eval_batch_size) or int(FLAGS.batch_size)
ncf_dataset = data_preprocessing.instantiate_pipeline(
dataset=FLAGS.dataset, data_dir=FLAGS.data_dir,
batch_size=batch_size,
eval_batch_size=eval_batch_size,
num_neg=FLAGS.num_neg,
epochs_per_cycle=FLAGS.epochs_between_evals)
model_helpers.apply_clean(flags.FLAGS) model_helpers.apply_clean(flags.FLAGS)
# Create NeuMF model and convert it to Estimator train_estimator, eval_estimator = construct_estimator(
tf.logging.info("Creating Estimator from Keras model...") num_gpus=num_gpus, model_dir=FLAGS.model_dir, params={
layers = [int(layer) for layer in FLAGS.layers] "batch_size": batch_size,
mlp_regularization = [float(reg) for reg in FLAGS.mlp_regularization] "learning_rate": FLAGS.learning_rate,
keras_model = neumf_model.NeuMF( "num_users": ncf_dataset.num_users,
ncf_dataset.num_users, ncf_dataset.num_items, FLAGS.num_factors, "num_items": ncf_dataset.num_items,
layers, FLAGS.batch_size, FLAGS.mf_regularization, "mf_dim": FLAGS.num_factors,
mlp_regularization) "model_layers": [int(layer) for layer in FLAGS.layers],
num_gpus = flags_core.get_num_gpus(FLAGS) "mf_regularization": FLAGS.mf_regularization,
estimator = convert_keras_to_estimator(keras_model, num_gpus, FLAGS.model_dir) "mlp_reg_layers": [float(reg) for reg in FLAGS.mlp_regularization],
"use_tpu": FLAGS.tpu is not None,
"tpu": FLAGS.tpu,
"tpu_zone": FLAGS.tpu_zone,
"tpu_gcp_project": FLAGS.tpu_gcp_project,
}, batch_size=flags.FLAGS.batch_size, eval_batch_size=eval_batch_size)
# Create hooks that log information about the training and metric values # Create hooks that log information about the training and metric values
train_hooks = hooks_helper.get_train_hooks( train_hooks = hooks_helper.get_train_hooks(
...@@ -200,6 +236,7 @@ def run_ncf(_): ...@@ -200,6 +236,7 @@ def run_ncf(_):
) )
run_params = { run_params = {
"batch_size": FLAGS.batch_size, "batch_size": FLAGS.batch_size,
"eval_batch_size": eval_batch_size,
"number_factors": FLAGS.num_factors, "number_factors": FLAGS.num_factors,
"hr_threshold": FLAGS.hr_threshold, "hr_threshold": FLAGS.hr_threshold,
"train_epochs": FLAGS.train_epochs, "train_epochs": FLAGS.train_epochs,
...@@ -211,31 +248,31 @@ def run_ncf(_): ...@@ -211,31 +248,31 @@ def run_ncf(_):
run_params=run_params, run_params=run_params,
test_id=FLAGS.benchmark_test_id) test_id=FLAGS.benchmark_test_id)
# Training and evaluation cycle approx_train_steps = int(ncf_dataset.num_train_positives
def get_train_input_fn(): * (1 + FLAGS.num_neg) // FLAGS.batch_size)
return movielens_dataset.get_input_fn( pred_input_fn = data_preprocessing.make_pred_input_fn(ncf_dataset=ncf_dataset)
True,
distribution_utils.per_device_batch_size(FLAGS.batch_size, num_gpus),
ncf_dataset, FLAGS.data_dir, FLAGS.dataset, FLAGS.epochs_between_evals)
def get_pred_input_fn():
return movielens_dataset.get_input_fn(
False,
distribution_utils.per_device_batch_size(FLAGS.batch_size, num_gpus),
ncf_dataset, FLAGS.data_dir, FLAGS.dataset, 1)
total_training_cycle = FLAGS.train_epochs // FLAGS.epochs_between_evals total_training_cycle = FLAGS.train_epochs // FLAGS.epochs_between_evals
for cycle_index in range(total_training_cycle): for cycle_index in range(total_training_cycle):
tf.logging.info("Starting a training cycle: {}/{}".format( tf.logging.info("Starting a training cycle: {}/{}".format(
cycle_index + 1, total_training_cycle)) cycle_index + 1, total_training_cycle))
# Train the model # Train the model
estimator.train(input_fn=get_train_input_fn(), hooks=train_hooks) train_input_fn, train_record_dir, batch_count = \
data_preprocessing.make_train_input_fn(ncf_dataset=ncf_dataset)
if np.abs(approx_train_steps - batch_count) > 1:
tf.logging.warning(
"Estimated ({}) and reported ({}) number of batches differ by more "
"than one".format(approx_train_steps, batch_count))
train_estimator.train(input_fn=train_input_fn, hooks=train_hooks,
steps=batch_count)
tf.gfile.DeleteRecursively(train_record_dir)
# Evaluate the model # Evaluate the model
eval_results = evaluate_model( eval_results = evaluate_model(
estimator, FLAGS.batch_size, num_gpus, ncf_dataset, get_pred_input_fn()) eval_estimator, ncf_dataset, pred_input_fn)
# Benchmark the evaluation results # Benchmark the evaluation results
benchmark_logger.log_evaluation_result(eval_results) benchmark_logger.log_evaluation_result(eval_results)
...@@ -246,6 +283,10 @@ def run_ncf(_): ...@@ -246,6 +283,10 @@ def run_ncf(_):
"Iteration {}: HR = {:.4f}, NDCG = {:.4f}".format( "Iteration {}: HR = {:.4f}, NDCG = {:.4f}".format(
cycle_index + 1, hr, ndcg)) cycle_index + 1, hr, ndcg))
# Some of the NumPy vector math can be quite large and likes to stay in
# memory for a while.
gc.collect()
# If some evaluation threshold is met # If some evaluation threshold is met
if model_helpers.past_stop_threshold(FLAGS.hr_threshold, hr): if model_helpers.past_stop_threshold(FLAGS.hr_threshold, hr):
break break
...@@ -267,6 +308,7 @@ def define_ncf_flags(): ...@@ -267,6 +308,7 @@ def define_ncf_flags():
dtype=False, dtype=False,
all_reduce_alg=False all_reduce_alg=False
) )
flags_core.define_device(tpu=True)
flags_core.define_benchmark() flags_core.define_benchmark()
flags.adopt_module_key_flags(flags_core) flags.adopt_module_key_flags(flags_core)
...@@ -276,7 +318,9 @@ def define_ncf_flags(): ...@@ -276,7 +318,9 @@ def define_ncf_flags():
data_dir="/tmp/movielens-data/", data_dir="/tmp/movielens-data/",
train_epochs=2, train_epochs=2,
batch_size=256, batch_size=256,
hooks="ProfilerHook") hooks="ProfilerHook",
tpu=None
)
# Add ncf-specific flags # Add ncf-specific flags
flags.DEFINE_enum( flags.DEFINE_enum(
...@@ -289,6 +333,13 @@ def define_ncf_flags(): ...@@ -289,6 +333,13 @@ def define_ncf_flags():
name="download_if_missing", default=True, help=flags_core.help_wrap( name="download_if_missing", default=True, help=flags_core.help_wrap(
"Download data to data_dir if it is not already present.")) "Download data to data_dir if it is not already present."))
flags.DEFINE_string(
name="eval_batch_size", default=None, help=flags_core.help_wrap(
"The batch size used for evaluation. This should generally be larger"
"than the training batch size as the lack of back propagation during"
"evaluation can allow for larger batch sizes to fit in memory. If not"
"specified, the training batch size (--batch_size) will be used."))
flags.DEFINE_integer( flags.DEFINE_integer(
name="num_factors", default=8, name="num_factors", default=8,
help=flags_core.help_wrap("The Embedding size of MF model.")) help=flags_core.help_wrap("The Embedding size of MF model."))
......
...@@ -33,102 +33,158 @@ from __future__ import absolute_import ...@@ -33,102 +33,158 @@ from __future__ import absolute_import
from __future__ import division from __future__ import division
from __future__ import print_function from __future__ import print_function
import sys
import typing
from six.moves import xrange # pylint: disable=redefined-builtin from six.moves import xrange # pylint: disable=redefined-builtin
import tensorflow as tf import tensorflow as tf
from tensorflow.python.keras.utils import tf_utils
from official.datasets import movielens # pylint: disable=g-bad-import-order from official.datasets import movielens # pylint: disable=g-bad-import-order
from official.utils.accelerator import tpu as tpu_utils
class NeuMF(tf.keras.models.Model):
"""Neural matrix factorization (NeuMF) model for recommendations.""" def neumf_model_fn(features, labels, mode, params):
"""Model Function for NeuMF estimator."""
def __init__(self, num_users, num_items, mf_dim, model_layers, batch_size, users = features[movielens.USER_COLUMN]
mf_regularization, mlp_reg_layers): items = tf.cast(features[movielens.ITEM_COLUMN], tf.int32)
"""Initialize NeuMF model.
logits = construct_model(users=users, items=items, params=params)
Args:
num_users: An integer, the number of users. if mode == tf.estimator.ModeKeys.PREDICT:
num_items: An integer, the number of items. predictions = {
mf_dim: An integer, the embedding size of Matrix Factorization (MF) model. movielens.RATING_COLUMN: logits,
model_layers: A list of integers for Multi-Layer Perceptron (MLP) layers. }
Note that the first layer is the concatenation of user and item
embeddings. So model_layers[0]//2 is the embedding size for MLP. if params["use_tpu"]:
batch_size: An integer for the batch size. return tf.contrib.tpu.TPUEstimatorSpec(mode=mode, predictions=predictions)
mf_regularization: A floating number, the regularization factor for MF return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions)
embeddings.
mlp_reg_layers: A list of floating numbers, the regularization factors for elif mode == tf.estimator.ModeKeys.TRAIN:
each layer in MLP. labels = tf.cast(labels, tf.int32)
optimizer = tf.train.AdamOptimizer(learning_rate=params["learning_rate"])
Raises: if params["use_tpu"]:
ValueError: if the first model layer is not even. optimizer = tf.contrib.tpu.CrossShardOptimizer(optimizer)
"""
if model_layers[0] % 2 != 0: # Softmax with the first column of ones is equivalent to sigmoid.
raise ValueError("The first layer size should be multiple of 2!") logits = tf.concat([tf.ones(logits.shape, dtype=logits.dtype), logits]
, axis=1)
# Input variables
user_input = tf.keras.layers.Input( loss = tf.losses.sparse_softmax_cross_entropy(
shape=(1,), dtype=tf.int32, name=movielens.USER_COLUMN) labels=labels,
item_input = tf.keras.layers.Input( logits=logits
shape=(1,), dtype=tf.int32, name=movielens.ITEM_COLUMN) )
# Initializer for embedding layer global_step = tf.train.get_global_step()
embedding_initializer = tf.keras.initializers.RandomNormal(stddev=0.01) tvars = tf.trainable_variables()
# Embedding layers of GMF and MLP gradients = optimizer.compute_gradients(
mf_embedding_user = tf.keras.layers.Embedding( loss, tvars, colocate_gradients_with_ops=True)
num_users, minimize_op = optimizer.apply_gradients(
mf_dim, gradients, global_step=global_step, name="train")
embeddings_initializer=embedding_initializer, update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
embeddings_regularizer=tf.keras.regularizers.l2(mf_regularization), train_op = tf.group(minimize_op, update_ops)
input_length=1)
mf_embedding_item = tf.keras.layers.Embedding( if params["use_tpu"]:
num_items, return tf.contrib.tpu.TPUEstimatorSpec(
mf_dim, mode=mode, loss=loss, train_op=train_op)
embeddings_initializer=embedding_initializer, return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op)
embeddings_regularizer=tf.keras.regularizers.l2(mf_regularization),
input_length=1) else:
raise NotImplementedError
mlp_embedding_user = tf.keras.layers.Embedding(
num_users,
model_layers[0]//2, def construct_model(users, items, params):
embeddings_initializer=embedding_initializer, # type: (tf.Tensor, tf.Tensor, dict) -> tf.Tensor
embeddings_regularizer=tf.keras.regularizers.l2(mlp_reg_layers[0]), """Initialize NeuMF model.
input_length=1)
mlp_embedding_item = tf.keras.layers.Embedding( Args:
num_items, users: Tensor of user ids.
model_layers[0]//2, items: Tensor of item ids.
embeddings_initializer=embedding_initializer, params: Dict of hyperparameters.
embeddings_regularizer=tf.keras.regularizers.l2(mlp_reg_layers[0]),
input_length=1) Raises:
ValueError: if the first model layer is not even.
# GMF part """
# Flatten the embedding vector as latent features in GMF
mf_user_latent = tf.keras.layers.Flatten()(mf_embedding_user(user_input)) num_users = params["num_users"]
mf_item_latent = tf.keras.layers.Flatten()(mf_embedding_item(item_input)) num_items = params["num_items"]
# Element-wise multiply
mf_vector = tf.keras.layers.multiply([mf_user_latent, mf_item_latent]) model_layers = params["model_layers"]
# MLP part mf_regularization = params["mf_regularization"]
# Flatten the embedding vector as latent features in MLP mlp_reg_layers = params["mlp_reg_layers"]
mlp_user_latent = tf.keras.layers.Flatten()(mlp_embedding_user(user_input))
mlp_item_latent = tf.keras.layers.Flatten()(mlp_embedding_item(item_input)) mf_dim = params["mf_dim"]
# Concatenation of two latent features
mlp_vector = tf.keras.layers.concatenate([mlp_user_latent, mlp_item_latent]) if model_layers[0] % 2 != 0:
raise ValueError("The first layer size should be multiple of 2!")
num_layer = len(model_layers) # Number of layers in the MLP
for layer in xrange(1, num_layer): # Input variables
model_layer = tf.keras.layers.Dense( user_input = tf.keras.layers.Input(tensor=users)
model_layers[layer], item_input = tf.keras.layers.Input(tensor=items)
kernel_regularizer=tf.keras.regularizers.l2(mlp_reg_layers[layer]),
activation="relu") # Initializer for embedding layers
mlp_vector = model_layer(mlp_vector) embedding_initializer = "glorot_uniform"
# Concatenate GMF and MLP parts # Embedding layers of GMF and MLP
predict_vector = tf.keras.layers.concatenate([mf_vector, mlp_vector]) mf_embedding_user = tf.keras.layers.Embedding(
num_users,
# Final prediction layer mf_dim,
prediction = tf.keras.layers.Dense( embeddings_initializer=embedding_initializer,
1, activation="sigmoid", kernel_initializer="lecun_uniform", embeddings_regularizer=tf.keras.regularizers.l2(mf_regularization),
name=movielens.RATING_COLUMN)(predict_vector) input_length=1)
mf_embedding_item = tf.keras.layers.Embedding(
super(NeuMF, self).__init__( num_items,
inputs=[user_input, item_input], outputs=prediction) mf_dim,
embeddings_initializer=embedding_initializer,
embeddings_regularizer=tf.keras.regularizers.l2(mf_regularization),
input_length=1)
mlp_embedding_user = tf.keras.layers.Embedding(
num_users,
model_layers[0]//2,
embeddings_initializer=embedding_initializer,
embeddings_regularizer=tf.keras.regularizers.l2(mlp_reg_layers[0]),
input_length=1)
mlp_embedding_item = tf.keras.layers.Embedding(
num_items,
model_layers[0]//2,
embeddings_initializer=embedding_initializer,
embeddings_regularizer=tf.keras.regularizers.l2(mlp_reg_layers[0]),
input_length=1)
# GMF part
# Flatten the embedding vector as latent features in GMF
mf_user_latent = tf.keras.layers.Flatten()(mf_embedding_user(user_input))
mf_item_latent = tf.keras.layers.Flatten()(mf_embedding_item(item_input))
# Element-wise multiply
mf_vector = tf.keras.layers.multiply([mf_user_latent, mf_item_latent])
# MLP part
# Flatten the embedding vector as latent features in MLP
mlp_user_latent = tf.keras.layers.Flatten()(mlp_embedding_user(user_input))
mlp_item_latent = tf.keras.layers.Flatten()(mlp_embedding_item(item_input))
# Concatenation of two latent features
mlp_vector = tf.keras.layers.concatenate([mlp_user_latent, mlp_item_latent])
num_layer = len(model_layers) # Number of layers in the MLP
for layer in xrange(1, num_layer):
model_layer = tf.keras.layers.Dense(
model_layers[layer],
kernel_regularizer=tf.keras.regularizers.l2(mlp_reg_layers[layer]),
activation="relu")
mlp_vector = model_layer(mlp_vector)
# Concatenate GMF and MLP parts
predict_vector = tf.keras.layers.concatenate([mf_vector, mlp_vector])
# Final prediction layer
logits = tf.keras.layers.Dense(
1, activation=None, kernel_initializer="lecun_uniform",
name=movielens.RATING_COLUMN)(predict_vector)
# Print model topology.
tf.keras.models.Model([user_input, item_input], logits).summary()
sys.stdout.flush()
return logits
#!/bin/bash #!/bin/bash
set -e
# Run script for MLPerf DATASET="ml-20m"
python3 ncf_main.py -dataset ml-20m -hooks "" -hr_threshold 0.9562 -train_epochs 100 -learning_rate 0.0005 -batch_size 2048 -layers 256,256,128,64 -num_factors 64
BUCKET=${BUCKET:-""}
ROOT_DIR="${BUCKET:-/tmp}/MLPerf_NCF"
echo "Root directory: ${ROOT_DIR}"
if [[ -z ${BUCKET} ]]; then
LOCAL_ROOT=${ROOT_DIR}
else
LOCAL_ROOT="/tmp/MLPerf_NCF"
mkdir -p ${LOCAL_ROOT}
echo "Local root (for files which cannot use GCS): ${LOCAL_ROOT}"
fi
DATE=$(date '+%Y-%m-%d_%H:%M:%S')
TEST_DIR="${ROOT_DIR}/${DATE}"
LOCAL_TEST_DIR="${LOCAL_ROOT}/${DATE}"
mkdir -p ${LOCAL_TEST_DIR}
TPU=${TPU:-""}
if [[ -z ${TPU} ]]; then
DEVICE_FLAG="--num_gpus -1"
else
DEVICE_FLAG="--tpu ${TPU} --num_gpus 0"
fi
DATA_DIR="${ROOT_DIR}/movielens_data"
python ../datasets/movielens.py --data_dir ${DATA_DIR} --dataset ${DATASET}
{
for i in `seq 0 4`;
do
START_TIME=$(date +%s)
MODEL_DIR="${TEST_DIR}/model_dir_${i}"
RUN_LOG="${LOCAL_TEST_DIR}/run_${i}.log"
echo ""
echo "Beginning run ${i}"
echo " Complete logs are in ${RUN_LOG}"
# Note: The hit rate threshold has been set to 0.62 rather than the MLPerf 0.635
# The reason why the TF implementation does not reach 0.635 is still unknown.
python ncf_main.py --model_dir ${MODEL_DIR} \
--data_dir ${DATA_DIR} \
--dataset ${DATASET} --hooks "" \
${DEVICE_FLAG} \
--clean \
--train_epochs 100 \
--batch_size 2048 \
--eval_batch_size 65536 \
--learning_rate 0.0005 \
--layers 256,256,128,64 --num_factors 64 \
--hr_threshold 0.62 \
|& tee ${RUN_LOG} \
| grep --line-buffered -E --regexp="Iteration [0-9]+: HR = [0-9\.]+, NDCG = [0-9\.]+"
END_TIME=$(date +%s)
echo "Run ${i} complete: $(( $END_TIME - $START_TIME )) seconds."
done
} |& tee "${LOCAL_TEST_DIR}/summary.log"
#!/bin/bash
set -e
# Example settings:
# TPU="taylorrobie-tpu-0"
# BUCKET="gs://taylorrobie-tpu-test-bucket-2"
# Remove IDE "not assigned" warning highlights.
TPU=${TPU:-""}
BUCKET=${BUCKET:-""}
if [[ -z ${TPU} ]]; then
echo "Please set 'TPU' to the name of the TPU to be used."
exit 1
fi
if [[ -z ${BUCKET} ]]; then
echo "Please set 'BUCKET' to the GCS bucket to be used."
exit 1
fi
./run.sh
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Statistics utility functions of NCF."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
def random_int32():
return np.random.randint(low=0, high=np.iinfo(np.int32).max, dtype=np.int32)
def sample_with_exclusion(num_items, positive_set, n, replacement=True):
# type: (int, typing.Iterable, int, bool) -> list
"""Vectorized negative sampling.
This function samples from the positive set's conjugate, both with and
without replacement.
Performance:
This algorithm generates a vector of candidate values based on the expected
number needed such that at least k are not in the positive set, where k
is the number of false negatives still needed. An additional factor of
safety of 1.2 is used during the generation to minimize the chance of having
to perform another generation cycle.
While this approach generates more values than needed and then discards some
of them, vectorized generation is inexpensive and turns out to be much
faster than generating points one at a time. (And it defers quite a bit
of work to NumPy which has much better multi-core utilization than native
Python.)
Args:
num_items: The cardinality of the entire set of items.
positive_set: The set of positive items which should not be included as
negatives.
n: The number of negatives to generate.
replacement: Whether to sample with (True) or without (False) replacement.
Returns:
A list of generated negatives.
"""
if not isinstance(positive_set, set):
positive_set = set(positive_set)
p = 1 - len(positive_set) / num_items
n_attempt = int(n * (1 / p) * 1.2) # factor of 1.2 for safety
# If sampling is performed with replacement, candidates are appended.
# Otherwise, they should be added with a set union to remove duplicates.
if replacement:
negatives = []
else:
negatives = set()
while len(negatives) < n:
negative_candidates = np.random.randint(
low=0, high=num_items, size=(n_attempt,))
if replacement:
negatives.extend(
[i for i in negative_candidates if i not in positive_set]
)
else:
negatives |= (set(negative_candidates) - positive_set)
if not replacement:
negatives = list(negatives)
np.random.shuffle(negatives) # list(set(...)) is not order guaranteed, but
# in practice tends to be quite ordered.
return negatives[:n]
2784 2659 1705 3316 887 815 1783 3483 1085 329 1830 1880 2699 3194 3596 2700 3520 2414 757 3610 2651 2187 2274 2949 1753 589 2945 2172 2877 1992 2215 1028 807 2098 1923 3555 2548 2950 151 2060 3301 807 849 711 3271 3010 475 3412 3389 2797 691 596 2643 766 2344 203 2775 2610 3583 2982 1259 2128 854 2228 2228 2008 3177 1977 3674 3612 808 325 2435 440 1693 3166 1518 2643 940 309 1397 2157 2391 3243 2879 482 3206 143 1972 2498 2711 1641 3008 2733 136 2303 376 826 3064 1123
767 2164 1283 1100 3044 1332 2152 1295 1812 3427 3130 2967 2895 3085 501 2005 688 1457 1733 2345 1827 1600 3295 3397 384 2033 1444 2082 944 2563 1762 1101 496 2151 3093 329 2559 1664 2058 2546 683 1082 3583 2199 1851 258 2553 2274 1059 2910 2299 2115 2770 2094 2915 3348 337 2738 1563 2958 3241 3258 2881 3236 2954 214 3243 1000 2187 2946 435 1232 2208 1334 1280 2982 403 1326 1706 1523 1336 2620 2664 2462 1432 765 898 222 3426 2027 3469 2032 2472 1480 3219 735 1562 2626 1400 308
0 21 1
0 41 1
0 8 1
0 23 1
0 43 1
0 10 1
0 45 1
0 12 1
0 47 1
0 14 1
0 33 1
0 0 1
0 35 1
0 2 1
0 37 1
0 4 1
0 39 1
0 6 1
0 24 1
0 26 1
0 28 1
0 30 1
0 49 1
0 16 1
0 51 1
0 18 1
0 20 1
0 40 1
0 22 1
0 42 1
0 9 1
0 44 1
0 11 1
0 46 1
0 13 1
0 32 1
0 15 1
0 34 1
0 1 1
0 36 1
0 3 1
0 38 1
0 5 1
0 7 1
0 27 1
0 29 1
0 48 1
0 31 1
0 50 1
0 17 1
0 52 1
0 19 1
1 146 1
1 61 1
1 127 1
1 88 1
1 144 1
1 125 1
1 94 1
1 150 1
1 18 1
1 115 1
1 92 1
1 170 1
1 139 1
1 148 1
1 55 1
1 113 1
1 82 1
1 168 1
1 137 1
1 53 1
1 119 1
1 80 1
1 174 1
1 143 1
1 42 1
1 20 1
1 117 1
1 86 1
1 172 1
1 141 1
1 106 1
1 75 1
1 84 1
1 162 1
1 131 1
1 104 1
1 73 1
1 160 1
1 129 1
1 110 1
1 79 1
1 166 1
1 135 1
1 108 1
1 77 1
1 164 1
1 133 1
1 98 1
1 67 1
1 155 1
1 96 1
1 65 1
1 153 1
1 102 1
1 71 1
1 159 1
1 58 1
1 100 1
1 69 1
1 157 1
1 56 1
1 122 1
1 91 1
1 147 1
1 62 1
1 120 1
1 89 1
1 145 1
1 60 1
1 126 1
1 95 1
1 151 1
1 124 1
1 93 1
1 171 1
1 149 1
1 48 1
1 114 1
1 83 1
1 169 1
1 138 1
1 54 1
1 112 1
1 81 1
1 136 1
1 52 1
1 118 1
1 87 1
1 173 1
1 142 1
1 107 1
1 116 1
1 85 1
1 163 1
1 140 1
1 47 1
1 105 1
1 74 1
1 161 1
1 130 1
1 111 1
1 72 1
1 167 1
1 128 1
1 109 1
1 78 1
1 165 1
1 134 1
1 99 1
1 76 1
1 132 1
1 0 1
1 97 1
1 154 1
1 103 1
1 64 1
1 152 1
1 59 1
1 101 1
1 70 1
1 158 1
1 57 1
1 123 1
1 68 1
1 156 1
1 63 1
1 121 1
1 90 1
...@@ -6,4 +6,4 @@ oauth2client>=4.1.2 ...@@ -6,4 +6,4 @@ oauth2client>=4.1.2
pandas pandas
psutil>=5.4.3 psutil>=5.4.3
py-cpuinfo>=3.3.0 py-cpuinfo>=3.3.0
typing
...@@ -32,12 +32,17 @@ lint() { ...@@ -32,12 +32,17 @@ lint() {
local exit_code=0 local exit_code=0
RC_FILE="utils/testing/pylint.rcfile" RC_FILE="utils/testing/pylint.rcfile"
PROTO_SKIP="DO\sNOT\sEDIT!"
echo "===========Running lint test============" echo "===========Running lint test============"
for file in `find . -name '*.py' ! -name '*test.py' -print` for file in `find . -name '*.py' ! -name '*test.py' -print`
do do
echo "Linting ${file}" if grep ${PROTO_SKIP} ${file}; then
pylint --rcfile="${RC_FILE}" "${file}" || exit_code=$? echo "Linting ${file} (Skipped: Machine generated file)"
else
echo "Linting ${file}"
pylint --rcfile="${RC_FILE}" "${file}" || exit_code=$?
fi
done done
# More lenient for test files. # More lenient for test files.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment