run_pretraining.py 7.34 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Run masked LM/next sentence masked_lm pre-training for BERT in tf2.0."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import functools
22

23
24
25
26
27
from absl import app
from absl import flags
from absl import logging
import tensorflow as tf

28
# Import BERT model libraries.
29
from official.bert import bert_models
30
from official.bert import common_flags
31
from official.bert import input_pipeline
32
from official.bert import model_saving_utils
33
34
35
from official.bert import model_training_utils
from official.bert import modeling
from official.bert import optimization
36
from official.utils.misc import tpu_lib
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

flags.DEFINE_string('input_files', None,
                    'File path to retrieve training data for pre-training.')
# Model training specific flags.
flags.DEFINE_integer(
    'max_seq_length', 128,
    'The maximum total input sequence length after WordPiece tokenization. '
    'Sequences longer than this will be truncated, and sequences shorter '
    'than this will be padded.')
flags.DEFINE_integer('max_predictions_per_seq', 20,
                     'Maximum predictions per sequence_output.')
flags.DEFINE_integer('train_batch_size', 32, 'Total batch size for training.')
flags.DEFINE_integer('num_steps_per_epoch', 1000,
                     'Total number of training steps to run per epoch.')
flags.DEFINE_float('warmup_steps', 10000,
                   'Warmup steps for Adam weight decay optimizer.')

54
55
common_flags.define_common_bert_flags()

56
57
58
59
FLAGS = flags.FLAGS


def get_pretrain_input_data(input_file_pattern, seq_length,
60
                            max_predictions_per_seq, batch_size, strategy):
61
62
  """Returns input dataset from input file string."""

63
64
65
66
67
68
69
70
71
  # When using TPU pods, we need to clone dataset across
  # workers and need to pass in function that returns the dataset rather
  # than passing dataset instance itself.
  use_dataset_fn = isinstance(strategy, tf.distribute.experimental.TPUStrategy)
  if use_dataset_fn:
    if batch_size % strategy.num_replicas_in_sync != 0:
      raise ValueError(
          'Batch size must be divisible by number of replicas : {}'.format(
              strategy.num_replicas_in_sync))
72

73
74
75
76
77
78
79
    # As auto rebatching is not supported in
    # `experimental_distribute_datasets_from_function()` API, which is
    # required when cloning dataset to multiple workers in eager mode,
    # we use per-replica batch size.
    batch_size = int(batch_size / strategy.num_replicas_in_sync)

  def _dataset_fn(ctx=None):
80
    """Returns tf.data.Dataset for distributed BERT pretraining."""
81
82
83
84
85
    input_files = []
    for input_pattern in input_file_pattern.split(','):
      input_files.extend(tf.io.gfile.glob(input_pattern))

    train_dataset = input_pipeline.create_pretrain_dataset(
86
87
88
89
90
91
        input_files,
        seq_length,
        max_predictions_per_seq,
        batch_size,
        is_training=True,
        input_pipeline_context=ctx)
92
93
94
    return train_dataset

  return _dataset_fn if use_dataset_fn else _dataset_fn()
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111


def get_loss_fn(loss_scale=1.0):
  """Returns loss function for BERT pretraining."""

  def _bert_pretrain_loss_fn(unused_labels, losses, **unused_args):
    return tf.keras.backend.mean(losses) * loss_scale

  return _bert_pretrain_loss_fn


def run_customized_training(strategy,
                            bert_config,
                            max_seq_length,
                            max_predictions_per_seq,
                            model_dir,
                            steps_per_epoch,
112
                            steps_per_loop,
113
114
115
116
117
118
119
120
121
122
                            epochs,
                            initial_lr,
                            warmup_steps,
                            input_files,
                            train_batch_size,
                            use_remote_tpu=False):
  """Run BERT pretrain model training using low-level API."""

  train_input_fn = functools.partial(get_pretrain_input_data, input_files,
                                     max_seq_length, max_predictions_per_seq,
123
                                     train_batch_size, strategy)
124
125
126
127
128
129
130
131

  def _get_pretrain_model():
    pretrain_model, core_model = bert_models.pretrain_model(
        bert_config, max_seq_length, max_predictions_per_seq)
    pretrain_model.optimizer = optimization.create_optimizer(
        initial_lr, steps_per_epoch * epochs, warmup_steps)
    return pretrain_model, core_model

132
  trained_model = model_training_utils.run_customized_training_loop(
133
134
135
136
137
138
      strategy=strategy,
      model_fn=_get_pretrain_model,
      loss_fn=get_loss_fn(),
      model_dir=model_dir,
      train_input_fn=train_input_fn,
      steps_per_epoch=steps_per_epoch,
139
      steps_per_loop=steps_per_loop,
140
141
142
      epochs=epochs,
      use_remote_tpu=use_remote_tpu)

143
144
145
146
147
148
149
150
151
152
  # Creates the BERT core model outside distribution strategy scope.
  _, core_model = bert_models.pretrain_model(bert_config, max_seq_length,
                                             max_predictions_per_seq)

  # Restores the core model from model checkpoints and get a new checkpoint only
  # contains the core model.
  model_saving_utils.export_pretraining_checkpoint(
      checkpoint_dir=model_dir, model=core_model)
  return trained_model

153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172

def run_bert_pretrain(strategy):
  """Runs BERT pre-training."""

  bert_config = modeling.BertConfig.from_json_file(FLAGS.bert_config_file)
  if not strategy:
    raise ValueError('Distribution strategy is not specified.')

  # Runs customized training loop.
  logging.info('Training using customized training loop TF 2.0 with distrubuted'
               'strategy.')

  use_remote_tpu = (FLAGS.strategy_type == 'tpu' and FLAGS.tpu)
  return run_customized_training(
      strategy,
      bert_config,
      FLAGS.max_seq_length,
      FLAGS.max_predictions_per_seq,
      FLAGS.model_dir,
      FLAGS.num_steps_per_epoch,
173
      FLAGS.steps_per_loop,
174
175
176
177
178
179
180
181
182
183
184
      FLAGS.num_train_epochs,
      FLAGS.learning_rate,
      FLAGS.warmup_steps,
      FLAGS.input_files,
      FLAGS.train_batch_size,
      use_remote_tpu=use_remote_tpu)


def main(_):
  # Users should always run this script under TF 2.x
  assert tf.version.VERSION.startswith('2.')
185

186
187
188
  if not FLAGS.model_dir:
    FLAGS.model_dir = '/tmp/bert20/'
  strategy = None
189
190
191
192
193
  if FLAGS.strategy_type == 'mirror':
    strategy = tf.distribute.MirroredStrategy()
  elif FLAGS.strategy_type == 'tpu':
    # Initialize TPU System.
    cluster_resolver = tpu_lib.tpu_initialize(FLAGS.tpu)
194
    strategy = tf.distribute.experimental.TPUStrategy(cluster_resolver)
195
196
197
  else:
    raise ValueError('The distribution strategy type is not supported: %s' %
                     FLAGS.strategy_type)
198
199
200
  if strategy:
    print('***** Number of cores used : ', strategy.num_replicas_in_sync)

201
  run_bert_pretrain(strategy)
202
203
204
205


if __name__ == '__main__':
  app.run(main)