keras_utils.py 8.27 KB
Newer Older
Hongkun Yu's avatar
Hongkun Yu committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Copyright 2021 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Helper functions for the Keras implementations of models."""

31
32
import multiprocessing
import os
33
34
import time

35
from absl import logging
36
import tensorflow as tf
37

38
39
40
41
42
from tensorflow.python.eager import monitoring

global_batch_size_gauge = monitoring.IntGauge(
    '/tensorflow/training/global_batch_size', 'TF training global batch size')

43
44
45
46
47
48
49
first_batch_time_gauge = monitoring.IntGauge(
    '/tensorflow/training/first_batch',
    'TF training start/end time for first batch (unix epoch time in us.',
    'type')

first_batch_start_time = first_batch_time_gauge.get_cell('start')
first_batch_end_time = first_batch_time_gauge.get_cell('end')
A. Unique TensorFlower's avatar
A. Unique TensorFlower committed
50

51

52
53
54
55
56
57
58
class BatchTimestamp(object):
  """A structure to store batch time stamp."""

  def __init__(self, batch_index, timestamp):
    self.batch_index = batch_index
    self.timestamp = timestamp

59
60
61
62
  def __repr__(self):
    return "'BatchTimestamp<batch_index: {}, timestamp: {}>'".format(
        self.batch_index, self.timestamp)

63
64
65
66

class TimeHistory(tf.keras.callbacks.Callback):
  """Callback for Keras models."""

Abdullah Rashwan's avatar
Abdullah Rashwan committed
67
  def __init__(self, batch_size, log_steps, initial_step=0, logdir=None):
68
    """Callback for logging performance.
Shining Sun's avatar
Shining Sun committed
69

70
71
    Args:
      batch_size: Total batch size.
72
      log_steps: Interval of steps between logging of batch level stats.
Abdullah Rashwan's avatar
Abdullah Rashwan committed
73
      initial_step: Optional, initial step.
74
      logdir: Optional directory to write TensorBoard summaries.
75
    """
76
77
    # TODO(wcromar): remove this parameter and rely on `logs` parameter of
    # on_train_batch_end()
78
79
80
    self.batch_size = batch_size
    super(TimeHistory, self).__init__()
    self.log_steps = log_steps
Abdullah Rashwan's avatar
Abdullah Rashwan committed
81
82
    self.last_log_step = initial_step
    self.steps_before_epoch = initial_step
83
84
85
    self.steps_in_epoch = 0
    self.start_time = None

86
87
    global_batch_size_gauge.get_cell().set(batch_size)

88
89
90
91
    if logdir:
      self.summary_writer = tf.summary.create_file_writer(logdir)
    else:
      self.summary_writer = None
92

93
    # Logs start of step 1 then end of each step based on log_steps interval.
94
95
    self.timestamp_log = []

96
97
98
    # Records the time each epoch takes to run from start to finish of epoch.
    self.epoch_runtime_log = []

99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
  @property
  def global_steps(self):
    """The current 1-indexed global step."""
    return self.steps_before_epoch + self.steps_in_epoch

  @property
  def average_steps_per_second(self):
    """The average training steps per second across all epochs."""
    return self.global_steps / sum(self.epoch_runtime_log)

  @property
  def average_examples_per_second(self):
    """The average number of training examples per second across all epochs."""
    return self.average_steps_per_second * self.batch_size

Hongkun Yu's avatar
Hongkun Yu committed
114
115
116
117
118
119
120
121
122
123
124
125
  def get_examples_per_sec(self, warmup=1):
    """Calculates examples/sec through timestamp_log and skip warmup period."""
    # First entry in timestamp_log is the start of the step 1. The rest of the
    # entries are the end of each step recorded.
    time_log = self.timestamp_log
    seconds = time_log[-1].timestamp - time_log[warmup].timestamp
    steps = time_log[-1].batch_index - time_log[warmup].batch_index
    return self.batch_size * steps / seconds

  def get_startup_time(self, start_time_sec):
    return self.timestamp_log[0].timestamp - start_time_sec

126
127
128
  def on_train_end(self, logs=None):
    self.train_finish_time = time.time()

129
130
131
    if self.summary_writer:
      self.summary_writer.flush()

132
133
134
  def on_epoch_begin(self, epoch, logs=None):
    self.epoch_start = time.time()

135
  def on_batch_begin(self, batch, logs=None):
136
    if not self.start_time:
137
      self.start_time = time.time()
138
139
      if not first_batch_start_time.value():
        first_batch_start_time.set(int(self.start_time * 1000000))
140
141
142

    # Record the timestamp of the first global step
    if not self.timestamp_log:
Hongkun Yu's avatar
Hongkun Yu committed
143
144
      self.timestamp_log.append(
          BatchTimestamp(self.global_steps, self.start_time))
145
146

  def on_batch_end(self, batch, logs=None):
147
    """Records elapse time of the batch and calculates examples per second."""
148
149
    if not first_batch_end_time.value():
      first_batch_end_time.set(int(time.time() * 1000000))
150
151
152
153
154
155
156
157
158
    self.steps_in_epoch = batch + 1
    steps_since_last_log = self.global_steps - self.last_log_step
    if steps_since_last_log >= self.log_steps:
      now = time.time()
      elapsed_time = now - self.start_time
      steps_per_second = steps_since_last_log / elapsed_time
      examples_per_second = steps_per_second * self.batch_size

      self.timestamp_log.append(BatchTimestamp(self.global_steps, now))
159
      logging.info(
160
161
162
          'TimeHistory: %.2f seconds, %.2f examples/second between steps %d '
          'and %d', elapsed_time, examples_per_second, self.last_log_step,
          self.global_steps)
163
164
165

      if self.summary_writer:
        with self.summary_writer.as_default():
Hongkun Yu's avatar
Hongkun Yu committed
166
          tf.summary.scalar('steps_per_second', steps_per_second,
167
                            self.global_steps)
Hongkun Yu's avatar
Hongkun Yu committed
168
          tf.summary.scalar('examples_per_second', examples_per_second,
169
170
171
172
                            self.global_steps)

      self.last_log_step = self.global_steps
      self.start_time = None
173

174
175
176
  def on_epoch_end(self, epoch, logs=None):
    epoch_run_time = time.time() - self.epoch_start
    self.epoch_runtime_log.append(epoch_run_time)
177
178
179

    self.steps_before_epoch += self.steps_in_epoch
    self.steps_in_epoch = 0
180

181

182
183
184
185
186
187
188
189
190
191
192
193
class SimpleCheckpoint(tf.keras.callbacks.Callback):
  """Keras callback to save tf.train.Checkpoints."""

  def __init__(self, checkpoint_manager):
    super(SimpleCheckpoint, self).__init__()
    self.checkpoint_manager = checkpoint_manager

  def on_epoch_end(self, epoch, logs=None):
    step_counter = self.checkpoint_manager._step_counter.numpy()  # pylint: disable=protected-access
    self.checkpoint_manager.save(checkpoint_number=step_counter)


194
def set_session_config(enable_xla=False):
Toby Boyd's avatar
Toby Boyd committed
195
196
197
198
  """Sets the session config."""
  if enable_xla:
    tf.config.optimizer.set_jit(True)

Hongkun Yu's avatar
Hongkun Yu committed
199

200
201
# TODO(hongkuny): remove set_config_v2 globally.
set_config_v2 = set_session_config
202
203


Hongkun Yu's avatar
Hongkun Yu committed
204
def set_gpu_thread_mode_and_count(gpu_thread_mode, datasets_num_private_threads,
205
206
207
208
209
210
211
212
213
                                  num_gpus, per_gpu_thread_count):
  """Set GPU thread mode and count, and adjust dataset threads count."""
  cpu_count = multiprocessing.cpu_count()
  logging.info('Logical CPU cores: %s', cpu_count)

  # Allocate private thread pool for each GPU to schedule and launch kernels
  per_gpu_thread_count = per_gpu_thread_count or 2
  os.environ['TF_GPU_THREAD_MODE'] = gpu_thread_mode
  os.environ['TF_GPU_THREAD_COUNT'] = str(per_gpu_thread_count)
Hongkun Yu's avatar
Hongkun Yu committed
214
215
  logging.info('TF_GPU_THREAD_COUNT: %s', os.environ['TF_GPU_THREAD_COUNT'])
  logging.info('TF_GPU_THREAD_MODE: %s', os.environ['TF_GPU_THREAD_MODE'])
216
217
218
219
220
221
222

  # Limit data preprocessing threadpool to CPU cores minus number of total GPU
  # private threads and memory copy threads.
  total_gpu_thread_count = per_gpu_thread_count * num_gpus
  num_runtime_threads = num_gpus
  if not datasets_num_private_threads:
    datasets_num_private_threads = min(
Hongkun Yu's avatar
Hongkun Yu committed
223
        cpu_count - total_gpu_thread_count - num_runtime_threads, num_gpus * 8)
224
225
    logging.info('Set datasets_num_private_threads to %s',
                 datasets_num_private_threads)