keras_utils.py 6.52 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Helper functions for the Keras implementations of models."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

21
22
import multiprocessing
import os
23
24
import time

25
from absl import logging
26
import tensorflow as tf
27
28
29
30
31
32
33
34
35


class BatchTimestamp(object):
  """A structure to store batch time stamp."""

  def __init__(self, batch_index, timestamp):
    self.batch_index = batch_index
    self.timestamp = timestamp

36
37
38
39
  def __repr__(self):
    return "'BatchTimestamp<batch_index: {}, timestamp: {}>'".format(
        self.batch_index, self.timestamp)

40
41
42
43

class TimeHistory(tf.keras.callbacks.Callback):
  """Callback for Keras models."""

44
  def __init__(self, batch_size, log_steps, logdir=None):
45
    """Callback for logging performance.
Shining Sun's avatar
Shining Sun committed
46

47
48
    Args:
      batch_size: Total batch size.
49
      log_steps: Interval of steps between logging of batch level stats.
50
      logdir: Optional directory to write TensorBoard summaries.
51
    """
52
53
    # TODO(wcromar): remove this parameter and rely on `logs` parameter of
    # on_train_batch_end()
54
55
56
    self.batch_size = batch_size
    super(TimeHistory, self).__init__()
    self.log_steps = log_steps
57
58
59
60
61
62
63
64
65
    self.last_log_step = 0
    self.steps_before_epoch = 0
    self.steps_in_epoch = 0
    self.start_time = None

    if logdir:
      self.summary_writer = tf.summary.create_file_writer(logdir)
    else:
      self.summary_writer = None
66

67
    # Logs start of step 1 then end of each step based on log_steps interval.
68
69
    self.timestamp_log = []

70
71
72
    # Records the time each epoch takes to run from start to finish of epoch.
    self.epoch_runtime_log = []

73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
  @property
  def global_steps(self):
    """The current 1-indexed global step."""
    return self.steps_before_epoch + self.steps_in_epoch

  @property
  def average_steps_per_second(self):
    """The average training steps per second across all epochs."""
    return self.global_steps / sum(self.epoch_runtime_log)

  @property
  def average_examples_per_second(self):
    """The average number of training examples per second across all epochs."""
    return self.average_steps_per_second * self.batch_size

88
89
90
  def on_train_end(self, logs=None):
    self.train_finish_time = time.time()

91
92
93
    if self.summary_writer:
      self.summary_writer.flush()

94
95
96
  def on_epoch_begin(self, epoch, logs=None):
    self.epoch_start = time.time()

97
  def on_batch_begin(self, batch, logs=None):
98
    if not self.start_time:
99
      self.start_time = time.time()
100
101
102

    # Record the timestamp of the first global step
    if not self.timestamp_log:
103
104
      self.timestamp_log.append(BatchTimestamp(self.global_steps,
                                               self.start_time))
105
106

  def on_batch_end(self, batch, logs=None):
107
    """Records elapse time of the batch and calculates examples per second."""
108
109
110
111
112
113
114
115
116
    self.steps_in_epoch = batch + 1
    steps_since_last_log = self.global_steps - self.last_log_step
    if steps_since_last_log >= self.log_steps:
      now = time.time()
      elapsed_time = now - self.start_time
      steps_per_second = steps_since_last_log / elapsed_time
      examples_per_second = steps_per_second * self.batch_size

      self.timestamp_log.append(BatchTimestamp(self.global_steps, now))
117
      logging.info(
118
119
120
          'TimeHistory: %.2f seconds, %.2f examples/second between steps %d '
          'and %d', elapsed_time, examples_per_second, self.last_log_step,
          self.global_steps)
121
122
123
124
125
126
127
128
129
130

      if self.summary_writer:
        with self.summary_writer.as_default():
          tf.summary.scalar('global_step/sec', steps_per_second,
                            self.global_steps)
          tf.summary.scalar('examples/sec', examples_per_second,
                            self.global_steps)

      self.last_log_step = self.global_steps
      self.start_time = None
131

132
133
134
  def on_epoch_end(self, epoch, logs=None):
    epoch_run_time = time.time() - self.epoch_start
    self.epoch_runtime_log.append(epoch_run_time)
135
136
137

    self.steps_before_epoch += self.steps_in_epoch
    self.steps_in_epoch = 0
138

139

140
141
142
143
144
145
146
147
148
149
150
151
class SimpleCheckpoint(tf.keras.callbacks.Callback):
  """Keras callback to save tf.train.Checkpoints."""

  def __init__(self, checkpoint_manager):
    super(SimpleCheckpoint, self).__init__()
    self.checkpoint_manager = checkpoint_manager

  def on_epoch_end(self, epoch, logs=None):
    step_counter = self.checkpoint_manager._step_counter.numpy()  # pylint: disable=protected-access
    self.checkpoint_manager.save(checkpoint_number=step_counter)


152
def set_session_config(enable_xla=False):
Toby Boyd's avatar
Toby Boyd committed
153
154
155
156
  """Sets the session config."""
  if enable_xla:
    tf.config.optimizer.set_jit(True)

157
158
# TODO(hongkuny): remove set_config_v2 globally.
set_config_v2 = set_session_config
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186


def set_gpu_thread_mode_and_count(gpu_thread_mode,
                                  datasets_num_private_threads,
                                  num_gpus, per_gpu_thread_count):
  """Set GPU thread mode and count, and adjust dataset threads count."""
  cpu_count = multiprocessing.cpu_count()
  logging.info('Logical CPU cores: %s', cpu_count)

  # Allocate private thread pool for each GPU to schedule and launch kernels
  per_gpu_thread_count = per_gpu_thread_count or 2
  os.environ['TF_GPU_THREAD_MODE'] = gpu_thread_mode
  os.environ['TF_GPU_THREAD_COUNT'] = str(per_gpu_thread_count)
  logging.info('TF_GPU_THREAD_COUNT: %s',
               os.environ['TF_GPU_THREAD_COUNT'])
  logging.info('TF_GPU_THREAD_MODE: %s',
               os.environ['TF_GPU_THREAD_MODE'])

  # Limit data preprocessing threadpool to CPU cores minus number of total GPU
  # private threads and memory copy threads.
  total_gpu_thread_count = per_gpu_thread_count * num_gpus
  num_runtime_threads = num_gpus
  if not datasets_num_private_threads:
    datasets_num_private_threads = min(
        cpu_count - total_gpu_thread_count - num_runtime_threads,
        num_gpus * 8)
    logging.info('Set datasets_num_private_threads to %s',
                 datasets_num_private_threads)