Commit ce03903f authored by David M. Chen's avatar David M. Chen Committed by saberkun
Browse files

Merged commit includes the following changes: (#6998)

252697519 by dmchen<dmchen@google.com>:

        BERT SQuAD accuracy test

25266352 by hongjunchoi<hongjunchoi@google.com>:

        Internal change

252647871 by hongjunchoi<hongjunchoi@google.com>:

        Enable multi worker TPU training for BERT pretraining.
parent e7b4d364
......@@ -30,13 +30,16 @@ import tensorflow as tf
from official.bert import run_squad
from official.bert.benchmark import benchmark_utils
from official.bert.benchmark import squad_evaluate_v1_1
from official.utils.misc import distribution_utils
# pylint: disable=line-too-long
PRETRAINED_CHECKPOINT_PATH = 'gs://cloud-tpu-checkpoints/bert/tf_20/uncased_L-24_H-1024_A-16/bert_model.ckpt'
SQUAD_TRAIN_DATA_PATH = 'gs://tf-perfzero-data/bert/squad/squad_train.tf_record'
SQUAD_PREDICT_FILE = 'gs://tf-perfzero-data/bert/squad/dev-v1.1.json'
SQUAD_VOCAB_FILE = 'gs://tf-perfzero-data/bert/squad/vocab.txt'
SQUAD_SMALL_INPUT_META_DATA_PATH = 'gs://tf-perfzero-data/bert/squad/squad_small_meta_data'
SQUAD_FULL_INPUT_META_DATA_PATH = 'gs://tf-perfzero-data/bert/squad/squad_full_meta_data'
MODEL_CONFIG_FILE_PATH = 'gs://cloud-tpu-checkpoints/bert/tf_20/uncased_L-24_H-1024_A-16/bert_config'
# pylint: enable=line-too-long
......@@ -46,9 +49,37 @@ FLAGS = flags.FLAGS
class BertSquadBenchmarkBase(benchmark_utils.BertBenchmarkBase):
"""Base class to hold methods common to test classes in the module."""
def _run_and_report_benchmark(self, training_summary_path, min_accuracy,
max_accuracy):
"""Runs the benchmark and reports various metrics."""
start_time_sec = time.time()
self._run_bert_squad()
wall_time_sec = time.time() - start_time_sec
with tf.io.gfile.GFile(training_summary_path, 'rb') as reader:
summary = json.loads(reader.read().decode('utf-8'))
summary['eval_metrics'] = self.eval_metrics
super(BertSquadBenchmarkBase, self)._report_benchmark(
stats=summary,
wall_time_sec=wall_time_sec,
min_accuracy=min_accuracy,
max_accuracy=max_accuracy)
def _evaluate_squad(self, predictions_file):
"""Evaluates a predictions file."""
with tf.io.gfile.GFile(SQUAD_PREDICT_FILE, 'r') as reader:
dataset_json = json.load(reader)
dataset = dataset_json['data']
with tf.io.gfile.GFile(predictions_file, 'r') as reader:
predictions = json.load(reader)
return squad_evaluate_v1_1.evaluate(dataset, predictions)
@flagsaver.flagsaver
def _run_bert_squad(self):
"""Starts BERT SQuAD task."""
"""Starts BERT SQuAD training and evaluation tasks."""
with tf.io.gfile.GFile(FLAGS.input_meta_data_path, 'rb') as reader:
input_meta_data = json.loads(reader.read().decode('utf-8'))
......@@ -59,9 +90,14 @@ class BertSquadBenchmarkBase(benchmark_utils.BertBenchmarkBase):
strategy=strategy,
input_meta_data=input_meta_data,
custom_callbacks=[self.timer_callback])
run_squad.predict_squad(strategy=strategy, input_meta_data=input_meta_data)
predictions_file = os.path.join(FLAGS.model_dir, 'predictions.json')
eval_metrics = self._evaluate_squad(predictions_file)
# Use F1 score as reported evaluation metric.
self.eval_metrics = eval_metrics['f1']
class BertSquadBenchmark(BertSquadBenchmarkBase):
class BertSquadBenchmarkReal(BertSquadBenchmarkBase):
"""Short benchmark performance tests for BERT SQuAD model.
Tests BERT SQuAD performance in different GPU configurations.
......@@ -70,10 +106,11 @@ class BertSquadBenchmark(BertSquadBenchmarkBase):
"""
def __init__(self, output_dir=None, **kwargs):
super(BertSquadBenchmark, self).__init__(output_dir=output_dir)
super(BertSquadBenchmarkReal, self).__init__(output_dir=output_dir)
def _setup(self):
super(BertSquadBenchmark, self)._setup()
"""Sets up the benchmark and SQuAD flags."""
super(BertSquadBenchmarkReal, self)._setup()
FLAGS.train_data_path = SQUAD_TRAIN_DATA_PATH
FLAGS.predict_file = SQUAD_PREDICT_FILE
FLAGS.vocab_file = SQUAD_VOCAB_FILE
......@@ -81,27 +118,13 @@ class BertSquadBenchmark(BertSquadBenchmarkBase):
FLAGS.bert_config_file = MODEL_CONFIG_FILE_PATH
FLAGS.num_train_epochs = 1
def _run_and_report_benchmark(self,
training_summary_path,
min_accuracy=0,
max_accuracy=1):
"""Starts BERT SQuAD performance benchmark test."""
start_time_sec = time.time()
self._run_bert_squad()
wall_time_sec = time.time() - start_time_sec
with tf.io.gfile.GFile(training_summary_path, 'rb') as reader:
summary = json.loads(reader.read().decode('utf-8'))
super(BertSquadBenchmark, self)._report_benchmark(
stats=summary,
wall_time_sec=wall_time_sec,
min_accuracy=min_accuracy,
max_accuracy=max_accuracy)
def _run_and_report_benchmark(self, training_summary_path):
"""Runs the benchmark and reports various metrics."""
super(BertSquadBenchmarkReal, self)._run_and_report_benchmark(
training_summary_path, min_accuracy=0, max_accuracy=1)
def benchmark_1_gpu(self):
"""Test BERT SQuAD model performance with 1 GPU."""
"""Tests BERT SQuAD model performance with 1 GPU."""
self._setup()
self.num_gpus = 1
......@@ -112,7 +135,7 @@ class BertSquadBenchmark(BertSquadBenchmarkBase):
self._run_and_report_benchmark(summary_path)
def benchmark_2_gpu(self):
"""Test BERT SQuAD model performance with 2 GPUs."""
"""Tests BERT SQuAD model performance with 2 GPUs."""
self._setup()
self.num_gpus = 2
......@@ -123,7 +146,7 @@ class BertSquadBenchmark(BertSquadBenchmarkBase):
self._run_and_report_benchmark(summary_path)
def benchmark_4_gpu(self):
"""Test BERT SQuAD model performance with 4 GPUs."""
"""Tests BERT SQuAD model performance with 4 GPUs."""
self._setup()
self.num_gpus = 4
......@@ -134,7 +157,45 @@ class BertSquadBenchmark(BertSquadBenchmarkBase):
self._run_and_report_benchmark(summary_path)
def benchmark_8_gpu(self):
"""Test BERT SQuAD model performance with 8 GPUs."""
"""Tests BERT SQuAD model performance with 8 GPUs."""
self._setup()
self.num_gpus = 8
FLAGS.model_dir = self._get_model_dir('benchmark_8_gpu_squad')
FLAGS.train_batch_size = 32
summary_path = os.path.join(FLAGS.model_dir, 'training_summary.txt')
self._run_and_report_benchmark(summary_path)
class BertSquadAccuracy(BertSquadBenchmarkBase):
"""Short accuracy test for BERT SQuAD model.
Tests BERT SQuAD accuracy. The naming convention of below test cases follow
`benchmark_(number of gpus)_gpu` format.
"""
def __init__(self, output_dir=None, **kwargs):
super(BertSquadAccuracy, self).__init__(output_dir=output_dir)
def _setup(self):
"""Sets up the benchmark and SQuAD flags."""
super(BertSquadAccuracy, self)._setup()
FLAGS.train_data_path = SQUAD_TRAIN_DATA_PATH
FLAGS.predict_file = SQUAD_PREDICT_FILE
FLAGS.vocab_file = SQUAD_VOCAB_FILE
FLAGS.input_meta_data_path = SQUAD_FULL_INPUT_META_DATA_PATH
FLAGS.bert_config_file = MODEL_CONFIG_FILE_PATH
FLAGS.init_checkpoint = PRETRAINED_CHECKPOINT_PATH
FLAGS.num_train_epochs = 2
def _run_and_report_benchmark(self, training_summary_path):
"""Runs the benchmark and reports various metrics."""
super(BertSquadAccuracy, self)._run_and_report_benchmark(
training_summary_path, min_accuracy=0.902, max_accuracy=0.909)
def benchmark_8_gpu(self):
"""Tests BERT SQuAD model accuracy with 8 GPUs."""
self._setup()
self.num_gpus = 8
......
# Copyright 2019 Pranav Rajpurkar, Jian Zhang, Konstantin Lopyrev and
# Percy Liang. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Evaluation of SQuAD predictions (version 1.1).
The functions are copied from
https://worksheets.codalab.org/rest/bundles/0xbcd57bee090b421c982906709c8c27e1/contents/blob/.
The SQuAD dataset is described in this paper:
SQuAD: 100,000+ Questions for Machine Comprehension of Text
Pranav Rajpurkar, Jian Zhang, Konstantin Lopyrev, Percy Liang
https://nlp.stanford.edu/pubs/rajpurkar2016squad.pdf
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import collections
import re
import string
# pylint: disable=g-bad-import-order
from absl import logging
# pylint: enable=g-bad-import-order
def _normalize_answer(s):
"""Lowers text and remove punctuation, articles and extra whitespace."""
def remove_articles(text):
return re.sub(r"\b(a|an|the)\b", " ", text)
def white_space_fix(text):
return " ".join(text.split())
def remove_punc(text):
exclude = set(string.punctuation)
return "".join(ch for ch in text if ch not in exclude)
def lower(text):
return text.lower()
return white_space_fix(remove_articles(remove_punc(lower(s))))
def _f1_score(prediction, ground_truth):
"""Computes F1 score by comparing prediction to ground truth."""
prediction_tokens = _normalize_answer(prediction).split()
ground_truth_tokens = _normalize_answer(ground_truth).split()
prediction_counter = collections.Counter(prediction_tokens)
ground_truth_counter = collections.Counter(ground_truth_tokens)
common = prediction_counter & ground_truth_counter
num_same = sum(common.values())
if num_same == 0:
return 0
precision = 1.0 * num_same / len(prediction_tokens)
recall = 1.0 * num_same / len(ground_truth_tokens)
f1 = (2 * precision * recall) / (precision + recall)
return f1
def _exact_match_score(prediction, ground_truth):
"""Checks if predicted answer exactly matches ground truth answer."""
return _normalize_answer(prediction) == _normalize_answer(ground_truth)
def _metric_max_over_ground_truths(metric_fn, prediction, ground_truths):
"""Computes the max over all metric scores."""
scores_for_ground_truths = []
for ground_truth in ground_truths:
score = metric_fn(prediction, ground_truth)
scores_for_ground_truths.append(score)
return max(scores_for_ground_truths)
def evaluate(dataset, predictions):
"""Evaluates predictions for a dataset."""
f1 = exact_match = total = 0
for article in dataset:
for paragraph in article["paragraphs"]:
for qa in paragraph["qas"]:
total += 1
if qa["id"] not in predictions:
message = "Unanswered question " + qa["id"] + " will receive score 0."
logging.error(message)
continue
ground_truths = [entry["text"] for entry in qa["answers"]]
prediction = predictions[qa["id"]]
exact_match += _metric_max_over_ground_truths(_exact_match_score,
prediction, ground_truths)
f1 += _metric_max_over_ground_truths(_f1_score, prediction,
ground_truths)
exact_match = exact_match / total
f1 = f1 / total
return {"exact_match": exact_match, "f1": f1}
......@@ -43,6 +43,21 @@ def _save_checkpoint(checkpoint, model_dir, checkpoint_prefix):
return
def _get_input_iterator(input_fn, strategy):
"""Returns distributed dataset iterator."""
# When training with TPU pods, datasets needs to be cloned across
# workers. Since Dataset instance cannot be cloned in eager mode, we instead
# pass callable that returns a dataset.
input_data = input_fn()
if callable(input_data):
iterator = iter(
strategy.experimental_distribute_datasets_from_function(input_data))
else:
iterator = iter(strategy.experimental_distribute_dataset(input_data))
return iterator
def run_customized_training_loop(
# pylint: disable=invalid-name
_sentinel=None,
......@@ -125,8 +140,8 @@ def run_customized_training_loop(
# To reduce unnecessary send/receive input pipeline operation, we place input
# pipeline ops in worker task.
with tf.device(get_primary_cpu_task(use_remote_tpu)):
train_iterator = iter(
strategy.experimental_distribute_dataset(train_input_fn()))
train_iterator = _get_input_iterator(train_input_fn, strategy)
with strategy.scope():
total_training_steps = steps_per_epoch * epochs
......@@ -192,9 +207,8 @@ def run_customized_training_loop(
strategy.experimental_run_v2(_test_step_fn, args=(next(iterator),))
def _run_evaluation(current_training_step, test_dataset):
def _run_evaluation(current_training_step, test_iterator):
"""Runs validation steps and aggregate metrics."""
test_iterator = iter(test_dataset)
for _ in range(eval_steps):
test_step(test_iterator)
......@@ -260,9 +274,8 @@ def run_customized_training_loop(
if eval_input_fn:
logging.info('Running evaluation after step: %s.', current_step)
_run_evaluation(
current_step,
strategy.experimental_distribute_dataset(eval_input_fn()))
_run_evaluation(current_step,
_get_input_iterator(eval_input_fn, strategy))
# Re-initialize evaluation metric, except the last step.
if metric and current_step < total_training_steps:
......@@ -275,8 +288,7 @@ def run_customized_training_loop(
if eval_input_fn:
logging.info('Running final evaluation after training is complete.')
eval_metric_result = _run_evaluation(
current_step,
strategy.experimental_distribute_dataset(eval_input_fn()))
current_step, _get_input_iterator(eval_input_fn, strategy))
training_summary = {
'total_training_steps': total_training_steps,
......
......@@ -204,7 +204,7 @@ def run_bert(strategy, input_meta_data):
logging.info('Training using customized training loop TF 2.0 with distrubuted'
'strategy.')
use_remote_tpu = (FLAGS.strategy_type == 'tpu' and FLAGS.tpu)
run_customized_training(
return run_customized_training(
strategy,
bert_config,
input_meta_data,
......
......@@ -68,16 +68,37 @@ FLAGS = flags.FLAGS
def get_pretrain_input_data(input_file_pattern, seq_length,
max_predictions_per_seq, batch_size):
max_predictions_per_seq, batch_size, strategy):
"""Returns input dataset from input file string."""
input_files = []
for input_pattern in input_file_pattern.split(','):
input_files.extend(tf.io.gfile.glob(input_pattern))
# When using TPU pods, we need to clone dataset across
# workers and need to pass in function that returns the dataset rather
# than passing dataset instance itself.
use_dataset_fn = isinstance(strategy, tf.distribute.experimental.TPUStrategy)
if use_dataset_fn:
if batch_size % strategy.num_replicas_in_sync != 0:
raise ValueError(
'Batch size must be divisible by number of replicas : {}'.format(
strategy.num_replicas_in_sync))
train_dataset = input_pipeline.create_pretrain_dataset(
input_files, seq_length, max_predictions_per_seq, batch_size)
return train_dataset
# As auto rebatching is not supported in
# `experimental_distribute_datasets_from_function()` API, which is
# required when cloning dataset to multiple workers in eager mode,
# we use per-replica batch size.
batch_size = int(batch_size / strategy.num_replicas_in_sync)
def _dataset_fn(ctx=None):
del ctx
input_files = []
for input_pattern in input_file_pattern.split(','):
input_files.extend(tf.io.gfile.glob(input_pattern))
train_dataset = input_pipeline.create_pretrain_dataset(
input_files, seq_length, max_predictions_per_seq, batch_size)
return train_dataset
return _dataset_fn if use_dataset_fn else _dataset_fn()
def get_loss_fn(loss_scale=1.0):
......@@ -105,7 +126,7 @@ def run_customized_training(strategy,
train_input_fn = functools.partial(get_pretrain_input_data, input_files,
max_seq_length, max_predictions_per_seq,
train_batch_size)
train_batch_size, strategy)
def _get_pretrain_model():
pretrain_model, core_model = bert_models.pretrain_model(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment