Unverified Commit 932364b6 authored by Qianli Scott Zhu's avatar Qianli Scott Zhu Committed by GitHub
Browse files

Add benchmark upload util to Bigquery. (#3776)

* Add benchmark upload util to bigquery.

Also update the benchmark logger and bigquery schema for the
errors found during the integration test.

* Fix lint error.

* Update test to clear all the env vars during test.

This was causing error since the Kokoro test has TF_PKG=tf-nightly
injected during test.

* Update lintrc to ignore google related package.

* Another attempt to fix lint import error.

* Address the review comment.

* Fix lint error.

* Another fix for lint.

* Update test comment for env var clean up.
parent 03781c74
......@@ -15,7 +15,7 @@
"description": "The date when the test of the model is started",
"mode": "REQUIRED",
"name": "run_date",
"type": "DATETIME"
"type": "TIMESTAMP"
},
{
"description": "The tensorflow version information.",
......@@ -58,7 +58,7 @@
"type": "RECORD"
},
{
"description": "Enviornment variables when the benchmark run is executed.",
"description": "Environment variables when the benchmark run is executed.",
"fields": [
{
"description": "The name of the variable.",
......@@ -74,7 +74,27 @@
}
],
"mode": "REPEATED",
"name": "enviornment_variable",
"name": "environment_variable",
"type": "RECORD"
},
{
"description": "TF Environment variables when the benchmark run is executed.",
"fields": [
{
"description": "The name of the variable.",
"mode": "REQUIRED",
"name": "name",
"type": "STRING"
},
{
"description": "The value of the variable.",
"mode": "NULLABLE",
"name": "value",
"type": "STRING"
}
],
"mode": "REPEATED",
"name": "tensorflow_environment_variables",
"type": "RECORD"
},
{
......
psutil>=5.4.3
py-cpuinfo>=3.3.0
\ No newline at end of file
py-cpuinfo>=3.3.0
google-cloud-bigquery>=0.31.0
\ No newline at end of file
......@@ -348,6 +348,12 @@ def resnet_main(flags, model_function, input_function):
'version': flags.version,
})
if flags.benchmark_log_dir is not None:
benchmark_logger = logger.BenchmarkLogger(flags.benchmark_log_dir)
benchmark_logger.log_run_info("resnet")
else:
benchmark_logger = None
for _ in range(flags.train_epochs // flags.epochs_between_evals):
train_hooks = hooks_helper.get_train_hooks(
flags.hooks,
......@@ -380,8 +386,7 @@ def resnet_main(flags, model_function, input_function):
steps=flags.max_train_steps)
print(eval_results)
if flags.benchmark_log_dir is not None:
benchmark_logger = logger.BenchmarkLogger(flags.benchmark_log_dir)
if benchmark_logger:
benchmark_logger.log_estimator_evaluation_result(eval_results)
......
......@@ -234,7 +234,8 @@ class BenchmarkParser(argparse.ArgumentParser):
benchmark_log_dir: Create a flag to specify location for benchmark logging.
"""
def __init__(self, add_help=False, benchmark_log_dir=True):
def __init__(self, add_help=False, benchmark_log_dir=True,
bigquery_uploader=True):
super(BenchmarkParser, self).__init__(add_help=add_help)
if benchmark_log_dir:
self.add_argument(
......@@ -242,3 +243,28 @@ class BenchmarkParser(argparse.ArgumentParser):
help="[default: %(default)s] The location of the benchmark logging.",
metavar="<BLD>"
)
if bigquery_uploader:
self.add_argument(
"--gcp_project", "-gp", default=None,
help="[default: %(default)s] The GCP project name where the benchmark"
" will be uploaded.",
metavar="<GP>"
)
self.add_argument(
"--bigquery_data_set", "-bds", default="test_benchmark",
help="[default: %(default)s] The Bigquery dataset name where the"
" benchmark will be uploaded.",
metavar="<BDS>"
)
self.add_argument(
"--bigquery_run_table", "-brt", default="benchmark_run",
help="[default: %(default)s] The Bigquery table name where the"
" benchmark run information will be uploaded.",
metavar="<BRT>"
)
self.add_argument(
"--bigquery_metric_table", "-bmt", default="benchmark_metric",
help="[default: %(default)s] The Bigquery table name where the"
" benchmark metric information will be uploaded.",
metavar="<BMT>"
)
......@@ -29,7 +29,7 @@ class TestParser(argparse.ArgumentParser):
parsers.PerformanceParser(num_parallel_calls=True, inter_op=True,
intra_op=True, use_synthetic_data=True),
parsers.ImageModelParser(data_format=True),
parsers.BenchmarkParser(benchmark_log_dir=True)
parsers.BenchmarkParser(benchmark_log_dir=True, bigquery_uploader=True)
])
......@@ -62,7 +62,8 @@ class BaseTester(unittest.TestCase):
def test_benchmark_setting(self):
defaults = dict(
hooks=["LoggingMetricHook"],
benchmark_log_dir="/tmp/12345"
benchmark_log_dir="/tmp/12345",
gcp_project="project_abc",
)
parser = TestParser()
......
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Library to upload benchmark generated by BenchmarkLogger to remote repo.
This library require google cloud bigquery lib as dependency, which can be
installed with:
> pip install --upgrade google-cloud-bigquery
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import json
import os
import sys
import uuid
from google.cloud import bigquery
import tensorflow as tf # pylint: disable=g-bad-import-order
from official.utils.arg_parsers import parsers
from official.utils.logging import logger
class BigQueryUploader(object):
"""Upload the benchmark and metric info to BigQuery."""
def __init__(self, logging_dir, gcp_project=None, credentials=None):
"""Initialized BigQueryUploader with proper setting.
Args:
logging_dir: string, logging directory that contains the benchmark log.
gcp_project: string, the name of the GCP project that the log will be
uploaded to. The default project name will be detected from local
environment if no value is provided.
credentials: google.auth.credentials. The credential to access the
BigQuery service. The default service account credential will be
detected from local environment if no value is provided. Please use
google.oauth2.service_account.Credentials to load credential from local
file for the case that the test is run out side of GCP.
"""
self._logging_dir = logging_dir
self._bq_client = bigquery.Client(
project=gcp_project, credentials=credentials)
def upload_benchmark_run(self, dataset_name, table_name, run_id):
"""Upload benchmark run information to Bigquery.
Args:
dataset_name: string, the name of bigquery dataset where the data will be
uploaded.
table_name: string, the name of bigquery table under the dataset where
the data will be uploaded.
run_id: string, a unique ID that will be attached to the data, usually
this is a UUID4 format.
"""
expected_file = os.path.join(
self._logging_dir, logger.BENCHMARK_RUN_LOG_FILE_NAME)
with tf.gfile.GFile(expected_file) as f:
benchmark_json = json.load(f)
benchmark_json["model_id"] = run_id
table_ref = self._bq_client.dataset(dataset_name).table(table_name)
errors = self._bq_client.insert_rows_json(table_ref, [benchmark_json])
if errors:
tf.logging.error(
"Failed to upload benchmark info to bigquery: {}".format(errors))
def upload_metric(self, dataset_name, table_name, run_id):
"""Upload metric information to Bigquery.
Args:
dataset_name: string, the name of bigquery dataset where the data will be
uploaded.
table_name: string, the name of bigquery table under the dataset where
the metric data will be uploaded. This is different from the
benchmark_run table.
run_id: string, a unique ID that will be attached to the data, usually
this is a UUID4 format. This should be the same as the benchmark run_id.
"""
expected_file = os.path.join(
self._logging_dir, logger.METRIC_LOG_FILE_NAME)
with tf.gfile.GFile(expected_file) as f:
lines = f.readlines()
metrics = []
for line in filter(lambda l: l.strip(), lines):
metric = json.loads(line)
metric["run_id"] = run_id
metrics.append(metric)
table_ref = self._bq_client.dataset(dataset_name).table(table_name)
errors = self._bq_client.insert_rows_json(table_ref, metrics)
if errors:
tf.logging.error(
"Failed to upload benchmark info to bigquery: {}".format(errors))
def main(argv):
parser = parsers.BenchmarkParser()
flags = parser.parse_args(args=argv[1:])
if not flags.benchmark_log_dir:
print("Usage: benchmark_uploader.py --benchmark_log_dir=/some/dir")
sys.exit(1)
uploader = BigQueryUploader(
flags.benchmark_log_dir,
gcp_project=flags.gcp_project)
run_id = str(uuid.uuid4())
uploader.upload_benchmark_run(
flags.bigquery_data_set, flags.bigquery_run_table, run_id)
uploader.upload_metric(
flags.bigquery_data_set, flags.bigquery_metric_table, run_id)
if __name__ == "__main__":
main(argv=sys.argv)
......@@ -31,8 +31,8 @@ import os
import tensorflow as tf
from tensorflow.python.client import device_lib
_METRIC_LOG_FILE_NAME = "metric.log"
_BENCHMARK_RUN_LOG_FILE_NAME = "benchmark_run.log"
METRIC_LOG_FILE_NAME = "metric.log"
BENCHMARK_RUN_LOG_FILE_NAME = "benchmark_run.log"
_DATE_TIME_FORMAT_PATTERN = "%Y-%m-%dT%H:%M:%S.%fZ"
......@@ -81,9 +81,12 @@ class BenchmarkLogger(object):
tf.logging.warning(
"Metric value to log should be a number. Got %s", type(value))
return
if extras:
extras = [{"name": k, "value": v} for k, v in sorted(extras.items())]
else:
extras = []
with tf.gfile.GFile(
os.path.join(self._logging_dir, _METRIC_LOG_FILE_NAME), "a") as f:
os.path.join(self._logging_dir, METRIC_LOG_FILE_NAME), "a") as f:
metric = {
"name": name,
"value": float(value),
......@@ -107,7 +110,10 @@ class BenchmarkLogger(object):
Args:
model_name: string, the name of the model.
"""
run_info = {"model_name": model_name}
run_info = {
"model_name": model_name,
"machine_config": {},
"run_date": datetime.datetime.now().strftime(_DATE_TIME_FORMAT_PATTERN)}
_collect_tensorflow_info(run_info)
_collect_tensorflow_environment_variables(run_info)
_collect_cpu_info(run_info)
......@@ -115,7 +121,7 @@ class BenchmarkLogger(object):
_collect_memory_info(run_info)
with tf.gfile.GFile(os.path.join(
self._logging_dir, _BENCHMARK_RUN_LOG_FILE_NAME), "w") as f:
self._logging_dir, BENCHMARK_RUN_LOG_FILE_NAME), "w") as f:
try:
json.dump(run_info, f)
f.write("\n")
......@@ -130,8 +136,9 @@ def _collect_tensorflow_info(run_info):
def _collect_tensorflow_environment_variables(run_info):
run_info["tensorflow_environment_variables"] = {
k: v for k, v in os.environ.items() if k.startswith("TF_")}
run_info["tensorflow_environment_variables"] = [
{"name": k, "value": v}
for k, v in sorted(os.environ.items()) if k.startswith("TF_")]
# The following code is mirrored from tensorflow/tools/test/system_info_lib
......@@ -150,7 +157,7 @@ def _collect_cpu_info(run_info):
cpu_info["cpu_info"] = info["brand"]
cpu_info["mhz_per_cpu"] = info["hz_advertised_raw"][0] / 1.0e6
run_info["cpu_info"] = cpu_info
run_info["machine_config"]["cpu_info"] = cpu_info
def _collect_gpu_info(run_info):
......@@ -168,7 +175,7 @@ def _collect_gpu_info(run_info):
gpu_info["model"] = _parse_gpu_model(d.physical_device_desc)
# Assume all the GPU connected are same model
break
run_info["gpu_info"] = gpu_info
run_info["machine_config"]["gpu_info"] = gpu_info
def _collect_memory_info(run_info):
......@@ -176,8 +183,8 @@ def _collect_memory_info(run_info):
# It is installable via pip.
import psutil # pylint: disable=g-import-not-at-top
vmem = psutil.virtual_memory()
run_info["memory_total"] = vmem.total
run_info["memory_available"] = vmem.available
run_info["machine_config"]["memory_total"] = vmem.total
run_info["machine_config"]["memory_available"] = vmem.available
def _parse_gpu_model(physical_device_desc):
......
......@@ -25,16 +25,25 @@ import tempfile
import unittest
import tensorflow as tf # pylint: disable=g-bad-import-order
from tensorflow.python.client import device_lib
from official.utils.logging import logger
class BenchmarkLoggerTest(tf.test.TestCase):
def setUp(self):
super(BenchmarkLoggerTest, self).setUp()
# Avoid pulling extra env vars from test environment which affects the test
# result, eg. Kokoro test has a TF_PKG env which affect the test case
# test_collect_tensorflow_environment_variables()
self.original_environ = dict(os.environ)
os.environ.clear()
def tearDown(self):
super(BenchmarkLoggerTest, self).tearDown()
tf.gfile.DeleteRecursively(self.get_temp_dir())
os.environ.clear()
os.environ.update(self.original_environ)
def test_create_logging_dir(self):
non_exist_temp_dir = os.path.join(self.get_temp_dir(), "unknown_dir")
......@@ -56,7 +65,7 @@ class BenchmarkLoggerTest(tf.test.TestCase):
self.assertEqual(metric["value"], 0.999)
self.assertEqual(metric["unit"], None)
self.assertEqual(metric["global_step"], 1e4)
self.assertEqual(metric["extras"], {"name": "value"})
self.assertEqual(metric["extras"], [{"name": "name", "value": "value"}])
def test_log_multiple_metrics(self):
log_dir = tempfile.mkdtemp(dir=self.get_temp_dir())
......@@ -72,13 +81,14 @@ class BenchmarkLoggerTest(tf.test.TestCase):
self.assertEqual(accuracy["value"], 0.999)
self.assertEqual(accuracy["unit"], None)
self.assertEqual(accuracy["global_step"], 1e4)
self.assertEqual(accuracy["extras"], {"name": "value"})
self.assertEqual(accuracy["extras"], [{"name": "name", "value": "value"}])
loss = json.loads(f.readline())
self.assertEqual(loss["name"], "loss")
self.assertEqual(loss["value"], 0.02)
self.assertEqual(loss["unit"], None)
self.assertEqual(loss["global_step"], 1e4)
self.assertEqual(loss["extras"], [])
def test_log_non_nubmer_value(self):
log_dir = tempfile.mkdtemp(dir=self.get_temp_dir())
......@@ -130,24 +140,30 @@ class BenchmarkLoggerTest(tf.test.TestCase):
def test_collect_tensorflow_environment_variables(self):
os.environ["TF_ENABLE_WINOGRAD_NONFUSED"] = "1"
os.environ["TF_OTHER"] = "2"
os.environ["OTHER"] = "3"
run_info = {}
logger._collect_tensorflow_environment_variables(run_info)
self.assertIsNotNone(run_info["tensorflow_environment_variables"])
self.assertEqual(run_info["tensorflow_environment_variables"]
["TF_ENABLE_WINOGRAD_NONFUSED"], "1")
expected_tf_envs = [
{"name": "TF_ENABLE_WINOGRAD_NONFUSED", "value": "1"},
{"name": "TF_OTHER", "value": "2"},
]
self.assertEqual(run_info["tensorflow_environment_variables"],
expected_tf_envs)
@unittest.skipUnless(tf.test.is_built_with_cuda(), "requires GPU")
def test_collect_gpu_info(self):
run_info = {}
run_info = {"machine_config": {}}
logger._collect_gpu_info(run_info)
self.assertNotEqual(run_info["gpu_info"], {})
self.assertNotEqual(run_info["machine_config"]["gpu_info"], {})
def test_collect_memory_info(self):
run_info = {}
run_info = {"machine_config": {}}
logger._collect_memory_info(run_info)
self.assertIsNotNone(run_info["memory_total"])
self.assertIsNotNone(run_info["memory_available"])
self.assertIsNotNone(run_info["machine_config"]["memory_total"])
self.assertIsNotNone(run_info["machine_config"]["memory_available"])
if __name__ == "__main__":
tf.test.main()
......@@ -61,7 +61,7 @@ variable-rgx=^[a-z][a-z0-9_]*$
# (useful for modules/projects where namespaces are manipulated during runtime
# and thus existing member attributes cannot be deduced by static analysis. It
# supports qualified module names, as well as Unix pattern matching.
ignored-modules=official, official.*, tensorflow, tensorflow.*, LazyLoader
ignored-modules=official, official.*, tensorflow, tensorflow.*, LazyLoader, google, google.cloud.*
[CLASSES]
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment