Commit 85cfe94d authored by Tianqi Liu's avatar Tianqi Liu Committed by A. Unique TensorFlower
Browse files

Internal cleanup.

PiperOrigin-RevId: 316734574
parent c64cb01b
...@@ -111,6 +111,7 @@ def run_customized_training_loop( ...@@ -111,6 +111,7 @@ def run_customized_training_loop(
model_dir=None, model_dir=None,
train_input_fn=None, train_input_fn=None,
steps_per_epoch=None, steps_per_epoch=None,
num_eval_per_epoch=1,
steps_per_loop=None, steps_per_loop=None,
epochs=1, epochs=1,
eval_input_fn=None, eval_input_fn=None,
...@@ -144,6 +145,7 @@ def run_customized_training_loop( ...@@ -144,6 +145,7 @@ def run_customized_training_loop(
steps_per_epoch: Number of steps to run per epoch. At the end of each steps_per_epoch: Number of steps to run per epoch. At the end of each
epoch, model checkpoint will be saved and evaluation will be conducted epoch, model checkpoint will be saved and evaluation will be conducted
if evaluation dataset is provided. if evaluation dataset is provided.
num_eval_per_epoch: Number of evaluations per epoch.
steps_per_loop: Number of steps per graph-mode loop. In order to reduce steps_per_loop: Number of steps per graph-mode loop. In order to reduce
communication in eager context, training logs are printed every communication in eager context, training logs are printed every
steps_per_loop. steps_per_loop.
...@@ -166,8 +168,8 @@ def run_customized_training_loop( ...@@ -166,8 +168,8 @@ def run_customized_training_loop(
sub_model_export_name: If not None, will export `sub_model` returned by sub_model_export_name: If not None, will export `sub_model` returned by
`model_fn` into checkpoint files. The name of intermediate checkpoint `model_fn` into checkpoint files. The name of intermediate checkpoint
file is {sub_model_export_name}_step_{step}.ckpt and the last file is {sub_model_export_name}_step_{step}.ckpt and the last
checkpint's name is {sub_model_export_name}.ckpt; checkpint's name is {sub_model_export_name}.ckpt; if None, `sub_model`
if None, `sub_model` will not be exported as checkpoint. will not be exported as checkpoint.
explicit_allreduce: Whether to explicitly perform gradient allreduce, explicit_allreduce: Whether to explicitly perform gradient allreduce,
instead of relying on implicit allreduce in optimizer.apply_gradients(). instead of relying on implicit allreduce in optimizer.apply_gradients().
default is False. For now, if training using FP16 mixed precision, default is False. For now, if training using FP16 mixed precision,
...@@ -177,10 +179,10 @@ def run_customized_training_loop( ...@@ -177,10 +179,10 @@ def run_customized_training_loop(
pre_allreduce_callbacks: A list of callback functions that takes gradients pre_allreduce_callbacks: A list of callback functions that takes gradients
and model variables pairs as input, manipulate them, and returns a new and model variables pairs as input, manipulate them, and returns a new
gradients and model variables paris. The callback functions will be gradients and model variables paris. The callback functions will be
invoked in the list order and before gradients are allreduced. invoked in the list order and before gradients are allreduced. With
With mixed precision training, the pre_allreduce_allbacks will be mixed precision training, the pre_allreduce_allbacks will be applied on
applied on scaled_gradients. Default is no callbacks. scaled_gradients. Default is no callbacks. Only used when
Only used when explicit_allreduce=True. explicit_allreduce=True.
post_allreduce_callbacks: A list of callback functions that takes post_allreduce_callbacks: A list of callback functions that takes
gradients and model variables pairs as input, manipulate them, and gradients and model variables pairs as input, manipulate them, and
returns a new gradients and model variables paris. The callback returns a new gradients and model variables paris. The callback
...@@ -208,6 +210,8 @@ def run_customized_training_loop( ...@@ -208,6 +210,8 @@ def run_customized_training_loop(
required_arguments = [ required_arguments = [
strategy, model_fn, loss_fn, model_dir, steps_per_epoch, train_input_fn strategy, model_fn, loss_fn, model_dir, steps_per_epoch, train_input_fn
] ]
steps_between_evals = int(steps_per_epoch / num_eval_per_epoch)
if [arg for arg in required_arguments if arg is None]: if [arg for arg in required_arguments if arg is None]:
raise ValueError('`strategy`, `model_fn`, `loss_fn`, `model_dir`, ' raise ValueError('`strategy`, `model_fn`, `loss_fn`, `model_dir`, '
'`steps_per_epoch` and `train_input_fn` are required ' '`steps_per_epoch` and `train_input_fn` are required '
...@@ -216,17 +220,17 @@ def run_customized_training_loop( ...@@ -216,17 +220,17 @@ def run_customized_training_loop(
if tf.config.list_logical_devices('TPU'): if tf.config.list_logical_devices('TPU'):
# One can't fully utilize a TPU with steps_per_loop=1, so in this case # One can't fully utilize a TPU with steps_per_loop=1, so in this case
# default users to a more useful value. # default users to a more useful value.
steps_per_loop = min(1000, steps_per_epoch) steps_per_loop = min(1000, steps_between_evals)
else: else:
steps_per_loop = 1 steps_per_loop = 1
logging.info('steps_per_loop not specified. Using steps_per_loop=%d', logging.info('steps_per_loop not specified. Using steps_per_loop=%d',
steps_per_loop) steps_per_loop)
if steps_per_loop > steps_per_epoch: if steps_per_loop > steps_between_evals:
logging.warning( logging.warning(
'steps_per_loop: %d is specified to be greater than ' 'steps_per_loop: %d is specified to be greater than '
' steps_per_epoch: %d, we will use steps_per_epoch as' ' steps_between_evals: %d, we will use steps_between_evals as'
' steps_per_loop.', steps_per_loop, steps_per_epoch) ' steps_per_loop.', steps_per_loop, steps_between_evals)
steps_per_loop = steps_per_epoch steps_per_loop = steps_between_evals
assert tf.executing_eagerly() assert tf.executing_eagerly()
if run_eagerly: if run_eagerly:
...@@ -246,8 +250,7 @@ def run_customized_training_loop( ...@@ -246,8 +250,7 @@ def run_customized_training_loop(
total_training_steps = steps_per_epoch * epochs total_training_steps = steps_per_epoch * epochs
train_iterator = _get_input_iterator(train_input_fn, strategy) train_iterator = _get_input_iterator(train_input_fn, strategy)
eval_loss_metric = tf.keras.metrics.Mean( eval_loss_metric = tf.keras.metrics.Mean('training_loss', dtype=tf.float32)
'training_loss', dtype=tf.float32)
with distribution_utils.get_strategy_scope(strategy): with distribution_utils.get_strategy_scope(strategy):
# To correctly place the model weights on accelerators, # To correctly place the model weights on accelerators,
...@@ -270,8 +273,7 @@ def run_customized_training_loop( ...@@ -270,8 +273,7 @@ def run_customized_training_loop(
checkpoint.restore(init_checkpoint).assert_existing_objects_matched() checkpoint.restore(init_checkpoint).assert_existing_objects_matched()
logging.info('Loading from checkpoint file completed') logging.info('Loading from checkpoint file completed')
train_loss_metric = tf.keras.metrics.Mean( train_loss_metric = tf.keras.metrics.Mean('training_loss', dtype=tf.float32)
'training_loss', dtype=tf.float32)
eval_metrics = [metric_fn()] if metric_fn else [] eval_metrics = [metric_fn()] if metric_fn else []
# If evaluation is required, make a copy of metric as it will be used by # If evaluation is required, make a copy of metric as it will be used by
# both train and evaluation. # both train and evaluation.
...@@ -440,18 +442,19 @@ def run_customized_training_loop( ...@@ -440,18 +442,19 @@ def run_customized_training_loop(
latest_checkpoint_file = tf.train.latest_checkpoint(model_dir) latest_checkpoint_file = tf.train.latest_checkpoint(model_dir)
if latest_checkpoint_file: if latest_checkpoint_file:
logging.info( logging.info('Checkpoint file %s found and restoring from '
'Checkpoint file %s found and restoring from ' 'checkpoint', latest_checkpoint_file)
'checkpoint', latest_checkpoint_file)
checkpoint.restore(latest_checkpoint_file) checkpoint.restore(latest_checkpoint_file)
logging.info('Loading from checkpoint file completed') logging.info('Loading from checkpoint file completed')
current_step = optimizer.iterations.numpy() current_step = optimizer.iterations.numpy()
checkpoint_name = 'ctl_step_{step}.ckpt' checkpoint_name = 'ctl_step_{step}.ckpt'
logs = {}
while current_step < total_training_steps: while current_step < total_training_steps:
if current_step % steps_per_epoch == 0: if current_step % steps_per_epoch == 0:
callback_list.on_epoch_begin(int(current_step / steps_per_epoch) + 1) callback_list.on_epoch_begin(
int(current_step / steps_per_epoch) + 1)
# Training loss/metric are taking average over steps inside micro # Training loss/metric are taking average over steps inside micro
# training loop. We reset the their values before each round. # training loop. We reset the their values before each round.
...@@ -461,7 +464,7 @@ def run_customized_training_loop( ...@@ -461,7 +464,7 @@ def run_customized_training_loop(
callback_list.on_batch_begin(current_step) callback_list.on_batch_begin(current_step)
# Runs several steps in the host while loop. # Runs several steps in the host while loop.
steps = steps_to_run(current_step, steps_per_epoch, steps_per_loop) steps = steps_to_run(current_step, steps_between_evals, steps_per_loop)
if tf.config.list_physical_devices('GPU'): if tf.config.list_physical_devices('GPU'):
# TODO(zongweiz): merge with train_steps once tf.while_loop # TODO(zongweiz): merge with train_steps once tf.while_loop
...@@ -470,11 +473,9 @@ def run_customized_training_loop( ...@@ -470,11 +473,9 @@ def run_customized_training_loop(
train_single_step(train_iterator) train_single_step(train_iterator)
else: else:
# Converts steps to a Tensor to avoid tf.function retracing. # Converts steps to a Tensor to avoid tf.function retracing.
train_steps(train_iterator, train_steps(train_iterator, tf.convert_to_tensor(steps, dtype=tf.int32))
tf.convert_to_tensor(steps, dtype=tf.int32))
train_loss = _float_metric_value(train_loss_metric) train_loss = _float_metric_value(train_loss_metric)
current_step += steps current_step += steps
callback_list.on_batch_end(current_step - 1, {'loss': train_loss})
# Updates training logging. # Updates training logging.
training_status = 'Train Step: %d/%d / loss = %s' % ( training_status = 'Train Step: %d/%d / loss = %s' % (
...@@ -492,8 +493,7 @@ def run_customized_training_loop( ...@@ -492,8 +493,7 @@ def run_customized_training_loop(
'learning_rate', 'learning_rate',
optimizer.learning_rate(current_step), optimizer.learning_rate(current_step),
step=current_step) step=current_step)
tf.summary.scalar( tf.summary.scalar(train_loss_metric.name, train_loss, step=current_step)
train_loss_metric.name, train_loss, step=current_step)
for metric in train_metrics + model.metrics: for metric in train_metrics + model.metrics:
metric_value = _float_metric_value(metric) metric_value = _float_metric_value(metric)
training_status += ' %s = %f' % (metric.name, metric_value) training_status += ' %s = %f' % (metric.name, metric_value)
...@@ -501,7 +501,11 @@ def run_customized_training_loop( ...@@ -501,7 +501,11 @@ def run_customized_training_loop(
summary_writer.flush() summary_writer.flush()
logging.info(training_status) logging.info(training_status)
if current_step % steps_per_epoch == 0: # If no need for evaluation, we only call on_batch_end with train_loss,
# this is to ensure we get granular global_step/sec on Tensorboard.
if current_step % steps_between_evals:
callback_list.on_batch_end(current_step - 1, {'loss': train_loss})
else:
# Save a submodel with the step in the file name after each epoch. # Save a submodel with the step in the file name after each epoch.
if sub_model_export_name: if sub_model_export_name:
_save_checkpoint( _save_checkpoint(
...@@ -514,7 +518,6 @@ def run_customized_training_loop( ...@@ -514,7 +518,6 @@ def run_customized_training_loop(
if current_step < total_training_steps: if current_step < total_training_steps:
_save_checkpoint(strategy, checkpoint, model_dir, _save_checkpoint(strategy, checkpoint, model_dir,
checkpoint_name.format(step=current_step)) checkpoint_name.format(step=current_step))
logs = None
if eval_input_fn: if eval_input_fn:
logging.info('Running evaluation after step: %s.', current_step) logging.info('Running evaluation after step: %s.', current_step)
logs = _run_evaluation(current_step, logs = _run_evaluation(current_step,
...@@ -523,8 +526,15 @@ def run_customized_training_loop( ...@@ -523,8 +526,15 @@ def run_customized_training_loop(
eval_loss_metric.reset_states() eval_loss_metric.reset_states()
for metric in eval_metrics + model.metrics: for metric in eval_metrics + model.metrics:
metric.reset_states() metric.reset_states()
# We add train_loss here rather than call on_batch_end twice to make
# sure that no duplicated values are generated.
logs['loss'] = train_loss
callback_list.on_batch_end(current_step - 1, logs)
callback_list.on_epoch_end(int(current_step / steps_per_epoch), logs) # Calls on_epoch_end after each real epoch ends to prevent mis-calculation
# of training steps.
if current_step % steps_per_epoch == 0:
callback_list.on_epoch_end(int(current_step / steps_per_epoch), logs)
if sub_model_export_name: if sub_model_export_name:
_save_checkpoint(strategy, sub_model_checkpoint, model_dir, _save_checkpoint(strategy, sub_model_checkpoint, model_dir,
...@@ -532,14 +542,11 @@ def run_customized_training_loop( ...@@ -532,14 +542,11 @@ def run_customized_training_loop(
_save_checkpoint(strategy, checkpoint, model_dir, _save_checkpoint(strategy, checkpoint, model_dir,
checkpoint_name.format(step=current_step)) checkpoint_name.format(step=current_step))
logs = None
if eval_input_fn: if eval_input_fn:
logging.info('Running final evaluation after training is complete.') logging.info('Running final evaluation after training is complete.')
logs = _run_evaluation(current_step, logs = _run_evaluation(current_step,
_get_input_iterator(eval_input_fn, strategy)) _get_input_iterator(eval_input_fn, strategy))
callback_list.on_epoch_end(int(current_step / steps_per_epoch), logs) callback_list.on_epoch_end(int(current_step / steps_per_epoch), logs)
training_summary = { training_summary = {
'total_training_steps': total_training_steps, 'total_training_steps': total_training_steps,
'train_loss': _float_metric_value(train_loss_metric), 'train_loss': _float_metric_value(train_loss_metric),
......
...@@ -258,6 +258,7 @@ class ModelTrainingUtilsTest(tf.test.TestCase, parameterized.TestCase): ...@@ -258,6 +258,7 @@ class ModelTrainingUtilsTest(tf.test.TestCase, parameterized.TestCase):
loss_fn=tf.keras.losses.categorical_crossentropy, loss_fn=tf.keras.losses.categorical_crossentropy,
model_dir=model_dir, model_dir=model_dir,
steps_per_epoch=20, steps_per_epoch=20,
num_eval_per_epoch=4,
steps_per_loop=10, steps_per_loop=10,
epochs=2, epochs=2,
train_input_fn=input_fn, train_input_fn=input_fn,
...@@ -269,14 +270,15 @@ class ModelTrainingUtilsTest(tf.test.TestCase, parameterized.TestCase): ...@@ -269,14 +270,15 @@ class ModelTrainingUtilsTest(tf.test.TestCase, parameterized.TestCase):
run_eagerly=False) run_eagerly=False)
self.assertEqual(callback.epoch_begin, [(1, {}), (2, {})]) self.assertEqual(callback.epoch_begin, [(1, {}), (2, {})])
epoch_ends, epoch_end_infos = zip(*callback.epoch_end) epoch_ends, epoch_end_infos = zip(*callback.epoch_end)
self.assertEqual(list(epoch_ends), [1, 2]) self.assertEqual(list(epoch_ends), [1, 2, 2])
for info in epoch_end_infos: for info in epoch_end_infos:
self.assertIn('accuracy', info) self.assertIn('accuracy', info)
self.assertEqual(callback.batch_begin, self.assertEqual(callback.batch_begin, [(0, {}), (5, {}), (10, {}),
[(0, {}), (10, {}), (20, {}), (30, {})]) (15, {}), (20, {}), (25, {}),
(30, {}), (35, {})])
batch_ends, batch_end_infos = zip(*callback.batch_end) batch_ends, batch_end_infos = zip(*callback.batch_end)
self.assertEqual(list(batch_ends), [9, 19, 29, 39]) self.assertEqual(list(batch_ends), [4, 9, 14, 19, 24, 29, 34, 39])
for info in batch_end_infos: for info in batch_end_infos:
self.assertIn('loss', info) self.assertIn('loss', info)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment