Unverified Commit 40543869 authored by Toby Boyd's avatar Toby Boyd Committed by GitHub
Browse files

Transformer instrumented for benchmarking (#6734)

* Add first benchmark and return stats.

* Remove print statements update training steps.

* Revert print T: in print statement.

* Remove print(stats)

* add 2 gpu accuracy test for base.

* Fixed total_batch_size when using gpu + gFile deprecations.

* 8 GPU test name fix

* Add 4 and 8 GPU tests.

* typo fixes.

* Clean up test names and methods.

* bleu uncased.  docstring format fix.
parent 9d38e894
......@@ -86,12 +86,13 @@ def bleu_tokenize(string):
def bleu_wrapper(ref_filename, hyp_filename, case_sensitive=False):
"""Compute BLEU for two files (reference and hypothesis translation)."""
ref_lines = tf.gfile.Open(ref_filename).read().strip().splitlines()
hyp_lines = tf.gfile.Open(hyp_filename).read().strip().splitlines()
ref_lines = tf.io.gfile.GFile(ref_filename).read().strip().splitlines()
hyp_lines = tf.io.gfile.GFile(hyp_filename).read().strip().splitlines()
if len(ref_lines) != len(hyp_lines):
raise ValueError("Reference and translation files have different number of "
"lines.")
"lines. If training only a few steps (100-200), the "
"translation may be empty.")
if not case_sensitive:
ref_lines = [x.lower() for x in ref_lines]
hyp_lines = [x.lower() for x in hyp_lines]
......
......@@ -220,7 +220,7 @@ def download_and_extract(path, url, input_filename, target_filename):
def txt_line_iterator(path):
"""Iterate through lines of file."""
with tf.gfile.Open(path) as f:
with tf.io.gfile.GFile(path) as f:
for line in f:
yield line.strip()
......@@ -244,8 +244,8 @@ def compile_files(raw_dir, raw_files, tag):
input_compiled_file = os.path.join(raw_dir, filename + ".lang1")
target_compiled_file = os.path.join(raw_dir, filename + ".lang2")
with tf.gfile.Open(input_compiled_file, mode="w") as input_writer:
with tf.gfile.Open(target_compiled_file, mode="w") as target_writer:
with tf.io.gfile.GFile(input_compiled_file, mode="w") as input_writer:
with tf.io.gfile.GFile(target_compiled_file, mode="w") as target_writer:
for i in range(len(raw_files["inputs"])):
input_file = raw_files["inputs"][i]
target_file = raw_files["targets"][i]
......
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Executes Estimator benchmarks and accuracy tests."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import time
from absl import flags
from absl.testing import flagsaver
import tensorflow as tf # pylint: disable=g-bad-import-order
from official.transformer import transformer_main as transformer_main
from official.utils.logs import hooks
TRANSFORMER_EN2DE_DATA_DIR_NAME = 'wmt32k-en2de-official'
EN2DE_2014_BLEU_DATA_DIR_NAME = 'newstest2014'
FLAGS = flags.FLAGS
class EstimatorBenchmark(tf.test.Benchmark):
"""Base class to hold methods common to test classes in the module.
Code under test for the Transformer Estimator models that report mostly the
same data and require the same FLAG setup.
"""
local_flags = None
def __init__(self, output_dir=None, default_flags=None, flag_methods=None):
if not output_dir:
output_dir = '/tmp'
self.output_dir = output_dir
self.default_flags = default_flags or {}
self.flag_methods = flag_methods or {}
def _get_model_dir(self, folder_name):
"""Returns directory to store info, e.g. saved model and event log."""
return os.path.join(self.output_dir, folder_name)
def _setup(self):
"""Sets up and resets flags before each test."""
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.DEBUG)
if EstimatorBenchmark.local_flags is None:
for flag_method in self.flag_methods:
flag_method()
# Loads flags to get defaults to then override. List cannot be empty.
flags.FLAGS(['foo'])
# Overrides flag values with defaults for the class of tests.
for k, v in self.default_flags.items():
setattr(FLAGS, k, v)
saved_flag_values = flagsaver.save_flag_values()
EstimatorBenchmark.local_flags = saved_flag_values
else:
flagsaver.restore_flag_values(EstimatorBenchmark.local_flags)
def _report_benchmark(self,
stats,
wall_time_sec,
bleu_max=None,
bleu_min=None):
"""Report benchmark results by writing to local protobuf file.
Args:
stats: dict returned from estimator models with known entries.
wall_time_sec: the during of the benchmark execution in seconds
bleu_max: highest passing level for bleu score.
bleu_min: lowest passing level for bleu score.
"""
examples_per_sec_hook = None
for hook in stats['train_hooks']:
if isinstance(hook, hooks.ExamplesPerSecondHook):
examples_per_sec_hook = hook
break
eval_results = stats['eval_results']
metrics = []
if 'bleu_uncased' in stats:
metrics.append({'name': 'bleu_uncased',
'value': stats['bleu_uncased'],
'min_value': bleu_min,
'max_value': bleu_max})
if examples_per_sec_hook:
exp_per_second_list = examples_per_sec_hook.current_examples_per_sec_list
# ExamplesPerSecondHook skips the first 10 steps.
exp_per_sec = sum(exp_per_second_list) / (len(exp_per_second_list))
metrics.append({'name': 'exp_per_second',
'value': exp_per_sec})
self.report_benchmark(
iters=eval_results['global_step'],
wall_time=wall_time_sec,
metrics=metrics)
class TransformerBaseEstimatorAccuracy(EstimatorBenchmark):
"""Benchmark accuracy tests for ResNet50 w/ Estimator."""
def __init__(self, output_dir=None, root_data_dir=None, **kwargs):
"""Benchmark accuracy tests for ResNet50 w/ Estimator.
Args:
output_dir: directory where to output e.g. log files
root_data_dir: directory under which to look for dataset
**kwargs: arbitrary named arguments. This is needed to make the
constructor forward compatible in case PerfZero provides more
named arguments before updating the constructor.
"""
flag_methods = [transformer_main.define_transformer_flags]
self.train_data_dir = os.path.join(root_data_dir,
TRANSFORMER_EN2DE_DATA_DIR_NAME)
self.vocab_file = os.path.join(root_data_dir,
TRANSFORMER_EN2DE_DATA_DIR_NAME,
'vocab.ende.32768')
self.bleu_source = os.path.join(root_data_dir,
EN2DE_2014_BLEU_DATA_DIR_NAME,
'newstest2014.en')
self.bleu_ref = os.path.join(root_data_dir,
EN2DE_2014_BLEU_DATA_DIR_NAME,
'newstest2014.de')
super(TransformerBaseEstimatorAccuracy, self).__init__(
output_dir=output_dir, flag_methods=flag_methods)
def benchmark_graph_1_gpu(self):
"""Benchmark graph mode 1 gpu.
The paper uses 8 GPUs and a much larger effective batch size, this is will
not converge to the 27.3 BLEU (uncased) SOTA.
"""
self._setup()
FLAGS.num_gpus = 1
FLAGS.data_dir = self.train_data_dir
FLAGS.vocab_file = self.vocab_file
# Sets values directly to avoid validation check.
FLAGS['bleu_source'].value = self.bleu_source
FLAGS['bleu_ref'].value = self.bleu_ref
FLAGS.param_set = 'base'
FLAGS.batch_size = 4096
FLAGS.train_steps = 100000
FLAGS.steps_between_evals = 5000
FLAGS.model_dir = self._get_model_dir('benchmark_graph_1_gpu')
FLAGS.hooks = ['ExamplesPerSecondHook']
self._run_and_report_benchmark()
def benchmark_graph_2_gpu(self):
"""Benchmark graph mode 2 gpus.
The paper uses 8 GPUs and a much larger effective batch size, this is will
not converge to the 27.3 BLEU (uncased) SOTA.
"""
self._setup()
FLAGS.num_gpus = 2
FLAGS.data_dir = self.train_data_dir
FLAGS.vocab_file = self.vocab_file
# Sets values directly to avoid validation check.
FLAGS['bleu_source'].value = self.bleu_source
FLAGS['bleu_ref'].value = self.bleu_ref
FLAGS.param_set = 'base'
FLAGS.batch_size = 4096 * 2
FLAGS.train_steps = 100000
FLAGS.steps_between_evals = 5000
FLAGS.model_dir = self._get_model_dir('benchmark_graph_2_gpu')
FLAGS.hooks = ['ExamplesPerSecondHook']
self._run_and_report_benchmark()
def benchmark_graph_8_gpu(self):
"""Benchmark graph mode 8 gpus.
SOTA is 27.3 BLEU (uncased).
"""
self._setup()
FLAGS.num_gpus = 8
FLAGS.data_dir = self.train_data_dir
FLAGS.vocab_file = self.vocab_file
# Sets values directly to avoid validation check.
FLAGS['bleu_source'].value = self.bleu_source
FLAGS['bleu_ref'].value = self.bleu_ref
FLAGS.param_set = 'base'
FLAGS.batch_size = 2048 * 8
FLAGS.train_steps = 100000
FLAGS.steps_between_evals = 5000
FLAGS.model_dir = self._get_model_dir('benchmark_graph_8_gpu')
FLAGS.hooks = ['ExamplesPerSecondHook']
self._run_and_report_benchmark()
def _run_and_report_benchmark(self):
start_time_sec = time.time()
stats = transformer_main.run_transformer(flags.FLAGS)
wall_time_sec = time.time() - start_time_sec
self._report_benchmark(stats,
wall_time_sec,
bleu_min=27.2,
bleu_max=28)
class TransformerBaseEstimatorBenchmark(EstimatorBenchmark):
"""Benchmarks for ResNet50 using Estimator."""
local_flags = None
def __init__(self, output_dir=None, default_flags=None):
flag_methods = [transformer_main.define_transformer_flags]
super(TransformerBaseEstimatorBenchmark, self).__init__(
output_dir=output_dir,
default_flags=default_flags,
flag_methods=flag_methods)
def benchmark_graph_1_gpu(self):
"""Benchmark graph 1 gpu."""
self._setup()
FLAGS.num_gpus = 1
FLAGS.batch_size = 2048
FLAGS.model_dir = self._get_model_dir('benchmark_graph_1_gpu')
self._run_and_report_benchmark()
def benchmark_graph_2_gpu(self):
"""Benchmark graph 2 gpus."""
self._setup()
FLAGS.num_gpus = 2
FLAGS.batch_size = 2048 * 2
FLAGS.model_dir = self._get_model_dir('benchmark_graph_2_gpu')
self._run_and_report_benchmark()
def benchmark_graph_4_gpu(self):
"""Benchmark graph 4 gpus."""
self._setup()
FLAGS.num_gpus = 4
FLAGS.batch_size = 2048 * 4
FLAGS.model_dir = self._get_model_dir('benchmark_graph_4_gpu')
self._run_and_report_benchmark()
def benchmark_graph_8_gpu(self):
"""Benchmark graph 8 gpus."""
self._setup()
FLAGS.num_gpus = 8
FLAGS.batch_size = 2048 * 8
FLAGS.model_dir = self._get_model_dir('benchmark_graph_8_gpu')
self._run_and_report_benchmark()
def _run_and_report_benchmark(self):
start_time_sec = time.time()
stats = transformer_main.run_transformer(flags.FLAGS)
wall_time_sec = time.time() - start_time_sec
self._report_benchmark(stats, wall_time_sec)
class TransformerBaseEstimatorBenchmarkSynth(TransformerBaseEstimatorBenchmark):
"""Transformer based version synthetic benchmark tests."""
def __init__(self, output_dir=None, root_data_dir=None, **kwargs):
def_flags = {}
def_flags['param_set'] = 'base'
def_flags['use_synthetic_data'] = True
def_flags['train_steps'] = 200
def_flags['steps_between_evals'] = 200
def_flags['hooks'] = ['ExamplesPerSecondHook']
super(TransformerBaseEstimatorBenchmarkSynth, self).__init__(
output_dir=output_dir, default_flags=def_flags)
class TransformerBaseEstimatorBenchmarkReal(TransformerBaseEstimatorBenchmark):
"""Transformer based version real data benchmark tests."""
def __init__(self, output_dir=None, root_data_dir=None, **kwargs):
train_data_dir = os.path.join(root_data_dir,
TRANSFORMER_EN2DE_DATA_DIR_NAME)
vocab_file = os.path.join(root_data_dir,
TRANSFORMER_EN2DE_DATA_DIR_NAME,
'vocab.ende.32768')
def_flags = {}
def_flags['param_set'] = 'base'
def_flags['vocab_file'] = vocab_file
def_flags['data_dir'] = train_data_dir
def_flags['train_steps'] = 200
def_flags['steps_between_evals'] = 200
def_flags['hooks'] = ['ExamplesPerSecondHook']
super(TransformerBaseEstimatorBenchmarkReal, self).__init__(
output_dir=output_dir, default_flags=def_flags)
......@@ -239,7 +239,7 @@ def evaluate_and_log_bleu(estimator, bleu_source, bleu_ref, vocab_file):
def _validate_file(filepath):
"""Make sure that file exists."""
if not tf.gfile.Exists(filepath):
if not tf.io.gfile.exists(filepath):
raise tf.errors.NotFoundError(None, None, "File %s not found." % filepath)
......@@ -279,6 +279,11 @@ def run_loop(
bleu_threshold: minimum BLEU score before training is stopped.
vocab_file: Path to vocab file that will be used to subtokenize bleu_source.
Returns:
Dict of results of the run. Contains the keys `eval_results`,
`train_hooks`, `bleu_cased`, and `bleu_uncased`. `train_hooks` is a list the
instances of hooks used during training.
Raises:
ValueError: if both or none of single_iteration_train_steps and
single_iteration_train_epochs were defined.
......@@ -321,6 +326,7 @@ def run_loop(
schedule_manager.train_eval_iterations = INF
# Loop training/evaluation/bleu cycles
stats = {}
for i in xrange(schedule_manager.train_eval_iterations):
tf.logging.info("Starting iteration %d" % (i + 1))
......@@ -349,6 +355,9 @@ def run_loop(
uncased_score, cased_score = evaluate_and_log_bleu(
estimator, bleu_source, bleu_ref, vocab_file)
stats["bleu_uncased"] = uncased_score
stats["bleu_cased"] = cased_score
# Write actual bleu scores using summary writer and benchmark logger
global_step = get_global_step(estimator)
summary = tf.Summary(value=[
......@@ -367,6 +376,11 @@ def run_loop(
bleu_writer.close()
break
stats["eval_results"] = eval_results
stats["train_hooks"] = train_hooks
return stats
def define_transformer_flags():
"""Add flags and flag validators for running transformer_main."""
......@@ -535,6 +549,11 @@ def run_transformer(flags_obj):
Args:
flags_obj: Object containing parsed flag values.
Returns:
Dict of results of the run. Contains the keys `eval_results`,
`train_hooks`, `bleu_cased`, and `bleu_uncased`. `train_hooks` is a list the
instances of hooks used during training.
"""
num_gpus = flags_core.get_num_gpus(flags_obj)
......@@ -563,6 +582,7 @@ def run_transformer(flags_obj):
params["default_batch_size_tpu"] if params["use_tpu"]
else params["default_batch_size"]))
total_batch_size = params["batch_size"]
if not params["use_tpu"]:
params["batch_size"] = distribution_utils.per_replica_batch_size(
params["batch_size"], num_gpus)
......@@ -588,7 +608,7 @@ def run_transformer(flags_obj):
flags_obj.hooks,
model_dir=flags_obj.model_dir,
tensors_to_log=TENSORS_TO_LOG, # used for logging hooks
batch_size=schedule_manager.batch_size, # for ExamplesPerSecondHook
batch_size=total_batch_size, # for ExamplesPerSecondHook
use_tpu=params["use_tpu"] # Not all hooks can run with TPUs
)
benchmark_logger = logger.get_benchmark_logger()
......@@ -600,7 +620,7 @@ def run_transformer(flags_obj):
# Train and evaluate transformer model
estimator = construct_estimator(flags_obj, params, schedule_manager)
run_loop(
stats = run_loop(
estimator=estimator,
# Training arguments
schedule_manager=schedule_manager,
......@@ -625,6 +645,7 @@ def run_transformer(flags_obj):
flags_obj.export_dir, serving_input_fn,
assets_extra={"vocab.txt": flags_obj.vocab_file},
strip_default_attrs=True)
return stats
def main(_):
......
......@@ -44,7 +44,7 @@ def _get_sorted_inputs(filename):
Sorted list of inputs, and dictionary mapping original index->sorted index
of each element.
"""
with tf.gfile.Open(filename) as f:
with tf.io.gfile.GFile(filename) as f:
records = f.read().split("\n")
inputs = [record.strip() for record in records]
if not inputs[-1]:
......@@ -58,6 +58,7 @@ def _get_sorted_inputs(filename):
for i, (index, _) in enumerate(sorted_input_lens):
sorted_inputs[i] = inputs[index]
sorted_keys[index] = i
return sorted_inputs, sorted_keys
......@@ -126,11 +127,11 @@ def translate_file(
# Write translations in the order they appeared in the original file.
if output_file is not None:
if tf.gfile.IsDirectory(output_file):
if tf.io.gfile.isdir(output_file):
raise ValueError("File output is a directory, will not save outputs to "
"file.")
tf.logging.info("Writing to file %s" % output_file)
with tf.gfile.Open(output_file, "w") as f:
with tf.io.gfile.GFile(output_file, "w") as f:
for i in sorted_keys:
f.write("%s\n" % translations[i])
......
......@@ -179,7 +179,7 @@ class Subtokenizer(object):
def _save_vocab_file(vocab_file, subtoken_list):
"""Save subtokens to file."""
with tf.gfile.Open(vocab_file, mode="w") as f:
with tf.io.gfile.GFile(vocab_file, mode="w") as f:
for subtoken in subtoken_list:
f.write("'%s'\n" % _unicode_to_native(subtoken))
......@@ -190,7 +190,7 @@ def _load_vocab_file(vocab_file, reserved_tokens=None):
reserved_tokens = RESERVED_TOKENS
subtoken_list = []
with tf.gfile.Open(vocab_file, mode="r") as f:
with tf.io.gfile.GFile(vocab_file, mode="r") as f:
for line in f:
subtoken = _native_to_unicode(line.strip())
subtoken = subtoken[1:-1] # Remove surrounding single-quotes
......@@ -339,7 +339,7 @@ def _count_tokens(files, file_byte_limit=1e6):
token_counts = collections.defaultdict(int)
for filepath in files:
with tf.gfile.Open(filepath, mode="r") as reader:
with tf.io.gfile.GFile(filepath, mode="r") as reader:
file_byte_budget = file_byte_limit
counter = 0
lines_to_skip = int(reader.size() / (file_byte_budget * 2))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment