keras_utils.py 9.75 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Helper functions for the Keras implementations of models."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

21
22
import multiprocessing
import os
23
24
import time

25
from absl import logging
26
import tensorflow.compat.v2 as tf
Toby Boyd's avatar
Toby Boyd committed
27
from tensorflow.python import tf2
A. Unique TensorFlower's avatar
A. Unique TensorFlower committed
28
from tensorflow.python.profiler import profiler_v2 as profiler
29
30
31
32
33
34
35
36
37


class BatchTimestamp(object):
  """A structure to store batch time stamp."""

  def __init__(self, batch_index, timestamp):
    self.batch_index = batch_index
    self.timestamp = timestamp

38
39
40
41
  def __repr__(self):
    return "'BatchTimestamp<batch_index: {}, timestamp: {}>'".format(
        self.batch_index, self.timestamp)

42
43
44
45

class TimeHistory(tf.keras.callbacks.Callback):
  """Callback for Keras models."""

46
  def __init__(self, batch_size, log_steps, logdir=None):
47
    """Callback for logging performance.
Shining Sun's avatar
Shining Sun committed
48

49
50
    Args:
      batch_size: Total batch size.
51
      log_steps: Interval of steps between logging of batch level stats.
52
      logdir: Optional directory to write TensorBoard summaries.
53
    """
54
55
    # TODO(wcromar): remove this parameter and rely on `logs` parameter of
    # on_train_batch_end()
56
57
58
    self.batch_size = batch_size
    super(TimeHistory, self).__init__()
    self.log_steps = log_steps
59
60
61
62
63
64
65
66
67
    self.last_log_step = 0
    self.steps_before_epoch = 0
    self.steps_in_epoch = 0
    self.start_time = None

    if logdir:
      self.summary_writer = tf.summary.create_file_writer(logdir)
    else:
      self.summary_writer = None
68

69
    # Logs start of step 1 then end of each step based on log_steps interval.
70
71
    self.timestamp_log = []

72
73
74
    # Records the time each epoch takes to run from start to finish of epoch.
    self.epoch_runtime_log = []

75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
  @property
  def global_steps(self):
    """The current 1-indexed global step."""
    return self.steps_before_epoch + self.steps_in_epoch

  @property
  def average_steps_per_second(self):
    """The average training steps per second across all epochs."""
    return self.global_steps / sum(self.epoch_runtime_log)

  @property
  def average_examples_per_second(self):
    """The average number of training examples per second across all epochs."""
    return self.average_steps_per_second * self.batch_size

90
91
92
  def on_train_end(self, logs=None):
    self.train_finish_time = time.time()

93
94
95
    if self.summary_writer:
      self.summary_writer.flush()

96
97
98
  def on_epoch_begin(self, epoch, logs=None):
    self.epoch_start = time.time()

99
  def on_batch_begin(self, batch, logs=None):
100
    if not self.start_time:
101
      self.start_time = time.time()
102
103
104

    # Record the timestamp of the first global step
    if not self.timestamp_log:
105
106
      self.timestamp_log.append(BatchTimestamp(self.global_steps,
                                               self.start_time))
107
108

  def on_batch_end(self, batch, logs=None):
109
    """Records elapse time of the batch and calculates examples per second."""
110
111
112
113
114
115
116
117
118
    self.steps_in_epoch = batch + 1
    steps_since_last_log = self.global_steps - self.last_log_step
    if steps_since_last_log >= self.log_steps:
      now = time.time()
      elapsed_time = now - self.start_time
      steps_per_second = steps_since_last_log / elapsed_time
      examples_per_second = steps_per_second * self.batch_size

      self.timestamp_log.append(BatchTimestamp(self.global_steps, now))
119
      logging.info(
120
121
122
          'TimeHistory: %.2f seconds, %.2f examples/second between steps %d '
          'and %d', elapsed_time, examples_per_second, self.last_log_step,
          self.global_steps)
123
124
125
126
127
128
129
130
131
132

      if self.summary_writer:
        with self.summary_writer.as_default():
          tf.summary.scalar('global_step/sec', steps_per_second,
                            self.global_steps)
          tf.summary.scalar('examples/sec', examples_per_second,
                            self.global_steps)

      self.last_log_step = self.global_steps
      self.start_time = None
133

134
135
136
  def on_epoch_end(self, epoch, logs=None):
    epoch_run_time = time.time() - self.epoch_start
    self.epoch_runtime_log.append(epoch_run_time)
137
138
139

    self.steps_before_epoch += self.steps_in_epoch
    self.steps_in_epoch = 0
140

141

Zongwei Zhou's avatar
Zongwei Zhou committed
142
143
def get_profiler_callback(model_dir, profile_steps, enable_tensorboard,
                          steps_per_epoch):
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
  """Validate profile_steps flag value and return profiler callback."""
  profile_steps_error_message = (
      'profile_steps must be a comma separated pair of positive integers, '
      'specifying the first and last steps to be profiled.'
  )
  try:
    profile_steps = [int(i) for i in profile_steps.split(',')]
  except ValueError:
    raise ValueError(profile_steps_error_message)
  if len(profile_steps) != 2:
    raise ValueError(profile_steps_error_message)
  start_step, stop_step = profile_steps
  if start_step < 0 or start_step > stop_step:
    raise ValueError(profile_steps_error_message)
  if enable_tensorboard:
159
    logging.warning(
160
161
162
163
        'Both TensorBoard and profiler callbacks are used. Note that the '
        'TensorBoard callback profiles the 2nd step (unless otherwise '
        'specified). Please make sure the steps profiled by the two callbacks '
        'do not overlap.')
Zongwei Zhou's avatar
Zongwei Zhou committed
164
  return ProfilerCallback(model_dir, start_step, stop_step, steps_per_epoch)
165
166


167
168
169
170
171
172
173
174
175
176
177
178
class SimpleCheckpoint(tf.keras.callbacks.Callback):
  """Keras callback to save tf.train.Checkpoints."""

  def __init__(self, checkpoint_manager):
    super(SimpleCheckpoint, self).__init__()
    self.checkpoint_manager = checkpoint_manager

  def on_epoch_end(self, epoch, logs=None):
    step_counter = self.checkpoint_manager._step_counter.numpy()  # pylint: disable=protected-access
    self.checkpoint_manager.save(checkpoint_number=step_counter)


179
180
181
class ProfilerCallback(tf.keras.callbacks.Callback):
  """Save profiles in specified step range to log directory."""

Zongwei Zhou's avatar
Zongwei Zhou committed
182
  def __init__(self, log_dir, start_step, stop_step, steps_per_epoch):
183
184
185
186
    super(ProfilerCallback, self).__init__()
    self.log_dir = log_dir
    self.start_step = start_step
    self.stop_step = stop_step
Zongwei Zhou's avatar
Zongwei Zhou committed
187
188
189
190
191
192
193
194
195
196
197
198
    self.start_epoch = start_step // steps_per_epoch
    self.stop_epoch = stop_step // steps_per_epoch
    self.start_step_in_epoch = start_step % steps_per_epoch
    self.stop_step_in_epoch = stop_step % steps_per_epoch
    self.should_start = False
    self.should_stop = False

  def on_epoch_begin(self, epoch, logs=None):
    if epoch == self.start_epoch:
      self.should_start = True
    if epoch == self.stop_epoch:
      self.should_stop = True
199
200

  def on_batch_begin(self, batch, logs=None):
Zongwei Zhou's avatar
Zongwei Zhou committed
201
202
    if batch == self.start_step_in_epoch and self.should_start:
      self.should_start = False
A. Unique TensorFlower's avatar
A. Unique TensorFlower committed
203
      profiler.start(self.log_dir)
204
      logging.info('Profiler started at Step %s', self.start_step)
205
206

  def on_batch_end(self, batch, logs=None):
Zongwei Zhou's avatar
Zongwei Zhou committed
207
208
    if batch == self.stop_step_in_epoch and self.should_stop:
      self.should_stop = False
A. Unique TensorFlower's avatar
A. Unique TensorFlower committed
209
210
211
      profiler.stop()
      logging.info('Profiler saved profiles for steps between %s and %s to %s',
                   self.start_step, self.stop_step, self.log_dir)
Toby Boyd's avatar
Toby Boyd committed
212
213
214


def set_session_config(enable_eager=False,
215
                       enable_xla=False):
Toby Boyd's avatar
Toby Boyd committed
216
217
  """Sets the session config."""
  if is_v2_0():
218
    set_config_v2(enable_xla=enable_xla)
Toby Boyd's avatar
Toby Boyd committed
219
  else:
220
    config = get_config_proto_v1(enable_xla=enable_xla)
Toby Boyd's avatar
Toby Boyd committed
221
222
223
    if enable_eager:
      tf.compat.v1.enable_eager_execution(config=config)
    else:
224
225
      sess = tf.compat.v1.Session(config=config)
      tf.compat.v1.keras.backend.set_session(sess)
Toby Boyd's avatar
Toby Boyd committed
226
227


228
def get_config_proto_v1(enable_xla=False):
Toby Boyd's avatar
Toby Boyd committed
229
230
231
232
233
234
235
236
237
  """Return config proto according to flag settings, or None to use default."""
  config = None
  if enable_xla:
    config = tf.compat.v1.ConfigProto()
    config.graph_options.optimizer_options.global_jit_level = (
        tf.OptimizerOptions.ON_2)
  return config


238
def set_config_v2(enable_xla=False):
Toby Boyd's avatar
Toby Boyd committed
239
240
241
242
  """Config eager context according to flag values using TF 2.0 API."""
  if enable_xla:
    tf.config.optimizer.set_jit(True)

Toby Boyd's avatar
Toby Boyd committed
243

Toby Boyd's avatar
Toby Boyd committed
244
245
def is_v2_0():
  """Returns true if using tf 2.0."""
Toby Boyd's avatar
Toby Boyd committed
246
  return tf2.enabled()
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274


def set_gpu_thread_mode_and_count(gpu_thread_mode,
                                  datasets_num_private_threads,
                                  num_gpus, per_gpu_thread_count):
  """Set GPU thread mode and count, and adjust dataset threads count."""
  cpu_count = multiprocessing.cpu_count()
  logging.info('Logical CPU cores: %s', cpu_count)

  # Allocate private thread pool for each GPU to schedule and launch kernels
  per_gpu_thread_count = per_gpu_thread_count or 2
  os.environ['TF_GPU_THREAD_MODE'] = gpu_thread_mode
  os.environ['TF_GPU_THREAD_COUNT'] = str(per_gpu_thread_count)
  logging.info('TF_GPU_THREAD_COUNT: %s',
               os.environ['TF_GPU_THREAD_COUNT'])
  logging.info('TF_GPU_THREAD_MODE: %s',
               os.environ['TF_GPU_THREAD_MODE'])

  # Limit data preprocessing threadpool to CPU cores minus number of total GPU
  # private threads and memory copy threads.
  total_gpu_thread_count = per_gpu_thread_count * num_gpus
  num_runtime_threads = num_gpus
  if not datasets_num_private_threads:
    datasets_num_private_threads = min(
        cpu_count - total_gpu_thread_count - num_runtime_threads,
        num_gpus * 8)
    logging.info('Set datasets_num_private_threads to %s',
                 datasets_num_private_threads)