Commit cb913691 authored by Reed Wanderman-Milne's avatar Reed Wanderman-Milne Committed by A. Unique TensorFlower
Browse files

Add support for the tf.keras.mixed_precision API in NCF

To test, I did 50 fp32 runs and 50 fp16 runs. I used the following command:

python ncf_keras_main.py --dataset=ml-20m --num_gpus=1 --train_epochs=10 --clean --batch_size=99000 --learning_rate=0.00382059 --beta1=0.783529 --beta2=0.909003 --epsilon=1.45439e-7 --layers=256,256,128,64 --num_factors=64 --hr_threshold=0.635 --ml_perf --nouse_synthetic_data --data_dir ~/ncf_data_dir_python3 --model_dir ~/tmp_model_dir --keras_use_ctl

For the fp16 runs, I added --dtype=fp16. The average hit-rate for both fp16 and fp32 was 0.6365. I also did 50 runs with the mixed precision graph rewrite, and the average hit-rate was 0.6363. The difference is likely due to noise.

PiperOrigin-RevId: 275059871
parent e97e22df
...@@ -152,13 +152,9 @@ def define_ncf_flags(): ...@@ -152,13 +152,9 @@ def define_ncf_flags():
run_eagerly=True, stop_threshold=True, num_gpu=True, run_eagerly=True, stop_threshold=True, num_gpu=True,
hooks=True, distribution_strategy=True) hooks=True, distribution_strategy=True)
flags_core.define_performance( flags_core.define_performance(
num_parallel_calls=False,
inter_op=False,
intra_op=False,
synthetic_data=True, synthetic_data=True,
max_train_steps=False,
dtype=True, dtype=True,
all_reduce_alg=False, fp16_implementation=True,
loss_scale=True, loss_scale=True,
dynamic_loss_scale=True, dynamic_loss_scale=True,
enable_xla=True, enable_xla=True,
......
...@@ -282,6 +282,25 @@ class NCFKerasAccuracy(NCFKerasBenchmarkBase): ...@@ -282,6 +282,25 @@ class NCFKerasAccuracy(NCFKerasBenchmarkBase):
FLAGS.loss_scale = 8192 FLAGS.loss_scale = 8192
self._run_and_report_benchmark_mlperf_like() self._run_and_report_benchmark_mlperf_like()
def benchmark_1_gpu_ctl_fp16_graph_rewrite_mlperf_like(self):
"""1 GPU using CTL and FP16 graph rewrite."""
self._setup()
FLAGS.keras_use_ctl = True
FLAGS.train_epochs = 7
FLAGS.dtype = 'fp16'
FLAGS.fp16_implementation = 'graph_rewrite'
FLAGS.loss_scale = 8192
self._run_and_report_benchmark_mlperf_like()
def benchmark_1_gpu_fp16_graph_rewrite_mlperf_like(self):
"""1 GPU using FP16 graph rewrite."""
self._setup()
FLAGS.train_epochs = 7
FLAGS.dtype = 'fp16'
FLAGS.fp16_implementation = 'graph_rewrite'
FLAGS.loss_scale = 8192
self._run_and_report_benchmark_mlperf_like()
def benchmark_1_gpu_ctl_run_eagerly_mlperf_like(self): def benchmark_1_gpu_ctl_run_eagerly_mlperf_like(self):
"""1 GPU using CTL with eager and distribution strategy.""" """1 GPU using CTL with eager and distribution strategy."""
self._setup() self._setup()
...@@ -412,6 +431,30 @@ class NCFKerasAccuracy(NCFKerasBenchmarkBase): ...@@ -412,6 +431,30 @@ class NCFKerasAccuracy(NCFKerasBenchmarkBase):
FLAGS.input_meta_data_path = os.path.join(NCF_TF_DATA_1M_BATCH_DIR_NAME, "meta_data.json") FLAGS.input_meta_data_path = os.path.join(NCF_TF_DATA_1M_BATCH_DIR_NAME, "meta_data.json")
self._run_and_report_benchmark_mlperf_like() self._run_and_report_benchmark_mlperf_like()
def benchmark_8_gpu_tf_data_ctl_fp16_graph_rewrite_mlperf_like(self):
"""8 GPU FP16 graph rewrite using CTL."""
self._setup()
FLAGS.keras_use_ctl = True
FLAGS.num_gpus = 8
FLAGS.train_epochs = 17
FLAGS.batch_size = 1048576
FLAGS.eval_batch_size = 1048000
FLAGS.learning_rate = 0.0045
FLAGS.beta1 = 0.25
FLAGS.beta2 = 0.5
FLAGS.epsilon = 1e-8
FLAGS.dtype = 'fp16'
FLAGS.fp16_implementation = 'graph_rewrite'
FLAGS.loss_scale = 8192
FLAGS.train_dataset_path = os.path.join(NCF_TF_DATA_1M_BATCH_DIR_NAME,
'training_cycle_*/*')
FLAGS.eval_dataset_path = os.path.join(NCF_TF_DATA_1M_BATCH_DIR_NAME,
'eval_data/*')
FLAGS.input_meta_data_path = os.path.join(NCF_TF_DATA_1M_BATCH_DIR_NAME,
'meta_data.json')
self._run_and_report_benchmark_mlperf_like()
class NCFKerasSynth(NCFKerasBenchmarkBase): class NCFKerasSynth(NCFKerasBenchmarkBase):
"""Benchmark NCF model using synthetic data.""" """Benchmark NCF model using synthetic data."""
......
...@@ -85,7 +85,8 @@ class LossLayer(tf.keras.layers.Layer): ...@@ -85,7 +85,8 @@ class LossLayer(tf.keras.layers.Layer):
"""Pass-through loss layer for NCF model.""" """Pass-through loss layer for NCF model."""
def __init__(self, loss_normalization_factor): def __init__(self, loss_normalization_factor):
super(LossLayer, self).__init__() # The loss may overflow in float16, so we use float32 instead.
super(LossLayer, self).__init__(dtype="float32")
self.loss_normalization_factor = loss_normalization_factor self.loss_normalization_factor = loss_normalization_factor
self.loss = tf.keras.losses.SparseCategoricalCrossentropy( self.loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True, reduction="sum") from_logits=True, reduction="sum")
...@@ -208,6 +209,12 @@ def run_ncf(_): ...@@ -208,6 +209,12 @@ def run_ncf(_):
params = ncf_common.parse_flags(FLAGS) params = ncf_common.parse_flags(FLAGS)
model_helpers.apply_clean(flags.FLAGS) model_helpers.apply_clean(flags.FLAGS)
if FLAGS.dtype == "fp16" and FLAGS.fp16_implementation == "keras":
policy = tf.keras.mixed_precision.experimental.Policy(
"mixed_float16",
loss_scale=flags_core.get_loss_scale(FLAGS, default_for_fp16="dynamic"))
tf.keras.mixed_precision.experimental.set_policy(policy)
strategy = distribution_utils.get_distribution_strategy( strategy = distribution_utils.get_distribution_strategy(
distribution_strategy=FLAGS.distribution_strategy, distribution_strategy=FLAGS.distribution_strategy,
num_gpus=FLAGS.num_gpus, num_gpus=FLAGS.num_gpus,
...@@ -266,12 +273,18 @@ def run_ncf(_): ...@@ -266,12 +273,18 @@ def run_ncf(_):
beta_1=params["beta1"], beta_1=params["beta1"],
beta_2=params["beta2"], beta_2=params["beta2"],
epsilon=params["epsilon"]) epsilon=params["epsilon"])
if FLAGS.dtype == "fp16": if FLAGS.fp16_implementation == "graph_rewrite":
optimizer = \ optimizer = \
tf.compat.v1.train.experimental.enable_mixed_precision_graph_rewrite( tf.compat.v1.train.experimental.enable_mixed_precision_graph_rewrite(
optimizer, optimizer,
loss_scale=flags_core.get_loss_scale(FLAGS, loss_scale=flags_core.get_loss_scale(FLAGS,
default_for_fp16="dynamic")) default_for_fp16="dynamic"))
elif FLAGS.dtype == "fp16" and params["keras_use_ctl"]:
# When keras_use_ctl is False, instead Model.fit() automatically applies
# loss scaling so we don't need to create a LossScaleOptimizer.
optimizer = tf.keras.mixed_precision.experimental.LossScaleOptimizer(
optimizer,
tf.keras.mixed_precision.experimental.global_policy().loss_scale)
if params["keras_use_ctl"]: if params["keras_use_ctl"]:
train_loss, eval_results = run_ncf_custom_training( train_loss, eval_results = run_ncf_custom_training(
...@@ -370,6 +383,8 @@ def run_ncf_custom_training(params, ...@@ -370,6 +383,8 @@ def run_ncf_custom_training(params,
"""Computes loss and applied gradient per replica.""" """Computes loss and applied gradient per replica."""
with tf.GradientTape() as tape: with tf.GradientTape() as tape:
softmax_logits = keras_model(features) softmax_logits = keras_model(features)
# The loss can overflow in float16, so we cast to float32.
softmax_logits = tf.cast(softmax_logits, "float32")
labels = features[rconst.TRAIN_LABEL_KEY] labels = features[rconst.TRAIN_LABEL_KEY]
loss = loss_object( loss = loss_object(
labels, labels,
......
...@@ -231,7 +231,7 @@ class NcfTest(tf.test.TestCase): ...@@ -231,7 +231,7 @@ class NcfTest(tf.test.TestCase):
@mock.patch.object(rconst, "SYNTHETIC_BATCHES_PER_EPOCH", 100) @mock.patch.object(rconst, "SYNTHETIC_BATCHES_PER_EPOCH", 100)
@unittest.skipUnless(keras_utils.is_v2_0(), 'TF 2.0 only test.') @unittest.skipUnless(keras_utils.is_v2_0(), 'TF 2.0 only test.')
def test_end_to_end_keras_1_gpu_dist_strat(self): def test_end_to_end_keras_1_gpu_dist_strat_fp16(self):
if context.num_gpus() < 1: if context.num_gpus() < 1:
self.skipTest( self.skipTest(
"{} GPUs are not available for this test. {} GPUs are available". "{} GPUs are not available for this test. {} GPUs are available".
...@@ -239,11 +239,26 @@ class NcfTest(tf.test.TestCase): ...@@ -239,11 +239,26 @@ class NcfTest(tf.test.TestCase):
integration.run_synthetic( integration.run_synthetic(
ncf_keras_main.main, tmp_root=self.get_temp_dir(), ncf_keras_main.main, tmp_root=self.get_temp_dir(),
extra_flags=self._BASE_END_TO_END_FLAGS + ['-num_gpus', '1']) extra_flags=self._BASE_END_TO_END_FLAGS + ['-num_gpus', '1',
'--dtype', 'fp16'])
@mock.patch.object(rconst, "SYNTHETIC_BATCHES_PER_EPOCH", 100) @mock.patch.object(rconst, "SYNTHETIC_BATCHES_PER_EPOCH", 100)
@unittest.skipUnless(keras_utils.is_v2_0(), 'TF 2.0 only test.') @unittest.skipUnless(keras_utils.is_v2_0(), 'TF 2.0 only test.')
def test_end_to_end_keras_2_gpu(self): def test_end_to_end_keras_1_gpu_dist_strat_ctl_fp16(self):
if context.num_gpus() < 1:
self.skipTest(
'{} GPUs are not available for this test. {} GPUs are available'.
format(1, context.num_gpus()))
integration.run_synthetic(
ncf_keras_main.main, tmp_root=self.get_temp_dir(),
extra_flags=self._BASE_END_TO_END_FLAGS + ['-num_gpus', '1',
'--dtype', 'fp16',
'--keras_use_ctl'])
@mock.patch.object(rconst, 'SYNTHETIC_BATCHES_PER_EPOCH', 100)
@unittest.skipUnless(keras_utils.is_v2_0(), 'TF 2.0 only test.')
def test_end_to_end_keras_2_gpu_fp16(self):
if context.num_gpus() < 2: if context.num_gpus() < 2:
self.skipTest( self.skipTest(
"{} GPUs are not available for this test. {} GPUs are available". "{} GPUs are not available for this test. {} GPUs are available".
...@@ -251,7 +266,8 @@ class NcfTest(tf.test.TestCase): ...@@ -251,7 +266,8 @@ class NcfTest(tf.test.TestCase):
integration.run_synthetic( integration.run_synthetic(
ncf_keras_main.main, tmp_root=self.get_temp_dir(), ncf_keras_main.main, tmp_root=self.get_temp_dir(),
extra_flags=self._BASE_END_TO_END_FLAGS + ['-num_gpus', '2']) extra_flags=self._BASE_END_TO_END_FLAGS + ['-num_gpus', '2',
'--dtype', 'fp16'])
if __name__ == "__main__": if __name__ == "__main__":
tf.test.main() tf.test.main()
...@@ -427,7 +427,7 @@ def compute_top_k_and_ndcg(logits, # type: tf.Tensor ...@@ -427,7 +427,7 @@ def compute_top_k_and_ndcg(logits, # type: tf.Tensor
logits_by_user = tf.reshape(logits, (-1, rconst.NUM_EVAL_NEGATIVES + 1)) logits_by_user = tf.reshape(logits, (-1, rconst.NUM_EVAL_NEGATIVES + 1))
duplicate_mask_by_user = tf.cast( duplicate_mask_by_user = tf.cast(
tf.reshape(duplicate_mask, (-1, rconst.NUM_EVAL_NEGATIVES + 1)), tf.reshape(duplicate_mask, (-1, rconst.NUM_EVAL_NEGATIVES + 1)),
tf.float32) logits_by_user.dtype)
if match_mlperf: if match_mlperf:
# Set duplicate logits to the min value for that dtype. The MLPerf # Set duplicate logits to the min value for that dtype. The MLPerf
......
...@@ -191,16 +191,15 @@ def define_performance(num_parallel_calls=False, inter_op=False, intra_op=False, ...@@ -191,16 +191,15 @@ def define_performance(num_parallel_calls=False, inter_op=False, intra_op=False,
return loss_scale > 0 return loss_scale > 0
if fp16_implementation: if fp16_implementation:
# Currently, this flag is only defined for the estimator resnet and transformer models.
flags.DEFINE_enum( flags.DEFINE_enum(
name="fp16_implementation", default="casting", name="fp16_implementation", default="keras",
enum_values=("casting', 'graph_rewrite"), enum_values=("keras', 'graph_rewrite"),
help=help_wrap( help=help_wrap(
"When --dtype=fp16, how fp16 should be implemented. This has no " "When --dtype=fp16, how fp16 should be implemented. This has no "
"impact on correctness. 'casting' will cause manual tf.casts to " "impact on correctness. 'keras' uses the "
"be inserted in the model. 'graph_rewrite' means " "tf.keras.mixed_precision API. 'graph_rewrite' uses the "
"tf.train.experimental.enable_mixed_precision_graph_rewrite will " "tf.train.experimental.enable_mixed_precision_graph_rewrite "
"be used to automatically use fp16 without any manual casts.")) "API."))
@flags.multi_flags_validator(["fp16_implementation", "dtype", @flags.multi_flags_validator(["fp16_implementation", "dtype",
"loss_scale"]) "loss_scale"])
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment