Commit 4c11b84b authored by Shining Sun's avatar Shining Sun Committed by Toby Boyd
Browse files

Added benchmark test and convergence test for the NCF model (#6318)

* initial commit

* bug fix

* Move build_stats from common to keras main, because it is only applicable in keras

* remove tailing blank line

* add test for synth data

* add kwargs to init

* add kwargs to function invokation

* correctly pass kwargs

* debug

* debug

* debug

* fix super init

* bug fix

* fix local_flags

* fix import

* bug fix

* fix log_steps flag

* bug fix

* bug fix: add missing return value

* resolve double-defined flags

* lint fix

* move log_steps flag to benchmarK flag

* fix lint

* lint fix

* lint fix

* try flag core default values

* bug fix

* bug fix

* bug fix

* debug

* debug

* remove debug prints

* rename benchmark methods

* flag bug fix for synth benchmark
parent 6d3989eb
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Executes Keras benchmarks and accuracy tests."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import time
import json
from absl import flags
from absl.testing import flagsaver
import tensorflow as tf # pylint: disable=g-bad-import-order
from official.recommendation import ncf_common
from official.recommendation import ncf_keras_main
from official.utils.flags import core
FLAGS = flags.FLAGS
class KerasNCFBenchmarkBase(tf.test.Benchmark):
"""Base class for NCF model benchmark."""
local_flags = None
def __init__(self,
output_dir=None,
root_data_dir=None,
default_flags=None,
**kwargs):
self.output_dir = output_dir
self.default_flags = default_flags or {}
ncf_common.define_ncf_flags()
if root_data_dir:
FLAGS.data_dir = os.path.join(root_data_dir, 'movielens_data')
def _setup(self):
"""Sets up and resets flags before each test."""
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.DEBUG)
if KerasNCFBenchmarkBase.local_flags is None:
# Loads flags to get defaults to then override. List cannot be empty.
flags.FLAGS(['foo'])
core.set_defaults(**self.default_flags)
saved_flag_values = flagsaver.save_flag_values()
KerasNCFBenchmarkBase.local_flags = saved_flag_values
else:
flagsaver.restore_flag_values(KerasNCFBenchmarkBase.local_flags)
def _run_and_report_benchmark(self):
start_time_sec = time.time()
stats = ncf_keras_main.run_ncf(FLAGS)
wall_time_sec = time.time() - start_time_sec
extras = self._extract_benchmark_report_extras(stats)
self.report_benchmark(iters=-1, wall_time=wall_time_sec, extras=extras)
def _extract_benchmark_report_extras(self, stats):
raise NotImplementedError("Not implemented")
class KerasNCFRealData(KerasNCFBenchmarkBase):
"""Benchmark NCF model using real data."""
def __init__(self,
output_dir=None,
default_flags=None,
**kwargs):
default_flags = {}
default_flags['dataset'] = 'ml-20m'
default_flags['num_gpus'] = 1
default_flags['train_epochs'] = 14
default_flags['batch_size'] = 16000
default_flags['learning_rate'] = 0.00382059
default_flags['beta1'] = 0.783529
default_flags['beta2'] = 0.909003
default_flags['epsilon'] = 1.45439e-07
default_flags['layers'] = [256, 256, 128, 64]
default_flags['num_factors'] = 64
default_flags['hr_threshold'] = 0.635
default_flags['use_synthetic_data'] = False
super(KerasNCFRealData, self).__init__(
output_dir=output_dir,
default_flags=default_flags,
**kwargs)
def _extract_benchmark_report_extras(self, stats):
extras = {}
extras['train_loss'] = stats['loss']
extras['eval_hit_rate'] = stats['eval_hit_rate']
extras['examples_per_second'] = stats['avg_exp_per_second']
return extras
def benchmark_1_gpu(self):
self._setup()
self._run_and_report_benchmark()
class KerasNCFSyntheticData(KerasNCFBenchmarkBase):
"""Benchmark NCF model using synthetic data."""
def __init__(self,
output_dir=None,
default_flags=None,
**kwargs):
default_flags = {}
default_flags['dataset'] = 'ml-20m'
default_flags['num_gpus'] = 1
default_flags['train_epochs'] = 14
default_flags['batch_size'] = 16000
default_flags['learning_rate'] = 0.00382059
default_flags['beta1'] = 0.783529
default_flags['beta2'] = 0.909003
default_flags['epsilon'] = 1.45439e-07
default_flags['layers'] = [256, 256, 128, 64]
default_flags['num_factors'] = 64
default_flags['hr_threshold'] = 0.635
default_flags['use_synthetic_data'] = True
super(KerasNCFSyntheticData, self).__init__(
output_dir=output_dir,
default_flags=default_flags,
**kwargs)
def _extract_benchmark_report_extras(self, stats):
extras = {}
extras['examples_per_second'] = stats['avg_exp_per_second']
return extras
def benchmark_1_gpu(self):
self._setup()
self._run_and_report_benchmark()
......@@ -36,6 +36,7 @@ from official.recommendation import neumf_model
from official.recommendation import constants as rconst
from official.utils.logs import logger
from official.utils.logs import mlperf_helper
from official.utils.misc import keras_utils
from official.utils.misc import model_helpers
......@@ -161,6 +162,7 @@ def run_ncf(_):
FLAGS.eval_batch_size = FLAGS.batch_size
params = ncf_common.parse_flags(FLAGS)
batch_size = params["batch_size"]
# ncf_common rounds eval_batch_size (this is needed due to a reshape during
# eval). This carries over that rounding to batch_size as well.
......@@ -176,6 +178,8 @@ def run_ncf(_):
keras_model = _get_keras_model(params)
optimizer = ncf_common.get_optimizer(params)
time_callback = keras_utils.TimeHistory(batch_size, FLAGS.log_steps)
keras_model.compile(
loss=_keras_loss,
metrics=[_get_metric_fn(params)],
......@@ -184,10 +188,12 @@ def run_ncf(_):
train_input_dataset, eval_input_dataset = _get_train_and_eval_data(
producer, params)
keras_model.fit(
history = keras_model.fit(
train_input_dataset,
epochs=FLAGS.train_epochs,
callbacks=[IncrementEpochCallback(producer)],
callbacks=[
IncrementEpochCallback(producer),
time_callback],
verbose=2)
tf.logging.info("Training done. Start evaluating")
......@@ -199,7 +205,42 @@ def run_ncf(_):
tf.logging.info("Keras evaluation is done.")
return eval_results
stats = build_stats(history, eval_results, time_callback)
return stats
def build_stats(history, eval_result, time_callback):
"""Normalizes and returns dictionary of stats.
Args:
history: Results of the training step. Supports both categorical_accuracy
and sparse_categorical_accuracy.
eval_output: Output of the eval step. Assumes first value is eval_loss and
second value is accuracy_top_1.
time_callback: Time tracking callback likely used during keras.fit.
Returns:
Dictionary of normalized results.
"""
stats = {}
if history and history.history:
train_history = history.history
stats['loss'] = train_history['loss'][-1]
if eval_result:
stats['eval_loss'] = eval_result[0]
stats['eval_hit_rate'] = eval_result[1]
if time_callback:
timestamp_log = time_callback.timestamp_log
stats['step_timestamp_log'] = timestamp_log
stats['train_finish_time'] = time_callback.train_finish_time
if len(timestamp_log) > 1:
stats['avg_exp_per_second'] = (
time_callback.batch_size * time_callback.log_steps *
(len(time_callback.timestamp_log)-1) /
(timestamp_log[-1].timestamp - timestamp_log[0].timestamp))
return stats
def main(_):
......
......@@ -204,14 +204,16 @@ class NcfTest(tf.test.TestCase):
integration.run_synthetic(
ncf_keras_main.main, tmp_root=self.get_temp_dir(), max_train=None,
extra_flags=self._BASE_END_TO_END_FLAGS +
['-distribution_strategy', 'off'])
['-distribution_strategy', 'off', '-log_steps', '100'])
@mock.patch.object(rconst, "SYNTHETIC_BATCHES_PER_EPOCH", 100)
def test_end_to_end_keras_mlperf(self):
integration.run_synthetic(
ncf_keras_main.main, tmp_root=self.get_temp_dir(), max_train=None,
extra_flags=self._BASE_END_TO_END_FLAGS +
['-ml_perf', 'True', '-distribution_strategy', 'off'])
['-ml_perf', 'True',
'-distribution_strategy', 'off',
'-log_steps', '100'])
if __name__ == "__main__":
......
......@@ -20,7 +20,6 @@ from __future__ import print_function
import multiprocessing
import os
import time
import numpy as np
......@@ -28,6 +27,8 @@ import numpy as np
from absl import flags
import tensorflow as tf
from official.utils.misc import keras_utils
# pylint: disable=ungrouped-imports
from tensorflow.core.protobuf import rewriter_config_pb2
from tensorflow.python.keras.optimizer_v2 import (gradient_descent as
gradient_descent_v2)
......@@ -37,60 +38,6 @@ BASE_LEARNING_RATE = 0.1 # This matches Jing's version.
TRAIN_TOP_1 = 'training_accuracy_top_1'
class BatchTimestamp(object):
"""A structure to store batch time stamp."""
def __init__(self, batch_index, timestamp):
self.batch_index = batch_index
self.timestamp = timestamp
class TimeHistory(tf.keras.callbacks.Callback):
"""Callback for Keras models."""
def __init__(self, batch_size, log_steps):
"""Callback for logging performance (# image/second).
Args:
batch_size: Total batch size.
log_steps: Interval of time history logs.
"""
self.batch_size = batch_size
super(TimeHistory, self).__init__()
self.log_steps = log_steps
# Logs start of step 0 then end of each step based on log_steps interval.
self.timestamp_log = []
def on_train_begin(self, logs=None):
self.record_batch = True
def on_train_end(self, logs=None):
self.train_finish_time = time.time()
def on_batch_begin(self, batch, logs=None):
if self.record_batch:
timestamp = time.time()
self.start_time = timestamp
self.record_batch = False
if batch == 0:
self.timestamp_log.append(BatchTimestamp(batch, timestamp))
def on_batch_end(self, batch, logs=None):
if batch % self.log_steps == 0:
timestamp = time.time()
elapsed_time = timestamp - self.start_time
examples_per_second = (self.batch_size * self.log_steps) / elapsed_time
if batch != 0:
self.record_batch = True
self.timestamp_log.append(BatchTimestamp(batch, timestamp))
tf.compat.v1.logging.info(
"BenchmarkMetric: {'num_batches':%d, 'time_taken': %f,"
"'images_per_second': %f}" %
(batch, elapsed_time, examples_per_second))
class LearningRateBatchScheduler(tf.keras.callbacks.Callback):
"""Callback to update learning rate on every batch (not epoch boundaries).
......@@ -195,7 +142,7 @@ def get_optimizer():
def get_callbacks(learning_rate_schedule_fn, num_images):
"""Returns common callbacks."""
time_callback = TimeHistory(FLAGS.batch_size, FLAGS.log_steps)
time_callback = keras_utils.TimeHistory(FLAGS.batch_size, FLAGS.log_steps)
tensorboard_callback = tf.keras.callbacks.TensorBoard(
log_dir=FLAGS.model_dir)
......@@ -251,6 +198,7 @@ def build_stats(history, eval_output, time_callback):
def define_keras_flags():
"""Define flags for Keras models."""
flags.DEFINE_boolean(name='enable_eager', default=False, help='Enable eager?')
flags.DEFINE_boolean(name='skip_eval', default=False, help='Skip evaluation?')
flags.DEFINE_boolean(name='use_trivial_model', default=False,
......@@ -267,11 +215,6 @@ def define_keras_flags():
help='The number of steps to run for training. If it is larger than '
'# batches per epoch, then use # batches per epoch. When this flag is '
'set, only one epoch is going to run for training.')
flags.DEFINE_integer(
name='log_steps', default=100,
help='For every log_steps, we log the timing information such as '
'examples per second. Besides, for every log_steps, we store the '
'timestamp of a batch end.')
def get_synth_input_fn(height, width, num_channels, num_classes,
......
......@@ -52,6 +52,12 @@ def define_benchmark(benchmark_log_dir=True, bigquery_uploader=True):
"human consumption, and does not have any impact within "
"the system."))
flags.DEFINE_integer(
name='log_steps', default=100,
help='For every log_steps, we log the timing information such as '
'examples per second. Besides, for every log_steps, we store the '
'timestamp of a batch end.')
if benchmark_log_dir:
flags.DEFINE_string(
name="benchmark_log_dir", short_name="bld", default=None,
......
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Helper functions for the Keras implementations of models."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import time
from absl import flags
import tensorflow as tf
class BatchTimestamp(object):
"""A structure to store batch time stamp."""
def __init__(self, batch_index, timestamp):
self.batch_index = batch_index
self.timestamp = timestamp
class TimeHistory(tf.keras.callbacks.Callback):
"""Callback for Keras models."""
def __init__(self, batch_size, log_steps):
"""Callback for logging performance (# examples/second).
k
Args:
batch_size: Total batch size.
log_steps: Interval of time history logs.
"""
self.batch_size = batch_size
super(TimeHistory, self).__init__()
self.log_steps = log_steps
# Logs start of step 0 then end of each step based on log_steps interval.
self.timestamp_log = []
def on_train_begin(self, logs=None):
self.record_batch = True
def on_train_end(self, logs=None):
self.train_finish_time = time.time()
def on_batch_begin(self, batch, logs=None):
if self.record_batch:
timestamp = time.time()
self.start_time = timestamp
self.record_batch = False
if batch == 0:
self.timestamp_log.append(BatchTimestamp(batch, timestamp))
def on_batch_end(self, batch, logs=None):
if batch % self.log_steps == 0:
timestamp = time.time()
elapsed_time = timestamp - self.start_time
examples_per_second = (self.batch_size * self.log_steps) / elapsed_time
if batch != 0:
self.record_batch = True
self.timestamp_log.append(BatchTimestamp(batch, timestamp))
tf.compat.v1.logging.info(
"BenchmarkMetric: {'num_batches':%d, 'time_taken': %f,"
"'examples_per_second': %f}" %
(batch, elapsed_time, examples_per_second))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment