"docs/source/vscode:/vscode.git/clone" did not exist on "540399f54054115b69097aaeb79b1f0b5f4f8e98"
Commit 0ae03c71 authored by sunxx1's avatar sunxx1
Browse files

Merge branch 'main' into 'main'

update TensorFlow and TensorFlow2x test code

See merge request dcutoolkit/deeplearing/dlexamples_new!58
parents f270c43a a7666964
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Setup the data drive with raid, RAM, or mount network drives."""
from __future__ import print_function
import logging
import perfzero.utils as utils
def create_drive_from_devices(data_dir, gce_nvme_raid):
"""Creates a drive at data directory."""
if not gce_nvme_raid:
return
devices = _get_nvme_devices()
cmd = 'mountpoint -q {}'.format(data_dir)
retcode, _ = utils.run_command(cmd)
if retcode:
if len(devices) > 1:
_create_drive_raid(data_dir, devices)
else:
_create_single_drive(data_dir, devices[0])
def _get_nvme_devices():
"""Returns list paths to nvme devices."""
devices = []
cmd = 'lsblk'
retcode, log = utils.run_command(cmd)
if retcode:
raise Exception('"{}" failed with code:{} and log:\n{}'.format(
cmd, retcode, log))
lines = log.splitlines()
if lines:
for line in lines:
if line.startswith('nvme'):
parts = line.split()
devices.append('/dev/' + parts[0].strip())
return devices
def _create_single_drive(data_dir, device):
"""Creates a data drive out of a single device."""
cmds = []
cmds.append('mkfs.ext4 -F {}'.format(device))
cmds.append('mkdir -p {}'.format(data_dir))
cmds.append('mount {} {}'.format(device, data_dir))
cmds.append('chmod a+w {}'.format(data_dir))
utils.run_commands(cmds)
logging.info('Created and mounted device %s at %s', device, data_dir)
def _create_drive_raid(data_dir, devices):
"""Creates a raid zero array of nvme drives."""
cmds = []
# Passing 'yes' because GCE nvme drive are sometimes in an odd state and
# think they are in another raid. mdadm does not have -y option.
# Or the kokoro images were left dirty? and that is where the info
# comes from.
cmds.append('yes | mdadm --create /dev/md0 --level=0 '
'--raid-devices={} {}'.format(
len(devices), ' '.join(devices)))
cmds.append('mkfs.ext4 -F /dev/md0')
cmds.append('mkdir -p {}'.format(data_dir))
cmds.append('mount /dev/md0 {}'.format(data_dir))
cmds.append('chmod a+w {}'.format(data_dir))
utils.run_commands(cmds)
logging.info('Created and mounted RAID array at %s', data_dir)
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""PerfZero configs provided by user."""
from __future__ import print_function
import json
import logging
import os
def add_setup_parser_arguments(parser):
"""Add arguments to the parser used by the setup.py."""
parser.add_argument(
'--dockerfile_path',
default='docker/Dockerfile_ubuntu_1804_tf_v1',
type=str,
help='''Build the docker image using docker file located at the ${pwd}/${dockerfile_path} if
it exists, where ${pwd} is user's current work directory. Otherwise, build
the docker image using the docker file located at path_to_perfzero/${dockerfile_path}.
''')
parser.add_argument(
'--workspace',
default='workspace',
type=str,
help='''The gcloud key file will be downloaded under directory path_to_perfzero/${workspace}
''')
parser.add_argument(
'--gcloud_key_file_url',
default='',
type=str,
help='''DEPRECATED: Use --gcloud_key_file_url of setup.py instead.
The gcloud key file url. When specified, it will be downloaded to the
directory specified by the flag --workspace. Each url can start with file://, gcs://, http:// or https://.
''')
parser.add_argument(
'--root_data_dir',
default='/data',
type=str,
help='The directory which should contain the dataset required by the becnhmark method.'
)
parser.add_argument(
'--gce_nvme_raid',
default=None,
type=str,
help='If set to non-empty string, create raid 0 array with devices at the directory specified by the flag --root_data_dir'
)
parser.add_argument(
'--tensorflow_pip_spec',
default=None,
type=str,
help='''The tensorflow pip package specfication. The format can be either ${package_name}, or ${package_name}==${package_version}.
Example values include tf-nightly-gpu, and tensorflow==1.12.0. If it is specified, the corresponding tensorflow pip package/version
will be installed. Otherwise, the default tensorflow pip package specified in the docker file will be installed.
''')
parser.add_argument(
'--extra_pip_specs',
default='',
type=str,
help='''Additional specifications to pass to `pip install`. (e.g. pinning certain dependencies)
Specifications should be semicolon separated: e.g. `numpy==1.16.4;scipy==1.3.1`
''')
parser.add_argument(
'--docker_tag',
default='perfzero/tensorflow',
type=str,
help='The docker tag to use if building a docker image.'
)
parser.add_argument(
'--site_package_downloads',
default='',
type=str,
help='''Comma separated list of dirs in the external vm to copy to the docker\'s site package dir.
Format: <absolute-path>/src/dir:new_base_dir_name,<absolute-path>/src/dir2>:new_name,....
This will copy <absolute-path>/src/dir to <site-packages>/new_base_dir_name.
'''
)
parser.add_argument(
'--extra_docker_build_args',
nargs='*',
default='',
type=str,
help='''Additional build-args to pass to `docker build`.
Example: --extra_docker_build_args arg0 arg1=value1 "arg2=value with space" arg3=300.
Each string will be passed directly as a build-arg to docker, so the above example will be passed as follows:
--build-arg arg0 --build-arg arg1=value1 --build-arg "arg2=value with space" --build-arg arg3=300
'''
)
def add_benchmark_parser_arguments(parser):
"""Add arguments to the parser used by the benchmark.py."""
parser.add_argument(
'--use_cached_site_packages',
action='store_true',
help='If set, skip git pull for dependent git repositories if it already exists in path_to_perfzero/${workspace}/site-packages'
)
parser.add_argument(
'--gcs_downloads',
default=None,
type=str,
help='This flag is deprecated. Use the flag --data_downloads instead')
parser.add_argument(
'--git_repos',
default=None,
type=str,
help='''A string representing git repositories to checkout. The format is url_1;branch_1;hash_1,url_2;branch_2;hash_2,...
Git repositories will be checked-out under directory path_to_perfzero/${workspace}/site-packages,
where ${workspace} either defaults to 'workspace', or takes the value of the flag --workspace.
branch and hash can be skipped if user wants to use the head of the master branch,
which shortens the format to url_1,url_2,...
''')
parser.add_argument(
'--benchmark_num_trials',
default=1,
type=int,
help='''Configures number of times to run each benchmark method
after setup completion.''')
parser.add_argument(
'--benchmark_methods',
action='append',
default=[],
type=str,
help='''This string specifies the benchmark_method to be executed. The flag can be specified multiple times in which case
the union of methods matched by these flags will be executed. The format can be module_path.class_name.method_name in which
case the corresponding method is executed. The format can also be module_path.class_name.filter:regex_pattern, in which case all methods
of the given class whose method name matches the given regular expression are executed.
''')
parser.add_argument(
'--ml_framework_build_label',
default=None,
type=str,
help='A string that identified the machine learning framework build, e.g. nightly-gpu-build'
)
parser.add_argument(
'--execution_label',
default=None,
type=str,
help='A string that identified the benchmark execution type, e.g. test, prod'
)
parser.add_argument(
'--platform_name',
default=None,
type=str,
help='A string that identified the computing platform, e.g. gcp, aws'
)
parser.add_argument(
'--system_name',
default=None,
type=str,
help='A string that identified the hardware system, e.g. n1-standard-64-8xV100'
)
parser.add_argument(
'--output_gcs_url',
default=None,
type=str,
help='''If specified, log files generated by the benchmark execution will be uploaded to output_gcs_url/${execution_id},
where ${execution_id} is a string that generated by PerfZero which uniquely identifies the execution of one benchmark method
''')
parser.add_argument(
'--scratch_gcs_url',
default=None,
type=str,
help='''If specified, intermediate files like model outputs will be stored in scratch_gcs_url/${execution_id}, where
${execution_id} is a string that is generated by PerfZero which uniquely identifies the execution of one benchmark method.
If not specified, intermediate files will be stored in a local folder on the host.
''')
parser.add_argument(
'--bigquery_project_name',
default=None,
type=str,
help='''If both --bigquery_project_name and --bigquery_dataset_table_name are specified, for each benchmark method, the benchmark
summary will be uploaded to the specified bigquery table whose schema is defined in perfzero/scripts/create_big_table.txt.
The value of each field can in turn be a json-formatted string. See README.md for example output.
''')
parser.add_argument(
'--bigquery_dataset_table_name',
default=None,
type=str,
help='''If both --bigquery_project_name and --bigquery_dataset_table_name are specified, for each benchmark method, the benchmark
summary will be uploaded to the specified bigquery table whose schema is defined in perfzero/scripts/create_big_table.txt.
The value of each field can in turn be a json-formatted string. See README.md for example output.
''')
parser.add_argument(
'--python_path',
default=None,
type=str,
help='''A string of format path_1,path_2,... For each ${path} specified in the string,
path_to_perfzero/${workspace}/site-packages/${path} will be added to python path so that libraies downloaded by --git_repos can
be loaded and executed.
''')
parser.add_argument(
'--workspace',
default='workspace',
type=str,
help='''The log files, gcloud key file and git repositories will be generated and downloaded under the
directory path_to_perfzero/${workspace}
''')
parser.add_argument(
'--root_data_dir',
default='/data',
type=str,
help='The directory which should contain the dataset required by the becnhmark method.'
)
parser.add_argument(
'--data_downloads',
default=None,
type=str,
help='''A string of format url_1;relative_path_1,url_2;relative_path_2,...
Data will be copied from ${url} to ${root_data_dir}/${relative_path}. ${relative_path} can be skipped if it is the same as the
base name of the url, which shortens the format to url_1,url_2,... ${root_data_dir} is specified by the flag --root_data_dir.
File will be de-compressed in ${root_data_dir} if its name ends with 'gz'. Only url prefixed with gcs, http or https are supported.
Each url can start with file://, gcs://, http:// or https://.
''')
parser.add_argument(
'--gcloud_key_file_url',
default='gs://tf-performance/auth_tokens/benchmark_upload_gce.json',
type=str,
help='''The gcloud key file url. When specified, it will be downloaded to the
directory specified by the flag --workspace. Each url can start with file://, gcs://, http:// or https://.
The key file will then be activated and used as gcloud authentication credential.
''')
parser.add_argument(
'--debug',
action='store_true',
help='If set, use debug level logging. Otherwise, use info level logging'
)
parser.add_argument(
'--profiler_enabled_time',
default=None,
type=str,
help='''A string of format begin_time_1:end_time_1,begin_time_2:end_time_2,.... PerfZero will start to collect profiler
data ${begin_time} sec after benchmark method execution starts. The data collection continues for ${end_time - begin_time}
sec or until the benchmark method execution finishes, whichever occurs first. If ${end_time} is not explicitly
specified, it is assumed to be MAX_LONG.
''')
parser.add_argument(
'--execution_id',
default=None,
type=str,
help='A string that uniquely identifies the benchmark execution.')
parser.add_argument(
'--result_upload_methods',
default=None,
type=str,
help='A comma separated list of class.method values to upload results.')
parser.add_argument(
'--tpu_parameters',
default=None,
type=str,
help='''A json dictionary of cloud tpu parameters. The format must look like the following:
{"name": "my-tpu-name", project": "my-gcp-project-id", "zone": "europe-west4-a", "size": "v3-8", "version": "nightly-2.x"}
It can have an optional key value pair "version_id" -> "nightly version" to change the tpu version id.
Example "version_id": "2.4.0-dev20200728".
''')
parser.add_argument(
'--perfzero_constructor_args',
nargs='*',
default='',
type=str,
help='''A json dictionary of additional args to pass to the perfzero
constructor.'''
)
parser.add_argument(
'--benchmark_class_type',
default=None,
type=str,
help='''The benchmark class type. If none, assumed perfzero_benchmark. Set to "tf_benchmark"
for tf.test.Benchmark benchmarks.''')
class PerfZeroConfig(object):
"""Creates and contains config for PerfZero."""
def __init__(self, mode, flags=None):
self.mode = mode
self.flags = flags
if mode == 'flags':
self.gcs_downloads_str = flags.gcs_downloads
self.data_downloads_str = flags.data_downloads
self.git_repos_str = flags.git_repos
self.benchmark_method_patterns = []
for value in flags.benchmark_methods:
self.benchmark_method_patterns.extend(value.split(','))
self.ml_framework_build_label = flags.ml_framework_build_label
self.execution_label = flags.execution_label
self.platform_name = flags.platform_name
self.system_name = flags.system_name
self.output_gcs_url = flags.output_gcs_url
self.scratch_gcs_url = flags.scratch_gcs_url
self.bigquery_project_name = flags.bigquery_project_name
self.bigquery_dataset_table_name = flags.bigquery_dataset_table_name
self.python_path_str = flags.python_path
self.workspace = flags.workspace
self.benchmark_class_type = flags.benchmark_class_type
self.use_cached_site_packages = flags.use_cached_site_packages
self.root_data_dir = flags.root_data_dir
self.gcloud_key_file_url = flags.gcloud_key_file_url
self.profiler_enabled_time_str = flags.profiler_enabled_time
self.execution_id = flags.execution_id
self.result_upload_methods = flags.result_upload_methods
self.perfzero_constructor_args = flags.perfzero_constructor_args
self.benchmark_num_trials = flags.benchmark_num_trials
if flags.tpu_parameters:
self.tpu_parameters = json.loads(flags.tpu_parameters)
else:
self.tpu_parameters = None
if not flags.benchmark_methods:
logging.warning('No benchmark method is specified by '
'--benchmark_methods')
if flags.bigquery_project_name and not flags.bigquery_dataset_table_name:
raise ValueError('--bigquery_project_name is specified but '
'--bigquery_dataset_table_name is not')
if not flags.bigquery_project_name and flags.bigquery_dataset_table_name:
raise ValueError('--bigquery_dataset_table_name is specified but '
'--bigquery_project_name is not')
def get_env_vars(self):
env_vars = {}
for key in os.environ.keys():
if key.startswith('PERFZERO_'):
env_vars[key] = os.environ[key]
return env_vars
def get_flags(self):
not_none_flags = {}
for key in vars(self.flags):
value = getattr(self.flags, key)
if value is not None:
not_none_flags[key] = value
return not_none_flags
def get_git_repos(self, site_packages_dir):
"""Parse git repository string."""
git_repos = []
if not self.git_repos_str:
return git_repos
for repo_entry in self.git_repos_str.split(','):
parts = repo_entry.split(';')
git_repo = {}
git_repo['url'] = parts[0]
# Assume the git url has format */{dir_name}.git
git_repo['dir_name'] = parts[0].rsplit('/', 1)[-1].rsplit('.', 1)[0]
git_repo['local_path'] = os.path.join(site_packages_dir,
git_repo['dir_name'])
if len(parts) >= 2:
git_repo['branch'] = parts[1]
if len(parts) >= 3:
git_repo['git_hash'] = parts[2]
git_repos.append(git_repo)
return git_repos
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests for perfzero_config.py."""
from __future__ import print_function
import os
import unittest
import perfzero.perfzero_config as perfzero_config
class TestPerfZeroConfig(unittest.TestCase):
def test_get_git_repos(self):
config = perfzero_config.PerfZeroConfig(mode='mock')
config.git_repos_str = 'https://github.com/tensorflow/benchmarks.git;branch_1;hash_1,https://github.com/tensorflow/models.git;branch_2'
git_repos = config.get_git_repos('/site_package_dir')
git_repo_1 = {}
git_repo_1['url'] = 'https://github.com/tensorflow/benchmarks.git'
git_repo_1['dir_name'] = 'benchmarks'
git_repo_1['local_path'] = '/site_package_dir/benchmarks'
git_repo_1['branch'] = 'branch_1'
git_repo_1['git_hash'] = 'hash_1'
git_repo_2 = {}
git_repo_2['url'] = 'https://github.com/tensorflow/models.git'
git_repo_2['dir_name'] = 'models'
git_repo_2['local_path'] = '/site_package_dir/models'
git_repo_2['branch'] = 'branch_2'
self.assertEqual(2, len(git_repos))
self.assertEqual(git_repo_1, git_repos[0])
self.assertEqual(git_repo_2, git_repos[1])
def test_get_env_vars(self):
config = perfzero_config.PerfZeroConfig(mode='mock')
self.assertEqual({}, config.get_env_vars())
os.environ['PERFZERO_VAR1'] = 'value1'
self.assertEqual({'PERFZERO_VAR1': 'value1'}, config.get_env_vars())
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Keep track of process information such as maximum memory usage with a separate thread."""
from __future__ import absolute_import
import json
import logging
import os
import sched
import threading
import time
import traceback
import psutil
class ProcessInfoTracker(object):
"""Keep track of process information such as maximum memory usage with separate thread."""
def __init__(self, output_dir):
self.process_info_log = open(os.path.join(output_dir, 'process_info.log'),
'w')
self.scheduler = sched.scheduler(time.time, time.sleep)
self.process_info = {}
self.process_info['max_rss'] = 0
self.process_info['max_vms'] = 0
self.process_info['max_cpu_percent'] = 0
self.exit_event = threading.Event()
self.last_exception = None
self.start_time = None
def start(self):
self.start_time = time.time()
# 4th positional arg added to support Python2 for the short-term.
self.scheduler.enter(1, 1, self._update_process_info, ()) # pylint: disable=no-value-for-parameter
threading.Thread(target=self.scheduler.run).start()
logging.info('Started process information tracker.')
def stop(self):
self.exit_event.set()
self.process_info_log.flush()
logging.info('Stopped process information tracker.')
if self.last_exception is not None:
raise self.last_exception # pylint: disable=raising-bad-type
return dict(self.process_info)
def _update_process_info(self):
"""Read and update process info using background thread every 1 second."""
try:
p = psutil.Process(os.getpid())
memory_info = p.memory_info()
# This is a blocking call which takes 0.1 second.
# This affects the interval # at which the metrics are reported
cpu_percent = p.cpu_percent(interval=0.1)
self.process_info['max_rss'] = max(self.process_info['max_rss'],
memory_info.rss)
self.process_info['max_vms'] = max(self.process_info['max_vms'],
memory_info.vms)
self.process_info['max_cpu_percent'] = max(
self.process_info['max_cpu_percent'], cpu_percent)
entry = {}
entry['time'] = time.time() - self.start_time
entry['rss'] = memory_info.rss
entry['vms'] = memory_info.vms
entry['cpu_percent'] = cpu_percent
self.process_info_log.write(json.dumps(entry) + '\n')
if not self.exit_event.is_set():
# Schedule the next event to be run after 1 second
# 4th positional arg added to support Python2 for the short-term.
self.scheduler.enter(1, 1, self._update_process_info, ()) # pylint: disable=no-value-for-parameter
except Exception as e: # pylint: disable=broad-except
logging.error('Process info tracker failed due to error:\n %s',
traceback.format_exc())
self.last_exception = e
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Upload test results."""
from __future__ import print_function
import importlib
import json
import logging
import perfzero.utils as utils
import psutil
import socket
from six import u as unicode # pylint: disable=W0622
def execute_methods(method_names_str, *args, **kwargs):
"""Calls a list of method names on given function params.
Args:
method_names_str: String - Comma-separated module.foo.bar.method strings.
This function imports module.foo.bar for each such method and calls it
with *args and **kwargs.
*args: Function params common to each method.
**kwargs: Function params common to each method.
Raises:
RuntimeError: If any of the invoked methods raised an exception.
"""
if not method_names_str:
return
errors = []
module_methods_list = method_names_str.split(',')
for module_method in module_methods_list:
try:
logging.info('Trying to call %s', module_method)
module_path, method_path = module_method.rsplit('.', 1)
this_module = importlib.import_module(module_path)
logging.info('Found module %s, looking for %s', module_path, method_path)
this_method = getattr(this_module, method_path)
logging.info('Found method %s', method_path)
this_method(*args, **kwargs)
except Exception as e: # pylint: disable=broad-except
errors.append(str(e))
if errors:
raise RuntimeError('\n' + '\n'.join(errors))
def upload_execution_summary(bigquery_project_name, bigquery_dataset_table_name,
execution_summary):
"""Upload benchmark summary.
Note: Using stream=False has a 1000 per day insert limit per table. Using
stream=True, the documented limit is 50K+. With streaming there can be
a small and possibly not noticeable delay to seeing the results the BigQuery
UI, but there can be a 90 minute more or less delay in the results being part
of exports.
Note: BigQuery maps unicode() to STRING for python2. If str is used that is
mapped to BYTE.
Args:
bigquery_project_name: Name of the gcp project.
bigquery_dataset_table_name: data_set and table name.
execution_summary: benchmark summary dictionary of results.
"""
# pylint: disable=C6204
import google.auth
from google.cloud import bigquery
if not bigquery_project_name:
logging.info(
'Skipped uploading benchmark result to bigquery because bigquery table name is not set.'
)
return
if not bigquery_dataset_table_name:
logging.info(
'Skipped uploading benchmark result to bigquery because bigquery project name is not set.'
)
return
credentials = google.auth.default()[0]
dataset_name = bigquery_dataset_table_name.split('.')[0]
table_name = bigquery_dataset_table_name.split('.')[1]
client = bigquery.Client(
project=bigquery_project_name, credentials=credentials)
benchmark_summary_input = {}
for key, value in execution_summary.items():
if isinstance(value, dict):
benchmark_summary_input[key] = unicode(json.dumps(value))
else:
benchmark_summary_input[key] = unicode(value)
logging.debug('Bigquery input for benchmark_summary table is %s',
json.dumps(benchmark_summary_input, indent=2))
errors = []
# TODO(tobyboyd): Shim to direct results to new table until all jobs
# are updated.
if 'benchmark_results' in dataset_name:
if dataset_name == 'benchmark_results_dev':
table_ref = client.dataset('perfzero_dev').table('benchmark_summary')
table_obj = client.get_table(table_ref)
elif dataset_name == 'benchmark_results':
table_ref = client.dataset('perfzero').table('benchmark_summary')
table_obj = client.get_table(table_ref)
else:
table_ref = client.dataset(dataset_name).table(table_name)
table_obj = client.get_table(table_ref)
errors.extend(client.insert_rows(table_obj, [benchmark_summary_input]))
if errors:
logging.error(
'Failed to upload benchmark result to bigquery due to errors %s',
errors)
else:
logging.info(
'Uploaded benchmark result to the table %s of the bigquery project %s.',
bigquery_dataset_table_name,
bigquery_project_name)
def build_benchmark_result(raw_benchmark_result, has_exception, trial_id):
"""Converts test_log.proto format to PerfZero format."""
benchmark_result = {}
benchmark_result['name'] = raw_benchmark_result['name']
benchmark_result['wall_time'] = raw_benchmark_result['wall_time']
succeeded = not has_exception
extras = []
for name in raw_benchmark_result.get('extras', {}):
entry = {}
entry['name'] = name
if 'double_value' in raw_benchmark_result['extras'][name]:
entry['value'] = raw_benchmark_result['extras'][name]['double_value']
else:
entry['value'] = raw_benchmark_result['extras'][name]['string_value']
extras.append(entry)
metrics = []
for metric in raw_benchmark_result.get('metrics', []):
value = metric['value']
if 'min_value' in metric and metric['min_value'] > value:
succeeded = False
if 'max_value' in metric and metric['max_value'] < value:
succeeded = False
metrics.append(metric)
benchmark_result['succeeded'] = succeeded
benchmark_result['extras'] = extras
benchmark_result['metrics'] = metrics
benchmark_result['trial_id'] = trial_id
return benchmark_result
def build_execution_summary(execution_timestamp, execution_id,
ml_framework_build_label, execution_label,
platform_name, system_name, output_gcs_url,
benchmark_result, env_vars, flags, harness_info,
site_package_info, process_info, has_exception,
is_tpu_benchmark):
"""Builds summary of the execution."""
# Avoids module not found during setup phase when tf is not installed yet.
# pylint: disable=C6204
import tensorflow as tf
benchmark_info = {}
benchmark_info['harness_name'] = 'perfzero'
benchmark_info['harness_info'] = harness_info
benchmark_info['has_exception'] = has_exception
if execution_label:
benchmark_info['execution_label'] = execution_label
if output_gcs_url:
benchmark_info['output_url'] = '{}/{}/'.format(output_gcs_url, execution_id)
if env_vars:
benchmark_info['env_vars'] = env_vars
if flags:
benchmark_info['flags'] = flags
benchmark_info['site_package_info'] = site_package_info
ml_framework_info = {}
ml_framework_info['name'] = 'tensorflow'
ml_framework_info['version'] = tf.__version__
# tf.__git_version__ in Python3 has format b'version_string'
if tf.__git_version__[0] == 'b':
ml_framework_info['build_version'] = tf.__git_version__[2:-1]
else:
ml_framework_info['build_version'] = tf.__git_version__
if ml_framework_build_label:
ml_framework_info['build_label'] = ml_framework_build_label
system_info = {}
if platform_name:
system_info['platform_name'] = platform_name
if system_name:
system_info['system_name'] = system_name
if not is_tpu_benchmark:
gpu_info = utils.get_gpu_info()
if gpu_info:
system_info['accelerator_driver_version'] = gpu_info['gpu_driver_version']
system_info['accelerator_model'] = gpu_info['gpu_model']
system_info['accelerator_count'] = gpu_info['gpu_count']
system_info['cpu_model'] = utils.get_cpu_name()
system_info['physical_cpu_count'] = psutil.cpu_count(logical=False)
system_info['logical_cpu_count'] = psutil.cpu_count(logical=True)
system_info['cpu_socket_count'] = utils.get_cpu_socket_count()
system_info['hostname'] = socket.gethostname()
execution_summary = {}
execution_summary['execution_id'] = execution_id
execution_summary['execution_timestamp'] = execution_timestamp
execution_summary['benchmark_result'] = benchmark_result
execution_summary['benchmark_info'] = benchmark_info
execution_summary['setup_info'] = {}
execution_summary['ml_framework_info'] = ml_framework_info
execution_summary['system_info'] = system_info
if process_info:
execution_summary['process_info'] = process_info
return execution_summary
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Collect profiler data for Tensorboard with a separate thread."""
from __future__ import print_function
import logging
import os
import sched
import threading
import time
import traceback
import perfzero.utils as utils
def _start_profiler(output_dir):
"""Start profiler.
Args:
output_dir: log directory to place the profiler data
"""
import tensorflow as tf # pylint: disable=g-import-not-at-top
profiler_data_dir = os.path.join(output_dir, 'profiler_data')
utils.make_dir_if_not_exist(profiler_data_dir)
logging.info('Starting TensorFlow profiler and saving data to dir %s',
profiler_data_dir)
try:
tf.profiler.experimental.start(profiler_data_dir)
logging.info('Started TensorFlow profiler')
except Exception: # pylint: disable=broad-except
logging.error('TensorFlow profiler failed to start due to error:\n %s',
traceback.format_exc())
def _stop_profiler():
"""Stop profiler."""
import tensorflow as tf # pylint: disable=g-import-not-at-top
try:
tf.profiler.experimental.stop()
logging.info('Stopped TensorFlow profiler.')
except Exception: # pylint: disable=broad-except
logging.error('TensorFlow profiler failed to stop due to error:\n %s',
traceback.format_exc())
class TensorFlowProfiler(object):
"""Collect profiler data for Tensorboard with a separate thread."""
def __init__(self, profiler_enabled_time_str, output_dir):
"""Constructor.
Args:
profiler_enabled_time_str: the value of the config --profiler_enabled_time
output_dir: log directory to place the profiler data
"""
self.profiler_enabled_time_str = profiler_enabled_time_str
self.output_dir = output_dir
self.exit_event = threading.Event()
self.scheduler = sched.scheduler(time.time, self._sleep_until_exit)
def _sleep_until_exit(self, timeout):
start_time = time.time()
cur_time = time.time()
while cur_time - start_time < timeout and not self.exit_event.is_set():
time.sleep(min(1, timeout + start_time - cur_time))
cur_time = time.time()
def start(self):
"""Schedule start/stop profiler event specified in profiler_enabled_time_str."""
if not self.profiler_enabled_time_str:
return
last_end_time = -1
for time_str in self.profiler_enabled_time_str.split(','):
begin_time = int(time_str.split(':')[0].strip())
end_time_str = time_str.split(':')[1].strip() if ':' in time_str else None
end_time = int(end_time_str) if end_time_str else 365 * 24 * 60 * 60
if begin_time <= last_end_time:
raise ValueError('begin_time {} is no larger than the last '
'end_time {}'.format(begin_time, last_end_time))
if end_time <= begin_time:
raise ValueError('end_time {} is no larger than begin_time {}'.format(
end_time, begin_time))
# 4th positional arg added to support Python2 for the short-term.
self.scheduler.enter(begin_time, 1, _start_profiler,
argument=(self.output_dir,))
self.scheduler.enter(end_time, 1, _stop_profiler, ()) # pylint: disable=no-value-for-parameter
last_end_time = end_time
threading.Thread(target=self.scheduler.run).start()
def stop(self):
"""Stop scheduler and save profiler data if any event is cancelled."""
event_canceled = False
for event in self.scheduler.queue:
try:
self.scheduler.cancel(event)
event_canceled = True
except ValueError:
# This is OK because the event may have been just canceled
pass
# Signal the scheduler thread to stop sleeping
self.exit_event.set()
# Save the profiler data if any event is canceled
if event_canceled:
_stop_profiler()
Tue Jan 9 09:34:25 2018
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 384.81 Driver Version: 384.81 |
|-------------------------------+----------------------+----------------------+
| GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC |
| Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. |
|===============================+======================+======================|
| 0 Tesla P100-SXM2... On | 00000000:06:00.0 Off | 0 |
| N/A 50C P0 196W / 300W | 15643MiB / 16276MiB | 97% Default |
+-------------------------------+----------------------+----------------------+
| 1 Tesla P100-SXM2... On | 00000000:07:00.0 Off | 0 |
| N/A 41C P0 50W / 300W | 15483MiB / 16276MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 2 Tesla P100-SXM2... On | 00000000:0A:00.0 Off | 0 |
| N/A 33C P0 48W / 300W | 15483MiB / 16276MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 3 Tesla P100-SXM2... On | 00000000:0B:00.0 Off | 0 |
| N/A 34C P0 49W / 300W | 15483MiB / 16276MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 4 Tesla P100-SXM2... On | 00000000:85:00.0 Off | 0 |
| N/A 36C P0 50W / 300W | 15483MiB / 16276MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 5 Tesla P100-SXM2... On | 00000000:86:00.0 Off | 0 |
| N/A 33C P0 48W / 300W | 15483MiB / 16276MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 6 Tesla P100-SXM2... On | 00000000:89:00.0 Off | 0 |
| N/A 38C P0 48W / 300W | 15483MiB / 16276MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 7 Tesla P100-SXM2... On | 00000000:8A:00.0 Off | 0 |
| N/A 34C P0 49W / 300W | 15483MiB / 16276MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
+-----------------------------------------------------------------------------+
| Processes: GPU Memory |
| GPU PID Type Process name Usage |
|=============================================================================|
| No running processes found |
+-----------------------------------------------------------------------------+
Tue Jan 9 09:34:25 2018
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 384.81 Driver Version: 384.81 |
|-------------------------------+----------------------+----------------------+
| GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC |
| Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. |
|===============================+======================+======================|
| 0 Tesla P100-SXM2... On | 00000000:06:00.0 Off | 0 |
| N/A 50C P0 196W / 300W | 15643MiB / 16276MiB | 97% Default |
+-------------------------------+----------------------+----------------------+
| 1 Tesla P100-SXM2... On | 00000000:07:00.0 Off | 0 |
| N/A 41C P0 50W / 300W | 15483MiB / 16276MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 2 Tesla P100-SXM2... On | 00000000:0A:00.0 Off | 0 |
| N/A 33C P0 48W / 300W | 15483MiB / 16276MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 3 Tesla P100-SXM2... On | 00000000:0B:00.0 Off | 0 |
| N/A 34C P0 49W / 300W | 15483MiB / 16276MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 4 Tesla P100-SXM2... On | 00000000:85:00.0 Off | 0 |
| N/A 36C P0 50W / 300W | 15483MiB / 16276MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 5 Tesla P100-SXM2... On | 00000000:86:00.0 Off | 0 |
| N/A 33C P0 48W / 300W | 15483MiB / 16276MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 6 Tesla P100-SXM2... On | 00000000:89:00.0 Off | 0 |
| N/A 38C P0 48W / 300W | 15483MiB / 16276MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
| 7 Tesla P100-SXM2... On | 00000000:8A:00.0 Off | 0 |
| N/A 34C P0 49W / 300W | 15483MiB / 16276MiB | 0% Default |
+-------------------------------+----------------------+----------------------+
+-----------------------------------------------------------------------------+
| Processes: GPU Memory |
| GPU PID Type Process name Usage |
|=============================================================================|
| 0 44454 C /usr/bin/python 15631MiB |
| 1 44454 C /usr/bin/python 15471MiB |
| 2 44454 C /usr/bin/python 15471MiB |
| 3 44454 C /usr/bin/python 15471MiB |
+-----------------------------------------------------------------------------+
NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT
nvme0n8 259:7 0 375G 0 disk
nvme0n6 259:5 0 375G 0 disk
sdb 8:16 0 50G 0 disk
└─sdb1 8:17 0 50G 0 part /tmpfs
nvme0n4 259:3 0 375G 0 disk
nvme0n2 259:1 0 375G 0 disk
nvme0n7 259:6 0 375G 0 disk
nvme0n5 259:4 0 375G 0 disk
sda 8:0 0 100G 0 disk
└─sda1 8:1 0 100G 0 part /
nvme0n3 259:2 0 375G 0 disk
nvme0n1 259:0 0 375G 0 disk
"""Utility to manage the tpu version before starting the benchmark."""
import json
from absl import logging
from six.moves.urllib import request
try:
from cloud_tpu_client import client # pylint: disable=g-import-not-at-top
except ImportError:
print(
'Falling back to TensorFlow client; we recommended you install the Cloud '
'TPU client directly with pip install cloud-tpu-client.')
from tensorflow.python.tpu.client import client # pylint: disable=g-import-not-at-top
def _as_text(s):
"""Converts a byte/string into string."""
if isinstance(s, bytes):
return s.decode('utf-8')
return s
def _get_content(url):
"""Opens the url and loads the response into json."""
logging.info('opening url %s', url)
req = request.Request(url)
resp = request.urlopen(req)
resp_text = _as_text(resp.read())
logging.info('response text = %s', resp_text)
return json.loads(resp_text)
def _get_version_info(url, version_label):
"""Constructs a version info from the response."""
json_data = _get_content(url)
logging.info('json_data = %s', json_data)
if 'currentVersion' in json_data:
commit_id = json_data['currentVersion']
elif 'buildLabel' in json_data:
commit_id = json_data['buildLabel']
else:
commit_id = ''
info = {
'url': '',
'hash': commit_id,
'branch': version_label,
'piper_id': json_data.get('piperOriginRevId', '')
}
return info
def _configure_tpu_version(tpu_name, version_label, new_version_id):
"""Returns the current tpu version after resetting to an optional version."""
# The tpu_name is arbitrary / user chosen unique string for this tpu.
logging.info('Trying to connect to tpu %s', tpu_name)
tpu_client = client.Client(tpu=tpu_name)
tpu_client.wait_for_healthy()
if new_version_id:
logging.info('Trying to reset tpu version to %s', new_version_id)
tpu_client.configure_tpu_version(version=new_version_id)
tpu_client.wait_for_healthy()
logging.info('TPU healthy after version reset.')
else:
logging.info('Using the default tpu version id.')
workers = tpu_client.network_endpoints()
if workers:
ip_addr = workers[0]['ipAddress']
url = 'http://{}:8475/requestversion'.format(ip_addr)
return _get_version_info(url, version_label)
else:
logging.error('No tpu endpoint info')
return {
'url': '',
'hash': '',
'branch': version_label,
'piper_id': '',
}
def configure_tpu(tpu_params):
return _configure_tpu_version(
tpu_params.get('name'),
version_label=tpu_params.get('version'),
new_version_id=tpu_params.get('version_id'))
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""PerfZero utility methods."""
from __future__ import print_function
import importlib
import logging
import os
import shutil
import subprocess
import sys
import threading
import traceback
import requests
import json
import re
def create_empty_file(parent_directory, file_basename):
"""Creates an empty file with a given basename in a parent directory.
Creates parent_directory and intermediate directories if it doesn't exist.
This is mostly used for creating no-op actions in the Dockerfile.
Args:
parent_directory: The path to the parent directory.
file_basename: The basename for the empty file.
"""
if not os.path.isdir(parent_directory):
os.makedirs(parent_directory)
full_file_name = os.path.join(parent_directory, file_basename)
with open(full_file_name, 'w'):
print('Creating empty file: {}'.format(full_file_name))
def checkout_git_repos(git_repos, use_cached_site_packages):
"""Clone, update, or sync a repo.
Args:
git_repos: array of dict containing attributes of the git repo to checkout.
use_cached_site_packages: If true, skip git pull if git_repo already exists.
Returns:
A dict containing attributes of the git repositories
"""
site_package_info = {}
for repo in git_repos:
logging.info('Checking out repository from %s to %s',
repo['url'], repo['local_path'])
if not os.path.isdir(repo['local_path']):
run_commands(['git clone {} {}'.format(repo['url'], repo['local_path'])])
if 'branch' in repo:
run_commands(['git -C {} checkout {}'.format(
repo['local_path'], repo['branch'])])
if not use_cached_site_packages or 'git_hash' in repo:
run_commands(['git -C {} pull --rebase'.format(repo['local_path'])])
if 'git_hash' in repo:
run_commands(['git -C {} reset --hard {}'.format(
repo['local_path'], repo['git_hash'])])
logging.info('Checked-out repository from %s to %s',
repo['url'], repo['local_path'])
site_package_info[repo['dir_name']] = get_git_repo_info(repo['local_path'])
return site_package_info
def get_git_repo_info(local_path):
"""Get information of the git repository specified by the local_path."""
git_repo_info = {}
# Get git url
cmd = 'git -C {} config --get remote.origin.url'.format(local_path)
exit_code, result = run_command(cmd)
lines = result.splitlines()
if exit_code == 0 and lines:
git_repo_info['url'] = lines[0]
else:
logging.error('Error getting git url for repository %s due to %s',
local_path, result)
return {}
# Get git branch
cmd = 'git -C {} rev-parse --abbrev-ref HEAD'.format(local_path)
exit_code, result = run_command(cmd)
lines = result.splitlines()
if exit_code == 0 and lines:
git_repo_info['branch'] = lines[0]
else:
logging.error('Error getting git branch for repository %s due to %s',
local_path, result)
return {}
# Get git hash
cmd = 'git -C {} rev-parse HEAD'.format(local_path)
exit_code, result = run_command(cmd)
lines = result.splitlines()
if exit_code == 0 and lines:
git_repo_info['hash'] = lines[0]
else:
logging.error('Error getting git hash for repository %s due to %s',
local_path, result)
return {}
return git_repo_info
def setup_python_path(site_packages_dir, python_path_str):
if python_path_str:
python_paths = python_path_str.split(',')
for python_path in python_paths:
logging.info('Adding path %s to sys.path', python_path)
sys.path.append(os.path.join(site_packages_dir, python_path))
logging.debug('PYTHONPATH: %s', sys.path)
def active_gcloud_service(gcloud_key_file_url, workspace_dir,
download_only=False):
"""Download key file and setup gcloud service credential using the key file.
Args:
gcloud_key_file_url: gcloud key file url
workspace_dir: directory that the key file is downloaded to
download_only: skip setting up the gcloud service credential if this is true
"""
if not gcloud_key_file_url:
return
local_path = os.path.join(workspace_dir,
os.path.basename(gcloud_key_file_url))
if not os.path.exists(local_path):
download_data([{'url': gcloud_key_file_url, 'local_path': local_path}])
if not download_only:
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = local_path
run_commands(['gcloud auth activate-service-account --key-file {}'.format(
local_path)])
logging.info('Activated gcloud service account credential')
def setup_gsutil_credential():
run_commands(['gcloud config set pass_credentials_to_gsutil true'])
def download_data(download_infos):
"""Download data from url to local_path for each (url, local_path) pair in the download_infos.
Each url should start with either gs://, http:// or https://
Downloaded file whose name ends with .gz will be decompressed in its
current directory
Args:
download_infos: array of dict which specifies the url and local_path for
data download
"""
for info in download_infos:
if os.path.exists(info['local_path']):
continue
original_base_name = os.path.basename(info['url'])
expected_base_name = os.path.basename(info['local_path'])
local_path_parent = os.path.dirname(info['local_path'])
logging.info('Downloading data from %s to %s',
info['url'], info['local_path'])
make_dir_if_not_exist(local_path_parent)
# Download data to the local path
if info['url'].startswith('http://') or info['url'].startswith('https://'):
request = requests.get(info['url'], allow_redirects=True)
f = open(info['local_path'], 'wb')
f.write(request.content)
f.close()
elif info['url'].startswith('gs://'):
cmd = ['gsutil', '-m', 'cp', '-r', '-n', info['url'], local_path_parent]
run_commands([cmd], shell=False)
elif info['url'].startswith('file://'):
cmd = ['cp', info['url'][7:], local_path_parent]
run_commands([cmd], shell=False)
else:
raise ValueError('Url {} with prefix {} is not supported.'.format(
info['url'], info['url'].split(':')[0]))
# Move data to the expected local path
if original_base_name != expected_base_name:
run_commands(['mv {} {}'.format(
os.path.join(local_path_parent, original_base_name),
os.path.join(local_path_parent, expected_base_name))])
logging.info('Downloaded data from %s to %s',
info['url'], info['local_path'])
# Decompress file if file name ends with .gz unless caller sets 'decompress'
# to False in info.
if info['url'].endswith('.gz') and info.get('decompress', True):
run_commands(['tar xvf {} -C {}'.format(
info['local_path'], local_path_parent)])
logging.info('Decompressed file %s', info['local_path'])
def parse_data_downloads_str(root_data_dir, data_downloads_str):
"""Parse a comma separated string into array of dicts.
Each dict specifies the url and local_path for a download.
Args:
root_data_dir: the directory which should contain all the dataset files
data_downloads_str: a comma separated string specified by the
flag --data_downloads
Returns:
An array of dict which specifies the url and local_path for data download
"""
download_infos = []
if not data_downloads_str:
return download_infos
for entry in data_downloads_str.split(','):
info = {}
if ';' in entry:
info['url'] = entry.split(';')[0]
info['local_path'] = os.path.join(root_data_dir, entry.split(';')[1])
else:
info['url'] = entry
info['local_path'] = os.path.join(root_data_dir, os.path.basename(entry))
# Canonicalize url to remove trailing '/' and '*'
if info['url'].endswith('*'):
info['url'] = info['url'][:-1]
if info['url'].endswith('/'):
info['url'] = info['url'][:-1]
download_infos.append(info)
return download_infos
def maybe_upload_to_gcs(local_dir, output_gcs_url):
if not output_gcs_url:
return
run_commands(['gsutil -m cp -r {} {}'.format(local_dir, output_gcs_url)])
logging.info('Uploaded data from local directory %s to gcs %s',
local_dir, output_gcs_url)
def make_dir_if_not_exist(local_path):
if not os.path.exists(local_path):
os.makedirs(local_path)
logging.info('Created directory %s', local_path)
def run_command(cmd, shell=True):
"""Structures for a variety of different test results.
Args:
cmd: Command to execute
shell: True to use shell, false otherwise.
Returns:
Tuple of the command return value and the standard out in as a string.
"""
logging.debug('Executing command: %s', cmd)
p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, shell=shell)
exit_code = None
line = ''
stdout = ''
while exit_code is None or line:
exit_code = p.poll()
line = p.stdout.readline().decode('utf-8')
stdout += line
logging.debug(line)
return exit_code, stdout
def run_commands(cmds, shell=True):
"""Runs list of command and throw error if any fail."""
for cmd in cmds:
exit_code, stdout = run_command(cmd, shell=shell)
if exit_code:
raise Exception('"{}" failed with code:{} and stdout:\n{}'.format(
cmd, exit_code, stdout))
def get_cpu_name():
cmd = "cat /proc/cpuinfo | grep 'model name' | sort --unique"
exit_code, result = run_command(cmd)
lines = result.splitlines()
if exit_code == 0 and lines:
model_name_parts = lines[0].split(':')
return model_name_parts[1].strip()
else:
logging.error('Error getting cpuinfo model name: %s', result)
return ''
def get_cpu_socket_count():
cmd = 'grep -i "physical id" /proc/cpuinfo | sort -u | wc -l'
exit_code, result = run_command(cmd)
lines = result.splitlines()
if exit_code == 0 and lines:
return int(lines[0])
else:
logging.error('Error getting cpuinfo scocket count: %s', result)
return -1
def _get_amd_gpu_info():
"""Returns gpu information using rocm-smi.
Note: Assumes if the system has multiple GPUs, that they are all the same
Returns:
A dict containing gpu_driver_version, gpu_model and gpu_count or None if
`rocm-smi` is not found or fails.
"""
cmd = 'rocm-smi --json --showproductname --showdriverversion'
exit_code, result = run_command(cmd)
if exit_code != 0:
logging.error('rocm-smi did not return as expected: %s', result)
return None
def get_gpu_driver_version(rocm_smi_output):
return rocm_smi_output['system']['Driver version']
def get_gpu_model(rocm_smi_output):
gpu_model = ""
for key, value in rocm_smi_output.items():
if re.match("card[0-9]+", key):
gpu_model = value['Card SKU']
break
return gpu_model
def get_gpu_count(rocm_smi_output):
gpu_count = 0
for key, value in rocm_smi_output.items():
if re.match("card[0-9]+", key):
gpu_count += 1
return gpu_count
rocm_smi_output= json.loads(result)
gpu_info = {}
gpu_info['gpu_driver_version'] = get_gpu_driver_version(rocm_smi_output)
gpu_info['gpu_model'] = get_gpu_model(rocm_smi_output)
gpu_info['gpu_count'] = get_gpu_count(rocm_smi_output)
return gpu_info
def _get_nvidia_gpu_info():
"""Returns gpu information using nvidia-smi.
Note: Assumes if the system has multiple GPUs that they are all the same with
one exception. If the first result is a Quadro, the heuristic assumes
this may be a workstation and takes the second entry.
Returns:
A dict containing gpu_driver_version, gpu_model and gpu_count or None if
`nvidia-smi` is not found or fails.
"""
cmd = 'nvidia-smi --query-gpu=driver_version,gpu_name --format=csv'
exit_code, result = run_command(cmd)
if exit_code != 0:
logging.error('nvidia-smi did not return as expected: %s', result)
return None
lines = result.splitlines()
gpu_info_line = lines[1]
if 'Quadro' in gpu_info_line and len(lines) >= 3:
gpu_info_line = lines[2]
gpu_info = {}
gpu_info['gpu_driver_version'] = gpu_info_line.split(',')[0].strip()
gpu_info['gpu_model'] = gpu_info_line.split(',')[1].strip()
gpu_info['gpu_count'] = len(lines) - 1
return gpu_info
def get_gpu_info():
"""Returns gpu information using either nvidia-smi or rocm-smi.
Returns:
A dict containing gpu_driver_version, gpu_model and gpu_count or None if
`nvidia-smi` is not found or fails.
"""
return _get_amd_gpu_info() if shutil.which("rocm-smi") \
else _get_nvidia_gpu_info()
def _install_tpu_tool():
"""Installs the ctpu tool to managing cloud TPUs.
Follows the instructions here:
https://github.com/tensorflow/tpu/tree/master/tools/ctpu
"""
if not os.path.exists('ctpu'):
logging.info('Installing TPU tool')
commands = [
'wget https://dl.google.com/cloud_tpu/ctpu/latest/linux/ctpu',
'chmod a+x ctpu',
]
run_commands(commands)
def setup_tpu(parameters):
"""Sets up a TPU with a given set of parameters.
Args:
parameters: dictionary of TPU parameters.
Returns:
True if an error occurs during setup.
"""
try:
_install_tpu_tool()
args = [
'--name={}'.format(parameters.get('name')),
'--project={}'.format(parameters.get('project')),
'--zone={}'.format(parameters.get('zone')),
'--tpu-size={}'.format(parameters.get('size')),
'--tf-version={}'.format(parameters.get('version')),
'--tpu-only',
'-noconf',
]
command = './ctpu up {}'.format(' '.join(args))
logging.info('Setting up TPU: %s', command)
exit_code, output = run_command(command)
if exit_code != 0:
logging.error('Error in setup with output: %s', output)
return exit_code != 0
except Exception:
logging.error('Unable to setup TPU')
run_command('rm -f ctpu')
sys.exit(1)
def cleanup_tpu(parameters):
"""Cleans up an existing TPU.
Args:
parameters: dictionary of TPU parameters.
Returns:
True if an error occurs during cleanup.
"""
_install_tpu_tool()
args = [
'--name={}'.format(parameters.get('name')),
'--project={}'.format(parameters.get('project')),
'--zone={}'.format(parameters.get('zone')),
'--tpu-only',
'-noconf',
]
command = './ctpu delete {}'.format(' '.join(args))
logging.info('Cleaning up TPU: %s', command)
exit_code, output = run_command(command)
if exit_code != 0:
logging.error('Error in cleanup with output: %s', output)
return exit_code != 0
def read_benchmark_result(benchmark_result_file_path):
"""Read benchmark result from the protobuf file."""
from google.protobuf import json_format # pylint: disable=g-import-not-at-top
from tensorflow.core.util import test_log_pb2 # pylint: disable=g-import-not-at-top
if not os.path.isfile(benchmark_result_file_path):
logging.error('Failed to read benchmark result because '
'file %s does not exist', benchmark_result_file_path)
return {}
with open(benchmark_result_file_path, 'rb') as f:
benchmark_entries = test_log_pb2.BenchmarkEntries()
benchmark_entries.ParseFromString(f.read())
return json_format.MessageToDict(
benchmark_entries,
preserving_proto_field_name=True,
including_default_value_fields=True)['entry'][0]
def print_thread_stacktrace():
print('Here is the stacktrace for all threads:')
thread_names = {t.ident: t.name for t in threading.enumerate()}
for thread_id, frame in sys._current_frames().items(): # pylint: disable=protected-access
print('Thread {}'.format(thread_names.get(thread_id, thread_id)))
traceback.print_stack(frame)
def instantiate_benchmark_class(
benchmark_class, output_dir, root_data_dir, tpu, constructor_args,
benchmark_class_type=None):
"""Return initialized benchmark class."""
module_import_path, class_name = benchmark_class.rsplit('.', 1)
module = importlib.import_module(module_import_path)
class_ = getattr(module, class_name)
if benchmark_class_type == 'tf_benchmark':
# for benchmarks inheriting from tf.test.Benchmark, instantiate them directly.
instance = class_(**constructor_args)
else:
# Default instantiation for perfzero_benchmark classes.
instance = class_(
output_dir=output_dir,
root_data_dir=root_data_dir,
tpu=tpu,
**constructor_args)
return instance
def copy_and_rename_dirs(dir_spec_string, dst_base_dir):
"""Copies list of <dir-path>:new_name specs into a new dest dir.
If a path /path1/path2/dir:new_dir is given, it copies /path1/path2/dir to
dst_base_dir/new_dir.
Args:
dir_spec_string: Comma separated list of /path1/path2:new_name specs.
dst_base_dir: The base dir to contain the copies.
"""
if not dir_spec_string:
return
dir_specs = dir_spec_string.split(',')
for src_dir_with_name in dir_specs:
src_dir, final_basename = src_dir_with_name.split(':')
dst_dir = os.path.join(dst_base_dir, final_basename)
if os.path.isdir(dst_dir):
logging.info('[DELETE] pre-existing %s', dst_dir)
shutil.rmtree(dst_dir)
logging.info('[COPY] %s -> %s', src_dir, dst_dir)
shutil.copytree(src_dir, dst_dir)
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Tests utils.py."""
import os
import unittest
from mock import call
from mock import MagicMock
from mock import patch
import perfzero.utils as utils
import tensorflow as tf # pylint: disable=g-bad-import-order
class TestUtils(unittest.TestCase, tf.test.Benchmark):
def test_protobuf_read(self):
output_dir = '/tmp/'
os.environ['TEST_REPORT_FILE_PREFIX'] = output_dir
benchmark_result_file_path = os.path.join(output_dir,
'TestUtils.testReportBenchmark')
if os.path.exists(benchmark_result_file_path):
os.remove(benchmark_result_file_path)
self.report_benchmark(
iters=2000,
wall_time=1000,
name='testReportBenchmark',
metrics=[{'name': 'metric_name_1', 'value': 0, 'min_value': 1},
{'name': 'metric_name_2', 'value': 90, 'min_value': 0,
'max_value': 95}])
actual_result = utils.read_benchmark_result(
benchmark_result_file_path)
os.remove(benchmark_result_file_path)
expected_result = {
'name': 'TestUtils.testReportBenchmark',
# google.protobuf.json_format.MessageToDict() will convert
# int64 field to string.
'iters': '2000',
'wall_time': 1000,
'cpu_time': 0,
'throughput': 0,
'extras': {},
'metrics': [
{
'name': 'metric_name_1',
'value': 0,
'min_value': 1
},
{
'name': 'metric_name_2',
'value': 90,
'min_value': 0,
'max_value': 95
}
]
}
self.assertDictEqual(expected_result, actual_result)
@patch('perfzero.utils.get_git_repo_info')
@patch('perfzero.utils.run_commands')
def test_checkout_git_repos(self, run_commands_mock, get_git_repo_info_mock):
git_repo_1 = {}
git_repo_1['url'] = 'url_1'
git_repo_1['local_path'] = 'local_path_1'
git_repo_1['dir_name'] = 'dir_name_1'
git_repo_1['branch'] = 'branch_1'
git_repo_1['git_hash'] = 'git_hash_1'
git_repo_2 = {}
git_repo_2['url'] = 'url_2'
git_repo_2['local_path'] = 'local_path_2'
git_repo_2['dir_name'] = 'dir_name_2'
git_repo_2['branch'] = 'branch_2'
git_repo_info_1 = {'url': 'url_1'}
git_repo_info_2 = {'url': 'url_2'}
get_git_repo_info_mock.side_effect = \
lambda local_path: git_repo_info_1 if local_path == 'local_path_1' else git_repo_info_2 # pylint: disable=line-too-long
site_package_info = utils.checkout_git_repos([git_repo_1, git_repo_2],
False)
self.assertEqual(2, len(site_package_info))
self.assertEqual(git_repo_info_1, site_package_info['dir_name_1'])
self.assertEqual(git_repo_info_2, site_package_info['dir_name_2'])
run_commands_mock.assert_has_calls(any_order=False, calls=[
call(['git clone url_1 local_path_1']),
call(['git -C local_path_1 checkout branch_1']),
call(['git -C local_path_1 pull --rebase']),
call(['git -C local_path_1 reset --hard git_hash_1']),
call(['git clone url_2 local_path_2']),
call(['git -C local_path_2 checkout branch_2'])
])
@patch('perfzero.utils.run_command')
def test_get_git_repo_info(self, run_command_mock):
run_command_mock.side_effect = [
[0, 'git_url'],
[0, 'branch_name'],
[0, 'git_hash']
]
git_repo_info = utils.get_git_repo_info('local_path_1')
self.assertEqual(
{'url': 'git_url', 'branch': 'branch_name', 'hash': 'git_hash'},
git_repo_info)
run_command_mock.assert_has_calls(any_order=False, calls=[
call('git -C local_path_1 config --get remote.origin.url'),
call('git -C local_path_1 rev-parse --abbrev-ref HEAD'),
call('git -C local_path_1 rev-parse HEAD')
])
@patch('builtins.open')
@patch('perfzero.utils.make_dir_if_not_exist')
@patch('requests.get')
@patch('perfzero.utils.run_commands')
def test_download_data(self, run_commands_mock, requests_get_mock,
make_dir_mock, open_mock): # pylint: disable=unused-argument
get_mock = MagicMock()
get_mock.content = 'content'
requests_get_mock.return_value = get_mock
download_info_1 = {'url': 'gs://remote_path_1/name_1',
'local_path': 'local_path_1/modified_name_1'}
download_info_2 = {'url': 'http://remote_path_2/name_2',
'local_path': 'local_path_2/modified_name_2'}
utils.download_data([download_info_1, download_info_2])
make_dir_mock.assert_has_calls(any_order=False, calls=[
call('local_path_1'),
call('local_path_2')
])
requests_get_mock.assert_called_once_with('http://remote_path_2/name_2',
allow_redirects=True)
run_commands_mock.assert_has_calls(any_order=False, calls=[
call([['gsutil', '-m', 'cp', '-r', '-n',
'gs://remote_path_1/name_1', 'local_path_1']],
shell=False),
call(['mv local_path_1/name_1 local_path_1/modified_name_1']),
call(['mv local_path_2/name_2 local_path_2/modified_name_2'])
])
def test_parse_data_downloads_str(self):
data_downloads_str = 'url_1;relative_path_1,url_2;relative_path_2'
download_infos = utils.parse_data_downloads_str('/root_data_dir',
data_downloads_str)
self.assertEqual(2, len(download_infos))
self.assertEqual(download_infos[0],
{'url': 'url_1',
'local_path': '/root_data_dir/relative_path_1'})
self.assertEqual(download_infos[1],
{'url': 'url_2',
'local_path': '/root_data_dir/relative_path_2'})
@patch('perfzero.utils.run_command')
def test_get_cpu_name(self, run_command_mock):
"""Tests extract the cpu model name."""
run_command_mock.return_value = [
0, 'model name : Intel(R) Xeon(R) CPU E5-1650 v2 @ 3.50GHz\n'
]
cpu_name = utils.get_cpu_name()
self.assertEqual('Intel(R) Xeon(R) CPU E5-1650 v2 @ 3.50GHz', cpu_name)
@patch('perfzero.utils.run_command')
def test_get_cpu_socket_count(self, run_command_mock):
"""Tests get socket count."""
run_command_mock.return_value = [0, '2\n']
cpu_socket_count = utils.get_cpu_socket_count()
self.assertEqual(2, cpu_socket_count)
@patch('perfzero.utils.run_command')
def test_get_gpu_model(self, run_command_mock):
# Tests get gpu info parses expected value into expected components.
run_command_mock.return_value = [
0, 'driver_version, name\n381.99, GTX 1080 \n'
]
gpu_model = utils.get_gpu_info()['gpu_model']
self.assertEqual('GTX 1080', gpu_model)
# Tests gpu info returns second entry if first entry is a Quadro.
run_command_mock.return_value = [
0, 'blah\n200.99, Quadro K900 \n381.99, GTX 1080\n'
]
gpu_model = utils.get_gpu_info()['gpu_model']
self.assertEqual('GTX 1080', gpu_model)
@patch('perfzero.utils.run_command')
def test_get_gpu_count(self, run_command_mock):
"""Tests gpu info returns second entry if first entry is a Quadro."""
run_command_mock.return_value = [
0, 'blah\n200.99, Quadro K900 \n381.99, GTX 1080\n'
]
gpu_count = utils.get_gpu_info()['gpu_count']
self.assertEqual(2, gpu_count)
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Checkout repository, download data and build docker image."""
from __future__ import print_function
import argparse
import json
import logging
import os
import shutil
import sys
import tempfile
import time
import perfzero.device_utils as device_utils
import perfzero.perfzero_config as perfzero_config
import perfzero.utils as utils
def _temporary_file_name(parent_dir, base_name):
"""Returns a temp name of the form <parent-dir>/<random>/<base-name>."""
if not os.path.isdir(parent_dir):
os.makedirs(parent_dir)
temp_dir = tempfile.mkdtemp(dir=parent_dir)
return os.path.join(temp_dir, base_name)
def _load_docker_image(FLAGS, workspace_dir, setup_execution_time):
"""Runs docker load --input_image <FLAGS.dockerfile_path>.
Fetches FLAGS.dockerfile_path to workspace_dir/<temp-dir>/local_docker first.
Runs docker load --input <path-to-local-docker>.
Deletes workspace_dir/<temp-dir> after the docker image is loaded.
Args:
FLAGS: parser.parse_known_args object.
workspace_dir: String - The path to use for intermediate artifacts.
setup_execution_time: Map from string->double containing wall times for
different operations. This will have insertions describing the docker
setup time.
"""
load_docker_start_time = time.time()
local_docker_image_path = _temporary_file_name(workspace_dir, 'local_docker')
utils.download_data([{'url': FLAGS.dockerfile_path,
'local_path': local_docker_image_path,
'decompress': False}])
setup_execution_time['fetch_docker'] = time.time() - load_docker_start_time
docker_load_cmd = 'docker load --input {}'.format(local_docker_image_path)
try:
utils.run_commands(
[docker_load_cmd,
'docker images' # Print loaded image list.
])
setup_execution_time['load_docker'] = time.time() - load_docker_start_time
finally:
logging.info('removing parent dir of local docker image copy %s',
local_docker_image_path)
shutil.rmtree(os.path.dirname(local_docker_image_path))
def _create_docker_image(FLAGS, project_dir, workspace_dir,
setup_execution_time):
"""Creates a docker image.
Args:
FLAGS: parser.parse_known_args object.
project_dir: String - The current project path.
workspace_dir: String - The path to use for intermediate artifacts.
setup_execution_time: Map from string->double containing wall times for
different operations. This will have insertions describing the docker
setup time.
"""
# Create docker image
docker_start_time = time.time()
docker_context = os.path.join(workspace_dir, 'resources')
# Necessary in case we don't have a local .whl file.
utils.create_empty_file(docker_context, 'EMPTY')
# Download TensorFlow pip package from Google Cloud Storage and modify package
# path accordingly, if applicable
local_tensorflow_pip_spec = None
if (FLAGS.tensorflow_pip_spec and
(FLAGS.tensorflow_pip_spec.startswith('gs://') or
FLAGS.tensorflow_pip_spec.startswith('file://'))):
local_pip_filename = os.path.basename(FLAGS.tensorflow_pip_spec)
local_pip_path = os.path.join(docker_context, local_pip_filename)
utils.download_data([{'url': FLAGS.tensorflow_pip_spec,
'local_path': local_pip_path}])
# Update path to pip wheel file for the Dockerfile. Note that this path has
# to be relative to the docker context (absolute path will not work).
FLAGS.tensorflow_pip_spec = local_pip_filename
local_tensorflow_pip_spec = local_pip_filename
else:
local_tensorflow_pip_spec = 'EMPTY'
dockerfile_path = FLAGS.dockerfile_path
if not os.path.exists(dockerfile_path):
# Fall back to the deprecated approach if the user-specified
# dockerfile_path does not exist
dockerfile_path = os.path.join(project_dir, FLAGS.dockerfile_path)
extra_pip_specs = (FLAGS.extra_pip_specs or '').replace(';', '')
docker_base_cmd = 'docker build --no-cache --pull'
# FLAGS.extra_docker_build_args will be a list of strings (e.g. ['a', 'b=c']).
# We treat the strings directly as build-args: --build-arg a --build-arg b=c
# Empty strings are ignored.
extra_docker_build_args = ' '.join([
'--build-arg %s' % arg for arg in FLAGS.extra_docker_build_args if arg])
cmd = '{docker_base_cmd} -t {docker_tag}{tf_pip}{local_tf_pip}{extra_pip}{extra_docker_build_args} {suffix}'.format(
docker_base_cmd=docker_base_cmd,
docker_tag=FLAGS.docker_tag,
tf_pip=(
' --build-arg tensorflow_pip_spec={}'.format(
FLAGS.tensorflow_pip_spec) if FLAGS.tensorflow_pip_spec else ''),
# local_tensorflow_pip_spec is either string 'EMPTY' or basename of
# local .whl file.
local_tf_pip=' --build-arg local_tensorflow_pip_spec={}'.format(
local_tensorflow_pip_spec),
extra_pip=' --build-arg extra_pip_specs=\'{}\''.format(extra_pip_specs),
extra_docker_build_args=' ' + extra_docker_build_args,
suffix=(
'-f {} {}'.format(dockerfile_path, docker_context)
if docker_context else '- < {}'.format(dockerfile_path))
)
utils.run_commands([cmd])
logging.info('Built docker image with tag %s', FLAGS.docker_tag)
setup_execution_time['build_docker'] = time.time() - docker_start_time
if __name__ == '__main__':
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
perfzero_config.add_setup_parser_arguments(parser)
FLAGS, unparsed = parser.parse_known_args()
logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s',
level=logging.DEBUG)
if unparsed:
logging.error('Arguments %s are not recognized', unparsed)
sys.exit(1)
setup_execution_time = {}
project_dir = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
workspace_dir = os.path.join(project_dir, FLAGS.workspace)
site_package_dir = os.path.join(workspace_dir, 'site-packages')
utils.copy_and_rename_dirs(FLAGS.site_package_downloads,
site_package_dir)
activate_gcloud = False
if FLAGS.dockerfile_path and FLAGS.dockerfile_path.startswith('gs://'):
# We might end up doing gsutil fetch later, so need to call
# active_gcloud_service().
activate_gcloud = True
if FLAGS.tensorflow_pip_spec and FLAGS.tensorflow_pip_spec.startswith('gs://'):
activate_gcloud = True
# Download gcloud auth token. Remove this operation in the future when
# docker in Kokoro can accesss the GCP metadata server
start_time = time.time()
utils.active_gcloud_service(FLAGS.gcloud_key_file_url,
workspace_dir, download_only=not activate_gcloud)
setup_execution_time['download_token'] = time.time() - start_time
# Set up the raid array.
start_time = time.time()
device_utils.create_drive_from_devices(FLAGS.root_data_dir,
FLAGS.gce_nvme_raid)
setup_execution_time['create_drive'] = time.time() - start_time
if FLAGS.dockerfile_path:
if FLAGS.dockerfile_path.endswith('.tar.gz'):
logging.info('Assuming given file %s is a docker image to load',
FLAGS.dockerfile_path)
_load_docker_image(FLAGS, workspace_dir,
setup_execution_time)
else:
_create_docker_image(FLAGS, project_dir, workspace_dir,
setup_execution_time)
logging.info('Setup time in seconds by operation:\n %s',
json.dumps(setup_execution_time, indent=2))
[
{
"name": "execution_timestamp",
"type": "TIMESTAMP",
"mode": "REQUIRED"
},
{
"name": "execution_id",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "ml_framework_info",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "benchmark_result",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "benchmark_info",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "setup_info",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "system_info",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "process_info",
"type": "STRING",
"mode": "NULLABLE"
}
]
#!/usr/bin/env bash
#
# Steps:
#
# 1. Download corresponding html file for some README.md:
# curl -s $1
#
# 2. Discard rows where no substring 'user-content-' (github's markup):
# awk '/user-content-/ { ...
#
# 3.1 Get last number in each row like ' ... </span></a>sitemap.js</h1'.
# It's a level of the current header:
# substr($0, length($0), 1)
#
# 3.2 Get level from 3.1 and insert corresponding number of spaces before '*':
# sprintf("%*s", substr($0, length($0), 1)*3, " ")
#
# 4. Find head's text and insert it inside "* [ ... ]":
# substr($0, match($0, /a>.*<\/h/)+2, RLENGTH-5)
#
# 5. Find anchor and insert it inside "(...)":
# substr($0, match($0, "href=\"[^\"]+?\" ")+6, RLENGTH-8)
#
gh_toc_version="0.6.0"
gh_user_agent="gh-md-toc v$gh_toc_version"
#
# Download rendered into html README.md by its url.
#
#
gh_toc_load() {
local gh_url=$1
if type curl &>/dev/null; then
curl --user-agent "$gh_user_agent" -s "$gh_url"
elif type wget &>/dev/null; then
wget --user-agent="$gh_user_agent" -qO- "$gh_url"
else
echo "Please, install 'curl' or 'wget' and try again."
exit 1
fi
}
#
# Converts local md file into html by GitHub
#
# ➥ curl -X POST --data '{"text": "Hello world github/linguist#1 **cool**, and #1!"}' https://api.github.com/markdown
# <p>Hello world github/linguist#1 <strong>cool</strong>, and #1!</p>'"
gh_toc_md2html() {
local gh_file_md=$1
URL=https://api.github.com/markdown/raw
if [ -z "$GH_TOC_TOKEN" ]; then
TOKEN=$GH_TOC_TOKEN
else
TOKEN="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)/token.txt"
fi
if [ -f "$TOKEN" ]; then
URL="$URL?access_token=$(cat $TOKEN)"
fi
# echo $URL 1>&2
OUTPUT="$(curl -s --user-agent "$gh_user_agent" \
--data-binary @"$gh_file_md" -H "Content-Type:text/plain" \
$URL)"
if [ "$?" != "0" ]; then
echo "XXNetworkErrorXX"
fi
if [ "$(echo "${OUTPUT}" | awk '/API rate limit exceeded/')" != "" ]; then
echo "XXRateLimitXX"
else
echo "${OUTPUT}"
fi
}
#
# Is passed string url
#
gh_is_url() {
case $1 in
https* | http*)
echo "yes";;
*)
echo "no";;
esac
}
#
# TOC generator
#
gh_toc(){
local gh_src=$1
local gh_src_copy=$1
local gh_ttl_docs=$2
local need_replace=$3
if [ "$gh_src" = "" ]; then
echo "Please, enter URL or local path for a README.md"
exit 1
fi
# Show "TOC" string only if working with one document
if [ "$gh_ttl_docs" = "1" ]; then
echo "Table of Contents"
echo "================="
echo ""
gh_src_copy=""
fi
if [ "$(gh_is_url "$gh_src")" == "yes" ]; then
gh_toc_load "$gh_src" | gh_toc_grab "$gh_src_copy"
if [ "${PIPESTATUS[0]}" != "0" ]; then
echo "Could not load remote document."
echo "Please check your url or network connectivity"
exit 1
fi
if [ "$need_replace" = "yes" ]; then
echo
echo "!! '$gh_src' is not a local file"
echo "!! Can't insert the TOC into it."
echo
fi
else
local rawhtml=$(gh_toc_md2html "$gh_src")
if [ "$rawhtml" == "XXNetworkErrorXX" ]; then
echo "Parsing local markdown file requires access to github API"
echo "Please make sure curl is installed and check your network connectivity"
exit 1
fi
if [ "$rawhtml" == "XXRateLimitXX" ]; then
echo "Parsing local markdown file requires access to github API"
echo "Error: You exceeded the hourly limit. See: https://developer.github.com/v3/#rate-limiting"
TOKEN="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)/token.txt"
echo "or place github auth token here: $TOKEN"
exit 1
fi
local toc=`echo "$rawhtml" | gh_toc_grab "$gh_src_copy"`
echo "$toc"
if [ "$need_replace" = "yes" ]; then
local ts="<\!--ts-->"
local te="<\!--te-->"
local dt=`date +'%F_%H%M%S'`
local ext=".orig.${dt}"
local toc_path="${gh_src}.toc.${dt}"
local toc_footer="<!-- Added by: `whoami`, at: `date --iso-8601='minutes'` -->"
# http://fahdshariff.blogspot.ru/2012/12/sed-mutli-line-replacement-between-two.html
# clear old TOC
sed -i${ext} "/${ts}/,/${te}/{//!d;}" "$gh_src"
# create toc file
echo "${toc}" > "${toc_path}"
echo -e "\n${toc_footer}\n" >> "$toc_path"
# insert toc file
if [[ "`uname`" == "Darwin" ]]; then
sed -i "" "/${ts}/r ${toc_path}" "$gh_src"
else
sed -i "/${ts}/r ${toc_path}" "$gh_src"
fi
echo
echo "!! TOC was added into: '$gh_src'"
echo "!! Origin version of the file: '${gh_src}${ext}'"
echo "!! TOC added into a separate file: '${toc_path}'"
echo
fi
fi
}
#
# Grabber of the TOC from rendered html
#
# $1 — a source url of document.
# It's need if TOC is generated for multiple documents.
#
gh_toc_grab() {
# if closed <h[1-6]> is on the new line, then move it on the prev line
# for example:
# was: The command <code>foo1</code>
# </h1>
# became: The command <code>foo1</code></h1>
sed -e ':a' -e 'N' -e '$!ba' -e 's/\n<\/h/<\/h/g' |
# find strings that corresponds to template
grep -E -o '<a.*id="user-content-[^"]*".*</h[1-6]' |
# remove code tags
sed 's/<code>//g' | sed 's/<\/code>//g' |
# now all rows are like:
# <a id="user-content-..." href="..."><span ...></span></a> ... </h1
# format result line
# * $0 — whole string
# * last element of each row: "</hN" where N in (1,2,3,...)
echo -e "$(awk -v "gh_url=$1" '{
level = substr($0, length($0), 1)
text = substr($0, match($0, /a>.*<\/h/)+2, RLENGTH-5)
href = substr($0, match($0, "href=\"[^\"]+?\"")+6, RLENGTH-7)
print sprintf("%*s", level*3, " ") "* [" text "](" gh_url href ")" }' |
sed 'y/+/ /; s/%/\\x/g')"
}
#
# Returns filename only from full path or url
#
gh_toc_get_filename() {
echo "${1##*/}"
}
#
# Options hendlers
#
gh_toc_app() {
local app_name=$(basename $0)
local need_replace="no"
if [ "$1" = '--help' ] || [ $# -eq 0 ] ; then
echo "GitHub TOC generator ($app_name): $gh_toc_version"
echo ""
echo "Usage:"
echo " $app_name [--insert] src [src] Create TOC for a README file (url or local path)"
echo " $app_name - Create TOC for markdown from STDIN"
echo " $app_name --help Show help"
echo " $app_name --version Show version"
return
fi
if [ "$1" = '--version' ]; then
echo "$gh_toc_version"
echo
echo "os: `lsb_release -d | cut -f 2`"
echo "kernel: `cat /proc/version`"
echo "shell: `$SHELL --version`"
echo
for tool in curl wget grep awk sed; do
printf "%-5s: " $tool
echo `$tool --version | head -n 1`
done
return
fi
if [ "$1" = "-" ]; then
if [ -z "$TMPDIR" ]; then
TMPDIR="/tmp"
elif [ -n "$TMPDIR" -a ! -d "$TMPDIR" ]; then
mkdir -p "$TMPDIR"
fi
local gh_tmp_md
gh_tmp_md=$(mktemp $TMPDIR/tmp.XXXXXX)
while read input; do
echo "$input" >> "$gh_tmp_md"
done
gh_toc_md2html "$gh_tmp_md" | gh_toc_grab ""
return
fi
if [ "$1" = '--insert' ]; then
need_replace="yes"
shift
fi
for md in "$@"
do
echo ""
gh_toc "$md" "$#" "$need_replace"
done
echo ""
echo "Created by [gh-md-toc](https://github.com/ekalinin/github-markdown-toc)"
}
#
# Entry point
#
gh_toc_app "$@"
#!/usr/bin/python
#
# Copyright 2019 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Plot graph showing process metric values over time"""
from __future__ import print_function
import argparse
import sys
import json
import matplotlib.pyplot as plt
import matplotlib.backends.backend_pdf as backend_pdf
import matplotlib.ticker as tick
colors=['b', 'r', 'g', 'c', 'pink']
def visualize(file_path):
entries = []
with open(file_path) as f:
entries = [json.loads(line) for line in f.readlines() if line.strip()]
if not entries:
print('There is no data in file {}'.format(file_path))
return
pdf = backend_pdf.PdfPages("process_info.pdf")
idx = 0
names = [name for name in entries[0].keys() if name != 'time']
times = [entry['time'] for entry in entries]
for name in names:
values = [entry[name] for entry in entries]
fig = plt.figure()
ax = plt.gca()
ax.yaxis.set_major_formatter(tick.ScalarFormatter(useMathText=True))
plt.ticklabel_format(style='sci', axis='y', scilimits=(-2,3))
plt.plot(times, values, colors[idx % len(colors)], marker='x', label=name)
plt.xlabel('Time (sec)')
plt.ylabel(name)
plt.ylim(ymin=0)
plt.legend(loc = 'upper left')
pdf.savefig(fig)
idx += 1
plt.show()
pdf.close()
print('Generated process_info.pdf from {}'.format(file_path))
if __name__ == '__main__':
parser = argparse.ArgumentParser(usage='plot_process_info.py <path_to_file>' )
parser.add_argument('file_path', type=str)
flags = parser.parse_args(sys.argv[1:])
visualize(flags.file_path)
# tf_cnn_benchmarks: High performance benchmarks
**Note: tf_cnn_benchmarks is no longer maintained.**
tf_cnn_benchmarks contains TensorFlow 1 implementations of several popular
convolutional models, and is designed to be as fast as possible.
tf_cnn_benchmarks supports both running on a single machine or running in
distributed mode across multiple hosts.
tf_cnn_benchmarks is no longer maintained. Although it will run with TensorFlow
2, it was written and optimized for TensorFlow 1, and has not been maintained
since TensorFlow 2 was released. For clean and easy-to-read TensorFlow 2 models,
please see the [TensorFlow Official
Models](https://github.com/tensorflow/models/tree/master/official).
## Getting Started
To run ResNet50 with synthetic data without distortions with a single GPU, run
```
python tf_cnn_benchmarks.py --num_gpus=1 --batch_size=32 --model=resnet50 --variable_update=parameter_server
```
Note that the master branch of tf_cnn_benchmarks occasionally requires the
latest nightly version of TensorFlow. You can install the nightly version by
running `pip install tf-nightly-gpu` in a clean environment, or by installing
TensorFlow from source. We sometimes will create a branch of tf_cnn_benchmarks,
in the form of cnn_tf_vX.Y_compatible, that is compatible with TensorFlow
version X.Y. For example, branch
[cnn_tf_v1.9_compatible](https://github.com/tensorflow/benchmarks/tree/cnn_tf_v1.9_compatible/scripts/tf_cnn_benchmarks)
works with TensorFlow 1.9. However, as tf_cnn_benchmarks is no longer
maintained, we will likely no longer create new branches.
Some important flags are
* model: Model to use, e.g. resnet50, inception3, vgg16, and alexnet.
* num_gpus: Number of GPUs to use.
* data_dir: Path to data to process. If not set, synthetic data is used. To
use Imagenet data use these
[instructions](https://github.com/tensorflow/models/tree/master/research/inception#getting-started)
as a starting point.
* batch_size: Batch size for each GPU.
* variable_update: The method for managing variables: parameter_server
,replicated, distributed_replicated, independent
* local_parameter_device: Device to use as parameter server: cpu or gpu.
To see the full list of flags, run `python tf_cnn_benchmarks.py --help`.
To run ResNet50 with real data with 8 GPUs, run:
```
python tf_cnn_benchmarks.py --data_format=NCHW --batch_size=256 \
--model=resnet50 --optimizer=momentum --variable_update=replicated \
--nodistortions --gradient_repacking=8 --num_gpus=8 \
--num_epochs=90 --weight_decay=1e-4 --data_dir=${DATA_DIR} --use_fp16 \
--train_dir=${CKPT_DIR}
```
This will train a ResNet-50 model on ImageNet with 2048 batch size on 8
GPUs. The model should train to around 76% accuracy.
## Running the tests
To run the tests, run
```bash
pip install portpicker
python run_tests.py && python run_tests.py --run_distributed_tests
```
Note the tests require portpicker.
The command above runs a subset of tests that is both fast and fairly
comprehensive. Alternatively, all the tests can be run, but this will take a
long time:
```bash
python run_tests.py --full_tests && python run_tests.py --full_tests --run_distributed_tests
```
We will run all tests on every PR before merging them, so it is not necessary
to pass `--full_tests` when running tests yourself.
To run an individual test, such as method `testParameterServer` of test class
`TfCnnBenchmarksTest` of module `benchmark_cnn_test`, run
```bash
python -m unittest -v benchmark_cnn_test.TfCnnBenchmarksTest.testParameterServer
```
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Benchmarks the all-reduce algorithms of tf_cnn_benchmarks.
tf_cnn_benchmarks uses all-reduce to aggregate gradients. This benchmark is
useful for benchmarking the performance of just this gradient aggregation,
instead of the entire model. All the flags that tf_cnn_benchmarks accepts are
also accepted by this script, although many are silently ignored.
The number and shapes of the tensors all-reduced are those of the variables of
the model specified by the --model flag.
TODO(reedwm): Allow custom sizes to be specified.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import time
from absl import app
from absl import flags as absl_flags
import tensorflow.compat.v1 as tf
from tensorflow.python.ops import control_flow_ops
import benchmark_cnn
import cnn_util
import flags
from cnn_util import log_fn
absl_flags.DEFINE_integer('iters_per_step', 5,
'Number of iterations to run all-reduce for, per '
'step. Every step, a session will be run on a Graph '
'that contains this many copies of the all-reduce. '
'The copies are run sequentially. Setting this above '
'1 is useful to lower the overhead of starting the '
'session run, running the VariableV2 ops at the '
'start of the step, etc.')
flags.define_flags()
for name in flags.param_specs.keys():
absl_flags.declare_key_flag(name)
def get_var_shapes(model):
"""Returns the list of variable shapes for a tf_cnn_benchmarks Model."""
with tf.Graph().as_default():
# The variable shapes do not depend on the batch size.
images = tf.placeholder(tf.float32, model.get_input_shapes('train')[0])
model.build_network([images])
return [[int(d) for d in v.shape.dims] for v in tf.trainable_variables()]
def all_reduce(all_device_tensors, variable_mgr):
"""Performs a single batch all-reduce.
Args:
all_device_tensors: List of lists of tensors. all_device_tensors[t][i] is
a tensor, where t is the tower the tensor is on and i is the index of
the tensor.
variable_mgr: The VariableMgr to perform the all-reduce.
Returns:
List of list of tensors in the same form as `all_device_tensors`, except the
tensors are aggregated across towers.
"""
tower_grads = [[(g, None) for g in device_tensors] for
device_tensors in all_device_tensors]
_, aggregated_tower_grads = variable_mgr.preprocess_device_grads(tower_grads)
return [
[g for g, _ in agg_device_tensors]
for agg_device_tensors in aggregated_tower_grads]
def build_all_reduce_iterations(all_device_tensors, tower_devices, variable_mgr,
num_iters):
"""Builds the all-reduce ops for multiple iterations to aggregate tensors.
The tensors in `all_device_tensors` are aggregated `num_iters` times. Each
iteration aggregates the results from the previous iteration. The iterations
are run sequentially, so the aggregations for an iteration do not start
running until the previous iteration has completed. Each iteration after the
first is aggregating already-aggregated values, but it does not matter because
we are only aggregating for benchmarking purposes.
Args:
all_device_tensors: List of lists of tensors. all_device_tensors[t][i] is
a tensor, where t is the tower the tensor is on and i is the index of
the tensor.
tower_devices: A list of device strings. tower_devices[t] is the device
of the tensors in all_device_tensors[t].
variable_mgr: The VariableMgr to perform the all-reduce.
num_iters: Number of iterations to aggregate tensors for.
Returns:
An op that when run, causes the all-reduce ops to run.
"""
for i in range(num_iters):
with tf.name_scope('iteration_%d' % i):
# Step 1: Do the aggregation.
with tf.name_scope('tensor_aggregation'):
all_device_tensors = all_reduce(all_device_tensors, variable_mgr)
# Step 2. Create identity ops, to bring the aggregated results back to
# each device.
new_all_device_tensors = []
for device, device_tensors in zip(tower_devices, all_device_tensors):
with tf.device(device):
new_all_device_tensors.append([
tf.identity(t, name='identity_after_allreduce')
for t in device_tensors
])
all_device_tensors = new_all_device_tensors
# Step 3. Add control dependencies to delay the next iteration until this
# iteration is complete. To avoid extra overhead, we do not have any
# cross-device control dependencies, which means it's possible for two
# iterations to slightly overlap.
new_all_device_tensors = []
for device_tensors in all_device_tensors:
new_all_device_tensors.append([
control_flow_ops.with_dependencies(
device_tensors, t, name='identity_after_dependencies')
for t in device_tensors
])
all_device_tensors = new_all_device_tensors
# To prevent the dependency optimizer from removing every op we created,
# we store the results in variables.
ops_to_run = []
for device, device_tensors in zip(tower_devices, all_device_tensors):
with tf.device(device):
for t in device_tensors:
# The placeholder initial value is never run.
var = tf.Variable(tf.placeholder(tf.float32, t.shape), collections=[])
ops_to_run.append(var.assign(t))
return tf.group(*ops_to_run)
def build_graph(tower_devices, tensor_shapes, variable_mgr, num_iters):
"""Builds the graph for the benchmark.
Args:
tower_devices: A list of device strings of the devices to run the all-reduce
benchmark on.
tensor_shapes: A list of shapes of the tensors that will be aggregated for
the all-reduce.
variable_mgr: The VariableMgr to perform the all-reduce.
num_iters: Number of iterations to aggregate tensors for.
Returns:
An op that runs the benchmark.
"""
all_device_tensors = []
for i, tower_device in enumerate(tower_devices):
with tf.device(tower_device):
device_tensors = []
for j, shape in enumerate(tensor_shapes):
tensor = tf.Variable(tf.random_normal(shape, dtype=tf.float32),
name='tensor_%d_on_device_%d' % (j, i))
device_tensors.append(tensor)
all_device_tensors.append(device_tensors)
log_fn('Building all-reduce ops')
benchmark_op = build_all_reduce_iterations(all_device_tensors, tower_devices,
variable_mgr, num_iters)
log_fn('Done building all-reduce ops')
return benchmark_op
def run_graph(benchmark_op, bench_cnn, init_ops, dummy_loss_op):
"""Runs the graph for the benchmark.
Args:
benchmark_op: An op that runs the benchmark.
bench_cnn: The BenchmarkCNN where params and other attributes are obtained.
init_ops: A list of ops that are run before `benchmark_op` for
initialization.
dummy_loss_op: Any op. We must pass a loss op to
`benchmark_cnn.benchmark_one_step`, but the result of the op is never
actually used.
"""
config = benchmark_cnn.create_config_proto(bench_cnn.params)
with tf.Session(config=config) as sess:
for op in init_ops:
sess.run(op)
step_train_times = []
fetches = {'average_loss': dummy_loss_op, 'benchmark_op': benchmark_op}
log_fn('Running warmup')
for i in range(-bench_cnn.num_warmup_batches, bench_cnn.num_batches):
if i == 0:
log_fn('Running all-reduce ops')
start = time.perf_counter()
if i > 0 and i % bench_cnn.params.display_every == 0:
log_fn('Iteration: %d. Average time per step so far: %s' %
(i, (time.perf_counter() - start) / i))
# Call benchmark_one_step instead of directly calling sess.run(...), to
# potentially get a trace file, partitioned graphs, etc.
benchmark_cnn.benchmark_one_step(
sess=sess,
fetches=fetches,
step=i,
# The batch size is only used for the images/sec calculation, which is
# not actually calculated because we pass show_images_per_sec=False.
batch_size=None,
step_train_times=step_train_times,
trace_filename=bench_cnn.trace_filename,
partitioned_graph_file_prefix=(
bench_cnn.params.partitioned_graph_file_prefix),
profiler=None,
image_producer=None,
params=bench_cnn.params,
show_images_per_sec=False)
log_fn('Average time per step: %s' %
((time.perf_counter() - start) / bench_cnn.num_batches))
def run_benchmark(bench_cnn, num_iters):
"""Runs the all-reduce benchmark.
Args:
bench_cnn: The BenchmarkCNN where params, the variable manager, and other
attributes are obtained.
num_iters: Number of iterations to do all-reduce for for.
Raises:
ValueError: Invalid params of bench_cnn.
"""
if bench_cnn.params.variable_update != 'replicated':
raise ValueError('--variable_update=replicated must be specified to use'
'the all-reduce benchmark')
if bench_cnn.params.variable_consistency == 'relaxed':
raise ValueError('--variable_consistency=relaxed is not supported')
benchmark_op = build_graph(bench_cnn.raw_devices,
get_var_shapes(bench_cnn.model),
bench_cnn.variable_mgr, num_iters)
init_ops = [
tf.global_variables_initializer(),
bench_cnn.variable_mgr.get_post_init_ops()
]
loss_op = tf.no_op()
if bench_cnn.graph_file:
path, filename = os.path.split(bench_cnn.graph_file)
as_text = filename.endswith('txt')
log_fn('Writing GraphDef as %s to %s' % (
'text' if as_text else 'binary', bench_cnn.graph_file))
tf.train.write_graph(tf.get_default_graph().as_graph_def(add_shapes=True),
path, filename, as_text)
run_graph(benchmark_op, bench_cnn, init_ops, loss_op)
# TODO(reedwm): Reduce redundancy with tf_cnn_benchmarks
def main(positional_arguments):
# Command-line arguments like '--distortions False' are equivalent to
# '--distortions=True False', where False is a positional argument. To prevent
# this from silently running with distortions, we do not allow positional
# arguments.
assert len(positional_arguments) >= 1
if len(positional_arguments) > 1:
raise ValueError('Received unknown positional arguments: %s'
% positional_arguments[1:])
params = benchmark_cnn.make_params_from_flags()
params = benchmark_cnn.setup(params)
bench = benchmark_cnn.BenchmarkCNN(params)
tfversion = cnn_util.tensorflow_version_tuple()
log_fn('TensorFlow: %i.%i' % (tfversion[0], tfversion[1]))
run_benchmark(bench, absl_flags.FLAGS.iters_per_step)
if __name__ == '__main__':
tf.disable_v2_behavior()
app.run(main) # Raises error on invalid flags, unlike tf.app.run()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment