Commit 6307bf83 authored by A. Unique TensorFlower's avatar A. Unique TensorFlower
Browse files

Internal change

PiperOrigin-RevId: 263401952
parent b7c8dab7
...@@ -117,7 +117,10 @@ def create_dataset_from_data_producer(producer, params): ...@@ -117,7 +117,10 @@ def create_dataset_from_data_producer(producer, params):
return train_input_dataset, eval_input_dataset return train_input_dataset, eval_input_dataset
def create_ncf_input_data(params, producer=None, input_meta_data=None): def create_ncf_input_data(params,
producer=None,
input_meta_data=None,
strategy=None):
"""Creates NCF training/evaluation dataset. """Creates NCF training/evaluation dataset.
Args: Args:
...@@ -128,6 +131,9 @@ def create_ncf_input_data(params, producer=None, input_meta_data=None): ...@@ -128,6 +131,9 @@ def create_ncf_input_data(params, producer=None, input_meta_data=None):
input_meta_data: A dictionary of input metadata to be used when reading data input_meta_data: A dictionary of input metadata to be used when reading data
from tf record files. Must be specified when params["train_input_dataset"] from tf record files. Must be specified when params["train_input_dataset"]
is specified. is specified.
strategy: Distribution strategy used for distributed training. If specified,
used to assert that evaluation batch size is correctly a multiple of
total number of devices used.
Returns: Returns:
(training dataset, evaluation dataset, train steps per epoch, (training dataset, evaluation dataset, train steps per epoch,
...@@ -136,6 +142,17 @@ def create_ncf_input_data(params, producer=None, input_meta_data=None): ...@@ -136,6 +142,17 @@ def create_ncf_input_data(params, producer=None, input_meta_data=None):
Raises: Raises:
ValueError: If data is being generated online for when using TPU's. ValueError: If data is being generated online for when using TPU's.
""" """
# NCF evaluation metric calculation logic assumes that evaluation data
# sample size are in multiples of (1 + number of negative samples in
# evaluation) for each device. As so, evaluation batch size must be a
# multiple of (number of replicas * (1 + number of negative samples)).
num_devices = strategy.num_replicas_in_sync if strategy else 1
if (params["eval_batch_size"] % (num_devices *
(1 + rconst.NUM_EVAL_NEGATIVES))):
raise ValueError("Evaluation batch size must be divisible by {} "
"times {}".format(num_devices,
(1 + rconst.NUM_EVAL_NEGATIVES)))
if params["train_dataset_path"]: if params["train_dataset_path"]:
assert params["eval_dataset_path"] assert params["eval_dataset_path"]
......
...@@ -199,6 +199,7 @@ class NCFKerasAccuracy(NCFKerasBenchmarkBase): ...@@ -199,6 +199,7 @@ class NCFKerasAccuracy(NCFKerasBenchmarkBase):
self._setup() self._setup()
FLAGS.early_stopping = True FLAGS.early_stopping = True
FLAGS.num_gpus = 2 FLAGS.num_gpus = 2
FLAGS.eval_batch_size = 160000
self._run_and_report_benchmark() self._run_and_report_benchmark()
def benchmark_2_gpus_ctl_early_stop(self): def benchmark_2_gpus_ctl_early_stop(self):
...@@ -207,6 +208,7 @@ class NCFKerasAccuracy(NCFKerasBenchmarkBase): ...@@ -207,6 +208,7 @@ class NCFKerasAccuracy(NCFKerasBenchmarkBase):
FLAGS.keras_use_ctl = True FLAGS.keras_use_ctl = True
FLAGS.early_stopping = True FLAGS.early_stopping = True
FLAGS.num_gpus = 2 FLAGS.num_gpus = 2
FLAGS.eval_batch_size = 160000
self._run_and_report_benchmark() self._run_and_report_benchmark()
############################################# #############################################
...@@ -283,6 +285,7 @@ class NCFKerasAccuracy(NCFKerasBenchmarkBase): ...@@ -283,6 +285,7 @@ class NCFKerasAccuracy(NCFKerasBenchmarkBase):
FLAGS.num_gpus = 8 FLAGS.num_gpus = 8
FLAGS.train_epochs = 17 FLAGS.train_epochs = 17
FLAGS.batch_size = 1048576 FLAGS.batch_size = 1048576
FLAGS.eval_batch_size = 160000
FLAGS.learning_rate = 0.0045 FLAGS.learning_rate = 0.0045
FLAGS.beta1 = 0.25 FLAGS.beta1 = 0.25
FLAGS.beta2 = 0.5 FLAGS.beta2 = 0.5
...@@ -295,6 +298,7 @@ class NCFKerasAccuracy(NCFKerasBenchmarkBase): ...@@ -295,6 +298,7 @@ class NCFKerasAccuracy(NCFKerasBenchmarkBase):
FLAGS.num_gpus = 8 FLAGS.num_gpus = 8
FLAGS.train_epochs = 17 FLAGS.train_epochs = 17
FLAGS.batch_size = 1048576 FLAGS.batch_size = 1048576
FLAGS.eval_batch_size = 160000
FLAGS.learning_rate = 0.0045 FLAGS.learning_rate = 0.0045
FLAGS.beta1 = 0.25 FLAGS.beta1 = 0.25
FLAGS.beta2 = 0.5 FLAGS.beta2 = 0.5
...@@ -309,6 +313,7 @@ class NCFKerasAccuracy(NCFKerasBenchmarkBase): ...@@ -309,6 +313,7 @@ class NCFKerasAccuracy(NCFKerasBenchmarkBase):
FLAGS.num_gpus = 8 FLAGS.num_gpus = 8
FLAGS.train_epochs = 17 FLAGS.train_epochs = 17
FLAGS.batch_size = 1048576 FLAGS.batch_size = 1048576
FLAGS.eval_batch_size = 160000
FLAGS.learning_rate = 0.0045 FLAGS.learning_rate = 0.0045
FLAGS.beta1 = 0.25 FLAGS.beta1 = 0.25
FLAGS.beta2 = 0.5 FLAGS.beta2 = 0.5
...@@ -329,6 +334,7 @@ class NCFKerasSynth(NCFKerasBenchmarkBase): ...@@ -329,6 +334,7 @@ class NCFKerasSynth(NCFKerasBenchmarkBase):
default_flags['num_gpus'] = 1 default_flags['num_gpus'] = 1
default_flags['train_epochs'] = 8 default_flags['train_epochs'] = 8
default_flags['batch_size'] = 99000 default_flags['batch_size'] = 99000
default_flags['eval_batch_size'] = 160000
default_flags['learning_rate'] = 0.00382059 default_flags['learning_rate'] = 0.00382059
default_flags['beta1'] = 0.783529 default_flags['beta1'] = 0.783529
default_flags['beta2'] = 0.909003 default_flags['beta2'] = 0.909003
......
...@@ -64,12 +64,20 @@ class MetricLayer(tf.keras.layers.Layer): ...@@ -64,12 +64,20 @@ class MetricLayer(tf.keras.layers.Layer):
def __init__(self, params): def __init__(self, params):
super(MetricLayer, self).__init__() super(MetricLayer, self).__init__()
self.params = params self.params = params
self.metric = tf.keras.metrics.Mean(name=rconst.HR_METRIC_NAME)
def call(self, inputs): def call(self, inputs, training=False):
logits, dup_mask = inputs logits, dup_mask = inputs
in_top_k, metric_weights = metric_fn(logits, dup_mask, self.params)
self.add_metric(self.metric(in_top_k, sample_weight=metric_weights)) if training:
hr_sum = 0.0
hr_count = 0.0
else:
metric, metric_weights = metric_fn(logits, dup_mask, self.params)
hr_sum = tf.reduce_sum(metric * metric_weights)
hr_count = tf.reduce_sum(metric_weights)
self.add_metric(hr_sum, name="hr_sum", aggregation="mean")
self.add_metric(hr_count, name="hr_count", aggregation="mean")
return logits return logits
...@@ -249,7 +257,7 @@ def run_ncf(_): ...@@ -249,7 +257,7 @@ def run_ncf(_):
(train_input_dataset, eval_input_dataset, (train_input_dataset, eval_input_dataset,
num_train_steps, num_eval_steps) = \ num_train_steps, num_eval_steps) = \
(ncf_input_pipeline.create_ncf_input_data( (ncf_input_pipeline.create_ncf_input_data(
params, producer, input_meta_data)) params, producer, input_meta_data, strategy))
steps_per_epoch = None if generate_input_online else num_train_steps steps_per_epoch = None if generate_input_online else num_train_steps
with distribution_utils.get_strategy_scope(strategy): with distribution_utils.get_strategy_scope(strategy):
...@@ -295,11 +303,19 @@ def run_ncf(_): ...@@ -295,11 +303,19 @@ def run_ncf(_):
logging.info("Training done. Start evaluating") logging.info("Training done. Start evaluating")
eval_results = keras_model.evaluate( eval_loss_and_metrics = keras_model.evaluate(
eval_input_dataset, steps=num_eval_steps, verbose=2) eval_input_dataset, steps=num_eval_steps, verbose=2)
logging.info("Keras evaluation is done.") logging.info("Keras evaluation is done.")
# Keras evaluate() API returns scalar loss and metric values from
# evaluation as a list. Here, the returned list would contain
# [evaluation loss, hr sum, hr count].
eval_hit_rate = eval_loss_and_metrics[1] / eval_loss_and_metrics[2]
# Format evaluation result into [eval loss, eval hit accuracy].
eval_results = [eval_loss_and_metrics[0], eval_hit_rate]
if history and history.history: if history and history.history:
train_history = history.history train_history = history.history
train_loss = train_history["loss"][-1] train_loss = train_history["loss"][-1]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment