Unverified Commit 64d6c094 authored by Hongjun Choi's avatar Hongjun Choi Committed by GitHub
Browse files

Merged commit includes the following changes: (#7281)

* Merged commit includes the following changes:
259442882  by hongkuny<hongkuny@google.com>:

    Internal

--
259377621  by A. Unique TensorFlower<gardener@tensorflow.org>:

    Fix NCF serialization/de-serialization logic in NCF input pipeline to use tf.FixedLenFeature instead of raw string/binary decoding.

--
259373183  by A. Unique TensorFlower<gardener@tensorflow.org>:

    Create binary to generate NCF training/evaluation dataset offline.

--
259026454  by isaprykin<isaprykin@google.com>:

    Internal change

258871624  by hongkuny<hongkuny@google.com>:

    Internal change

257285772  by haoyuzhang<haoyuzhang@google.com>:

    Internal change

256202287  by A. Unique TensorFlower<gardener@tensorflow.org>:

    Internal change.

--
254069984  by hongkuny<hongkuny@google.com>:
    Automated rollback of changelist 254060732.

254060732  by yifeif<yifeif@google.com>:
    Automated rollback of changelist 254027750.

254027750  by hongkuny<hongkuny@google.com>:

    Internal change

253118910  by hongkuny<hongkuny@google.com>:

    Internal change

251906769  by hongkuny<hongkuny@google.com>:

    Internal change

251303452  by haoyuzhang<haoyuzhang@google.com>:

    Internal change

PiperOrigin-RevId: 259442882

* Update ncf_keras_main.py
parent 609260cd
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Binary to generate training/evaluation dataset for NCF model."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import json
# pylint: disable=g-bad-import-order
from absl import app
from absl import flags
import tensorflow.compat.v2 as tf
# pylint: enable=g-bad-import-order
from official.datasets import movielens
from official.recommendation import data_preprocessing
flags.DEFINE_string(
"data_dir", None,
"The input data dir at which training and evaluation tf record files "
"will be saved.")
flags.DEFINE_string("meta_data_file_path", None,
"The path in which input meta data will be written.")
flags.DEFINE_enum("dataset", "ml-20m", ["ml-1m", "ml-20m"],
"Dataset to be trained/evaluated.")
flags.DEFINE_enum(
"constructor_type", "bisection", ["bisection", "materialized"],
"Strategy to use for generating false negatives. materialized has a "
"precompute that scales badly, but a faster per-epoch construction "
"time and can be faster on very large systems.")
flags.DEFINE_integer("num_train_epochs", 14,
"Total number of training epochs to generate.")
flags.DEFINE_integer(
"num_negative_samples", 4,
"Number of negative instances to pair with positive instance.")
flags.DEFINE_integer("prebatch_size", 99000,
"Batch size to be used for prebatching the dataset.")
FLAGS = flags.FLAGS
def prepare_raw_data(flag_obj):
"""Downloads and prepares raw data for data generation."""
movielens.download(flag_obj.dataset, flag_obj.data_dir)
data_processing_params = {
"train_epochs": flag_obj.num_train_epochs,
"batch_size": flag_obj.prebatch_size,
"eval_batch_size": flag_obj.prebatch_size,
"batches_per_step": 1,
"stream_files": True,
"num_neg": flag_obj.num_negative_samples,
}
num_users, num_items, producer = data_preprocessing.instantiate_pipeline(
dataset=flag_obj.dataset,
data_dir=flag_obj.data_dir,
params=data_processing_params,
constructor_type=flag_obj.constructor_type,
epoch_dir=flag_obj.data_dir,
generate_data_offline=True)
# pylint: disable=protected-access
input_metadata = {
"num_users": num_users,
"num_items": num_items,
"constructor_type": flag_obj.constructor_type,
"num_train_elements": producer._elements_in_epoch,
"num_eval_elements": producer._eval_elements_in_epoch,
"num_train_epochs": flag_obj.num_train_epochs,
"prebatch_size": flag_obj.prebatch_size,
}
# pylint: enable=protected-access
return producer, input_metadata
def generate_data():
"""Creates NCF train/eval dataset and writes input metadata as a file."""
producer, input_metadata = prepare_raw_data(FLAGS)
producer.run()
with tf.io.gfile.GFile(FLAGS.meta_data_file_path, "w") as writer:
writer.write(json.dumps(input_metadata, indent=4) + "\n")
def main(_):
generate_data()
if __name__ == "__main__":
flags.mark_flag_as_required("data_dir")
flags.mark_flag_as_required("meta_data_file_path")
app.run(main)
...@@ -56,21 +56,6 @@ Eval: ...@@ -56,21 +56,6 @@ Eval:
{spacer}Batch count per epoch: {eval_batch_ct}""" {spacer}Batch count per epoch: {eval_batch_ct}"""
_TRAIN_FEATURE_MAP = {
movielens.USER_COLUMN: tf.io.FixedLenFeature([], dtype=tf.string),
movielens.ITEM_COLUMN: tf.io.FixedLenFeature([], dtype=tf.string),
rconst.MASK_START_INDEX: tf.io.FixedLenFeature([1], dtype=tf.string),
"labels": tf.io.FixedLenFeature([], dtype=tf.string),
}
_EVAL_FEATURE_MAP = {
movielens.USER_COLUMN: tf.io.FixedLenFeature([], dtype=tf.string),
movielens.ITEM_COLUMN: tf.io.FixedLenFeature([], dtype=tf.string),
rconst.DUPLICATE_MASK: tf.io.FixedLenFeature([], dtype=tf.string)
}
class DatasetManager(object): class DatasetManager(object):
"""Helper class for handling TensorFlow specific data tasks. """Helper class for handling TensorFlow specific data tasks.
...@@ -78,9 +63,15 @@ class DatasetManager(object): ...@@ -78,9 +63,15 @@ class DatasetManager(object):
constructor classes and handles the TensorFlow specific portions (TFRecord constructor classes and handles the TensorFlow specific portions (TFRecord
management, tf.Dataset creation, etc.). management, tf.Dataset creation, etc.).
""" """
def __init__(self, is_training, stream_files, batches_per_epoch,
shard_root=None, deterministic=False): def __init__(self,
# type: (bool, bool, int, typing.Optional[str], bool) -> None is_training,
stream_files,
batches_per_epoch,
shard_root=None,
deterministic=False,
num_train_epochs=None):
# type: (bool, bool, int, typing.Optional[str], bool, int) -> None
"""Constructs a `DatasetManager` instance. """Constructs a `DatasetManager` instance.
Args: Args:
is_training: Boolean of whether the data provided is training or is_training: Boolean of whether the data provided is training or
...@@ -92,6 +83,8 @@ class DatasetManager(object): ...@@ -92,6 +83,8 @@ class DatasetManager(object):
batches_per_epoch: The number of batches in a single epoch. batches_per_epoch: The number of batches in a single epoch.
shard_root: The base directory to be used when stream_files=True. shard_root: The base directory to be used when stream_files=True.
deterministic: Forgo non-deterministic speedups. (i.e. sloppy=True) deterministic: Forgo non-deterministic speedups. (i.e. sloppy=True)
num_train_epochs: Number of epochs to generate. If None, then each
call to `get_dataset()` increments the number of epochs requested.
""" """
self._is_training = is_training self._is_training = is_training
self._deterministic = deterministic self._deterministic = deterministic
...@@ -101,7 +94,7 @@ class DatasetManager(object): ...@@ -101,7 +94,7 @@ class DatasetManager(object):
range(rconst.NUM_FILE_SHARDS)] if stream_files else [] range(rconst.NUM_FILE_SHARDS)] if stream_files else []
self._batches_per_epoch = batches_per_epoch self._batches_per_epoch = batches_per_epoch
self._epochs_completed = 0 self._epochs_completed = 0
self._epochs_requested = 0 self._epochs_requested = num_train_epochs if num_train_epochs else 0
self._shard_root = shard_root self._shard_root = shard_root
self._result_queue = queue.Queue() self._result_queue = queue.Queue()
...@@ -119,17 +112,20 @@ class DatasetManager(object): ...@@ -119,17 +112,20 @@ class DatasetManager(object):
rconst.CYCLES_TO_BUFFER and self._is_training) rconst.CYCLES_TO_BUFFER and self._is_training)
@staticmethod @staticmethod
def _serialize(data): def serialize(data):
"""Convert NumPy arrays into a TFRecords entry.""" """Convert NumPy arrays into a TFRecords entry."""
def create_int_feature(values):
return tf.train.Feature(int64_list=tf.train.Int64List(value=list(values)))
feature_dict = { feature_dict = {
k: tf.train.Feature(bytes_list=tf.train.BytesList( k: create_int_feature(v.astype(np.int64)) for k, v in data.items()
value=[memoryview(v).tobytes()])) for k, v in data.items()} }
return tf.train.Example( return tf.train.Example(
features=tf.train.Features(feature=feature_dict)).SerializeToString() features=tf.train.Features(feature=feature_dict)).SerializeToString()
def _deserialize(self, serialized_data, batch_size): def deserialize(self, serialized_data, batch_size):
"""Convert serialized TFRecords into tensors. """Convert serialized TFRecords into tensors.
Args: Args:
...@@ -137,36 +133,58 @@ class DatasetManager(object): ...@@ -137,36 +133,58 @@ class DatasetManager(object):
batch_size: The data arrives pre-batched, so batch size is needed to batch_size: The data arrives pre-batched, so batch size is needed to
deserialize the data. deserialize the data.
""" """
feature_map = _TRAIN_FEATURE_MAP if self._is_training else _EVAL_FEATURE_MAP
features = tf.parse_single_example(serialized_data, feature_map) def _get_feature_map(batch_size, is_training=True):
"""Returns data format of the serialized tf record file."""
users = tf.reshape(tf.decode_raw(
features[movielens.USER_COLUMN], rconst.USER_DTYPE), (batch_size,)) if is_training:
items = tf.reshape(tf.decode_raw( return {
features[movielens.ITEM_COLUMN], rconst.ITEM_DTYPE), (batch_size,)) movielens.USER_COLUMN:
tf.io.FixedLenFeature([batch_size], dtype=tf.int64),
def decode_binary(data_bytes): movielens.ITEM_COLUMN:
# tf.decode_raw does not support bool as a decode type. As a result it is tf.io.FixedLenFeature([batch_size], dtype=tf.int64),
# necessary to decode to int8 (7 of the bits will be ignored) and then rconst.VALID_POINT_MASK:
# cast to bool. tf.io.FixedLenFeature([batch_size], dtype=tf.int64),
return tf.reshape(tf.cast(tf.decode_raw(data_bytes, tf.int8), tf.bool), "labels":
tf.io.FixedLenFeature([batch_size], dtype=tf.int64)
}
else:
return {
movielens.USER_COLUMN:
tf.io.FixedLenFeature([batch_size], dtype=tf.int64),
movielens.ITEM_COLUMN:
tf.io.FixedLenFeature([batch_size], dtype=tf.int64),
rconst.DUPLICATE_MASK:
tf.io.FixedLenFeature([batch_size], dtype=tf.int64)
}
features = tf.parse_single_example(
serialized_data, _get_feature_map(batch_size, self._is_training))
users = tf.reshape(
tf.cast(features[movielens.USER_COLUMN], rconst.USER_DTYPE),
(batch_size,))
items = tf.reshape(
tf.cast(features[movielens.ITEM_COLUMN], rconst.ITEM_DTYPE),
(batch_size,)) (batch_size,))
if self._is_training: if self._is_training:
mask_start_index = tf.decode_raw( valid_point_mask = tf.reshape(
features[rconst.MASK_START_INDEX], tf.int32)[0] tf.cast(features[movielens.ITEM_COLUMN], tf.bool), (batch_size,))
valid_point_mask = tf.less(tf.range(batch_size), mask_start_index)
return { return {
movielens.USER_COLUMN: users, movielens.USER_COLUMN: users,
movielens.ITEM_COLUMN: items, movielens.ITEM_COLUMN: items,
rconst.VALID_POINT_MASK: valid_point_mask, rconst.VALID_POINT_MASK: valid_point_mask,
}, decode_binary(features["labels"]) }, tf.reshape(tf.cast(features["labels"], tf.bool), (batch_size,))
return { return {
movielens.USER_COLUMN: users, movielens.USER_COLUMN:
movielens.ITEM_COLUMN: items, users,
rconst.DUPLICATE_MASK: decode_binary(features[rconst.DUPLICATE_MASK]), movielens.ITEM_COLUMN:
items,
rconst.DUPLICATE_MASK:
tf.reshape(
tf.cast(features[rconst.DUPLICATE_MASK], tf.bool),
(batch_size,))
} }
def put(self, index, data): def put(self, index, data):
...@@ -183,20 +201,20 @@ class DatasetManager(object): ...@@ -183,20 +201,20 @@ class DatasetManager(object):
data: A dict of the data to be stored. This method mutates data, and data: A dict of the data to be stored. This method mutates data, and
therefore expects to be the only consumer. therefore expects to be the only consumer.
""" """
if self._is_training:
mask_start_index = data.pop(rconst.MASK_START_INDEX)
batch_size = data[movielens.ITEM_COLUMN].shape[0]
data[rconst.VALID_POINT_MASK] = np.less(
np.arange(batch_size), mask_start_index)
if self._stream_files: if self._stream_files:
example_bytes = self._serialize(data) example_bytes = self.serialize(data)
with self._write_locks[index % rconst.NUM_FILE_SHARDS]: with self._write_locks[index % rconst.NUM_FILE_SHARDS]:
self._writers[index % rconst.NUM_FILE_SHARDS].write(example_bytes) self._writers[index % rconst.NUM_FILE_SHARDS].write(example_bytes)
else: else:
if self._is_training: self._result_queue.put((
mask_start_index = data.pop(rconst.MASK_START_INDEX) data, data.pop("labels")) if self._is_training else data)
batch_size = data[movielens.ITEM_COLUMN].shape[0]
data[rconst.VALID_POINT_MASK] = np.less(np.arange(batch_size),
mask_start_index)
data = (data, data.pop("labels"))
self._result_queue.put(data)
def start_construction(self): def start_construction(self):
if self._stream_files: if self._stream_files:
...@@ -269,7 +287,7 @@ class DatasetManager(object): ...@@ -269,7 +287,7 @@ class DatasetManager(object):
files=file_pattern, worker_job=popen_helper.worker_job(), files=file_pattern, worker_job=popen_helper.worker_job(),
num_parallel_reads=rconst.NUM_FILE_SHARDS, num_epochs=1, num_parallel_reads=rconst.NUM_FILE_SHARDS, num_epochs=1,
sloppy=not self._deterministic) sloppy=not self._deterministic)
map_fn = functools.partial(self._deserialize, batch_size=batch_size) map_fn = functools.partial(self.deserialize, batch_size=batch_size)
dataset = dataset.map(map_fn, num_parallel_calls=16) dataset = dataset.map(map_fn, num_parallel_calls=16)
else: else:
...@@ -330,7 +348,9 @@ class BaseDataConstructor(threading.Thread): ...@@ -330,7 +348,9 @@ class BaseDataConstructor(threading.Thread):
self.lookup_negative_items self.lookup_negative_items
""" """
def __init__(self,
def __init__(
self,
maximum_number_epochs, # type: int maximum_number_epochs, # type: int
num_users, # type: int num_users, # type: int
num_items, # type: int num_items, # type: int
...@@ -347,7 +367,9 @@ class BaseDataConstructor(threading.Thread): ...@@ -347,7 +367,9 @@ class BaseDataConstructor(threading.Thread):
batches_per_eval_step, # type: int batches_per_eval_step, # type: int
stream_files, # type: bool stream_files, # type: bool
deterministic=False, # type: bool deterministic=False, # type: bool
epoch_dir=None # type: str epoch_dir=None, # type: str
num_train_epochs=None, # type: int
create_data_offline=False # type: bool
): ):
# General constants # General constants
self._maximum_number_epochs = maximum_number_epochs self._maximum_number_epochs = maximum_number_epochs
...@@ -363,6 +385,8 @@ class BaseDataConstructor(threading.Thread): ...@@ -363,6 +385,8 @@ class BaseDataConstructor(threading.Thread):
self._eval_pos_users = eval_pos_users self._eval_pos_users = eval_pos_users
self._eval_pos_items = eval_pos_items self._eval_pos_items = eval_pos_items
self.eval_batch_size = eval_batch_size self.eval_batch_size = eval_batch_size
self.num_train_epochs = num_train_epochs
self.create_data_offline = create_data_offline
# Training # Training
if self._train_pos_users.shape != self._train_pos_items.shape: if self._train_pos_users.shape != self._train_pos_items.shape:
...@@ -396,12 +420,14 @@ class BaseDataConstructor(threading.Thread): ...@@ -396,12 +420,14 @@ class BaseDataConstructor(threading.Thread):
else: else:
self._shard_root = None self._shard_root = None
self._train_dataset = DatasetManager( self._train_dataset = DatasetManager(True, stream_files,
True, stream_files, self.train_batches_per_epoch, self._shard_root, self.train_batches_per_epoch,
deterministic) self._shard_root, deterministic,
self._eval_dataset = DatasetManager( num_train_epochs)
False, stream_files, self.eval_batches_per_epoch, self._shard_root, self._eval_dataset = DatasetManager(False, stream_files,
deterministic) self.eval_batches_per_epoch,
self._shard_root, deterministic,
num_train_epochs)
# Threading details # Threading details
super(BaseDataConstructor, self).__init__() super(BaseDataConstructor, self).__init__()
...@@ -526,7 +552,9 @@ class BaseDataConstructor(threading.Thread): ...@@ -526,7 +552,9 @@ class BaseDataConstructor(threading.Thread):
def _construct_training_epoch(self): def _construct_training_epoch(self):
"""Loop to construct a batch of training data.""" """Loop to construct a batch of training data."""
if not self.create_data_offline:
self._wait_to_construct_train_epoch() self._wait_to_construct_train_epoch()
start_time = timeit.default_timer() start_time = timeit.default_timer()
if self._stop_loop: if self._stop_loop:
return return
......
...@@ -149,8 +149,8 @@ def _filter_index_sort(raw_rating_path, cache_path): ...@@ -149,8 +149,8 @@ def _filter_index_sort(raw_rating_path, cache_path):
df.sort_values([movielens.USER_COLUMN, movielens.TIMESTAMP_COLUMN], df.sort_values([movielens.USER_COLUMN, movielens.TIMESTAMP_COLUMN],
inplace=True, kind="mergesort") inplace=True, kind="mergesort")
df = df.reset_index() # The dataframe does not reconstruct indices in the # The dataframe does not reconstruct indices in the sort or filter steps.
# sort or filter steps. df = df.reset_index()
grouped = df.groupby(movielens.USER_COLUMN, group_keys=False) grouped = df.groupby(movielens.USER_COLUMN, group_keys=False)
eval_df, train_df = grouped.tail(1), grouped.apply(lambda x: x.iloc[:-1]) eval_df, train_df = grouped.tail(1), grouped.apply(lambda x: x.iloc[:-1])
...@@ -177,9 +177,14 @@ def _filter_index_sort(raw_rating_path, cache_path): ...@@ -177,9 +177,14 @@ def _filter_index_sort(raw_rating_path, cache_path):
return data, valid_cache return data, valid_cache
def instantiate_pipeline(dataset, data_dir, params, constructor_type=None, def instantiate_pipeline(dataset,
deterministic=False, epoch_dir=None): data_dir,
# type: (str, str, dict, typing.Optional[str], bool, typing.Optional[str]) -> (int, int, data_pipeline.BaseDataConstructor) params,
constructor_type=None,
deterministic=False,
epoch_dir=None,
generate_data_offline=False):
# type: (str, str, dict, typing.Optional[str], bool, typing.Optional[str], bool) -> (int, int, data_pipeline.BaseDataConstructor)
"""Load and digest data CSV into a usable form. """Load and digest data CSV into a usable form.
Args: Args:
...@@ -190,6 +195,8 @@ def instantiate_pipeline(dataset, data_dir, params, constructor_type=None, ...@@ -190,6 +195,8 @@ def instantiate_pipeline(dataset, data_dir, params, constructor_type=None,
for the input pipeline. for the input pipeline.
deterministic: Tell the data constructor to produce deterministically. deterministic: Tell the data constructor to produce deterministically.
epoch_dir: Directory in which to store the training epochs. epoch_dir: Directory in which to store the training epochs.
generate_data_offline: Boolean, whether current pipeline is done offline
or while training.
""" """
logging.info("Beginning data preprocessing.") logging.info("Beginning data preprocessing.")
...@@ -223,10 +230,10 @@ def instantiate_pipeline(dataset, data_dir, params, constructor_type=None, ...@@ -223,10 +230,10 @@ def instantiate_pipeline(dataset, data_dir, params, constructor_type=None,
eval_pos_items=raw_data[rconst.EVAL_ITEM_KEY], eval_pos_items=raw_data[rconst.EVAL_ITEM_KEY],
eval_batch_size=params["eval_batch_size"], eval_batch_size=params["eval_batch_size"],
batches_per_eval_step=params["batches_per_step"], batches_per_eval_step=params["batches_per_step"],
stream_files=params["use_tpu"], stream_files=params["stream_files"],
deterministic=deterministic, deterministic=deterministic,
epoch_dir=epoch_dir epoch_dir=epoch_dir,
) create_data_offline=generate_data_offline)
run_time = timeit.default_timer() - st run_time = timeit.default_timer() - st
logging.info("Data preprocessing complete. Time: {:.1f} sec." logging.info("Data preprocessing complete. Time: {:.1f} sec."
......
...@@ -109,6 +109,7 @@ class BaseTest(tf.test.TestCase): ...@@ -109,6 +109,7 @@ class BaseTest(tf.test.TestCase):
"match_mlperf": True, "match_mlperf": True,
"use_tpu": False, "use_tpu": False,
"use_xla_for_gpu": False, "use_xla_for_gpu": False,
"stream_files": False,
} }
def test_preprocessing(self): def test_preprocessing(self):
......
...@@ -108,6 +108,7 @@ def parse_flags(flags_obj): ...@@ -108,6 +108,7 @@ def parse_flags(flags_obj):
"epochs_between_evals": FLAGS.epochs_between_evals, "epochs_between_evals": FLAGS.epochs_between_evals,
"keras_use_ctl": flags_obj.keras_use_ctl, "keras_use_ctl": flags_obj.keras_use_ctl,
"hr_threshold": flags_obj.hr_threshold, "hr_threshold": flags_obj.hr_threshold,
"stream_files": flags_obj.tpu is not None,
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment