Unverified Commit 20070ca4 authored by Taylor Robie's avatar Taylor Robie Committed by GitHub
Browse files

Wide Deep refactor and deep movies (#4506)

* begin branch

* finish download script

* rename download to dataset

* intermediate commit

* intermediate commit

* misc tweaks

* intermediate commit

* intermediate commit

* intermediate commit

* delint and update census test.

* add movie tests

* delint

* fix py2 issue

* address PR comments

* intermediate commit

* intermediate commit

* intermediate commit

* finish wide deep transition to vanilla movielens

* delint

* intermediate commit

* intermediate commit

* intermediate commit

* intermediate commit

* fix import

* add default ncf csv construction

* change default on download_if_missing

* shard and vectorize example serialization

* fix import

* update ncf data unittests

* delint

* delint

* more delinting

* fix wide-deep movielens serialization

* address PR comments

* add file_io tests

* investigate wide-deep test failure

* remove hard coded path and properly use flags.

* address file_io test PR comments

* missed a hash_bucked_size
parent 713228fd
......@@ -258,7 +258,8 @@ def main(_):
def define_train_higgs_flags():
"""Add tree related flags as well as training/eval configuration."""
flags_core.define_base(stop_threshold=False, batch_size=False, num_gpu=False)
flags_core.define_base(clean=False, stop_threshold=False, batch_size=False,
num_gpu=False)
flags_core.define_benchmark()
flags.adopt_module_key_flags(flags_core)
......
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Download and extract the MovieLens dataset from GroupLens website.
Download the dataset, and perform basic preprocessing.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import sys
import tempfile
import zipfile
# pylint: disable=g-bad-import-order
import numpy as np
import pandas as pd
import six
from six.moves import urllib # pylint: disable=redefined-builtin
from absl import app as absl_app
from absl import flags
import tensorflow as tf
# pylint: enable=g-bad-import-order
from official.utils.flags import core as flags_core
ML_1M = "ml-1m"
ML_20M = "ml-20m"
DATASETS = [ML_1M, ML_20M]
RATINGS_FILE = "ratings.csv"
MOVIES_FILE = "movies.csv"
# URL to download dataset
_DATA_URL = "http://files.grouplens.org/datasets/movielens/"
GENRE_COLUMN = "genres"
ITEM_COLUMN = "item_id" # movies
RATING_COLUMN = "rating"
TIMESTAMP_COLUMN = "timestamp"
TITLE_COLUMN = "titles"
USER_COLUMN = "user_id"
GENRES = [
'Action', 'Adventure', 'Animation', "Children", 'Comedy', 'Crime',
'Documentary', 'Drama', 'Fantasy', 'Film-Noir', 'Horror', "IMAX", 'Musical',
'Mystery', 'Romance', 'Sci-Fi', 'Thriller', 'War', 'Western'
]
N_GENRE = len(GENRES)
RATING_COLUMNS = [USER_COLUMN, ITEM_COLUMN, RATING_COLUMN, TIMESTAMP_COLUMN]
MOVIE_COLUMNS = [ITEM_COLUMN, TITLE_COLUMN, GENRE_COLUMN]
# Note: Users are indexed [1, k], not [0, k-1]
NUM_USER_IDS = {
ML_1M: 6040,
ML_20M: 138493,
}
# Note: Users are indexed [1, k], not [0, k-1]
# Both the 1m and 20m datasets use the same movie set.
NUM_ITEM_IDS = 3952
MAX_RATING = 5
NUM_RATINGS = {
ML_1M: 1000209,
ML_20M: 20000263
}
def _download_and_clean(dataset, data_dir):
"""Download MovieLens dataset in a standard format.
This function downloads the specified MovieLens format and coerces it into a
standard format. The only difference between the ml-1m and ml-20m datasets
after this point (other than size, of course) is that the 1m dataset uses
whole number ratings while the 20m dataset allows half integer ratings.
"""
if dataset not in DATASETS:
raise ValueError("dataset {} is not in {{{}}}".format(
dataset, ",".join(DATASETS)))
data_subdir = os.path.join(data_dir, dataset)
expected_files = ["{}.zip".format(dataset), RATINGS_FILE, MOVIES_FILE]
tf.gfile.MakeDirs(data_subdir)
if set(expected_files).intersection(
tf.gfile.ListDirectory(data_subdir)) == set(expected_files):
tf.logging.info("Dataset {} has already been downloaded".format(dataset))
return
url = "{}{}.zip".format(_DATA_URL, dataset)
temp_dir = tempfile.mkdtemp()
try:
zip_path = os.path.join(temp_dir, "{}.zip".format(dataset))
def _progress(count, block_size, total_size):
sys.stdout.write("\r>> Downloading {} {:.1f}%".format(
zip_path, 100.0 * count * block_size / total_size))
sys.stdout.flush()
zip_path, _ = urllib.request.urlretrieve(url, zip_path, _progress)
statinfo = os.stat(zip_path)
# A new line to clear the carriage return from download progress
# tf.logging.info is not applicable here
print()
tf.logging.info(
"Successfully downloaded {} {} bytes".format(
zip_path, statinfo.st_size))
zipfile.ZipFile(zip_path, "r").extractall(temp_dir)
if dataset == ML_1M:
_regularize_1m_dataset(temp_dir)
else:
_regularize_20m_dataset(temp_dir)
for fname in tf.gfile.ListDirectory(temp_dir):
tf.gfile.Copy(os.path.join(temp_dir, fname),
os.path.join(data_subdir, fname))
finally:
tf.gfile.DeleteRecursively(temp_dir)
def _transform_csv(input_path, output_path, names, skip_first, separator=","):
"""Transform csv to a regularized format.
Args:
input_path: The path of the raw csv.
output_path: The path of the cleaned csv.
names: The csv column names.
skip_first: Boolean of whether to skip the first line of the raw csv.
separator: Character used to separate fields in the raw csv.
"""
if six.PY2:
names = [n.decode("utf-u") for n in names]
with tf.gfile.Open(output_path, "wb") as f_out, \
tf.gfile.Open(input_path, "rb") as f_in:
# Write column names to the csv.
f_out.write(",".join(names).encode("utf-8"))
f_out.write(b"\n")
for i, line in enumerate(f_in):
if i == 0 and skip_first:
continue # ignore existing labels in the csv
line = line.decode("utf-8", errors="ignore")
fields = line.split(separator)
if separator != ",":
fields = ['"{}"'.format(field) if "," in field else field
for field in fields]
f_out.write(",".join(fields).encode("utf-8"))
def _regularize_1m_dataset(temp_dir):
"""
ratings.dat
The file has no header row, and each line is in the following format:
UserID::MovieID::Rating::Timestamp
- UserIDs range from 1 and 6040
- MovieIDs range from 1 and 3952
- Ratings are made on a 5-star scale (whole-star ratings only)
- Timestamp is represented in seconds since midnight Coordinated Universal
Time (UTC) of January 1, 1970.
- Each user has at least 20 ratings
movies.dat
Each line has the following format:
MovieID::Title::Genres
- MovieIDs range from 1 and 3952
"""
working_dir = os.path.join(temp_dir, ML_1M)
_transform_csv(
input_path=os.path.join(working_dir, "ratings.dat"),
output_path=os.path.join(temp_dir, RATINGS_FILE),
names=RATING_COLUMNS, skip_first=False, separator="::")
_transform_csv(
input_path=os.path.join(working_dir, "movies.dat"),
output_path=os.path.join(temp_dir, MOVIES_FILE),
names=MOVIE_COLUMNS, skip_first=False, separator="::")
tf.gfile.DeleteRecursively(working_dir)
def _regularize_20m_dataset(temp_dir):
"""
ratings.csv
Each line of this file after the header row represents one rating of one
movie by one user, and has the following format:
userId,movieId,rating,timestamp
- The lines within this file are ordered first by userId, then, within user,
by movieId.
- Ratings are made on a 5-star scale, with half-star increments
(0.5 stars - 5.0 stars).
- Timestamps represent seconds since midnight Coordinated Universal Time
(UTC) of January 1, 1970.
- All the users had rated at least 20 movies.
movies.csv
Each line has the following format:
MovieID,Title,Genres
- MovieIDs range from 1 and 3952
"""
working_dir = os.path.join(temp_dir, ML_20M)
_transform_csv(
input_path=os.path.join(working_dir, "ratings.csv"),
output_path=os.path.join(temp_dir, RATINGS_FILE),
names=RATING_COLUMNS, skip_first=True, separator=",")
_transform_csv(
input_path=os.path.join(working_dir, "movies.csv"),
output_path=os.path.join(temp_dir, MOVIES_FILE),
names=MOVIE_COLUMNS, skip_first=True, separator=",")
tf.gfile.DeleteRecursively(working_dir)
def download(dataset, data_dir):
if dataset:
_download_and_clean(dataset, data_dir)
else:
_ = [_download_and_clean(d, data_dir) for d in DATASETS]
def ratings_csv_to_dataframe(data_dir, dataset):
with tf.gfile.Open(os.path.join(data_dir, dataset, RATINGS_FILE)) as f:
return pd.read_csv(f, encoding="utf-8")
def csv_to_joint_dataframe(data_dir, dataset):
ratings = ratings_csv_to_dataframe(data_dir, dataset)
with tf.gfile.Open(os.path.join(data_dir, dataset, MOVIES_FILE)) as f:
movies = pd.read_csv(f, encoding="utf-8")
df = ratings.merge(movies, on=ITEM_COLUMN)
df[RATING_COLUMN] = df[RATING_COLUMN].astype(np.float32)
return df
def integerize_genres(dataframe):
"""Replace genre string with a binary vector.
Args:
dataframe: a pandas dataframe of movie data.
Returns:
The transformed dataframe.
"""
def _map_fn(entry):
entry.replace("Children's", "Children") # naming difference.
movie_genres = entry.split("|")
output = np.zeros((len(GENRES),), dtype=np.int64)
for i, genre in enumerate(GENRES):
if genre in movie_genres:
output[i] = 1
return output
dataframe[GENRE_COLUMN] = dataframe[GENRE_COLUMN].apply(_map_fn)
return dataframe
def define_data_download_flags():
"""Add flags specifying data download arguments."""
flags.DEFINE_string(
name="data_dir", default="/tmp/movielens-data/",
help=flags_core.help_wrap(
"Directory to download and extract data."))
flags.DEFINE_enum(
name="dataset", default=None,
enum_values=DATASETS, case_sensitive=False,
help=flags_core.help_wrap("Dataset to be trained and evaluated."))
def main(_):
"""Download and extract the data from GroupLens website."""
download(flags.FLAGS.dataset, flags.FLAGS.data_dir)
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
define_data_download_flags()
FLAGS = flags.FLAGS
absl_app.run(main)
......@@ -159,6 +159,7 @@ def run_mnist(flags_obj):
Args:
flags_obj: An object containing parsed flag values.
"""
model_helpers.apply_clean(flags_obj)
model_function = model_fn
# Get number of GPUs as defined by the --num_gpus flags and the number of
......@@ -210,7 +211,8 @@ def run_mnist(flags_obj):
# Set up hook that outputs training logs every 100 steps.
train_hooks = hooks_helper.get_train_hooks(
flags_obj.hooks, batch_size=flags_obj.batch_size)
flags_obj.hooks, model_dir=flags_obj.model_dir,
batch_size=flags_obj.batch_size)
# Train and evaluate model.
for _ in range(flags_obj.train_epochs // flags_obj.epochs_between_evals):
......
......@@ -39,6 +39,7 @@ import tensorflow.contrib.eager as tfe
from official.mnist import dataset as mnist_dataset
from official.mnist import mnist
from official.utils.flags import core as flags_core
from official.utils.misc import model_helpers
def loss(logits, labels):
......@@ -104,6 +105,7 @@ def run_mnist_eager(flags_obj):
flags_obj: An object containing parsed flag values.
"""
tf.enable_eager_execution()
model_helpers.apply_clean(flags.FLAGS)
# Automatically determine device and data_format
(device, data_format) = ('/gpu:0', 'channels_first')
......
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""NCF Constants."""
TRAIN_RATINGS_FILENAME = 'train-ratings.csv'
TEST_RATINGS_FILENAME = 'test-ratings.csv'
TEST_NEG_FILENAME = 'test-negative.csv'
USER = "user_id"
ITEM = "item_id"
RATING = "rating"
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Download and extract the MovieLens dataset from GroupLens website.
Download the dataset, and perform data-preprocessing to convert the raw dataset
into csv file to be used in model training and evaluation.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import collections
import os
import sys
import time
import zipfile
# pylint: disable=g-bad-import-order
import numpy as np
import pandas as pd
from six.moves import urllib # pylint: disable=redefined-builtin
from absl import app as absl_app
from absl import flags
import tensorflow as tf
# pylint: enable=g-bad-import-order
from official.recommendation import constants
from official.utils.flags import core as flags_core
# URL to download dataset
_DATA_URL = "http://files.grouplens.org/datasets/movielens/"
_USER_COLUMN = "user_id"
_ITEM_COLUMN = "item_id"
_RATING_COLUMN = "rating"
_TIMESTAMP_COLUMN = "timestamp"
# The number of negative examples attached with a positive example
# in training dataset. It is set as 100 in the paper.
_NUMBER_NEGATIVES = 100
# In both datasets, each user has at least 20 ratings.
_MIN_NUM_RATINGS = 20
RatingData = collections.namedtuple(
"RatingData", ["items", "users", "ratings", "min_date", "max_date"])
def _print_ratings_description(ratings):
"""Describe the rating dataset information.
Args:
ratings: A pandas DataFrame of the rating dataset.
"""
info = RatingData(items=len(ratings[_ITEM_COLUMN].unique()),
users=len(ratings[_USER_COLUMN].unique()),
ratings=len(ratings),
min_date=ratings[_TIMESTAMP_COLUMN].min(),
max_date=ratings[_TIMESTAMP_COLUMN].max())
tf.logging.info("{ratings} ratings on {items} items from {users} users"
" from {min_date} to {max_date}".format(**(info._asdict())))
def process_movielens(ratings, sort=True):
"""Sort and convert timestamp of the MovieLens dataset.
Args:
ratings: A pandas DataFrame of the rating dataset.
sort: A boolean to indicate whether to sort the data based on timestamp.
Returns:
ratings: The processed pandas DataFrame.
"""
ratings[_TIMESTAMP_COLUMN] = pd.to_datetime(
ratings[_TIMESTAMP_COLUMN], unit="s")
if sort:
ratings.sort_values(by=_TIMESTAMP_COLUMN, inplace=True)
_print_ratings_description(ratings)
return ratings
def load_movielens_1_million(file_name, sort=True):
"""Load the MovieLens 1 million dataset.
The file has no header row, and each line is in the following format:
UserID::MovieID::Rating::Timestamp
- UserIDs range between 1 and 6040
- MovieIDs range between 1 and 3952
- Ratings are made on a 5-star scale (whole-star ratings only)
- Timestamp is represented in seconds since midnight Coordinated Universal
Time (UTC) of January 1, 1970.
- Each user has at least 20 ratings
Args:
file_name: A string of the file name to be loaded.
sort: A boolean to indicate whether to sort the data based on timestamp.
Returns:
A processed pandas DataFrame of the rating dataset.
"""
names = [_USER_COLUMN, _ITEM_COLUMN, _RATING_COLUMN, _TIMESTAMP_COLUMN]
ratings = pd.read_csv(file_name, sep="::", names=names, engine="python")
return process_movielens(ratings, sort=sort)
def load_movielens_20_million(file_name, sort=True):
"""Load the MovieLens 20 million dataset.
Each line of this file after the header row represents one rating of one movie
by one user, and has the following format:
userId,movieId,rating,timestamp
- The lines within this file are ordered first by userId, then, within user,
by movieId.
- Ratings are made on a 5-star scale, with half-star increments
(0.5 stars - 5.0 stars).
- Timestamps represent seconds since midnight Coordinated Universal Time
(UTC) of January 1, 1970.
- All the users had rated at least 20 movies.
Args:
file_name: A string of the file name to be loaded.
sort: A boolean to indicate whether to sort the data based on timestamp.
Returns:
A processed pandas DataFrame of the rating dataset.
"""
ratings = pd.read_csv(file_name)
names = {"userId": _USER_COLUMN, "movieId": _ITEM_COLUMN}
ratings.rename(columns=names, inplace=True)
return process_movielens(ratings, sort=sort)
def load_file_to_df(file_name, sort=True):
"""Load rating dataset into DataFrame.
Two data loading functions are defined to handle dataset ml-1m and ml-20m,
as they are provided with different formats.
Args:
file_name: A string of the file name to be loaded.
sort: A boolean to indicate whether to sort the data based on timestamp.
Returns:
A pandas DataFrame of the rating dataset.
"""
dataset_name = os.path.basename(file_name).split(".")[0]
# ml-1m with extension .dat
file_extension = ".dat"
func = load_movielens_1_million
if dataset_name == "ml-20m":
file_extension = ".csv"
func = load_movielens_20_million
ratings_file = os.path.join(file_name, "ratings" + file_extension)
return func(ratings_file, sort=sort)
def generate_train_eval_data(df, original_users, original_items):
"""Generate the dataset for model training and evaluation.
Given all user and item interaction information, for each user, first sort
the interactions based on timestamp. Then the latest one is taken out as
Test ratings (leave-one-out evaluation) and the remaining data for training.
The Test negatives are randomly sampled from all non-interacted items, and the
number of Test negatives is 100 by default (defined as _NUMBER_NEGATIVES).
Args:
df: The DataFrame of ratings data.
original_users: A list of the original unique user ids in the dataset.
original_items: A list of the original unique item ids in the dataset.
Returns:
all_ratings: A list of the [user_id, item_id] with interactions.
test_ratings: A list of [user_id, item_id], and each line is the latest
user_item interaction for the user.
test_negs: A list of item ids with shape [num_users, 100].
Each line consists of 100 item ids for the user with no interactions.
"""
# Need to sort before popping to get last item
tf.logging.info("Sorting user_item_map by timestamp...")
df.sort_values(by=_TIMESTAMP_COLUMN, inplace=True)
all_ratings = set(zip(df[_USER_COLUMN], df[_ITEM_COLUMN]))
user_to_items = collections.defaultdict(list)
# Generate user_item rating matrix for training
t1 = time.time()
row_count = 0
for row in df.itertuples():
user_to_items[getattr(row, _USER_COLUMN)].append(getattr(row, _ITEM_COLUMN))
row_count += 1
if row_count % 50000 == 0:
tf.logging.info("Processing user_to_items row: {}".format(row_count))
tf.logging.info(
"Process {} rows in [{:.1f}]s".format(row_count, time.time() - t1))
# Generate test ratings and test negatives
t2 = time.time()
test_ratings = []
test_negs = []
# Generate the 0-based index for each item, and put it into a set
all_items = set(range(len(original_items)))
for user in range(len(original_users)):
test_item = user_to_items[user].pop() # Get the latest item id
all_ratings.remove((user, test_item)) # Remove the test item
all_negs = all_items.difference(user_to_items[user])
all_negs = sorted(list(all_negs)) # determinism
test_ratings.append((user, test_item))
test_negs.append(list(np.random.choice(all_negs, _NUMBER_NEGATIVES)))
if user % 1000 == 0:
tf.logging.info("Processing user: {}".format(user))
tf.logging.info("Process {} users in {:.1f}s".format(
len(original_users), time.time() - t2))
all_ratings = list(all_ratings) # convert set to list
return all_ratings, test_ratings, test_negs
def parse_file_to_csv(data_dir, dataset_name):
"""Parse the raw data to csv file to be used in model training and evaluation.
ml-1m dataset is small in size (~25M), while ml-20m is large (~500M). It may
take several minutes to process ml-20m dataset.
Args:
data_dir: A string, the directory with the unzipped dataset.
dataset_name: A string, the dataset name to be processed.
"""
# Use random seed as parameter
np.random.seed(0)
# Load the file as DataFrame
file_path = os.path.join(data_dir, dataset_name)
df = load_file_to_df(file_path, sort=False)
# Get the info of users who have more than 20 ratings on items
grouped = df.groupby(_USER_COLUMN)
df = grouped.filter(lambda x: len(x) >= _MIN_NUM_RATINGS)
original_users = df[_USER_COLUMN].unique()
original_items = df[_ITEM_COLUMN].unique()
# Map the ids of user and item to 0 based index for following processing
tf.logging.info("Generating user_map and item_map...")
user_map = {user: index for index, user in enumerate(original_users)}
item_map = {item: index for index, item in enumerate(original_items)}
df[_USER_COLUMN] = df[_USER_COLUMN].apply(lambda user: user_map[user])
df[_ITEM_COLUMN] = df[_ITEM_COLUMN].apply(lambda item: item_map[item])
assert df[_USER_COLUMN].max() == len(original_users) - 1
assert df[_ITEM_COLUMN].max() == len(original_items) - 1
# Generate data for train and test
all_ratings, test_ratings, test_negs = generate_train_eval_data(
df, original_users, original_items)
# Serialize to csv file. Each csv file contains three columns
# (user_id, item_id, interaction)
# As there are only two fields (user_id, item_id) in all_ratings and
# test_ratings, we need to add a fake rating to make three columns
df_train_ratings = pd.DataFrame(all_ratings)
df_train_ratings["fake_rating"] = 1
train_ratings_file = os.path.join(
FLAGS.data_dir, dataset_name + "-" + constants.TRAIN_RATINGS_FILENAME)
df_train_ratings.to_csv(
train_ratings_file,
index=False, header=False, sep="\t")
tf.logging.info("Train ratings is {}".format(train_ratings_file))
df_test_ratings = pd.DataFrame(test_ratings)
df_test_ratings["fake_rating"] = 1
test_ratings_file = os.path.join(
FLAGS.data_dir, dataset_name + "-" + constants.TEST_RATINGS_FILENAME)
df_test_ratings.to_csv(
test_ratings_file,
index=False, header=False, sep="\t")
tf.logging.info("Test ratings is {}".format(test_ratings_file))
df_test_negs = pd.DataFrame(test_negs)
test_negs_file = os.path.join(
FLAGS.data_dir, dataset_name + "-" + constants.TEST_NEG_FILENAME)
df_test_negs.to_csv(
test_negs_file,
index=False, header=False, sep="\t")
tf.logging.info("Test negatives is {}".format(test_negs_file))
def make_dir(file_dir):
if not tf.gfile.Exists(file_dir):
tf.logging.info("Creating directory {}".format(file_dir))
tf.gfile.MakeDirs(file_dir)
def main(_):
"""Download and extract the data from GroupLens website."""
tf.logging.set_verbosity(tf.logging.INFO)
make_dir(FLAGS.data_dir)
assert FLAGS.dataset, (
"Please specify which dataset to download. "
"Two datasets are available: ml-1m and ml-20m.")
# Download the zip dataset
dataset_zip = FLAGS.dataset + ".zip"
file_path = os.path.join(FLAGS.data_dir, dataset_zip)
if not tf.gfile.Exists(file_path):
def _progress(count, block_size, total_size):
sys.stdout.write("\r>> Downloading {} {:.1f}%".format(
file_path, 100.0 * count * block_size / total_size))
sys.stdout.flush()
file_path, _ = urllib.request.urlretrieve(
_DATA_URL + dataset_zip, file_path, _progress)
statinfo = os.stat(file_path)
# A new line to clear the carriage return from download progress
# tf.logging.info is not applicable here
print()
tf.logging.info(
"Successfully downloaded {} {} bytes".format(
file_path, statinfo.st_size))
# Unzip the dataset
if not tf.gfile.Exists(os.path.join(FLAGS.data_dir, FLAGS.dataset)):
zipfile.ZipFile(file_path, "r").extractall(FLAGS.data_dir)
# Preprocess and parse the dataset to csv
train_ratings = FLAGS.dataset + "-" + constants.TRAIN_RATINGS_FILENAME
if not tf.gfile.Exists(os.path.join(FLAGS.data_dir, train_ratings)):
parse_file_to_csv(FLAGS.data_dir, FLAGS.dataset)
def define_data_download_flags():
"""Add flags specifying data download arguments."""
flags.DEFINE_string(
name="data_dir", default="/tmp/movielens-data/",
help=flags_core.help_wrap(
"Directory to download and extract data."))
flags.DEFINE_enum(
name="dataset", default=None,
enum_values=["ml-1m", "ml-20m"], case_sensitive=False,
help=flags_core.help_wrap(
"Dataset to be trained and evaluated. Two datasets are available "
": ml-1m and ml-20m."))
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
define_data_download_flags()
FLAGS = flags.FLAGS
absl_app.run(main)
......@@ -18,25 +18,43 @@ from __future__ import division
from __future__ import print_function
import os
import shutil
import numpy as np
import tensorflow as tf # pylint: disable=g-bad-import-order
from official.recommendation import dataset
# from official.recommendation import dataset
from official.datasets import movielens
from official.recommendation import movielens_dataset
_TRAIN_FNAME = os.path.join(
os.path.dirname(__file__), "unittest_data/test_train_ratings.csv")
_TEST_FNAME = os.path.join(
os.path.dirname(__file__), "unittest_data/test_eval_ratings.csv")
_TEST_NEG_FNAME = os.path.join(
os.path.dirname(__file__), "unittest_data/test_eval_negative.csv")
_NUM_NEG = 4
class DatasetTest(tf.test.TestCase):
def setUp(self):
# Create temporary CSV file
self.temp_dir = self.get_temp_dir()
tf.gfile.MakeDirs(os.path.join(self.temp_dir,
movielens_dataset._BUFFER_SUBDIR))
path_map = {
"test_train_ratings.csv": "ml-1m-train-ratings.csv",
"test_eval_ratings.csv": "ml-1m-test-ratings.csv",
"test_eval_negative.csv": "ml-1m-test-negative.csv"
}
for src, dest in path_map.items():
src = os.path.join(os.path.dirname(__file__), "unittest_data", src)
dest = os.path.join(self.temp_dir, movielens_dataset._BUFFER_SUBDIR, dest)
with tf.gfile.Open(src, "r") as f_in, tf.gfile.Open(dest, "w") as f_out:
f_out.write(f_in.read())
def test_load_data(self):
data = dataset.load_data(_TEST_FNAME)
data = movielens_dataset.load_data(_TEST_FNAME)
self.assertEqual(len(data), 2)
self.assertEqual(data[0][0], 0)
......@@ -46,8 +64,8 @@ class DatasetTest(tf.test.TestCase):
self.assertEqual(data[-1][2], 1)
def test_data_preprocessing(self):
ncf_dataset = dataset.data_preprocessing(
_TRAIN_FNAME, _TEST_FNAME, _TEST_NEG_FNAME, _NUM_NEG)
ncf_dataset = movielens_dataset.data_preprocessing(
self.temp_dir, movielens.ML_1M, _NUM_NEG)
# Check train data preprocessing
self.assertAllEqual(np.array(ncf_dataset.train_data)[:, 2],
......@@ -75,10 +93,10 @@ class DatasetTest(tf.test.TestCase):
def test_generate_train_dataset(self):
# Check train dataset
ncf_dataset = dataset.data_preprocessing(
_TRAIN_FNAME, _TEST_FNAME, _TEST_NEG_FNAME, _NUM_NEG)
ncf_dataset = movielens_dataset.data_preprocessing(
self.temp_dir, movielens.ML_1M, _NUM_NEG)
train_dataset = dataset.generate_train_dataset(
train_dataset = movielens_dataset.generate_train_dataset(
ncf_dataset.train_data, ncf_dataset.num_items, _NUM_NEG)
# Each user has 1 positive instance followed by _NUM_NEG negative instances
......
......@@ -12,20 +12,210 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Prepare dataset for NCF.
"""Prepare MovieLens dataset for NCF recommendation model."""
Load the training dataset and evaluation dataset from csv file into memory.
Prepare input for model training and evaluation.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import collections
import functools
import os
import tempfile
import time
# pylint: disable=wrong-import-order
from absl import app as absl_app
from absl import flags
import numpy as np
from six.moves import xrange # pylint: disable=redefined-builtin
import pandas as pd
from six.moves import xrange
import tensorflow as tf
# pylint: enable=wrong-import-order
from official.datasets import movielens
from official.utils.data import file_io
from official.utils.flags import core as flags_core
_BUFFER_SUBDIR = "ncf_recommendation_buffer"
_TRAIN_RATINGS_FILENAME = 'train-ratings.csv'
_TEST_RATINGS_FILENAME = 'test-ratings.csv'
_TEST_NEG_FILENAME = 'test-negative.csv'
# The number of negative examples attached with a positive example
# in training dataset. It is set as 100 in the paper.
_NUMBER_NEGATIVES = 100
from official.recommendation import constants # pylint: disable=g-bad-import-order
# In both datasets, each user has at least 20 ratings.
_MIN_NUM_RATINGS = 20
# The buffer size for shuffling train dataset.
_SHUFFLE_BUFFER_SIZE = 1024
_FEATURE_MAP = {
movielens.USER_COLUMN: tf.FixedLenFeature([1], dtype=tf.int64),
movielens.ITEM_COLUMN: tf.FixedLenFeature([1], dtype=tf.int64),
movielens.RATING_COLUMN: tf.FixedLenFeature([1], dtype=tf.int64),
}
_FEATURE_MAP_EVAL = {
movielens.USER_COLUMN: tf.FixedLenFeature([1], dtype=tf.int64),
movielens.ITEM_COLUMN: tf.FixedLenFeature([1], dtype=tf.int64),
}
_COLUMNS = [movielens.USER_COLUMN, movielens.ITEM_COLUMN,
movielens.RATING_COLUMN]
_EVAL_COLUMNS = _COLUMNS[:2]
_EVAL_BUFFER_SIZE = {
movielens.ML_1M: 34130690,
movielens.ML_20M: 800961490,
}
def generate_train_eval_data(df, original_users, original_items):
"""Generate the dataset for model training and evaluation.
Given all user and item interaction information, for each user, first sort
the interactions based on timestamp. Then the latest one is taken out as
Test ratings (leave-one-out evaluation) and the remaining data for training.
The Test negatives are randomly sampled from all non-interacted items, and the
number of Test negatives is 100 by default (defined as _NUMBER_NEGATIVES).
Args:
df: The DataFrame of ratings data.
original_users: A list of the original unique user ids in the dataset.
original_items: A list of the original unique item ids in the dataset.
Returns:
all_ratings: A list of the [user_id, item_id] with interactions.
test_ratings: A list of [user_id, item_id], and each line is the latest
user_item interaction for the user.
test_negs: A list of item ids with shape [num_users, 100].
Each line consists of 100 item ids for the user with no interactions.
"""
# Need to sort before popping to get last item
tf.logging.info("Sorting user_item_map by timestamp...")
df.sort_values(by=movielens.TIMESTAMP_COLUMN, inplace=True)
all_ratings = set(zip(df[movielens.USER_COLUMN], df[movielens.ITEM_COLUMN]))
user_to_items = collections.defaultdict(list)
# Generate user_item rating matrix for training
t1 = time.time()
row_count = 0
for row in df.itertuples():
user_to_items[getattr(row, movielens.USER_COLUMN)].append(
getattr(row, movielens.ITEM_COLUMN))
row_count += 1
if row_count % 50000 == 0:
tf.logging.info("Processing user_to_items row: {}".format(row_count))
tf.logging.info(
"Process {} rows in [{:.1f}]s".format(row_count, time.time() - t1))
# Generate test ratings and test negatives
t2 = time.time()
test_ratings = []
test_negs = []
# Generate the 0-based index for each item, and put it into a set
all_items = set(range(len(original_items)))
for user in range(len(original_users)):
test_item = user_to_items[user].pop() # Get the latest item id
all_ratings.remove((user, test_item)) # Remove the test item
all_negs = all_items.difference(user_to_items[user])
all_negs = sorted(list(all_negs)) # determinism
test_ratings.append((user, test_item))
test_negs.append(list(np.random.choice(all_negs, _NUMBER_NEGATIVES)))
if user % 1000 == 0:
tf.logging.info("Processing user: {}".format(user))
tf.logging.info("Process {} users in {:.1f}s".format(
len(original_users), time.time() - t2))
all_ratings = list(all_ratings) # convert set to list
return all_ratings, test_ratings, test_negs
def _csv_buffer_paths(data_dir, dataset):
buffer_dir = os.path.join(data_dir, _BUFFER_SUBDIR)
return (
os.path.join(buffer_dir, dataset + "-" + _TRAIN_RATINGS_FILENAME),
os.path.join(buffer_dir, dataset + "-" + _TEST_RATINGS_FILENAME),
os.path.join(buffer_dir, dataset + "-" + _TEST_NEG_FILENAME)
)
def construct_train_eval_csv(data_dir, dataset):
"""Parse the raw data to csv file to be used in model training and evaluation.
ml-1m dataset is small in size (~25M), while ml-20m is large (~500M). It may
take several minutes to process ml-20m dataset.
Args:
data_dir: A string, the root directory of the movielens dataset.
dataset: A string, the dataset name to be processed.
"""
assert dataset in movielens.DATASETS
if all([tf.gfile.Exists(i) for i in _csv_buffer_paths(data_dir, dataset)]):
return
# Use random seed as parameter
np.random.seed(0)
df = movielens.ratings_csv_to_dataframe(data_dir=data_dir, dataset=dataset)
# Get the info of users who have more than 20 ratings on items
grouped = df.groupby(movielens.USER_COLUMN)
df = grouped.filter(lambda x: len(x) >= _MIN_NUM_RATINGS)
original_users = df[movielens.USER_COLUMN].unique()
original_items = df[movielens.ITEM_COLUMN].unique()
# Map the ids of user and item to 0 based index for following processing
tf.logging.info("Generating user_map and item_map...")
user_map = {user: index for index, user in enumerate(original_users)}
item_map = {item: index for index, item in enumerate(original_items)}
df[movielens.USER_COLUMN] = df[movielens.USER_COLUMN].apply(
lambda user: user_map[user])
df[movielens.ITEM_COLUMN] = df[movielens.ITEM_COLUMN].apply(
lambda item: item_map[item])
assert df[movielens.USER_COLUMN].max() == len(original_users) - 1
assert df[movielens.ITEM_COLUMN].max() == len(original_items) - 1
# Generate data for train and test
all_ratings, test_ratings, test_negs = generate_train_eval_data(
df, original_users, original_items)
# Serialize to csv file. Each csv file contains three columns
# (user_id, item_id, interaction)
tf.gfile.MakeDirs(os.path.join(data_dir, _BUFFER_SUBDIR))
train_ratings_file, test_ratings_file, test_negs_file = _csv_buffer_paths(
data_dir, dataset)
# As there are only two fields (user_id, item_id) in all_ratings and
# test_ratings, we need to add a fake rating to make three columns
df_train_ratings = pd.DataFrame(all_ratings)
df_train_ratings["fake_rating"] = 1
with tf.gfile.Open(train_ratings_file, "w") as f:
df_train_ratings.to_csv(f, index=False, header=False, sep="\t")
tf.logging.info("Train ratings is {}".format(train_ratings_file))
df_test_ratings = pd.DataFrame(test_ratings)
df_test_ratings["fake_rating"] = 1
with tf.gfile.Open(test_ratings_file, "w") as f:
df_test_ratings.to_csv(f, index=False, header=False, sep="\t")
tf.logging.info("Test ratings is {}".format(test_ratings_file))
df_test_negs = pd.DataFrame(test_negs)
with tf.gfile.Open(test_negs_file, "w") as f:
df_test_negs.to_csv(f, index=False, header=False, sep="\t")
tf.logging.info("Test negatives is {}".format(test_negs_file))
class NCFDataSet(object):
"""A class containing data information for model training and evaluation."""
......@@ -67,7 +257,7 @@ def load_data(file_name):
return data
def data_preprocessing(train_fname, test_fname, test_neg_fname, num_negatives):
def data_preprocessing(data_dir, dataset, num_negatives):
"""Preprocess the train and test dataset.
In data preprocessing, the training positive instances are loaded into memory
......@@ -75,11 +265,8 @@ def data_preprocessing(train_fname, test_fname, test_neg_fname, num_negatives):
dataset are generated from test positive and negative instances.
Args:
train_fname: A string, the file name of training positive dataset.
test_fname: A string, the file name of test positive dataset. Each user has
one positive instance.
test_neg_fname: A string, the file name of test negative dataset. Each user
has 100 negative instances by default.
data_dir: A string, the root directory of the movielens dataset.
dataset: A string, the dataset name to be processed.
num_negatives: An integer, the number of negative instances for each user
in train dataset.
......@@ -87,6 +274,9 @@ def data_preprocessing(train_fname, test_fname, test_neg_fname, num_negatives):
ncf_dataset: A NCFDataset object containing information about training and
evaluation/test dataset.
"""
train_fname, test_fname, test_neg_fname = _csv_buffer_paths(
data_dir, dataset)
# Load training positive instances into memory for later train data generation
train_data = load_data(train_fname)
# Get total number of users in the dataset
......@@ -158,7 +348,22 @@ def generate_train_dataset(train_data, num_items, num_negatives):
return np.asarray(all_train_data)
def input_fn(training, batch_size, ncf_dataset, repeat=1):
def _deserialize_train(examples_serialized):
features = tf.parse_example(examples_serialized, _FEATURE_MAP)
train_features = {
movielens.USER_COLUMN: features[movielens.USER_COLUMN],
movielens.ITEM_COLUMN: features[movielens.ITEM_COLUMN],
}
return train_features, features[movielens.RATING_COLUMN]
def _deserialize_eval(examples_serialized):
features = tf.parse_example(examples_serialized, _FEATURE_MAP_EVAL)
return features
def get_input_fn(training, batch_size, ncf_dataset, data_dir, dataset,
repeat=1):
"""Input function for model training and evaluation.
The train input consists of 1 positive instance (user and item have
......@@ -179,35 +384,56 @@ def input_fn(training, batch_size, ncf_dataset, repeat=1):
"""
# Generate random negative instances for training in each epoch
if training:
tf.logging.info("Generating training data.")
train_data = generate_train_dataset(
ncf_dataset.train_data, ncf_dataset.num_items,
ncf_dataset.num_negatives)
# Get train features and labels
train_features = [
(constants.USER, np.expand_dims(train_data[:, 0], axis=1)),
(constants.ITEM, np.expand_dims(train_data[:, 1], axis=1))
]
train_labels = [
(constants.RATING, np.expand_dims(train_data[:, 2], axis=1))]
dataset = tf.data.Dataset.from_tensor_slices(
(dict(train_features), dict(train_labels))
)
dataset = dataset.shuffle(buffer_size=_SHUFFLE_BUFFER_SIZE)
df = pd.DataFrame(data=train_data, columns=_COLUMNS)
if data_dir.startswith("gs://"):
buffer_dir = os.path.join(data_dir, _BUFFER_SUBDIR)
else:
buffer_dir = None
buffer_path = file_io.write_to_temp_buffer(df, buffer_dir, _COLUMNS)
map_fn = _deserialize_train
else:
# Create eval/test dataset
test_user = ncf_dataset.all_eval_data[:, 0]
test_item = ncf_dataset.all_eval_data[:, 1]
test_features = [
(constants.USER, np.expand_dims(test_user, axis=1)),
(constants.ITEM, np.expand_dims(test_item, axis=1))]
dataset = tf.data.Dataset.from_tensor_slices(dict(test_features))
# Repeat and batch the dataset
dataset = dataset.repeat(repeat)
dataset = dataset.batch(batch_size)
# Prefetch to improve speed of input pipeline.
dataset = dataset.prefetch(1)
return dataset
df = pd.DataFrame(ncf_dataset.all_eval_data, columns=_EVAL_COLUMNS)
buffer_path = os.path.join(
data_dir, _BUFFER_SUBDIR, dataset + "_eval_buffer")
file_io.write_to_buffer(
dataframe=df, buffer_path=buffer_path, columns=_EVAL_COLUMNS,
expected_size=_EVAL_BUFFER_SIZE[dataset])
map_fn = _deserialize_eval
def input_fn(): # pylint: disable=missing-docstring
dataset = tf.data.TFRecordDataset(buffer_path)
if training:
dataset = dataset.shuffle(buffer_size=_SHUFFLE_BUFFER_SIZE)
dataset = dataset.batch(batch_size)
dataset = dataset.map(map_fn, num_parallel_calls=16)
dataset = dataset.repeat(repeat)
# Prefetch to improve speed of input pipeline.
dataset = dataset.prefetch(buffer_size=tf.contrib.data.AUTOTUNE)
return dataset
return input_fn
def main(_):
movielens.download(dataset=flags.FLAGS.dataset, data_dir=flags.FLAGS.data_dir)
construct_train_eval_csv(flags.FLAGS.data_dir, flags.FLAGS.dataset)
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
movielens.define_data_download_flags()
flags.adopt_module_key_flags(movielens)
flags_core.set_defaults(dataset="ml-1m")
absl_app.run(main)
......@@ -32,8 +32,8 @@ from absl import flags
import tensorflow as tf
# pylint: enable=g-bad-import-order
from official.recommendation import constants
from official.recommendation import dataset
from official.datasets import movielens
from official.recommendation import movielens_dataset
from official.recommendation import neumf_model
from official.utils.flags import core as flags_core
from official.utils.logs import hooks_helper
......@@ -47,7 +47,7 @@ _HR_KEY = "HR"
_NDCG_KEY = "NDCG"
def evaluate_model(estimator, batch_size, num_gpus, ncf_dataset):
def evaluate_model(estimator, batch_size, num_gpus, ncf_dataset, pred_input_fn):
"""Model evaluation with HR and NDCG metrics.
The evaluation protocol is to rank the test interacted item (truth items)
......@@ -71,6 +71,7 @@ def evaluate_model(estimator, batch_size, num_gpus, ncf_dataset):
NDCG calculation. Each item is for one user.
eval_all_items, which is a nested list. Each entry is the 101 items
(1 ground truth item and 100 negative items) for one user.
pred_input_fn: The input function for the test data.
Returns:
eval_results: A dict of evaluation results for benchmark logging.
......@@ -83,15 +84,10 @@ def evaluate_model(estimator, batch_size, num_gpus, ncf_dataset):
ndcg is an integer representing the average NDCG scores across all users,
and global_step is the global step
"""
# Define prediction input function
def pred_input_fn():
return dataset.input_fn(
False, distribution_utils.per_device_batch_size(batch_size, num_gpus),
ncf_dataset)
# Get predictions
predictions = estimator.predict(input_fn=pred_input_fn)
all_predicted_scores = [p[constants.RATING] for p in predictions]
all_predicted_scores = [p[movielens.RATING_COLUMN] for p in predictions]
# Calculate HR score
def _get_hr(ranklist, true_item):
......@@ -174,22 +170,16 @@ def main(_):
def run_ncf(_):
"""Run NCF training and eval loop."""
# Data preprocessing
# The file name of training and test dataset
train_fname = os.path.join(
FLAGS.data_dir, FLAGS.dataset + "-" + constants.TRAIN_RATINGS_FILENAME)
test_fname = os.path.join(
FLAGS.data_dir, FLAGS.dataset + "-" + constants.TEST_RATINGS_FILENAME)
neg_fname = os.path.join(
FLAGS.data_dir, FLAGS.dataset + "-" + constants.TEST_NEG_FILENAME)
assert os.path.exists(train_fname), (
"Run data_download.py first to download and extract {} dataset".format(
FLAGS.dataset))
if FLAGS.download_if_missing:
movielens.download(FLAGS.dataset, FLAGS.data_dir)
movielens_dataset.construct_train_eval_csv(
data_dir=FLAGS.data_dir, dataset=FLAGS.dataset)
tf.logging.info("Data preprocessing...")
ncf_dataset = dataset.data_preprocessing(
train_fname, test_fname, neg_fname, FLAGS.num_neg)
ncf_dataset = movielens_dataset.data_preprocessing(
FLAGS.data_dir, FLAGS.dataset, FLAGS.num_neg)
model_helpers.apply_clean(flags.FLAGS)
# Create NeuMF model and convert it to Estimator
tf.logging.info("Creating Estimator from Keras model...")
......@@ -205,6 +195,7 @@ def run_ncf(_):
# Create hooks that log information about the training and metric values
train_hooks = hooks_helper.get_train_hooks(
FLAGS.hooks,
model_dir=FLAGS.model_dir,
batch_size=FLAGS.batch_size # for ExamplesPerSecondHook
)
run_params = {
......@@ -221,11 +212,17 @@ def run_ncf(_):
test_id=FLAGS.benchmark_test_id)
# Training and evaluation cycle
def train_input_fn():
return dataset.input_fn(
def get_train_input_fn():
return movielens_dataset.get_input_fn(
True,
distribution_utils.per_device_batch_size(FLAGS.batch_size, num_gpus),
ncf_dataset, FLAGS.epochs_between_evals)
ncf_dataset, FLAGS.data_dir, FLAGS.dataset, FLAGS.epochs_between_evals)
def get_pred_input_fn():
return movielens_dataset.get_input_fn(
False,
distribution_utils.per_device_batch_size(FLAGS.batch_size, num_gpus),
ncf_dataset, FLAGS.data_dir, FLAGS.dataset, 1)
total_training_cycle = FLAGS.train_epochs // FLAGS.epochs_between_evals
......@@ -234,11 +231,11 @@ def run_ncf(_):
cycle_index + 1, total_training_cycle))
# Train the model
estimator.train(input_fn=train_input_fn, hooks=train_hooks)
estimator.train(input_fn=get_train_input_fn(), hooks=train_hooks)
# Evaluate the model
eval_results = evaluate_model(
estimator, FLAGS.batch_size, num_gpus, ncf_dataset)
estimator, FLAGS.batch_size, num_gpus, ncf_dataset, get_pred_input_fn())
# Benchmark the evaluation results
benchmark_logger.log_evaluation_result(eval_results)
......@@ -288,6 +285,10 @@ def define_ncf_flags():
help=flags_core.help_wrap(
"Dataset to be trained and evaluated."))
flags.DEFINE_boolean(
name="download_if_missing", default=True, help=flags_core.help_wrap(
"Download data to data_dir if it is not already present."))
flags.DEFINE_integer(
name="num_factors", default=8,
help=flags_core.help_wrap("The Embedding size of MF model."))
......
......@@ -36,7 +36,7 @@ from __future__ import print_function
from six.moves import xrange # pylint: disable=redefined-builtin
import tensorflow as tf
from official.recommendation import constants # pylint: disable=g-bad-import-order
from official.datasets import movielens # pylint: disable=g-bad-import-order
class NeuMF(tf.keras.models.Model):
......@@ -67,9 +67,9 @@ class NeuMF(tf.keras.models.Model):
# Input variables
user_input = tf.keras.layers.Input(
shape=(1,), dtype=tf.int32, name=constants.USER)
shape=(1,), dtype=tf.int32, name=movielens.USER_COLUMN)
item_input = tf.keras.layers.Input(
shape=(1,), dtype=tf.int32, name=constants.ITEM)
shape=(1,), dtype=tf.int32, name=movielens.ITEM_COLUMN)
# Initializer for embedding layer
embedding_initializer = tf.keras.initializers.RandomNormal(stddev=0.01)
......@@ -128,7 +128,7 @@ class NeuMF(tf.keras.models.Model):
# Final prediction layer
prediction = tf.keras.layers.Dense(
1, activation="sigmoid", kernel_initializer="lecun_uniform",
name=constants.RATING)(predict_vector)
name=movielens.RATING_COLUMN)(predict_vector)
super(NeuMF, self).__init__(
inputs=[user_input, item_input], outputs=prediction)
google-api-python-client>=1.6.7
google-cloud-bigquery>=0.31.0
kaggle>=1.3.9
numpy
oauth2client>=4.1.2
pandas
psutil>=5.4.3
py-cpuinfo>=3.3.0
google-api-python-client>=1.6.7
google-cloud-bigquery>=0.31.0
oauth2client>=4.1.2
......@@ -339,6 +339,8 @@ def resnet_main(
This is only used if flags_obj.export_dir is passed.
"""
model_helpers.apply_clean(flags.FLAGS)
# Using the Winograd non-fused algorithms provides a small performance boost.
os.environ['TF_ENABLE_WINOGRAD_NONFUSED'] = '1'
......@@ -385,6 +387,7 @@ def resnet_main(
train_hooks = hooks_helper.get_train_hooks(
flags_obj.hooks,
model_dir=flags_obj.model_dir,
batch_size=flags_obj.batch_size)
def input_fn_train():
......
......@@ -576,9 +576,12 @@ def run_transformer(flags_obj):
params["repeat_dataset"] = schedule_manager.repeat_dataset
model_helpers.apply_clean(flags.FLAGS)
# Create hooks that log information about the training and metric values
train_hooks = hooks_helper.get_train_hooks(
flags_obj.hooks,
model_dir=flags_obj.model_dir,
tensors_to_log=TENSORS_TO_LOG, # used for logging hooks
batch_size=schedule_manager.batch_size, # for ExamplesPerSecondHook
use_tpu=params["use_tpu"] # Not all hooks can run with TPUs
......
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Convenience functions for managing dataset file buffers."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import atexit
import multiprocessing
import os
import tempfile
import uuid
import numpy as np
import six
import tensorflow as tf
class _GarbageCollector(object):
"""Deletes temporary buffer files at exit.
Certain tasks (such as NCF Recommendation) require writing buffers to
temporary files. (Which may be local or distributed.) It is not generally safe
to delete these files during operation, but they should be cleaned up. This
class keeps track of temporary files created, and deletes them at exit.
"""
def __init__(self):
self.temp_buffers = []
def register(self, filepath):
self.temp_buffers.append(filepath)
def purge(self):
try:
for i in self.temp_buffers:
if tf.gfile.Exists(i):
tf.gfile.Remove(i)
tf.logging.info("Buffer file {} removed".format(i))
except Exception as e:
tf.logging.error("Failed to cleanup buffer files: {}".format(e))
_GARBAGE_COLLECTOR = _GarbageCollector()
atexit.register(_GARBAGE_COLLECTOR.purge)
_ROWS_PER_CORE = 50000
def write_to_temp_buffer(dataframe, buffer_folder, columns):
if buffer_folder is None:
_, buffer_path = tempfile.mkstemp()
else:
tf.gfile.MakeDirs(buffer_folder)
buffer_path = os.path.join(buffer_folder, str(uuid.uuid4()))
_GARBAGE_COLLECTOR.register(buffer_path)
return write_to_buffer(dataframe, buffer_path, columns)
def iter_shard_dataframe(df, rows_per_core=1000):
"""Two way shard of a dataframe.
This function evenly shards a dataframe so that it can be mapped efficiently.
It yields a list of dataframes with length equal to the number of CPU cores,
with each dataframe having rows_per_core rows. (Except for the last batch
which may have fewer rows in the dataframes.) Passing vectorized inputs to
a multiprocessing pool is much more effecient than iterating through a
dataframe in serial and passing a list of inputs to the pool.
Args:
df: Pandas dataframe to be sharded.
rows_per_core: Number of rows in each shard.
Returns:
A list of dataframe shards.
"""
n = len(df)
num_cores = min([multiprocessing.cpu_count(), n])
num_blocks = int(np.ceil(n / num_cores / rows_per_core))
max_batch_size = num_cores * rows_per_core
for i in range(num_blocks):
min_index = i * max_batch_size
max_index = min([(i + 1) * max_batch_size, n])
df_shard = df[min_index:max_index]
n_shard = len(df_shard)
boundaries = np.linspace(0, n_shard, num_cores + 1, dtype=np.int64)
yield [df_shard[boundaries[j]:boundaries[j+1]] for j in range(num_cores)]
def _shard_dict_to_examples(shard_dict):
"""Converts a dict of arrays into a list of example bytes."""
n = [i for i in shard_dict.values()][0].shape[0]
feature_list = [{} for _ in range(n)]
for column, values in shard_dict.items():
if len(values.shape) == 1:
values = np.reshape(values, values.shape + (1,))
if values.dtype.kind == "i":
feature_map = lambda x: tf.train.Feature(
int64_list=tf.train.Int64List(value=x))
elif values.dtype.kind == "f":
feature_map = lambda x: tf.train.Feature(
float_list=tf.train.FloatList(value=x))
else:
raise ValueError("Invalid dtype")
for i in range(n):
feature_list[i][column] = feature_map(values[i])
examples = [
tf.train.Example(features=tf.train.Features(feature=example_features))
for example_features in feature_list
]
return [e.SerializeToString() for e in examples]
def _serialize_shards(df_shards, columns, pool, writer):
"""Map sharded dataframes to bytes, and write them to a buffer.
Args:
df_shards: A list of pandas dataframes. (Should be of similar size)
columns: The dataframe columns to be serialized.
pool: A multiprocessing pool to serialize in parallel.
writer: A TFRecordWriter to write the serialized shards.
"""
# Pandas does not store columns of arrays as nd arrays. stack remedies this.
map_inputs = [{c: np.stack(shard[c].values, axis=0) for c in columns}
for shard in df_shards]
# Failure within pools is very irksome. Thus, it is better to thoroughly check
# inputs in the main process.
for inp in map_inputs:
# Check that all fields have the same number of rows.
assert len(set([v.shape[0] for v in inp.values()])) == 1
for val in inp.values():
assert hasattr(val, "dtype")
assert hasattr(val.dtype, "kind")
assert val.dtype.kind in ("i", "f")
assert len(val.shape) in (1, 2)
shard_bytes = pool.map(_shard_dict_to_examples, map_inputs)
for s in shard_bytes:
for example in s:
writer.write(example)
def write_to_buffer(dataframe, buffer_path, columns, expected_size=None):
"""Write a dataframe to a binary file for a dataset to consume.
Args:
dataframe: The pandas dataframe to be serialized.
buffer_path: The path where the serialized results will be written.
columns: The dataframe columns to be serialized.
expected_size: The size in bytes of the serialized results. This is used to
lazily construct the buffer.
Returns:
The path of the buffer.
"""
if tf.gfile.Exists(buffer_path) and tf.gfile.Stat(buffer_path).length > 0:
actual_size = tf.gfile.Stat(buffer_path).length
if expected_size == actual_size:
return buffer_path
tf.logging.warning(
"Existing buffer {} has size {}. Expected size {}. Deleting and "
"rebuilding buffer.".format(buffer_path, actual_size, expected_size))
tf.gfile.Remove(buffer_path)
if dataframe is None:
raise ValueError(
"dataframe was None but a valid existing buffer was not found.")
tf.gfile.MakeDirs(os.path.split(buffer_path)[0])
tf.logging.info("Constructing TFRecordDataset buffer: {}".format(buffer_path))
count = 0
pool = multiprocessing.Pool(multiprocessing.cpu_count())
try:
with tf.python_io.TFRecordWriter(buffer_path) as writer:
for df_shards in iter_shard_dataframe(df=dataframe,
rows_per_core=_ROWS_PER_CORE):
_serialize_shards(df_shards, columns, pool, writer)
count += sum([len(s) for s in df_shards])
tf.logging.info("{}/{} examples written."
.format(str(count).ljust(8), len(dataframe)))
finally:
pool.terminate()
tf.logging.info("Buffer write complete.")
return buffer_path
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for binary data file utilities."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import contextlib
import multiprocessing
# pylint: disable=wrong-import-order
import numpy as np
import pandas as pd
import tensorflow as tf
# pylint: enable=wrong-import-order
from official.utils.data import file_io
_RAW_ROW = "raw_row"
_DUMMY_COL = "column_0"
_DUMMY_VEC_COL = "column_1"
_DUMMY_VEC_LEN = 4
_ROWS_PER_CORE = 4
_TEST_CASES = [
# One batch of one
dict(row_count=1, cpu_count=1, expected=[
[[0]]
]),
dict(row_count=10, cpu_count=1, expected=[
[[0, 1, 2, 3]], [[4, 5, 6, 7]], [[8, 9]]
]),
dict(row_count=21, cpu_count=1, expected=[
[[0, 1, 2, 3]], [[4, 5, 6, 7]], [[8, 9, 10, 11]],
[[12, 13, 14, 15]], [[16, 17, 18, 19]], [[20]]
]),
dict(row_count=1, cpu_count=4, expected=[
[[0]]
]),
dict(row_count=10, cpu_count=4, expected=[
[[0, 1], [2, 3, 4], [5, 6], [7, 8, 9]]
]),
dict(row_count=21, cpu_count=4, expected=[
[[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]],
[[16], [17], [18], [19, 20]]
]),
dict(row_count=10, cpu_count=8, expected=[
[[0], [1], [2], [3, 4], [5], [6], [7], [8, 9]]
]),
dict(row_count=40, cpu_count=8, expected=[
[[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15],
[16, 17, 18, 19], [20, 21, 22, 23], [24, 25, 26, 27],
[28, 29, 30, 31]],
[[32], [33], [34], [35], [36], [37], [38], [39]]
]),
]
_FEATURE_MAP = {
_RAW_ROW: tf.FixedLenFeature([1], dtype=tf.int64),
_DUMMY_COL: tf.FixedLenFeature([1], dtype=tf.int64),
_DUMMY_VEC_COL: tf.FixedLenFeature([_DUMMY_VEC_LEN], dtype=tf.float32)
}
@contextlib.contextmanager
def fixed_core_count(cpu_count):
"""Override CPU count.
file_io.py uses the cpu_count function to scale to the size of the instance.
However, this is not desirable for testing because it can make the test flaky.
Instead, this context manager fixes the count for more robust testing.
Args:
cpu_count: How many cores multiprocessing claims to have.
Yields:
Nothing. (for context manager only)
"""
old_count_fn = multiprocessing.cpu_count
multiprocessing.cpu_count = lambda: cpu_count
yield
multiprocessing.cpu_count = old_count_fn
class BaseTest(tf.test.TestCase):
def _test_sharding(self, row_count, cpu_count, expected):
df = pd.DataFrame({_DUMMY_COL: list(range(row_count))})
with fixed_core_count(cpu_count):
shards = list(file_io.iter_shard_dataframe(df, _ROWS_PER_CORE))
result = [[j[_DUMMY_COL].tolist() for j in i] for i in shards]
self.assertAllEqual(expected, result)
def test_tiny_rows_low_core(self):
self._test_sharding(**_TEST_CASES[0])
def test_small_rows_low_core(self):
self._test_sharding(**_TEST_CASES[1])
def test_large_rows_low_core(self):
self._test_sharding(**_TEST_CASES[2])
def test_tiny_rows_medium_core(self):
self._test_sharding(**_TEST_CASES[3])
def test_small_rows_medium_core(self):
self._test_sharding(**_TEST_CASES[4])
def test_large_rows_medium_core(self):
self._test_sharding(**_TEST_CASES[5])
def test_small_rows_large_core(self):
self._test_sharding(**_TEST_CASES[6])
def test_large_rows_large_core(self):
self._test_sharding(**_TEST_CASES[7])
def _serialize_deserialize(self, num_cores=1, num_rows=20):
np.random.seed(1)
df = pd.DataFrame({
# Serialization order is only deterministic for num_cores=1. raw_row is
# used in validation after the deserialization.
_RAW_ROW: np.array(range(num_rows), dtype=np.int64),
_DUMMY_COL: np.random.randint(0, 35, size=(num_rows,)),
_DUMMY_VEC_COL: [
np.array([np.random.random() for _ in range(_DUMMY_VEC_LEN)])
for i in range(num_rows) # pylint: disable=unused-variable
]
})
with fixed_core_count(num_cores):
buffer_path = file_io.write_to_temp_buffer(
df, self.get_temp_dir(), [_RAW_ROW, _DUMMY_COL, _DUMMY_VEC_COL])
with self.test_session(graph=tf.Graph()) as sess:
dataset = tf.data.TFRecordDataset(buffer_path)
dataset = dataset.batch(1).map(
lambda x: tf.parse_example(x, _FEATURE_MAP))
data_iter = dataset.make_one_shot_iterator()
seen_rows = set()
for i in range(num_rows+5):
row = data_iter.get_next()
try:
row_id, val_0, val_1 = sess.run(
[row[_RAW_ROW], row[_DUMMY_COL], row[_DUMMY_VEC_COL]])
row_id, val_0, val_1 = row_id[0][0], val_0[0][0], val_1[0]
assert row_id not in seen_rows
seen_rows.add(row_id)
self.assertEqual(val_0, df[_DUMMY_COL][row_id])
self.assertAllClose(val_1, df[_DUMMY_VEC_COL][row_id])
self.assertLess(i, num_rows, msg="Too many rows.")
except tf.errors.OutOfRangeError:
self.assertGreaterEqual(i, num_rows, msg="Too few rows.")
file_io._GARBAGE_COLLECTOR.purge()
assert not tf.gfile.Exists(buffer_path)
def test_serialize_deserialize_0(self):
self._serialize_deserialize(num_cores=1)
def test_serialize_deserialize_1(self):
self._serialize_deserialize(num_cores=2)
def test_serialize_deserialize_2(self):
self._serialize_deserialize(num_cores=8)
if __name__ == "__main__":
tf.test.main()
......@@ -25,7 +25,7 @@ from official.utils.flags._conventions import help_wrap
from official.utils.logs import hooks_helper
def define_base(data_dir=True, model_dir=True, train_epochs=True,
def define_base(data_dir=True, model_dir=True, clean=True, train_epochs=True,
epochs_between_evals=True, stop_threshold=True, batch_size=True,
num_gpu=True, hooks=True, export_dir=True):
"""Register base flags.
......@@ -59,6 +59,12 @@ def define_base(data_dir=True, model_dir=True, train_epochs=True,
help=help_wrap("The location of the model checkpoint files."))
key_flags.append("model_dir")
if clean:
flags.DEFINE_boolean(
name="clean", default=False,
help=help_wrap("If set, model_dir will be removed if it exists."))
key_flags.append("clean")
if train_epochs:
flags.DEFINE_integer(
name="train_epochs", short_name="te", default=1,
......
......@@ -94,10 +94,11 @@ def get_logging_tensor_hook(every_n_iter=100, tensors_to_log=None, **kwargs): #
every_n_iter=every_n_iter)
def get_profiler_hook(save_steps=1000, **kwargs): # pylint: disable=unused-argument
def get_profiler_hook(model_dir, save_steps=1000, **kwargs): # pylint: disable=unused-argument
"""Function to get ProfilerHook.
Args:
model_dir: The directory to save the profile traces to.
save_steps: `int`, print profile traces every N steps.
**kwargs: a dictionary of arguments to ProfilerHook.
......@@ -105,7 +106,7 @@ def get_profiler_hook(save_steps=1000, **kwargs): # pylint: disable=unused-argu
Returns a ProfilerHook that writes out timelines that can be loaded into
profiling tools like chrome://tracing.
"""
return tf.train.ProfilerHook(save_steps=save_steps)
return tf.train.ProfilerHook(save_steps=save_steps, output_dir=model_dir)
def get_examples_per_second_hook(every_n_steps=100,
......
......@@ -31,18 +31,19 @@ class BaseTest(unittest.TestCase):
def test_raise_in_non_list_names(self):
with self.assertRaises(ValueError):
hooks_helper.get_train_hooks(
'LoggingTensorHook, ProfilerHook', batch_size=256)
'LoggingTensorHook, ProfilerHook', model_dir="", batch_size=256)
def test_raise_in_invalid_names(self):
invalid_names = ['StepCounterHook', 'StopAtStepHook']
with self.assertRaises(ValueError):
hooks_helper.get_train_hooks(invalid_names, batch_size=256)
hooks_helper.get_train_hooks(invalid_names, model_dir="", batch_size=256)
def validate_train_hook_name(self,
test_hook_name,
expected_hook_name,
**kwargs):
returned_hook = hooks_helper.get_train_hooks([test_hook_name], **kwargs)
returned_hook = hooks_helper.get_train_hooks(
[test_hook_name], model_dir="", **kwargs)
self.assertEqual(len(returned_hook), 1)
self.assertIsInstance(returned_hook[0], tf.train.SessionRunHook)
self.assertEqual(returned_hook[0].__class__.__name__.lower(),
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment