Unverified Commit 048e5bff authored by Shining Sun's avatar Shining Sun Committed by GitHub
Browse files

Keras-fy NCF Model (#6092)

* tmp commit

* tmp commit

* first attempt (without eval)

* Bug fixes

* bug fixes

* training done

* Loss NAN, no eval

* Loss weight problem solved

* resolve the NAN loss problem

* Problem solved. Clean up needed

* Added a todo

* Remove debug prints

* Extract get_optimizer to ncf_common

* Move metrics computation back to neumf; use DS.scope api

* Extract DS.scope code to utils

* lint fixes

* Move obtaining DS above producer.start to avoid race condition

* move pt 1

* move pt 2

* Update the run script

* Wrap keras_model related code into functions

* Update the doc for softmax_logitfy and change the method name

* Resolve PR comments

* working version with: eager, DS, batch and no masks

* Remove git conflict indicator

* move reshape to neumf_model

* working version, not converge

* converged

* fix a test

* more lint fix

* more lint fix

* more lint fixes

* more lint fix

* Removed unused imports

* fix test

* dummy commit for kicking of checks

* fix lint issue

* dummy input to kick off checks

* dummy input to kick off checks

* add collective to dist strat

* addressed review comments

* add a doc string
parent fa9ed456
......@@ -235,6 +235,8 @@ class DatasetManager(object):
self._result_reuse.append(result)
yield result
def increment_request_epoch(self):
self._epochs_requested += 1
def get_dataset(self, batch_size, epochs_between_evals):
"""Construct the dataset to be used for training and eval.
......@@ -248,7 +250,7 @@ class DatasetManager(object):
epochs_between_evals: How many epochs worth of data to yield.
(Generator mode only.)
"""
self._epochs_requested += 1
self.increment_request_epoch()
if self._stream_files:
if epochs_between_evals > 1:
raise ValueError("epochs_between_evals > 1 not supported for file "
......@@ -626,6 +628,9 @@ class BaseDataConstructor(threading.Thread):
self._train_dataset.make_input_fn(self.train_batch_size) if is_training
else self._eval_dataset.make_input_fn(self.eval_batch_size))
def increment_request_epoch(self):
self._train_dataset.increment_request_epoch()
class DummyConstructor(threading.Thread):
"""Class for running with synthetic data."""
......
......@@ -12,131 +12,62 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""NCF framework to train and evaluate the NeuMF model.
The NeuMF model assembles both MF and MLP models under the NCF framework. Check
`neumf_model.py` for more details about the models.
"""Common functionalities used by both Keras and Estimator implementations.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import contextlib
import heapq
import json
import logging
import math
import multiprocessing
import os
import signal
import typing
# pylint: disable=g-bad-import-order
import numpy as np
from absl import app as absl_app
from absl import flags
import tensorflow as tf
# pylint: enable=g-bad-import-order
from tensorflow.contrib.compiler import xla
from official.datasets import movielens
from official.recommendation import constants as rconst
from official.recommendation import data_pipeline
from official.recommendation import data_preprocessing
from official.recommendation import neumf_model
from official.utils.flags import core as flags_core
from official.utils.logs import hooks_helper
from official.utils.logs import logger
from official.utils.logs import mlperf_helper
from official.utils.misc import distribution_utils
from official.utils.misc import model_helpers
FLAGS = flags.FLAGS
def construct_estimator(model_dir, params):
"""Construct either an Estimator or TPUEstimator for NCF.
Args:
model_dir: The model directory for the estimator
params: The params dict for the estimator
Returns:
An Estimator or TPUEstimator.
"""
if params["use_tpu"]:
# Some of the networking libraries are quite chatty.
for name in ["googleapiclient.discovery", "googleapiclient.discovery_cache",
"oauth2client.transport"]:
logging.getLogger(name).setLevel(logging.ERROR)
tpu_cluster_resolver = tf.contrib.cluster_resolver.TPUClusterResolver(
tpu=params["tpu"],
zone=params["tpu_zone"],
project=params["tpu_gcp_project"],
coordinator_name="coordinator"
)
tf.logging.info("Issuing reset command to TPU to ensure a clean state.")
tf.Session.reset(tpu_cluster_resolver.get_master())
# Estimator looks at the master it connects to for MonitoredTrainingSession
# by reading the `TF_CONFIG` environment variable, and the coordinator
# is used by StreamingFilesDataset.
tf_config_env = {
"session_master": tpu_cluster_resolver.get_master(),
"eval_session_master": tpu_cluster_resolver.get_master(),
"coordinator": tpu_cluster_resolver.cluster_spec()
.as_dict()["coordinator"]
}
os.environ['TF_CONFIG'] = json.dumps(tf_config_env)
def get_inputs(params):
"""Returns some parameters used by the model."""
if FLAGS.download_if_missing and not FLAGS.use_synthetic_data:
movielens.download(FLAGS.dataset, FLAGS.data_dir)
distribution = tf.contrib.distribute.TPUStrategy(
tpu_cluster_resolver, steps_per_run=100)
if FLAGS.seed is not None:
np.random.seed(FLAGS.seed)
if FLAGS.use_synthetic_data:
producer = data_pipeline.DummyConstructor()
num_users, num_items = data_preprocessing.DATASET_TO_NUM_USERS_AND_ITEMS[
FLAGS.dataset]
num_train_steps = rconst.SYNTHETIC_BATCHES_PER_EPOCH
num_eval_steps = rconst.SYNTHETIC_BATCHES_PER_EPOCH
else:
distribution = distribution_utils.get_distribution_strategy(
num_gpus=params["num_gpus"])
num_users, num_items, producer = data_preprocessing.instantiate_pipeline(
dataset=FLAGS.dataset, data_dir=FLAGS.data_dir, params=params,
constructor_type=FLAGS.constructor_type,
deterministic=FLAGS.seed is not None)
run_config = tf.estimator.RunConfig(train_distribute=distribution,
eval_distribute=distribution)
model_fn = neumf_model.neumf_model_fn
if params["use_xla_for_gpu"]:
tf.logging.info("Using XLA for GPU for training and evaluation.")
model_fn = xla.estimator_model_fn(model_fn)
estimator = tf.estimator.Estimator(model_fn=model_fn, model_dir=model_dir,
config=run_config, params=params)
return estimator
def log_and_get_hooks(eval_batch_size):
"""Convenience function for hook and logger creation."""
# Create hooks that log information about the training and metric values
train_hooks = hooks_helper.get_train_hooks(
FLAGS.hooks,
model_dir=FLAGS.model_dir,
batch_size=FLAGS.batch_size, # for ExamplesPerSecondHook
tensors_to_log={"cross_entropy": "cross_entropy"}
)
run_params = {
"batch_size": FLAGS.batch_size,
"eval_batch_size": eval_batch_size,
"number_factors": FLAGS.num_factors,
"hr_threshold": FLAGS.hr_threshold,
"train_epochs": FLAGS.train_epochs,
}
benchmark_logger = logger.get_benchmark_logger()
benchmark_logger.log_run_info(
model_name="recommendation",
dataset_name=FLAGS.dataset,
run_params=run_params,
test_id=FLAGS.benchmark_test_id)
num_train_steps = (producer.train_batches_per_epoch //
params["batches_per_step"])
num_eval_steps = (producer.eval_batches_per_epoch //
params["batches_per_step"])
assert not producer.train_batches_per_epoch % params["batches_per_step"]
assert not producer.eval_batches_per_epoch % params["batches_per_step"]
return benchmark_logger, train_hooks
return num_users, num_items, num_train_steps, num_eval_steps, producer
def parse_flags(flags_obj):
......@@ -174,112 +105,62 @@ def parse_flags(flags_obj):
"match_mlperf": flags_obj.ml_perf,
"use_xla_for_gpu": flags_obj.use_xla_for_gpu,
"epochs_between_evals": FLAGS.epochs_between_evals,
"turn_off_distribution_strategy": FLAGS.turn_off_distribution_strategy,
}
def main(_):
with logger.benchmark_context(FLAGS), \
mlperf_helper.LOGGER(FLAGS.output_ml_perf_compliance_logging):
mlperf_helper.set_ncf_root(os.path.split(os.path.abspath(__file__))[0])
run_ncf(FLAGS)
def run_ncf(_):
"""Run NCF training and eval loop."""
if FLAGS.download_if_missing and not FLAGS.use_synthetic_data:
movielens.download(FLAGS.dataset, FLAGS.data_dir)
if FLAGS.seed is not None:
np.random.seed(FLAGS.seed)
params = parse_flags(FLAGS)
total_training_cycle = FLAGS.train_epochs // FLAGS.epochs_between_evals
if FLAGS.use_synthetic_data:
producer = data_pipeline.DummyConstructor()
num_users, num_items = data_preprocessing.DATASET_TO_NUM_USERS_AND_ITEMS[
FLAGS.dataset]
num_train_steps = rconst.SYNTHETIC_BATCHES_PER_EPOCH
num_eval_steps = rconst.SYNTHETIC_BATCHES_PER_EPOCH
else:
num_users, num_items, producer = data_preprocessing.instantiate_pipeline(
dataset=FLAGS.dataset, data_dir=FLAGS.data_dir, params=params,
constructor_type=FLAGS.constructor_type,
deterministic=FLAGS.seed is not None)
num_train_steps = (producer.train_batches_per_epoch //
params["batches_per_step"])
num_eval_steps = (producer.eval_batches_per_epoch //
params["batches_per_step"])
assert not producer.train_batches_per_epoch % params["batches_per_step"]
assert not producer.eval_batches_per_epoch % params["batches_per_step"]
producer.start()
params["num_users"], params["num_items"] = num_users, num_items
model_helpers.apply_clean(flags.FLAGS)
estimator = construct_estimator(model_dir=FLAGS.model_dir, params=params)
benchmark_logger, train_hooks = log_and_get_hooks(params["eval_batch_size"])
target_reached = False
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.TRAIN_LOOP)
for cycle_index in range(total_training_cycle):
assert FLAGS.epochs_between_evals == 1 or not mlperf_helper.LOGGER.enabled
tf.logging.info("Starting a training cycle: {}/{}".format(
cycle_index + 1, total_training_cycle))
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.TRAIN_EPOCH,
value=cycle_index)
train_input_fn = producer.make_input_fn(is_training=True)
estimator.train(input_fn=train_input_fn, hooks=train_hooks,
steps=num_train_steps)
def get_optimizer(params):
optimizer = tf.train.AdamOptimizer(
learning_rate=params["learning_rate"],
beta1=params["beta1"],
beta2=params["beta2"],
epsilon=params["epsilon"])
if params["use_tpu"]:
optimizer = tf.contrib.tpu.CrossShardOptimizer(optimizer)
tf.logging.info("Beginning evaluation.")
eval_input_fn = producer.make_input_fn(is_training=False)
return optimizer
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.EVAL_START,
value=cycle_index)
eval_results = estimator.evaluate(eval_input_fn, steps=num_eval_steps)
tf.logging.info("Evaluation complete.")
hr = float(eval_results[rconst.HR_KEY])
ndcg = float(eval_results[rconst.NDCG_KEY])
loss = float(eval_results["loss"])
def get_distribution_strategy(params):
"""Returns the distribution strategy to use."""
if params["turn_off_distribution_strategy"]:
return None
mlperf_helper.ncf_print(
key=mlperf_helper.TAGS.EVAL_TARGET,
value={"epoch": cycle_index, "value": FLAGS.hr_threshold})
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.EVAL_ACCURACY,
value={"epoch": cycle_index, "value": hr})
mlperf_helper.ncf_print(
key=mlperf_helper.TAGS.EVAL_HP_NUM_NEG,
value={"epoch": cycle_index, "value": rconst.NUM_EVAL_NEGATIVES})
if params["use_tpu"]:
# Some of the networking libraries are quite chatty.
for name in ["googleapiclient.discovery", "googleapiclient.discovery_cache",
"oauth2client.transport"]:
logging.getLogger(name).setLevel(logging.ERROR)
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.EVAL_STOP, value=cycle_index)
tpu_cluster_resolver = tf.contrib.cluster_resolver.TPUClusterResolver(
tpu=params["tpu"],
zone=params["tpu_zone"],
project=params["tpu_gcp_project"],
coordinator_name="coordinator"
)
# Benchmark the evaluation results
benchmark_logger.log_evaluation_result(eval_results)
# Log the HR and NDCG results.
tf.logging.info(
"Iteration {}: HR = {:.4f}, NDCG = {:.4f}, Loss = {:.4f}".format(
cycle_index + 1, hr, ndcg, loss))
tf.logging.info("Issuing reset command to TPU to ensure a clean state.")
tf.Session.reset(tpu_cluster_resolver.get_master())
# If some evaluation threshold is met
if model_helpers.past_stop_threshold(FLAGS.hr_threshold, hr):
target_reached = True
break
# Estimator looks at the master it connects to for MonitoredTrainingSession
# by reading the `TF_CONFIG` environment variable, and the coordinator
# is used by StreamingFilesDataset.
tf_config_env = {
"session_master": tpu_cluster_resolver.get_master(),
"eval_session_master": tpu_cluster_resolver.get_master(),
"coordinator": tpu_cluster_resolver.cluster_spec()
.as_dict()["coordinator"]
}
os.environ['TF_CONFIG'] = json.dumps(tf_config_env)
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.RUN_STOP,
value={"success": target_reached})
producer.stop_loop()
producer.join()
distribution = tf.contrib.distribute.TPUStrategy(
tpu_cluster_resolver, steps_per_run=100)
# Clear the session explicitly to avoid session delete error
tf.keras.backend.clear_session()
else:
distribution = distribution_utils.get_distribution_strategy(
num_gpus=params["num_gpus"])
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.RUN_FINAL)
return distribution
def define_ncf_flags():
......@@ -421,6 +302,12 @@ def define_ncf_flags():
name="seed", default=None, help=flags_core.help_wrap(
"This value will be used to seed both NumPy and TensorFlow."))
flags.DEFINE_boolean(
name="turn_off_distribution_strategy",
default=False,
help=flags_core.help_wrap(
"If set, do not use any distribution strategy."))
@flags.validator("eval_batch_size", "eval_batch_size must be at least {}"
.format(rconst.NUM_EVAL_NEGATIVES + 1))
def eval_size_check(eval_batch_size):
......@@ -438,7 +325,10 @@ def define_ncf_flags():
return not flag_dict["use_xla_for_gpu"] or not flag_dict["tpu"]
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
define_ncf_flags()
absl_app.run(main)
def convert_to_softmax_logits(logits):
'''Convert the logits returned by the base model to softmax logits.
Softmax with the first column of zeros is equivalent to sigmoid.
'''
softmax_logits = tf.concat([logits * 0, logits], axis=1)
return softmax_logits
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""NCF framework to train and evaluate the NeuMF model.
The NeuMF model assembles both MF and MLP models under the NCF framework. Check
`neumf_model.py` for more details about the models.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import contextlib
import heapq
import json
import logging
import math
import multiprocessing
import os
import signal
import typing
# pylint: disable=g-bad-import-order
import numpy as np
from absl import app as absl_app
from absl import flags
import tensorflow as tf
# pylint: enable=g-bad-import-order
from tensorflow.contrib.compiler import xla
from official.datasets import movielens
from official.recommendation import constants as rconst
from official.recommendation import data_pipeline
from official.recommendation import data_preprocessing
from official.recommendation import ncf_common
from official.recommendation import neumf_model
from official.utils.flags import core as flags_core
from official.utils.logs import hooks_helper
from official.utils.logs import logger
from official.utils.logs import mlperf_helper
from official.utils.misc import distribution_utils
from official.utils.misc import model_helpers
FLAGS = flags.FLAGS
def construct_estimator(model_dir, params):
"""Construct either an Estimator or TPUEstimator for NCF.
Args:
model_dir: The model directory for the estimator
params: The params dict for the estimator
Returns:
An Estimator or TPUEstimator.
"""
distribution = ncf_common.get_distribution_strategy(params)
run_config = tf.estimator.RunConfig(train_distribute=distribution,
eval_distribute=distribution)
model_fn = neumf_model.neumf_model_fn
if params["use_xla_for_gpu"]:
tf.logging.info("Using XLA for GPU for training and evaluation.")
model_fn = xla.estimator_model_fn(model_fn)
estimator = tf.estimator.Estimator(model_fn=model_fn, model_dir=model_dir,
config=run_config, params=params)
return estimator
def log_and_get_hooks(eval_batch_size):
"""Convenience function for hook and logger creation."""
# Create hooks that log information about the training and metric values
train_hooks = hooks_helper.get_train_hooks(
FLAGS.hooks,
model_dir=FLAGS.model_dir,
batch_size=FLAGS.batch_size, # for ExamplesPerSecondHook
tensors_to_log={"cross_entropy": "cross_entropy"}
)
run_params = {
"batch_size": FLAGS.batch_size,
"eval_batch_size": eval_batch_size,
"number_factors": FLAGS.num_factors,
"hr_threshold": FLAGS.hr_threshold,
"train_epochs": FLAGS.train_epochs,
}
benchmark_logger = logger.get_benchmark_logger()
benchmark_logger.log_run_info(
model_name="recommendation",
dataset_name=FLAGS.dataset,
run_params=run_params,
test_id=FLAGS.benchmark_test_id)
return benchmark_logger, train_hooks
def main(_):
with logger.benchmark_context(FLAGS), \
mlperf_helper.LOGGER(FLAGS.output_ml_perf_compliance_logging):
mlperf_helper.set_ncf_root(os.path.split(os.path.abspath(__file__))[0])
run_ncf(FLAGS)
def run_ncf(_):
"""Run NCF training and eval loop."""
params = ncf_common.parse_flags(FLAGS)
num_users, num_items, num_train_steps, num_eval_steps, producer = (
ncf_common.get_inputs(params))
params["num_users"], params["num_items"] = num_users, num_items
producer.start()
model_helpers.apply_clean(flags.FLAGS)
estimator = construct_estimator(model_dir=FLAGS.model_dir, params=params)
benchmark_logger, train_hooks = log_and_get_hooks(params["eval_batch_size"])
total_training_cycle = FLAGS.train_epochs // FLAGS.epochs_between_evals
target_reached = False
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.TRAIN_LOOP)
for cycle_index in range(total_training_cycle):
assert FLAGS.epochs_between_evals == 1 or not mlperf_helper.LOGGER.enabled
tf.logging.info("Starting a training cycle: {}/{}".format(
cycle_index + 1, total_training_cycle))
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.TRAIN_EPOCH,
value=cycle_index)
train_input_fn = producer.make_input_fn(is_training=True)
estimator.train(input_fn=train_input_fn, hooks=train_hooks,
steps=num_train_steps)
tf.logging.info("Beginning evaluation.")
eval_input_fn = producer.make_input_fn(is_training=False)
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.EVAL_START,
value=cycle_index)
eval_results = estimator.evaluate(eval_input_fn, steps=num_eval_steps)
tf.logging.info("Evaluation complete.")
hr = float(eval_results[rconst.HR_KEY])
ndcg = float(eval_results[rconst.NDCG_KEY])
loss = float(eval_results["loss"])
mlperf_helper.ncf_print(
key=mlperf_helper.TAGS.EVAL_TARGET,
value={"epoch": cycle_index, "value": FLAGS.hr_threshold})
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.EVAL_ACCURACY,
value={"epoch": cycle_index, "value": hr})
mlperf_helper.ncf_print(
key=mlperf_helper.TAGS.EVAL_HP_NUM_NEG,
value={"epoch": cycle_index, "value": rconst.NUM_EVAL_NEGATIVES})
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.EVAL_STOP, value=cycle_index)
# Benchmark the evaluation results
benchmark_logger.log_evaluation_result(eval_results)
# Log the HR and NDCG results.
tf.logging.info(
"Iteration {}: HR = {:.4f}, NDCG = {:.4f}, Loss = {:.4f}".format(
cycle_index + 1, hr, ndcg, loss))
# If some evaluation threshold is met
if model_helpers.past_stop_threshold(FLAGS.hr_threshold, hr):
target_reached = True
break
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.RUN_STOP,
value={"success": target_reached})
producer.stop_loop()
producer.join()
# Clear the session explicitly to avoid session delete error
tf.keras.backend.clear_session()
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.RUN_FINAL)
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
ncf_common.define_ncf_flags()
absl_app.run(main)
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""NCF framework to train and evaluate the NeuMF model.
The NeuMF model assembles both MF and MLP models under the NCF framework. Check
`neumf_model.py` for more details about the models.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
# pylint: disable=g-bad-import-order
from absl import app as absl_app
from absl import flags
import tensorflow as tf
# pylint: enable=g-bad-import-order
from official.datasets import movielens
from official.recommendation import ncf_common
from official.recommendation import neumf_model
from official.recommendation import constants as rconst
from official.utils.logs import logger
from official.utils.logs import mlperf_helper
from official.utils.misc import model_helpers
FLAGS = flags.FLAGS
def _keras_loss(y_true, y_pred):
# Here we are using the exact same loss used by the estimator
loss = tf.losses.sparse_softmax_cross_entropy(
labels=tf.cast(y_true, tf.int32),
logits=y_pred)
return loss
def _get_metric_fn(params):
"""Get the metrix fn used by model compile."""
batch_size = params["batch_size"]
def metric_fn(y_true, y_pred):
"""Returns the in_top_k metric."""
softmax_logits = y_pred
logits = tf.slice(softmax_logits, [0, 1], [batch_size, 1])
# The dup mask should be obtained from input data, but we did not yet find
# a good way of getting it with keras, so we set it to zeros to neglect the
# repetition correction
dup_mask = tf.zeros([batch_size, 1])
cross_entropy, metric_fn, in_top_k, ndcg, metric_weights = (
neumf_model.compute_eval_loss_and_metrics_helper(
logits,
softmax_logits,
dup_mask,
params["num_neg"],
params["match_mlperf"],
params["use_xla_for_gpu"]))
in_top_k = tf.cond(
tf.keras.backend.learning_phase(),
lambda: tf.zeros(shape=in_top_k.shape, dtype=in_top_k.dtype),
lambda: in_top_k)
return in_top_k
return metric_fn
def _get_train_and_eval_data(producer, params):
"""Returns the datasets for training and evalutating."""
train_input_fn = producer.make_input_fn(is_training=True)
train_input_dataset = train_input_fn(params)
def preprocess_eval_input(features):
labels = tf.zeros_like(features[movielens.USER_COLUMN])
return features, labels
eval_input_fn = producer.make_input_fn(is_training=False)
eval_input_dataset = eval_input_fn(params).map(
lambda features: preprocess_eval_input(features))
return train_input_dataset, eval_input_dataset
class IncrementEpochCallback(tf.keras.callbacks.Callback):
"""A callback to increase the requested epoch for the data producer.
The reason why we need this is because we can only buffer a limited amount of
data. So we keep a moving window to represent the buffer. This is to move the
one of the window's boundaries for each epoch.
"""
def __init__(self, producer):
self._producer = producer
def on_epoch_begin(self, epoch, logs=None):
self._producer.increment_request_epoch()
def _get_keras_model(params):
"""Constructs and returns the model."""
batch_size = params['batch_size']
user_input = tf.keras.layers.Input(
shape=(),
batch_size=batch_size,
name=movielens.USER_COLUMN,
dtype=rconst.USER_DTYPE)
item_input = tf.keras.layers.Input(
shape=(),
batch_size=batch_size,
name=movielens.ITEM_COLUMN,
dtype=rconst.ITEM_DTYPE)
base_model = neumf_model.construct_model(user_input, item_input, params)
base_model_output = base_model.output
zeros = tf.keras.layers.Lambda(
lambda x: x * 0)(base_model_output)
softmax_logits = tf.keras.layers.concatenate(
[zeros, base_model_output],
axis=-1)
keras_model = tf.keras.Model(
inputs=[user_input, item_input],
outputs=softmax_logits)
keras_model.summary()
return keras_model
def run_ncf(_):
"""Run NCF training and eval with Keras."""
params = ncf_common.parse_flags(FLAGS)
num_users, num_items, num_train_steps, num_eval_steps, producer = (
ncf_common.get_inputs(params))
params["num_users"], params["num_items"] = num_users, num_items
producer.start()
model_helpers.apply_clean(flags.FLAGS)
keras_model = _get_keras_model(params)
optimizer = ncf_common.get_optimizer(params)
keras_model.compile(
loss=_keras_loss,
metrics=[_get_metric_fn(params)],
optimizer=optimizer)
train_input_dataset, eval_input_dataset = _get_train_and_eval_data(
producer, params)
keras_model.fit(
train_input_dataset,
epochs=FLAGS.train_epochs,
callbacks=[IncrementEpochCallback(producer)],
verbose=2)
tf.logging.info("Training done. Start evaluating")
eval_results = keras_model.evaluate(
eval_input_dataset,
steps=num_eval_steps,
verbose=2)
tf.logging.info("Keras evaluation is done.")
return eval_results
def main(_):
with logger.benchmark_context(FLAGS), \
mlperf_helper.LOGGER(FLAGS.output_ml_perf_compliance_logging):
mlperf_helper.set_ncf_root(os.path.split(os.path.abspath(__file__))[0])
if FLAGS.tpu:
raise ValueError("NCF in Keras does not support TPU for now")
if FLAGS.num_gpus > 1:
raise ValueError("NCF in Keras does not support distribution strategies. "
"Please set num_gpus to 1")
run_ncf(FLAGS)
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
ncf_common.define_ncf_flags()
absl_app.run(main)
......@@ -28,7 +28,8 @@ from absl.testing import flagsaver
from official.recommendation import constants as rconst
from official.recommendation import data_pipeline
from official.recommendation import neumf_model
from official.recommendation import ncf_main
from official.recommendation import ncf_common
from official.recommendation import ncf_estimator_main
NUM_TRAIN_NEG = 4
......@@ -39,7 +40,7 @@ class NcfTest(tf.test.TestCase):
@classmethod
def setUpClass(cls): # pylint: disable=invalid-name
super(NcfTest, cls).setUpClass()
ncf_main.define_ncf_flags()
ncf_common.define_ncf_flags()
def setUp(self):
self.top_k_old = rconst.TOP_K
......@@ -70,7 +71,7 @@ class NcfTest(tf.test.TestCase):
logits], axis=1)
duplicate_mask = tf.convert_to_tensor(duplicate_mask, tf.float32)
metric_ops = neumf_model.compute_eval_loss_and_metrics(
metric_ops = neumf_model._get_estimator_spec_with_metrics(
logits=logits, softmax_logits=softmax_logits,
duplicate_mask=duplicate_mask, num_training_neg=NUM_TRAIN_NEG,
match_mlperf=match_mlperf).eval_metric_ops
......@@ -192,12 +193,12 @@ class NcfTest(tf.test.TestCase):
@flagsaver.flagsaver(**_BASE_END_TO_END_FLAGS)
@mock.patch.object(rconst, "SYNTHETIC_BATCHES_PER_EPOCH", 100)
def test_end_to_end(self):
ncf_main.main(None)
ncf_estimator_main.main(None)
@flagsaver.flagsaver(ml_perf=True, **_BASE_END_TO_END_FLAGS)
@mock.patch.object(rconst, "SYNTHETIC_BATCHES_PER_EPOCH", 100)
def test_end_to_end_mlperf(self):
ncf_main.main(None)
ncf_estimator_main.main(None)
if __name__ == "__main__":
......
......@@ -34,13 +34,13 @@ from __future__ import division
from __future__ import print_function
import sys
import typing
from six.moves import xrange # pylint: disable=redefined-builtin
import tensorflow as tf
from official.datasets import movielens # pylint: disable=g-bad-import-order
from official.recommendation import constants as rconst
from official.recommendation import ncf_common
from official.recommendation import stat_utils
from official.utils.logs import mlperf_helper
......@@ -78,16 +78,20 @@ def neumf_model_fn(features, labels, mode, params):
users = features[movielens.USER_COLUMN]
items = features[movielens.ITEM_COLUMN]
logits = construct_model(users, items, params).output
user_input = tf.keras.layers.Input(tensor=users)
item_input = tf.keras.layers.Input(tensor=items)
logits = construct_model(user_input, item_input, params).output
# Softmax with the first column of zeros is equivalent to sigmoid.
softmax_logits = tf.concat([tf.zeros(logits.shape, dtype=logits.dtype),
logits], axis=1)
softmax_logits = ncf_common.convert_to_softmax_logits(logits)
if mode == tf.estimator.ModeKeys.EVAL:
duplicate_mask = tf.cast(features[rconst.DUPLICATE_MASK], tf.float32)
return compute_eval_loss_and_metrics(
logits, softmax_logits, duplicate_mask, params["num_neg"],
return _get_estimator_spec_with_metrics(
logits,
softmax_logits,
duplicate_mask,
params["num_neg"],
params["match_mlperf"],
use_tpu_spec=params["use_xla_for_gpu"])
......@@ -105,14 +109,11 @@ def neumf_model_fn(features, labels, mode, params):
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.OPT_HP_ADAM_EPSILON,
value=params["epsilon"])
optimizer = tf.train.AdamOptimizer(
learning_rate=params["learning_rate"], beta1=params["beta1"],
beta2=params["beta2"], epsilon=params["epsilon"])
if params["use_tpu"]:
optimizer = tf.contrib.tpu.CrossShardOptimizer(optimizer)
optimizer = ncf_common.get_optimizer(params)
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.MODEL_HP_LOSS_FN,
value=mlperf_helper.TAGS.BCE)
loss = tf.losses.sparse_softmax_cross_entropy(
labels=labels,
logits=softmax_logits,
......@@ -138,13 +139,17 @@ def neumf_model_fn(features, labels, mode, params):
raise NotImplementedError
def construct_model(users, items, params):
def _strip_first_and_last_dimension(x, batch_size):
return tf.reshape(x[0, :], (batch_size,))
def construct_model(user_input, item_input, params, need_strip=False):
# type: (tf.Tensor, tf.Tensor, dict) -> tf.keras.Model
"""Initialize NeuMF model.
Args:
users: Tensor of user ids.
items: Tensor of item ids.
user_input: keras input layer for users
item_input: keras input layer for items
params: Dict of hyperparameters.
Raises:
ValueError: if the first model layer is not even.
......@@ -168,13 +173,20 @@ def construct_model(users, items, params):
if model_layers[0] % 2 != 0:
raise ValueError("The first layer size should be multiple of 2!")
# Input variables
user_input = tf.keras.layers.Input(tensor=users, name="user_input")
item_input = tf.keras.layers.Input(tensor=items, name="item_input")
# Initializer for embedding layers
embedding_initializer = "glorot_uniform"
if need_strip:
batch_size = params["batch_size"]
user_input_reshaped = tf.keras.layers.Lambda(
lambda x: _strip_first_and_last_dimension(
x, batch_size))(user_input)
item_input_reshaped = tf.keras.layers.Lambda(
lambda x: _strip_first_and_last_dimension(
x, batch_size))(item_input)
# It turns out to be significantly more effecient to store the MF and MLP
# embedding portions in the same table, and then slice as needed.
mf_slice_fn = lambda x: x[:, :mf_dim]
......@@ -183,13 +195,15 @@ def construct_model(users, items, params):
num_users, mf_dim + model_layers[0] // 2,
embeddings_initializer=embedding_initializer,
embeddings_regularizer=tf.keras.regularizers.l2(mf_regularization),
input_length=1, name="embedding_user")(user_input)
input_length=1, name="embedding_user")(
user_input_reshaped if need_strip else user_input)
embedding_item = tf.keras.layers.Embedding(
num_items, mf_dim + model_layers[0] // 2,
embeddings_initializer=embedding_initializer,
embeddings_regularizer=tf.keras.regularizers.l2(mf_regularization),
input_length=1, name="embedding_item")(item_input)
input_length=1, name="embedding_item")(
item_input_reshaped if need_strip else item_input)
# GMF part
mf_user_latent = tf.keras.layers.Lambda(
......@@ -233,14 +247,46 @@ def construct_model(users, items, params):
return model
def compute_eval_loss_and_metrics(logits, # type: tf.Tensor
softmax_logits, # type: tf.Tensor
duplicate_mask, # type: tf.Tensor
num_training_neg, # type: int
match_mlperf=False, # type: bool
use_tpu_spec=False # type: bool
):
# type: (...) -> tf.estimator.EstimatorSpec
def _get_estimator_spec_with_metrics(logits, # type: tf.Tensor
softmax_logits, # type: tf.Tensor
duplicate_mask, # type: tf.Tensor
num_training_neg, # type: int
match_mlperf=False, # type: bool
use_tpu_spec=False # type: bool
):
"""Returns a EstimatorSpec that includes the metrics."""
cross_entropy, \
metric_fn, \
in_top_k, \
ndcg, \
metric_weights = compute_eval_loss_and_metrics_helper(
logits,
softmax_logits,
duplicate_mask,
num_training_neg,
match_mlperf,
use_tpu_spec)
if use_tpu_spec:
return tf.contrib.tpu.TPUEstimatorSpec(
mode=tf.estimator.ModeKeys.EVAL,
loss=cross_entropy,
eval_metrics=(metric_fn, [in_top_k, ndcg, metric_weights]))
return tf.estimator.EstimatorSpec(
mode=tf.estimator.ModeKeys.EVAL,
loss=cross_entropy,
eval_metric_ops=metric_fn(in_top_k, ndcg, metric_weights)
)
def compute_eval_loss_and_metrics_helper(logits, # type: tf.Tensor
softmax_logits, # type: tf.Tensor
duplicate_mask, # type: tf.Tensor
num_training_neg, # type: int
match_mlperf=False, # type: bool
use_tpu_spec=False # type: bool
):
"""Model evaluation with HR and NDCG metrics.
The evaluation protocol is to rank the test interacted item (truth items)
......@@ -302,7 +348,11 @@ def compute_eval_loss_and_metrics(logits, # type: tf.Tensor
name, TPUEstimatorSpecs work with GPUs
Returns:
An EstimatorSpec for evaluation.
cross_entropy: the loss
metric_fn: the metrics function
in_top_k: hit rate metric
ndcg: ndcg metric
metric_weights: metric weights
"""
in_top_k, ndcg, metric_weights, logits_by_user = compute_top_k_and_ndcg(
logits, duplicate_mask, match_mlperf)
......@@ -342,16 +392,7 @@ def compute_eval_loss_and_metrics(logits, # type: tf.Tensor
name=rconst.NDCG_METRIC_NAME),
}
if use_tpu_spec:
return tf.contrib.tpu.TPUEstimatorSpec(
mode=tf.estimator.ModeKeys.EVAL, loss=cross_entropy,
eval_metrics=(metric_fn, [in_top_k, ndcg, metric_weights]))
return tf.estimator.EstimatorSpec(
mode=tf.estimator.ModeKeys.EVAL,
loss=cross_entropy,
eval_metric_ops=metric_fn(in_top_k, ndcg, metric_weights)
)
return cross_entropy, metric_fn, in_top_k, ndcg, metric_weights
def compute_top_k_and_ndcg(logits, # type: tf.Tensor
......
......@@ -8,6 +8,7 @@ fi
SCRIPT_DIR=`dirname "$BASH_SOURCE"`
export PYTHONPATH="${SCRIPT_DIR}/../../"
MAIN_SCRIPT="ncf_estimator_main.py"
DATASET="ml-20m"
......@@ -38,6 +39,15 @@ fi
DATA_DIR="${ROOT_DIR}/movielens_data"
python "${SCRIPT_DIR}/../datasets/movielens.py" --data_dir ${DATA_DIR} --dataset ${DATASET}
if [ "$1" == "keras" ]
then
MAIN_SCRIPT="ncf_keras_main.py"
BATCH_SIZE=160000
DEVICE_FLAG="--num_gpus 1"
else
BATCH_SIZE=98340
fi
{
for i in `seq 0 4`;
......@@ -58,14 +68,14 @@ do
# To reduce variation set the seed flag:
# --seed ${i}
python -u "${SCRIPT_DIR}/ncf_main.py" \
python -u "${SCRIPT_DIR}/${MAIN_SCRIPT}" \
--model_dir ${MODEL_DIR} \
--data_dir ${DATA_DIR} \
--dataset ${DATASET} --hooks "" \
${DEVICE_FLAG} \
--clean \
--train_epochs 14 \
--batch_size 98304 \
--batch_size ${BATCH_SIZE} \
--eval_batch_size 160000 \
--learning_rate 0.00382059 \
--beta1 0.783529 \
......
#!/bin/bash
set -e
./run.sh keras
......@@ -150,7 +150,7 @@ def run(flags_obj):
distribution_strategy=flags_obj.distribution_strategy,
num_gpus=flags_obj.num_gpus)
strategy_scope = keras_common.get_strategy_scope(strategy)
strategy_scope = distribution_utils.MaybeDistributionScope(strategy)
with strategy_scope:
optimizer = keras_common.get_optimizer()
......
......@@ -144,7 +144,7 @@ def run(flags_obj):
distribution_strategy=flags_obj.distribution_strategy,
num_gpus=flags_obj.num_gpus)
strategy_scope = keras_common.get_strategy_scope(strategy)
strategy_scope = distribution_utils.MaybeDistributionScope(strategy)
with strategy_scope:
optimizer = keras_common.get_optimizer()
......
......@@ -92,6 +92,10 @@ def get_distribution_strategy(distribution_strategy="default",
if distribution_strategy == "parameter_server":
return tf.distribute.experimental.ParameterServerStrategy()
if distribution_strategy == "collective":
return tf.contrib.distribute.CollectiveAllReduceStrategy(
num_gpus_per_worker=num_gpus)
raise ValueError(
"Unrecognized Distribution Strategy: %r" % distribution_strategy)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment