ctl_imagenet_main.py 13.4 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Runs a ResNet model on the ImageNet dataset using custom training loops."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from absl import app as absl_app
from absl import flags
from absl import logging
import tensorflow as tf

26
from official.resnet.ctl import ctl_common
Hongkun Yu's avatar
Hongkun Yu committed
27
28
29
from official.vision.image_classification import imagenet_preprocessing
from official.vision.image_classification import common
from official.vision.image_classification import resnet_model
30
31
32
33
from official.utils.flags import core as flags_core
from official.utils.logs import logger
from official.utils.misc import distribution_utils
from official.utils.misc import keras_utils
34
from official.utils.misc import model_helpers
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51


def build_stats(train_result, eval_result, time_callback):
  """Normalizes and returns dictionary of stats.

  Args:
    train_result: The final loss at training time.
    eval_result: Output of the eval step. Assumes first value is eval_loss and
      second value is accuracy_top_1.
    time_callback: Time tracking callback instance.

  Returns:
    Dictionary of normalized results.
  """
  stats = {}

  if eval_result:
52
53
    stats['eval_loss'] = eval_result[0]
    stats['eval_acc'] = eval_result[1]
54
55
56
57
58
59

    stats['train_loss'] = train_result[0]
    stats['train_acc'] = train_result[1]

  if time_callback:
    timestamp_log = time_callback.timestamp_log
60
61
    stats['step_timestamp_log'] = timestamp_log
    stats['train_finish_time'] = time_callback.train_finish_time
62
    if len(timestamp_log) > 1:
63
      stats['avg_exp_per_second'] = (
64
65
66
67
68
69
70
71
72
73
          time_callback.batch_size * time_callback.log_steps *
          (len(time_callback.timestamp_log) - 1) /
          (timestamp_log[-1].timestamp - timestamp_log[0].timestamp))

  return stats


def get_input_dataset(flags_obj, strategy):
  """Returns the test and train input datasets."""
  dtype = flags_core.get_tf_dtype(flags_obj)
Jing Li's avatar
Jing Li committed
74
75
76
77
78
79
80
81
82
83
84
85
86
87
  use_dataset_fn = isinstance(strategy, tf.distribute.experimental.TPUStrategy)
  batch_size = flags_obj.batch_size
  if use_dataset_fn:
    if batch_size % strategy.num_replicas_in_sync != 0:
      raise ValueError(
          'Batch size must be divisible by number of replicas : {}'.format(
              strategy.num_replicas_in_sync))

    # As auto rebatching is not supported in
    # `experimental_distribute_datasets_from_function()` API, which is
    # required when cloning dataset to multiple workers in eager mode,
    # we use per-replica batch size.
    batch_size = int(batch_size / strategy.num_replicas_in_sync)

88
  if flags_obj.use_synthetic_data:
Hongkun Yu's avatar
Hongkun Yu committed
89
    input_fn = common.get_synth_input_fn(
90
91
92
93
        height=imagenet_preprocessing.DEFAULT_IMAGE_SIZE,
        width=imagenet_preprocessing.DEFAULT_IMAGE_SIZE,
        num_channels=imagenet_preprocessing.NUM_CHANNELS,
        num_classes=imagenet_preprocessing.NUM_CLASSES,
94
95
96
        dtype=dtype,
        drop_remainder=True)
  else:
97
    input_fn = imagenet_preprocessing.input_fn
98

Jing Li's avatar
Jing Li committed
99
100
101
102
103
104
105
106
107
108
109
  def _train_dataset_fn(ctx=None):
    train_ds = input_fn(
        is_training=True,
        data_dir=flags_obj.data_dir,
        batch_size=batch_size,
        parse_record_fn=imagenet_preprocessing.parse_record,
        datasets_num_private_threads=flags_obj.datasets_num_private_threads,
        dtype=dtype,
        input_context=ctx,
        drop_remainder=True)
    return train_ds
110
111

  if strategy:
Jing Li's avatar
Jing Li committed
112
113
114
115
116
117
    if isinstance(strategy, tf.distribute.experimental.TPUStrategy):
      train_ds = strategy.experimental_distribute_datasets_from_function(_train_dataset_fn)
    else:
      train_ds = strategy.experimental_distribute_dataset(_train_dataset_fn())
  else:
    train_ds = _train_dataset_fn()
118

119
120
  test_ds = None
  if not flags_obj.skip_eval:
Jing Li's avatar
Jing Li committed
121
122
123
124
125
126
127
128
129
    def _test_data_fn(ctx=None):
      test_ds = input_fn(
          is_training=False,
          data_dir=flags_obj.data_dir,
          batch_size=batch_size,
          parse_record_fn=imagenet_preprocessing.parse_record,
          dtype=dtype,
          input_context=ctx)
      return test_ds
130

Jing Li's avatar
Jing Li committed
131
132
133
134
135
136
    if strategy:
      if isinstance(strategy, tf.distribute.experimental.TPUStrategy):
        test_ds = strategy.experimental_distribute_datasets_from_function(
            _test_data_fn)
      else:
        test_ds = strategy.experimental_distribute_dataset(_test_data_fn())
Jing Li's avatar
Jing Li committed
137
    else:
Jing Li's avatar
Jing Li committed
138
      test_ds = _test_data_fn()
139
140
141
142
143

  return train_ds, test_ds


def get_num_train_iterations(flags_obj):
Jing Li's avatar
Jing Li committed
144
  """Returns the number of training steps, train and test epochs."""
145
146
  train_steps = (
      imagenet_preprocessing.NUM_IMAGES['train'] // flags_obj.batch_size)
147
148
149
150
151
152
  train_epochs = flags_obj.train_epochs

  if flags_obj.train_steps:
    train_steps = min(flags_obj.train_steps, train_steps)
    train_epochs = 1

153
154
  eval_steps = (
      imagenet_preprocessing.NUM_IMAGES['validation'] // flags_obj.batch_size)
155
156
157
158

  return train_steps, train_epochs, eval_steps


Jing Li's avatar
Jing Li committed
159
160
161
162
163
164
165
166
167
def _steps_to_run(steps_in_current_epoch, steps_per_epoch, steps_per_loop):
  """Calculates steps to run on device."""
  if steps_per_loop <= 0:
    raise ValueError('steps_per_loop should be positive integer.')
  if steps_per_loop == 1:
    return steps_per_loop
  return min(steps_per_loop, steps_per_epoch - steps_in_current_epoch)


168
169
170
171
172
173
174
175
176
177
178
179
def run(flags_obj):
  """Run ResNet ImageNet training and eval loop using custom training loops.

  Args:
    flags_obj: An object containing parsed flag values.

  Raises:
    ValueError: If fp16 is passed as it is not currently supported.

  Returns:
    Dictionary of training and eval stats.
  """
Zongwei Zhou's avatar
Zongwei Zhou committed
180
181
182
183
  keras_utils.set_session_config(
      enable_eager=flags_obj.enable_eager,
      enable_xla=flags_obj.enable_xla)

184
185
186
187
188
189
190
191
192
193
194
195
  # TODO(anj-s): Set data_format without using Keras.
  data_format = flags_obj.data_format
  if data_format is None:
    data_format = ('channels_first'
                   if tf.test.is_built_with_cuda() else 'channels_last')
  tf.keras.backend.set_image_data_format(data_format)

  strategy = distribution_utils.get_distribution_strategy(
      distribution_strategy=flags_obj.distribution_strategy,
      num_gpus=flags_obj.num_gpus,
      num_workers=distribution_utils.configure_cluster(),
      all_reduce_alg=flags_obj.all_reduce_alg,
Jing Li's avatar
Jing Li committed
196
197
      num_packs=flags_obj.num_packs,
      tpu_address=flags_obj.tpu)
198
199

  train_ds, test_ds = get_input_dataset(flags_obj, strategy)
Jing Li's avatar
Jing Li committed
200
201
202
203
204
205
206
  per_epoch_steps, train_epochs, eval_steps = get_num_train_iterations(
      flags_obj)
  steps_per_loop = min(flags_obj.steps_per_loop, per_epoch_steps)
  logging.info("Training %d epochs, each epoch has %d steps, "
               "total steps: %d; Eval %d steps",
               train_epochs, per_epoch_steps, train_epochs * per_epoch_steps,
               eval_steps)
207
208
209
210

  time_callback = keras_utils.TimeHistory(flags_obj.batch_size,
                                          flags_obj.log_steps)

Jing Li's avatar
Jing Li committed
211
  with distribution_utils.get_strategy_scope(strategy):
212
213
    model = resnet_model.resnet50(
        num_classes=imagenet_preprocessing.NUM_CLASSES,
214
        batch_size=flags_obj.batch_size,
Zongwei Zhou's avatar
Zongwei Zhou committed
215
        use_l2_regularizer=not flags_obj.single_l2_loss_op)
216

Jing Li's avatar
Jing Li committed
217
218
219
220
221
222
223
224
225
226
    lr_schedule = common.PiecewiseConstantDecayWithWarmup(
        batch_size=flags_obj.batch_size,
        epoch_size=imagenet_preprocessing.NUM_IMAGES['train'],
        warmup_epochs=common.LR_SCHEDULE[0][1],
        boundaries=list(p[1] for p in common.LR_SCHEDULE[1:]),
        multipliers=list(p[0] for p in common.LR_SCHEDULE),
        compute_lr_on_cpu=True)
    optimizer = common.get_optimizer(lr_schedule)

    if flags_obj.fp16_implementation == 'graph_rewrite':
227
      if not flags_obj.use_tf_function:
Jing Li's avatar
Jing Li committed
228
229
        raise ValueError('--fp16_implementation=graph_rewrite requires '
                         '--use_tf_function to be true')
230
      loss_scale = flags_core.get_loss_scale(flags_obj, default_for_fp16=128)
Kaixi Hou's avatar
Kaixi Hou committed
231
      optimizer = tf.train.experimental.enable_mixed_precision_graph_rewrite(
Kaixi Hou's avatar
Kaixi Hou committed
232
          optimizer, loss_scale)
233

Jing Li's avatar
Jing Li committed
234
    train_loss = tf.keras.metrics.Mean('train_loss', dtype=tf.float32)
235
236
237
238
239
240
    training_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
        'training_accuracy', dtype=tf.float32)
    test_loss = tf.keras.metrics.Mean('test_loss', dtype=tf.float32)
    test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(
        'test_accuracy', dtype=tf.float32)

Zongwei Zhou's avatar
Zongwei Zhou committed
241
242
    trainable_variables = model.trainable_variables

Jing Li's avatar
Jing Li committed
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
    def step_fn(inputs):
      """Per-Replica StepFn."""
      images, labels = inputs
      with tf.GradientTape() as tape:
        logits = model(images, training=True)

        prediction_loss = tf.keras.losses.sparse_categorical_crossentropy(
            labels, logits)
        loss = tf.reduce_sum(prediction_loss) * (1.0/ flags_obj.batch_size)
        num_replicas = tf.distribute.get_strategy().num_replicas_in_sync

        if flags_obj.single_l2_loss_op:
          filtered_variables = [
              tf.reshape(v, (-1,))
              for v in trainable_variables
              if 'bn' not in v.name
          ]
          l2_loss = resnet_model.L2_WEIGHT_DECAY * 2 * tf.nn.l2_loss(
              tf.concat(filtered_variables, axis=0))
          loss += (l2_loss / num_replicas)
        else:
          loss += (tf.reduce_sum(model.losses) / num_replicas)

        # Scale the loss
Kaixi Hou's avatar
Kaixi Hou committed
267
        if flags_obj.dtype == "fp16":
Jing Li's avatar
Jing Li committed
268
          loss = optimizer.get_scaled_loss(loss)
269

Jing Li's avatar
Jing Li committed
270
      grads = tape.gradient(loss, trainable_variables)
271

Jing Li's avatar
Jing Li committed
272
273
274
      # Unscale the grads
      if flags_obj.dtype == "fp16":
        grads = optimizer.get_unscaled_gradients(grads)
275

Jing Li's avatar
Jing Li committed
276
277
278
279
280
281
282
283
284
285
286
      optimizer.apply_gradients(zip(grads, trainable_variables))
      train_loss.update_state(loss)
      training_accuracy.update_state(labels, logits)

    @tf.function
    def train_steps(iterator, steps):
      """Performs distributed training steps in a loop."""
      for _ in tf.range(steps):
        strategy.experimental_run_v2(step_fn, args=(next(iterator),))

    def train_single_step(iterator):
287
      if strategy:
Jing Li's avatar
Jing Li committed
288
        strategy.experimental_run_v2(step_fn, args=(next(iterator),))
289
      else:
Jing Li's avatar
Jing Li committed
290
        return step_fn(next(iterator))
291

Jing Li's avatar
Jing Li committed
292
    def test_step(iterator):
293
294
295
296
297
298
299
300
301
302
303
      """Evaluation StepFn."""
      def step_fn(inputs):
        images, labels = inputs
        logits = model(images, training=False)
        loss = tf.keras.losses.sparse_categorical_crossentropy(labels,
                                                               logits)
        loss = tf.reduce_sum(loss) * (1.0/ flags_obj.batch_size)
        test_loss.update_state(loss)
        test_accuracy.update_state(labels, logits)

      if strategy:
Jing Li's avatar
Jing Li committed
304
        strategy.experimental_run_v2(step_fn, args=(next(iterator),))
305
      else:
Jing Li's avatar
Jing Li committed
306
        step_fn(next(iterator))
307
308

    if flags_obj.use_tf_function:
Jing Li's avatar
Jing Li committed
309
      train_single_step = tf.function(train_single_step)
310
311
      test_step = tf.function(test_step)

Jing Li's avatar
Jing Li committed
312
    train_iter = iter(train_ds)
313
314
    time_callback.on_train_begin()
    for epoch in range(train_epochs):
Jing Li's avatar
Jing Li committed
315
      train_loss.reset_states()
316
317
      training_accuracy.reset_states()

Jing Li's avatar
Jing Li committed
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
      steps_in_current_epoch = 0
      while steps_in_current_epoch < per_epoch_steps:
        time_callback.on_batch_begin(
            steps_in_current_epoch+epoch*per_epoch_steps)
        steps = _steps_to_run(steps_in_current_epoch, per_epoch_steps,
                              steps_per_loop)
        if steps == 1:
          train_single_step(train_iter)
        else:
          # Converts steps to a Tensor to avoid tf.function retracing.
          train_steps(train_iter, tf.convert_to_tensor(steps, dtype=tf.int32))
        time_callback.on_batch_end(
            steps_in_current_epoch+epoch*per_epoch_steps)
        steps_in_current_epoch += steps

      logging.info('Training loss: %s, accuracy: %s%% at epoch %d',
                   train_loss.result().numpy(),
335
                   training_accuracy.result().numpy(),
Jing Li's avatar
Jing Li committed
336
                   epoch + 1)
337
338
339
340
341
342
343
344

      if (not flags_obj.skip_eval and
          (epoch + 1) % flags_obj.epochs_between_evals == 0):
        test_loss.reset_states()
        test_accuracy.reset_states()

        test_iter = iter(test_ds)
        for _ in range(eval_steps):
Jing Li's avatar
Jing Li committed
345
          test_step(test_iter)
346
347
348
349

        logging.info('Test loss: %s, accuracy: %s%% at epoch: %d',
                     test_loss.result().numpy(),
                     test_accuracy.result().numpy(),
Jing Li's avatar
Jing Li committed
350
                     epoch + 1)
351
352
353
354
355
356
357
358

    time_callback.on_train_end()

    eval_result = None
    train_result = None
    if not flags_obj.skip_eval:
      eval_result = [test_loss.result().numpy(),
                     test_accuracy.result().numpy()]
Jing Li's avatar
Jing Li committed
359
      train_result = [train_loss.result().numpy(),
360
361
362
363
364
365
366
367
368
                      training_accuracy.result().numpy()]

    stats = build_stats(train_result, eval_result, time_callback)
    return stats


def main(_):
  model_helpers.apply_clean(flags.FLAGS)
  with logger.benchmark_context(flags.FLAGS):
Jing Li's avatar
Jing Li committed
369
370
    stats = run(flags.FLAGS)
  logging.info('Run stats:\n%s', stats)
371
372
373
374


if __name__ == '__main__':
  logging.set_verbosity(logging.INFO)
Hongkun Yu's avatar
Hongkun Yu committed
375
  common.define_keras_flags()
376
  ctl_common.define_ctl_flags()
377
  flags.adopt_module_key_flags(ctl_common)
378
  absl_app.run(main)