Commit e00e0e13 authored by dreamdragon's avatar dreamdragon
Browse files

Merge remote-tracking branch 'upstream/master'

parents b915db4e 402b561b
......@@ -30,7 +30,7 @@
/research/lfads/ @jazcollins @susillo
/research/lm_1b/ @oriolvinyals @panyx0718
/research/lm_commonsense/ @thtrieu
/research/lstm_object_detection/ @dreamdragon
/research/lstm_object_detection/ @dreamdragon @masonliuw @yinxiaoli
/research/marco/ @vincentvanhoucke
/research/maskgan/ @a-dai
/research/morph_net/ @gariel-google
......@@ -49,6 +49,7 @@
/research/slim/ @sguada @nathansilberman
/research/steve/ @buckman-google
/research/street/ @theraysmith
/research/struct2depth/ @aneliaangelova
/research/swivel/ @waterson
/research/syntaxnet/ @calberti @andorardo @bogatyy @markomernick
/research/tcn/ @coreylynch @sermanet
......
......@@ -111,12 +111,7 @@ def _download_and_clean(dataset, data_dir):
temp_dir = tempfile.mkdtemp()
try:
zip_path = os.path.join(temp_dir, "{}.zip".format(dataset))
def _progress(count, block_size, total_size):
sys.stdout.write("\r>> Downloading {} {:.1f}%".format(
zip_path, 100.0 * count * block_size / total_size))
sys.stdout.flush()
zip_path, _ = urllib.request.urlretrieve(url, zip_path, _progress)
zip_path, _ = urllib.request.urlretrieve(url, zip_path)
statinfo = os.stat(zip_path)
# A new line to clear the carriage return from download progress
# tf.logging.info is not applicable here
......
......@@ -58,6 +58,10 @@ HR_KEY = "HR"
NDCG_KEY = "NDCG"
DUPLICATE_MASK = "duplicate_mask"
# Metric names
HR_METRIC_NAME = "HR_METRIC"
NDCG_METRIC_NAME = "NDCG_METRIC"
# ==============================================================================
# == Subprocess Data Generation ================================================
# ==============================================================================
......
......@@ -410,6 +410,7 @@ def _generation_loop(num_workers, # type: int
num_items, # type: int
num_users, # type: int
epochs_per_cycle, # type: int
num_cycles, # type: int
train_batch_size, # type: int
eval_batch_size, # type: int
deterministic, # type: bool
......@@ -449,7 +450,7 @@ def _generation_loop(num_workers, # type: int
wait_count = 0
start_time = time.time()
while True:
while train_cycle < num_cycles:
ready_epochs = tf.gfile.ListDirectory(cache_paths.train_epoch_dir)
if len(ready_epochs) >= rconst.CYCLES_TO_BUFFER:
wait_count += 1
......@@ -479,18 +480,21 @@ def _generation_loop(num_workers, # type: int
gc.collect()
def _parse_flagfile(flagfile):
"""Fill flags with flagfile written by the main process."""
tf.logging.info("Waiting for flagfile to appear at {}..."
.format(flagfile))
def wait_for_path(fpath):
start_time = time.time()
while not tf.gfile.Exists(flagfile):
while not tf.gfile.Exists(fpath):
if time.time() - start_time > rconst.TIMEOUT_SECONDS:
log_msg("Waited more than {} seconds. Concluding that this "
"process is orphaned and exiting gracefully."
.format(rconst.TIMEOUT_SECONDS))
sys.exit()
time.sleep(1)
def _parse_flagfile(flagfile):
"""Fill flags with flagfile written by the main process."""
tf.logging.info("Waiting for flagfile to appear at {}..."
.format(flagfile))
wait_for_path(flagfile)
tf.logging.info("flagfile found.")
# `flags` module opens `flagfile` with `open`, which does not work on
......@@ -504,6 +508,8 @@ def _parse_flagfile(flagfile):
def write_alive_file(cache_paths):
"""Write file to signal that generation process started correctly."""
wait_for_path(cache_paths.cache_root)
log_msg("Signaling that I am alive.")
with tf.gfile.Open(cache_paths.subproc_alive, "w") as f:
f.write("Generation subproc has started.")
......@@ -550,7 +556,8 @@ def main(_):
if flags.FLAGS.seed is not None:
np.random.seed(flags.FLAGS.seed)
with mlperf_helper.LOGGER(enable=flags.FLAGS.ml_perf):
with mlperf_helper.LOGGER(
enable=flags.FLAGS.output_ml_perf_compliance_logging):
mlperf_helper.set_ncf_root(os.path.split(os.path.abspath(__file__))[0])
_generation_loop(
num_workers=flags.FLAGS.num_workers,
......@@ -561,6 +568,7 @@ def main(_):
num_items=flags.FLAGS.num_items,
num_users=flags.FLAGS.num_users,
epochs_per_cycle=flags.FLAGS.epochs_per_cycle,
num_cycles=flags.FLAGS.num_cycles,
train_batch_size=flags.FLAGS.train_batch_size,
eval_batch_size=flags.FLAGS.eval_batch_size,
deterministic=flags.FLAGS.seed is not None,
......@@ -602,6 +610,9 @@ def define_flags():
flags.DEFINE_integer(name="epochs_per_cycle", default=1,
help="The number of epochs of training data to produce"
"at a time.")
flags.DEFINE_integer(name="num_cycles", default=None,
help="The number of cycles to produce training data "
"for.")
flags.DEFINE_integer(name="train_batch_size", default=None,
help="The batch size with which training TFRecords will "
"be chunked.")
......@@ -618,6 +629,9 @@ def define_flags():
"specified, a seed will not be set.")
flags.DEFINE_boolean(name="ml_perf", default=None,
help="Match MLPerf. See ncf_main.py for details.")
flags.DEFINE_bool(name="output_ml_perf_compliance_logging", default=None,
help="Output the MLPerf compliance logging. See "
"ncf_main.py for details.")
flags.mark_flags_as_required(["data_dir", "cache_id"])
......
......@@ -152,7 +152,7 @@ def _filter_index_sort(raw_rating_path, match_mlperf):
num_items = len(original_items)
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.PREPROC_HP_NUM_EVAL,
value=num_users * (1 + rconst.NUM_EVAL_NEGATIVES))
value=rconst.NUM_EVAL_NEGATIVES)
mlperf_helper.ncf_print(
key=mlperf_helper.TAGS.PREPROC_HP_SAMPLE_EVAL_REPLACEMENT,
value=match_mlperf)
......@@ -394,7 +394,8 @@ def _shutdown(proc):
try:
proc.send_signal(signal.SIGINT)
time.sleep(5)
if proc.returncode is not None:
if proc.poll() is not None:
tf.logging.info("Train data creation subprocess ended")
return # SIGINT was handled successfully within 5 seconds
except socket.error:
......@@ -403,6 +404,7 @@ def _shutdown(proc):
# Otherwise another second of grace period and then force kill the process.
time.sleep(1)
proc.terminate()
tf.logging.info("Train data creation subprocess killed")
except: # pylint: disable=broad-except
tf.logging.error("Data generation subprocess could not be killed.")
......@@ -429,9 +431,10 @@ def write_flagfile(flags_, ncf_dataset):
"Wrote flagfile for async data generation in {}.".format(flagfile))
def instantiate_pipeline(dataset, data_dir, batch_size, eval_batch_size,
num_data_readers=None, num_neg=4, epochs_per_cycle=1,
match_mlperf=False, deterministic=False,
use_subprocess=True, cache_id=None):
num_cycles, num_data_readers=None, num_neg=4,
epochs_per_cycle=1, match_mlperf=False,
deterministic=False, use_subprocess=True,
cache_id=None):
# type: (...) -> (NCFDataset, typing.Callable)
"""Preprocess data and start negative generation subprocess."""
......@@ -455,12 +458,14 @@ def instantiate_pipeline(dataset, data_dir, batch_size, eval_batch_size,
"num_users": ncf_dataset.num_users,
"num_readers": ncf_dataset.num_data_readers,
"epochs_per_cycle": epochs_per_cycle,
"num_cycles": num_cycles,
"train_batch_size": batch_size,
"eval_batch_size": eval_batch_size,
"num_workers": num_workers,
"redirect_logs": use_subprocess,
"use_tf_logging": not use_subprocess,
"ml_perf": match_mlperf,
"output_ml_perf_compliance_logging": mlperf_helper.LOGGER.enabled,
}
if use_subprocess:
......@@ -506,6 +511,7 @@ def instantiate_pipeline(dataset, data_dir, batch_size, eval_batch_size,
# will then enter a loop waiting for the flagfile to be written. Once we see
# that the async process has signaled that it is alive, we clear the system
# caches and begin the run.
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.RUN_CLEAR_CACHES)
mlperf_helper.clear_system_caches()
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.RUN_START)
write_flagfile(flags_, ncf_dataset)
......@@ -592,49 +598,29 @@ def hash_pipeline(dataset, deterministic):
tf.logging.info(" [pipeline_hash] All batches hash: {}".format(overall_hash))
def make_input_fn(ncf_dataset, is_training):
# type: (typing.Optional[NCFDataset], bool) -> (typing.Callable, str, int)
def make_input_fn(
ncf_dataset, # type: typing.Optional[NCFDataset]
is_training, # type: bool
record_files=None # type: typing.Optional[tf.Tensor]
):
# type: (...) -> (typing.Callable, str, int)
"""Construct training input_fn for the current epoch."""
if ncf_dataset is None:
return make_synthetic_input_fn(is_training)
if not tf.gfile.Exists(ncf_dataset.cache_paths.subproc_alive):
# The generation subprocess must have been alive at some point, because we
# earlier checked that the subproc_alive file existed.
raise ValueError("Generation subprocess unexpectedly died. Data will not "
"be available; exiting to avoid waiting forever.")
if is_training:
train_epoch_dir = ncf_dataset.cache_paths.train_epoch_dir
while not tf.gfile.Exists(train_epoch_dir):
tf.logging.info("Waiting for {} to exist.".format(train_epoch_dir))
time.sleep(1)
train_data_dirs = tf.gfile.ListDirectory(train_epoch_dir)
while not train_data_dirs:
tf.logging.info("Waiting for data folder to be created.")
time.sleep(1)
train_data_dirs = tf.gfile.ListDirectory(train_epoch_dir)
train_data_dirs.sort() # names are zfilled so that
# lexicographic sort == numeric sort
record_dir = os.path.join(train_epoch_dir, train_data_dirs[0])
template = rconst.TRAIN_RECORD_TEMPLATE
if record_files is not None:
epoch_metadata = None
batch_count = None
record_dir = None
else:
record_dir = ncf_dataset.cache_paths.eval_data_subdir
template = rconst.EVAL_RECORD_TEMPLATE
ready_file = os.path.join(record_dir, rconst.READY_FILE)
while not tf.gfile.Exists(ready_file):
tf.logging.info("Waiting for records in {} to be ready".format(record_dir))
time.sleep(1)
epoch_metadata, record_dir, template = get_epoch_info(is_training,
ncf_dataset)
record_files = os.path.join(record_dir, template.format("*"))
# This value is used to check that the batch count from the subprocess
# matches the batch count expected by the main thread.
batch_count = epoch_metadata["batch_count"]
with tf.gfile.Open(ready_file, "r") as f:
epoch_metadata = json.load(f)
# This value is used to check that the batch count from the subprocess matches
# the batch count expected by the main thread.
batch_count = epoch_metadata["batch_count"]
def input_fn(params):
"""Generated input_fn for the given epoch."""
......@@ -645,15 +631,13 @@ def make_input_fn(ncf_dataset, is_training):
# populates "batch_size" to the appropriate value.
batch_size = params.get("eval_batch_size") or params["batch_size"]
if epoch_metadata["batch_size"] != batch_size:
if epoch_metadata and epoch_metadata["batch_size"] != batch_size:
raise ValueError(
"Records were constructed with batch size {}, but input_fn was given "
"a batch size of {}. This will result in a deserialization error in "
"tf.parse_single_example."
.format(epoch_metadata["batch_size"], batch_size))
record_files = tf.data.Dataset.list_files(
os.path.join(record_dir, template.format("*")), shuffle=False)
record_files_ds = tf.data.Dataset.list_files(record_files, shuffle=False)
interleave = tf.contrib.data.parallel_interleave(
tf.data.TFRecordDataset,
......@@ -664,7 +648,7 @@ def make_input_fn(ncf_dataset, is_training):
)
deserialize = make_deserialize(params, batch_size, is_training)
dataset = record_files.apply(interleave)
dataset = record_files_ds.apply(interleave)
dataset = dataset.map(deserialize, num_parallel_calls=4)
dataset = dataset.prefetch(32)
......@@ -676,6 +660,61 @@ def make_input_fn(ncf_dataset, is_training):
return input_fn, record_dir, batch_count
def _check_subprocess_alive(ncf_dataset, directory):
if (not tf.gfile.Exists(ncf_dataset.cache_paths.subproc_alive) and
not tf.gfile.Exists(directory)):
# The generation subprocess must have been alive at some point, because we
# earlier checked that the subproc_alive file existed.
raise ValueError("Generation subprocess unexpectedly died. Data will not "
"be available; exiting to avoid waiting forever.")
def get_epoch_info(is_training, ncf_dataset):
"""Wait for the epoch input data to be ready and return various info about it.
Args:
is_training: If we should return info for a training or eval epoch.
ncf_dataset: An NCFDataset.
Returns:
epoch_metadata: A dict with epoch metadata.
record_dir: The directory with the TFRecord files storing the input data.
template: A string template of the files in `record_dir`.
`template.format('*')` is a glob that matches all the record files.
"""
if is_training:
train_epoch_dir = ncf_dataset.cache_paths.train_epoch_dir
_check_subprocess_alive(ncf_dataset, train_epoch_dir)
while not tf.gfile.Exists(train_epoch_dir):
tf.logging.info("Waiting for {} to exist.".format(train_epoch_dir))
time.sleep(1)
train_data_dirs = tf.gfile.ListDirectory(train_epoch_dir)
while not train_data_dirs:
tf.logging.info("Waiting for data folder to be created.")
time.sleep(1)
train_data_dirs = tf.gfile.ListDirectory(train_epoch_dir)
train_data_dirs.sort() # names are zfilled so that
# lexicographic sort == numeric sort
record_dir = os.path.join(train_epoch_dir, train_data_dirs[0])
template = rconst.TRAIN_RECORD_TEMPLATE
else:
record_dir = ncf_dataset.cache_paths.eval_data_subdir
_check_subprocess_alive(ncf_dataset, record_dir)
template = rconst.EVAL_RECORD_TEMPLATE
ready_file = os.path.join(record_dir, rconst.READY_FILE)
while not tf.gfile.Exists(ready_file):
tf.logging.info("Waiting for records in {} to be ready".format(record_dir))
time.sleep(1)
with tf.gfile.Open(ready_file, "r") as f:
epoch_metadata = json.load(f)
return epoch_metadata, record_dir, template
def make_synthetic_input_fn(is_training):
"""Construct training input_fn that uses synthetic data."""
def input_fn(params):
......
......@@ -118,7 +118,7 @@ class BaseTest(tf.test.TestCase):
ncf_dataset, _ = data_preprocessing.instantiate_pipeline(
dataset=DATASET, data_dir=self.temp_data_dir,
batch_size=BATCH_SIZE, eval_batch_size=EVAL_BATCH_SIZE,
num_data_readers=2, num_neg=NUM_NEG)
num_cycles=1, num_data_readers=2, num_neg=NUM_NEG)
g = tf.Graph()
with g.as_default():
......
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Contains NcfModelRunner, which can train and evaluate an NCF model."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from collections import namedtuple
import os
import time
import tensorflow as tf
from tensorflow.contrib.compiler import xla
from official.recommendation import constants as rconst
from official.recommendation import data_preprocessing
from official.recommendation import neumf_model
class NcfModelRunner(object):
"""Creates a graph to train/evaluate an NCF model, and runs it.
This class builds both a training model and evaluation model in the graph.
The two models share variables, so that during evaluation, the trained
variables are used.
"""
# _TrainModelProperties and _EvalModelProperties store useful properties of
# the training and evaluation models, respectively.
# _SHARED_MODEL_PROPERTY_FIELDS is their shared fields.
_SHARED_MODEL_PROPERTY_FIELDS = (
# A scalar tf.string placeholder tensor, that will be fed the path to the
# directory storing the TFRecord files for the input data.
"record_files_placeholder",
# The tf.data.Iterator to iterate over the input data.
"iterator",
# A scalar float tensor representing the model loss.
"loss",
# The batch size, as a Python int.
"batch_size",
# The op to run the model. For the training model, this trains the model
# for one step. For the evaluation model, this computes the metrics and
# updates the metric variables.
"run_model_op")
_TrainModelProperties = namedtuple("_TrainModelProperties", # pylint: disable=invalid-name
_SHARED_MODEL_PROPERTY_FIELDS)
_EvalModelProperties = namedtuple( # pylint: disable=invalid-name
"_EvalModelProperties", _SHARED_MODEL_PROPERTY_FIELDS + (
# A dict from metric name to metric tensor.
"metrics",
# Initializes the metric variables.
"metric_initializer",))
def __init__(self, ncf_dataset, params, num_train_steps, num_eval_steps,
use_while_loop):
self._num_train_steps = num_train_steps
self._num_eval_steps = num_eval_steps
self._use_while_loop = use_while_loop
with tf.Graph().as_default() as self._graph:
if params["use_xla_for_gpu"]:
# The XLA functions we use require resource variables.
tf.enable_resource_variables()
self._ncf_dataset = ncf_dataset
self._global_step = tf.train.create_global_step()
self._train_model_properties = self._build_model(params, num_train_steps,
is_training=True)
self._eval_model_properties = self._build_model(params, num_eval_steps,
is_training=False)
initializer = tf.global_variables_initializer()
self._graph.finalize()
self._session = tf.Session(graph=self._graph)
self._session.run(initializer)
def _compute_metric_mean(self, metric_name):
"""Computes the mean from a call tf tf.metrics.mean().
tf.metrics.mean() already returns the mean, so normally this call is
unnecessary. But, if tf.metrics.mean() is called inside a tf.while_loop, the
mean cannot be accessed outside the while loop. Calling this function
recomputes the mean from the variables created by tf.metrics.mean(),
allowing the mean to be accessed outside the while loop.
Args:
metric_name: The string passed to the 'name' argument of tf.metrics.mean()
Returns:
The mean of the metric.
"""
metric_vars = tf.get_collection(tf.GraphKeys.METRIC_VARIABLES)
total_suffix = metric_name + "/total:0"
total_vars = [v for v in metric_vars if v.name.endswith(total_suffix)]
assert len(total_vars) == 1., (
"Found {} metric variables ending with '{}' but expected to find "
"exactly 1. All metric variables: {}".format(
len(total_vars), total_suffix, metric_vars))
total_var = total_vars[0]
count_suffix = metric_name + "/count:0"
count_vars = [v for v in metric_vars if v.name.endswith(count_suffix)]
assert len(count_vars) == 1., (
"Found {} metric variables ending with '{}' but expected to find "
"exactly 1. All metric variables: {}".format(
len(count_vars), count_suffix, metric_vars))
count_var = count_vars[0]
return total_var / count_var
def _build_model(self, params, num_steps, is_training):
"""Builds the NCF model.
Args:
params: A dict of hyperparameters.
is_training: If True, build the training model. If False, build the
evaluation model.
Returns:
A _TrainModelProperties if is_training is True, or an _EvalModelProperties
otherwise.
"""
record_files_placeholder = tf.placeholder(tf.string, ())
input_fn, _, _ = \
data_preprocessing.make_input_fn(
ncf_dataset=self._ncf_dataset, is_training=is_training,
record_files=record_files_placeholder)
dataset = input_fn(params)
iterator = dataset.make_initializable_iterator()
model_fn = neumf_model.neumf_model_fn
if params["use_xla_for_gpu"]:
model_fn = xla.estimator_model_fn(model_fn)
if is_training:
return self._build_train_specific_graph(
iterator, model_fn, params, record_files_placeholder, num_steps)
else:
return self._build_eval_specific_graph(
iterator, model_fn, params, record_files_placeholder, num_steps)
def _build_train_specific_graph(self, iterator, model_fn, params,
record_files_placeholder, num_train_steps):
"""Builds the part of the model that is specific to training."""
def build():
features, labels = iterator.get_next()
estimator_spec = model_fn(
features, labels, tf.estimator.ModeKeys.TRAIN, params)
with tf.control_dependencies([estimator_spec.train_op]):
run_model_op = self._global_step.assign_add(1)
return run_model_op, estimator_spec.loss
if self._use_while_loop:
def body(i):
run_model_op_single_step, _ = build()
with tf.control_dependencies([run_model_op_single_step]):
return i + 1
run_model_op = tf.while_loop(lambda i: i < num_train_steps, body, [0],
parallel_iterations=1)
loss = None
else:
run_model_op, loss = build()
return self._TrainModelProperties(
record_files_placeholder, iterator, loss, params["batch_size"],
run_model_op)
def _build_eval_specific_graph(self, iterator, model_fn, params,
record_files_placeholder, num_eval_steps):
"""Builds the part of the model that is specific to evaluation."""
def build():
features = iterator.get_next()
estimator_spec = model_fn(
features, None, tf.estimator.ModeKeys.EVAL, params)
run_model_op = tf.group(*(update_op for _, update_op in
estimator_spec.eval_metric_ops.values()))
eval_metric_tensors = {k: tensor for (k, (tensor, _))
in estimator_spec.eval_metric_ops.items()}
return run_model_op, estimator_spec.loss, eval_metric_tensors
if self._use_while_loop:
def body(i):
run_model_op_single_step, _, _ = build()
with tf.control_dependencies([run_model_op_single_step]):
return i + 1
run_model_op = tf.while_loop(lambda i: i < num_eval_steps, body, [0],
parallel_iterations=1)
loss = None
eval_metric_tensors = {
"HR": self._compute_metric_mean(rconst.HR_METRIC_NAME),
"NDCG": self._compute_metric_mean(rconst.NDCG_METRIC_NAME),
}
else:
run_model_op, loss, eval_metric_tensors = build()
metric_initializer = tf.variables_initializer(
tf.get_collection(tf.GraphKeys.METRIC_VARIABLES))
return self._EvalModelProperties(
record_files_placeholder, iterator, loss, params["eval_batch_size"],
run_model_op, eval_metric_tensors, metric_initializer)
def _train_or_eval(self, model_properties, num_steps, is_training):
"""Either trains or evaluates, depending on whether `is_training` is True.
Args:
model_properties: _TrainModelProperties or an _EvalModelProperties
containing the properties of the training or evaluation graph.
num_steps: The number of steps to train or evaluate for.
is_training: If True, run the training model. If False, run the evaluation
model.
Returns:
record_dir: The directory of TFRecords where the training/evaluation input
data was read from.
"""
if self._ncf_dataset is not None:
epoch_metadata, record_dir, template = data_preprocessing.get_epoch_info(
is_training=is_training, ncf_dataset=self._ncf_dataset)
batch_count = epoch_metadata["batch_count"]
if batch_count != num_steps:
raise ValueError(
"Step counts do not match. ({} vs. {}) The async process is "
"producing incorrect shards.".format(batch_count, num_steps))
record_files = os.path.join(record_dir, template.format("*"))
initializer_feed_dict = {
model_properties.record_files_placeholder: record_files}
del batch_count
else:
initializer_feed_dict = None
record_dir = None
self._session.run(model_properties.iterator.initializer,
initializer_feed_dict)
fetches = (model_properties.run_model_op,)
if model_properties.loss is not None:
fetches += (model_properties.loss,)
mode = "Train" if is_training else "Eval"
start = None
times_to_run = 1 if self._use_while_loop else num_steps
for i in range(times_to_run):
fetches_ = self._session.run(fetches)
if i % 100 == 0:
if start is None:
# Only start the timer after 100 steps so there is a warmup.
start = time.time()
start_step = i
if model_properties.loss is not None:
_, loss = fetches_
tf.logging.info("{} Loss = {}".format(mode, loss))
end = time.time()
if start is not None:
print("{} peformance: {} examples/sec".format(
mode, (i - start_step) * model_properties.batch_size / (end - start)))
return record_dir
def train(self):
"""Trains the graph for a single cycle."""
record_dir = self._train_or_eval(self._train_model_properties,
self._num_train_steps, is_training=True)
if record_dir:
# We delete the record_dir because each cycle, new TFRecords is generated
# by the async process.
tf.gfile.DeleteRecursively(record_dir)
def eval(self):
"""Evaluates the graph on the eval data.
Returns:
A dict of evaluation results.
"""
self._session.run(self._eval_model_properties.metric_initializer)
self._train_or_eval(self._eval_model_properties, self._num_eval_steps,
is_training=False)
eval_results = {
'global_step': self._session.run(self._global_step)}
for key, val in self._eval_model_properties.metrics.items():
val_ = self._session.run(val)
tf.logging.info("{} = {}".format(key, self._session.run(val)))
eval_results[key] = val_
return eval_results
......@@ -41,6 +41,7 @@ from tensorflow.contrib.compiler import xla
from official.datasets import movielens
from official.recommendation import constants as rconst
from official.recommendation import data_preprocessing
from official.recommendation import model_runner
from official.recommendation import neumf_model
from official.utils.flags import core as flags_core
from official.utils.logs import hooks_helper
......@@ -50,13 +51,17 @@ from official.utils.misc import distribution_utils
from official.utils.misc import model_helpers
def construct_estimator(num_gpus, model_dir, params, batch_size,
FLAGS = flags.FLAGS
def construct_estimator(num_gpus, model_dir, iterations, params, batch_size,
eval_batch_size):
"""Construct either an Estimator or TPUEstimator for NCF.
Args:
num_gpus: The number of gpus (Used to select distribution strategy)
model_dir: The model directory for the estimator
iterations: Estimator iterations
params: The params dict for the estimator
batch_size: The mini-batch size for training.
eval_batch_size: The batch size used during evaluation.
......@@ -75,12 +80,13 @@ def construct_estimator(num_gpus, model_dir, params, batch_size,
tf.Session.reset(tpu_cluster_resolver.get_master())
tpu_config = tf.contrib.tpu.TPUConfig(
iterations_per_loop=100,
iterations_per_loop=iterations,
num_shards=8)
run_config = tf.contrib.tpu.RunConfig(
cluster=tpu_cluster_resolver,
model_dir=model_dir,
save_checkpoints_secs=600,
session_config=tf.ConfigProto(
allow_soft_placement=True, log_device_placement=False),
tpu_config=tpu_config)
......@@ -91,12 +97,13 @@ def construct_estimator(num_gpus, model_dir, params, batch_size,
model_fn=neumf_model.neumf_model_fn,
use_tpu=True,
train_batch_size=batch_size,
eval_batch_size=eval_batch_size,
params=tpu_params,
config=run_config)
eval_estimator = tf.contrib.tpu.TPUEstimator(
model_fn=neumf_model.neumf_model_fn,
use_tpu=False,
use_tpu=True,
train_batch_size=1,
eval_batch_size=eval_batch_size,
params=tpu_params,
......@@ -118,7 +125,8 @@ def construct_estimator(num_gpus, model_dir, params, batch_size,
def main(_):
with logger.benchmark_context(FLAGS), mlperf_helper.LOGGER(FLAGS.ml_perf):
with logger.benchmark_context(FLAGS), \
mlperf_helper.LOGGER(FLAGS.output_ml_perf_compliance_logging):
mlperf_helper.set_ncf_root(os.path.split(os.path.abspath(__file__))[0])
run_ncf(FLAGS)
mlperf_helper.stitch_ncf()
......@@ -135,6 +143,7 @@ def run_ncf(_):
num_gpus = flags_core.get_num_gpus(FLAGS)
batch_size = distribution_utils.per_device_batch_size(
int(FLAGS.batch_size), num_gpus)
total_training_cycle = FLAGS.train_epochs // FLAGS.epochs_between_evals
eval_per_user = rconst.NUM_EVAL_NEGATIVES + 1
eval_batch_size = int(FLAGS.eval_batch_size or
......@@ -159,6 +168,7 @@ def run_ncf(_):
eval_batch_size=eval_batch_size,
num_neg=FLAGS.num_neg,
epochs_per_cycle=FLAGS.epochs_between_evals,
num_cycles=total_training_cycle,
match_mlperf=FLAGS.ml_perf,
deterministic=FLAGS.seed is not None,
use_subprocess=FLAGS.use_subprocess,
......@@ -173,30 +183,38 @@ def run_ncf(_):
model_helpers.apply_clean(flags.FLAGS)
train_estimator, eval_estimator = construct_estimator(
num_gpus=num_gpus, model_dir=FLAGS.model_dir, params={
"use_seed": FLAGS.seed is not None,
"hash_pipeline": FLAGS.hash_pipeline,
"batch_size": batch_size,
"eval_batch_size": eval_batch_size,
"learning_rate": FLAGS.learning_rate,
"num_users": num_users,
"num_items": num_items,
"mf_dim": FLAGS.num_factors,
"model_layers": [int(layer) for layer in FLAGS.layers],
"mf_regularization": FLAGS.mf_regularization,
"mlp_reg_layers": [float(reg) for reg in FLAGS.mlp_regularization],
"num_neg": FLAGS.num_neg,
"use_tpu": FLAGS.tpu is not None,
"tpu": FLAGS.tpu,
"tpu_zone": FLAGS.tpu_zone,
"tpu_gcp_project": FLAGS.tpu_gcp_project,
"beta1": FLAGS.beta1,
"beta2": FLAGS.beta2,
"epsilon": FLAGS.epsilon,
"match_mlperf": FLAGS.ml_perf,
"use_xla_for_gpu": FLAGS.use_xla_for_gpu,
}, batch_size=flags.FLAGS.batch_size, eval_batch_size=eval_batch_size)
params = {
"use_seed": FLAGS.seed is not None,
"hash_pipeline": FLAGS.hash_pipeline,
"batch_size": batch_size,
"eval_batch_size": eval_batch_size,
"learning_rate": FLAGS.learning_rate,
"num_users": num_users,
"num_items": num_items,
"mf_dim": FLAGS.num_factors,
"model_layers": [int(layer) for layer in FLAGS.layers],
"mf_regularization": FLAGS.mf_regularization,
"mlp_reg_layers": [float(reg) for reg in FLAGS.mlp_regularization],
"num_neg": FLAGS.num_neg,
"use_tpu": FLAGS.tpu is not None,
"tpu": FLAGS.tpu,
"tpu_zone": FLAGS.tpu_zone,
"tpu_gcp_project": FLAGS.tpu_gcp_project,
"beta1": FLAGS.beta1,
"beta2": FLAGS.beta2,
"epsilon": FLAGS.epsilon,
"match_mlperf": FLAGS.ml_perf,
"use_xla_for_gpu": FLAGS.use_xla_for_gpu,
"use_estimator": FLAGS.use_estimator,
}
if FLAGS.use_estimator:
train_estimator, eval_estimator = construct_estimator(
num_gpus=num_gpus, model_dir=FLAGS.model_dir,
iterations=num_train_steps, params=params,
batch_size=flags.FLAGS.batch_size, eval_batch_size=eval_batch_size)
else:
runner = model_runner.NcfModelRunner(ncf_dataset, params, num_train_steps,
num_eval_steps, FLAGS.use_while_loop)
# Create hooks that log information about the training and metric values
train_hooks = hooks_helper.get_train_hooks(
......@@ -220,8 +238,7 @@ def run_ncf(_):
test_id=FLAGS.benchmark_test_id)
pred_input_fn = None
total_training_cycle = FLAGS.train_epochs // FLAGS.epochs_between_evals
eval_input_fn = None
target_reached = False
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.TRAIN_LOOP)
for cycle_index in range(total_training_cycle):
......@@ -233,37 +250,46 @@ def run_ncf(_):
value=cycle_index)
# Train the model
train_input_fn, train_record_dir, batch_count = \
data_preprocessing.make_input_fn(
ncf_dataset=ncf_dataset, is_training=True)
if batch_count != num_train_steps:
raise ValueError(
"Step counts do not match. ({} vs. {}) The async process is "
"producing incorrect shards.".format(batch_count, num_train_steps))
train_estimator.train(input_fn=train_input_fn, hooks=train_hooks,
steps=num_train_steps)
if train_record_dir:
tf.gfile.DeleteRecursively(train_record_dir)
tf.logging.info("Beginning evaluation.")
if pred_input_fn is None:
pred_input_fn, _, eval_batch_count = data_preprocessing.make_input_fn(
ncf_dataset=ncf_dataset, is_training=False)
if eval_batch_count != num_eval_steps:
if FLAGS.use_estimator:
train_input_fn, train_record_dir, batch_count = \
data_preprocessing.make_input_fn(
ncf_dataset=ncf_dataset, is_training=True)
if batch_count != num_train_steps:
raise ValueError(
"Step counts do not match. ({} vs. {}) The async process is "
"producing incorrect shards.".format(
eval_batch_count, num_eval_steps))
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.EVAL_START,
value=cycle_index)
eval_results = eval_estimator.evaluate(pred_input_fn, steps=num_eval_steps)
"producing incorrect shards.".format(batch_count, num_train_steps))
train_estimator.train(input_fn=train_input_fn, hooks=train_hooks,
steps=num_train_steps)
if train_record_dir:
tf.gfile.DeleteRecursively(train_record_dir)
tf.logging.info("Beginning evaluation.")
if eval_input_fn is None:
eval_input_fn, _, eval_batch_count = data_preprocessing.make_input_fn(
ncf_dataset=ncf_dataset, is_training=False)
if eval_batch_count != num_eval_steps:
raise ValueError(
"Step counts do not match. ({} vs. {}) The async process is "
"producing incorrect shards.".format(
eval_batch_count, num_eval_steps))
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.EVAL_START,
value=cycle_index)
eval_results = eval_estimator.evaluate(eval_input_fn,
steps=num_eval_steps)
tf.logging.info("Evaluation complete.")
else:
runner.train()
tf.logging.info("Beginning evaluation.")
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.EVAL_START,
value=cycle_index)
eval_results = runner.eval()
tf.logging.info("Evaluation complete.")
hr = float(eval_results[rconst.HR_KEY])
ndcg = float(eval_results[rconst.NDCG_KEY])
tf.logging.info("Evaluation complete.")
mlperf_helper.ncf_print(
key=mlperf_helper.TAGS.EVAL_TARGET,
......@@ -403,7 +429,7 @@ def define_ncf_flags():
"achieved by MLPerf implementation."))
flags.DEFINE_bool(
name="ml_perf", default=None,
name="ml_perf", default=False,
help=flags_core.help_wrap(
"If set, changes the behavior of the model slightly to match the "
"MLPerf reference implementations here: \n"
......@@ -417,6 +443,18 @@ def define_ncf_flags():
"which performs better due to the fact the sorting algorithms are "
"not stable."))
flags.DEFINE_bool(
name="output_ml_perf_compliance_logging", default=False,
help=flags_core.help_wrap(
"If set, output the MLPerf compliance logging. This is only useful "
"if one is running the model for MLPerf. See "
"https://github.com/mlperf/policies/blob/master/training_rules.adoc"
"#submission-compliance-logs for details. This uses sudo and so may "
"ask for your password, as root access is needed to clear the system "
"caches, which is required for MLPerf compliance."
)
)
flags.DEFINE_integer(
name="seed", default=None, help=flags_core.help_wrap(
"This value will be used to seed both NumPy and TensorFlow."))
......@@ -456,9 +494,32 @@ def define_ncf_flags():
def xla_validator(flag_dict):
return not flag_dict["use_xla_for_gpu"] or not flag_dict["tpu"]
flags.DEFINE_bool(
name="use_estimator", default=True, help=flags_core.help_wrap(
"If True, use Estimator to train. Setting to False is slightly "
"faster, but when False, the following are currently unsupported:\n"
" * Using TPUs\n"
" * Using more than 1 GPU\n"
" * Reloading from checkpoints\n"
" * Any hooks specified with --hooks\n"))
flags.DEFINE_bool(
name="use_while_loop", default=None, help=flags_core.help_wrap(
"If set, run an entire epoch in a session.run() call using a "
"TensorFlow while loop. This can improve performance, but will not "
"print out losses throughout the epoch. Requires "
"--use_estimator=false"
))
xla_message = "--use_while_loop requires --use_estimator=false"
@flags.multi_flags_validator(["use_while_loop", "use_estimator"],
message=xla_message)
def while_loop_validator(flag_dict):
return (not flag_dict["use_while_loop"] or
not flag_dict["use_estimator"])
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
define_ncf_flags()
FLAGS = flags.FLAGS
absl_app.run(main)
......@@ -19,11 +19,15 @@ from __future__ import division
from __future__ import print_function
import math
import mock
import numpy as np
import tensorflow as tf
from absl import flags
from absl.testing import flagsaver
from official.recommendation import constants as rconst
from official.recommendation import data_preprocessing
from official.recommendation import neumf_model
from official.recommendation import ncf_main
from official.recommendation import stat_utils
......@@ -33,6 +37,12 @@ NUM_TRAIN_NEG = 4
class NcfTest(tf.test.TestCase):
@classmethod
def setUpClass(cls): # pylint: disable=invalid-name
super(NcfTest, cls).setUpClass()
ncf_main.define_ncf_flags()
def setUp(self):
self.top_k_old = rconst.TOP_K
self.num_eval_negatives_old = rconst.NUM_EVAL_NEGATIVES
......@@ -224,6 +234,40 @@ class NcfTest(tf.test.TestCase):
self.assertAlmostEqual(ndcg, (1 + 2 * math.log(2) / math.log(3) +
math.log(2) / math.log(4)) / 4)
_BASE_END_TO_END_FLAGS = {
"batch_size": 1024,
"train_epochs": 1,
"use_synthetic_data": True
}
@flagsaver.flagsaver(**_BASE_END_TO_END_FLAGS)
@mock.patch.object(data_preprocessing, "SYNTHETIC_BATCHES_PER_EPOCH", 100)
def test_end_to_end(self):
ncf_main.main(None)
@flagsaver.flagsaver(ml_perf=True, **_BASE_END_TO_END_FLAGS)
@mock.patch.object(data_preprocessing, "SYNTHETIC_BATCHES_PER_EPOCH", 100)
def test_end_to_end_mlperf(self):
ncf_main.main(None)
@flagsaver.flagsaver(use_estimator=False, **_BASE_END_TO_END_FLAGS)
@mock.patch.object(data_preprocessing, "SYNTHETIC_BATCHES_PER_EPOCH", 100)
def test_end_to_end_no_estimator(self):
ncf_main.main(None)
flags.FLAGS.ml_perf = True
ncf_main.main(None)
@flagsaver.flagsaver(use_estimator=False, **_BASE_END_TO_END_FLAGS)
@mock.patch.object(data_preprocessing, "SYNTHETIC_BATCHES_PER_EPOCH", 100)
def test_end_to_end_while_loop(self):
# We cannot set use_while_loop = True in the flagsaver constructor, because
# if the flagsaver sets it to True before setting use_estimator to False,
# the flag validator will throw an error.
flags.FLAGS.use_while_loop = True
ncf_main.main(None)
flags.FLAGS.ml_perf = True
ncf_main.main(None)
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
......
......@@ -78,7 +78,18 @@ def neumf_model_fn(features, labels, mode, params):
users = features[movielens.USER_COLUMN]
items = tf.cast(features[movielens.ITEM_COLUMN], tf.int32)
logits = construct_model(users=users, items=items, params=params)
keras_model = params.get("keras_model")
if keras_model:
logits = keras_model([users, items],
training=mode == tf.estimator.ModeKeys.TRAIN)
else:
keras_model = construct_model(users=users, items=items, params=params)
logits = keras_model.output
if not params["use_estimator"] and "keras_model" not in params:
# When we are not using estimator, we need to reuse the Keras model when
# this model_fn is called again, so that the variables are shared between
# training and eval. So we mutate params to add the Keras model.
params["keras_model"] = keras_model
# Softmax with the first column of zeros is equivalent to sigmoid.
softmax_logits = tf.concat([tf.zeros(logits.shape, dtype=logits.dtype),
......@@ -160,6 +171,9 @@ def construct_model(users, items, params):
Raises:
ValueError: if the first model layer is not even.
Returns:
logits: network logits
"""
num_users = params["num_users"]
......@@ -182,46 +196,84 @@ def construct_model(users, items, params):
# Input variables
user_input = tf.keras.layers.Input(tensor=users)
item_input = tf.keras.layers.Input(tensor=items)
batch_size = user_input.get_shape()[0]
if params["use_tpu"]:
with tf.variable_scope("embed_weights", reuse=tf.AUTO_REUSE):
cmb_embedding_user = tf.get_variable(
name="embeddings_mf_user",
shape=[num_users, mf_dim + model_layers[0] // 2],
initializer=tf.glorot_uniform_initializer())
cmb_embedding_item = tf.get_variable(
name="embeddings_mf_item",
shape=[num_items, mf_dim + model_layers[0] // 2],
initializer=tf.glorot_uniform_initializer())
cmb_user_latent = tf.keras.layers.Lambda(lambda ids: tf.gather(
cmb_embedding_user, ids))(user_input)
cmb_item_latent = tf.keras.layers.Lambda(lambda ids: tf.gather(
cmb_embedding_item, ids))(item_input)
mlp_user_latent = tf.keras.layers.Lambda(
lambda x: tf.slice(x, [0, 0], [batch_size, model_layers[0] // 2])
)(cmb_user_latent)
mlp_item_latent = tf.keras.layers.Lambda(
lambda x: tf.slice(x, [0, 0], [batch_size, model_layers[0] // 2])
)(cmb_item_latent)
mf_user_latent = tf.keras.layers.Lambda(
lambda x: tf.slice(x, [0, model_layers[0] // 2], [batch_size, mf_dim])
)(cmb_user_latent)
mf_item_latent = tf.keras.layers.Lambda(
lambda x: tf.slice(x, [0, model_layers[0] // 2], [batch_size, mf_dim])
)(cmb_item_latent)
else:
# Initializer for embedding layers
embedding_initializer = "glorot_uniform"
# Embedding layers of GMF and MLP
mf_embedding_user = tf.keras.layers.Embedding(
num_users,
mf_dim,
embeddings_initializer=embedding_initializer,
embeddings_regularizer=tf.keras.regularizers.l2(mf_regularization),
input_length=1)
mf_embedding_item = tf.keras.layers.Embedding(
num_items,
mf_dim,
embeddings_initializer=embedding_initializer,
embeddings_regularizer=tf.keras.regularizers.l2(mf_regularization),
input_length=1)
mlp_embedding_user = tf.keras.layers.Embedding(
num_users,
model_layers[0]//2,
embeddings_initializer=embedding_initializer,
embeddings_regularizer=tf.keras.regularizers.l2(mlp_reg_layers[0]),
input_length=1)
mlp_embedding_item = tf.keras.layers.Embedding(
num_items,
model_layers[0]//2,
embeddings_initializer=embedding_initializer,
embeddings_regularizer=tf.keras.regularizers.l2(mlp_reg_layers[0]),
input_length=1)
# GMF part
mf_user_latent = mf_embedding_user(user_input)
mf_item_latent = mf_embedding_item(item_input)
# MLP part
mlp_user_latent = mlp_embedding_user(user_input)
mlp_item_latent = mlp_embedding_item(item_input)
# Initializer for embedding layers
embedding_initializer = "glorot_uniform"
# Embedding layers of GMF and MLP
mf_embedding_user = tf.keras.layers.Embedding(
num_users,
mf_dim,
embeddings_initializer=embedding_initializer,
embeddings_regularizer=tf.keras.regularizers.l2(mf_regularization),
input_length=1)
mf_embedding_item = tf.keras.layers.Embedding(
num_items,
mf_dim,
embeddings_initializer=embedding_initializer,
embeddings_regularizer=tf.keras.regularizers.l2(mf_regularization),
input_length=1)
mlp_embedding_user = tf.keras.layers.Embedding(
num_users,
model_layers[0]//2,
embeddings_initializer=embedding_initializer,
embeddings_regularizer=tf.keras.regularizers.l2(mlp_reg_layers[0]),
input_length=1)
mlp_embedding_item = tf.keras.layers.Embedding(
num_items,
model_layers[0]//2,
embeddings_initializer=embedding_initializer,
embeddings_regularizer=tf.keras.regularizers.l2(mlp_reg_layers[0]),
input_length=1)
# GMF part
mf_user_latent = mf_embedding_user(user_input)
mf_item_latent = mf_embedding_item(item_input)
# Element-wise multiply
mf_vector = tf.keras.layers.multiply([mf_user_latent, mf_item_latent])
# MLP part
mlp_user_latent = mlp_embedding_user(user_input)
mlp_item_latent = mlp_embedding_item(item_input)
# Concatenation of two latent features
mlp_vector = tf.keras.layers.concatenate([mlp_user_latent, mlp_item_latent])
......@@ -242,10 +294,11 @@ def construct_model(users, items, params):
name=movielens.RATING_COLUMN)(predict_vector)
# Print model topology.
tf.keras.models.Model([user_input, item_input], logits).summary()
model = tf.keras.models.Model([user_input, item_input], logits)
model.summary()
sys.stdout.flush()
return logits
return model
def compute_eval_loss_and_metrics(logits, # type: tf.Tensor
......@@ -351,8 +404,10 @@ def compute_eval_loss_and_metrics(logits, # type: tf.Tensor
def metric_fn(top_k_tensor, ndcg_tensor, weight_tensor):
return {
rconst.HR_KEY: tf.metrics.mean(top_k_tensor, weights=weight_tensor),
rconst.NDCG_KEY: tf.metrics.mean(ndcg_tensor, weights=weight_tensor),
rconst.HR_KEY: tf.metrics.mean(top_k_tensor, weights=weight_tensor,
name=rconst.HR_METRIC_NAME),
rconst.NDCG_KEY: tf.metrics.mean(ndcg_tensor, weights=weight_tensor,
name=rconst.NDCG_METRIC_NAME),
}
if use_tpu_spec:
......
google-api-python-client>=1.6.7
google-cloud-bigquery>=0.31.0
kaggle>=1.3.9
mlperf_compliance==0.0.8
mlperf_compliance==0.0.10
numpy
oauth2client>=4.1.2
pandas
......
......@@ -70,7 +70,7 @@ validation data roughly once per epoch.
Note that there are a number of other options you can specify, including
`--model_dir` to choose where to store the model and `--resnet_size` to choose
the model size (options include ResNet-18 through ResNet-200). See
[`resnet.py`](resnet.py) for the full list of options.
[`resnet_run_loop.py`](resnet_run_loop.py) for the full list of options.
## Compute Devices
......
......@@ -86,19 +86,22 @@ def unparse_line(parsed_line): # type: (ParsedLine) -> str
def get_mlperf_log():
"""Shielded import of mlperf_log module."""
try:
import pkg_resources
import mlperf_compliance
version = pkg_resources.get_distribution("mlperf_compliance")
version = tuple(int(i) for i in version.version.split("."))
if version < _MIN_VERSION:
tf.logging.warning(
"mlperf_compliance is version {}, must be at least version {}".format(
".".join([str(i) for i in version]),
".".join([str(i) for i in _MIN_VERSION])))
raise ImportError
mlperf_log = mlperf_compliance.mlperf_log
def test_mlperf_log_pip_version():
"""Check that mlperf_compliance is up to date."""
import pkg_resources
version = pkg_resources.get_distribution("mlperf_compliance")
version = tuple(int(i) for i in version.version.split("."))
if version < _MIN_VERSION:
tf.logging.warning(
"mlperf_compliance is version {}, must be >= {}".format(
".".join([str(i) for i in version]),
".".join([str(i) for i in _MIN_VERSION])))
raise ImportError
return mlperf_compliance.mlperf_log
mlperf_log = test_mlperf_log_pip_version()
except ImportError:
mlperf_log = None
......@@ -189,7 +192,8 @@ def stitch_ncf():
return
if LOGGER.log_file is None or not tf.gfile.Exists(LOGGER.log_file):
tf.logging.error("Could not find log file to stitch.")
tf.logging.warning("Could not find log file to stitch.")
return
log_lines = []
num_eval_users = None
......
......@@ -27,8 +27,9 @@ def get_distribution_strategy(num_gpus, all_reduce_alg=None):
Args:
num_gpus: Number of GPUs to run this model.
all_reduce_alg: Specify which algorithm to use when performing all-reduce.
See tf.contrib.distribute.AllReduceCrossTowerOps for available algorithms.
If None, DistributionStrategy will choose based on device topology.
See tf.contrib.distribute.AllReduceCrossDeviceOps for available
algorithms. If None, DistributionStrategy will choose based on device
topology.
Returns:
tf.contrib.distribute.DistibutionStrategy object.
......@@ -41,7 +42,7 @@ def get_distribution_strategy(num_gpus, all_reduce_alg=None):
if all_reduce_alg:
return tf.contrib.distribute.MirroredStrategy(
num_gpus=num_gpus,
cross_tower_ops=tf.contrib.distribute.AllReduceCrossTowerOps(
cross_device_ops=tf.contrib.distribute.AllReduceCrossDeviceOps(
all_reduce_alg, num_packs=2))
else:
return tf.contrib.distribute.MirroredStrategy(num_gpus=num_gpus)
......
......@@ -27,21 +27,21 @@ class GetDistributionStrategyTest(tf.test.TestCase):
"""Tests for get_distribution_strategy."""
def test_one_device_strategy_cpu(self):
ds = distribution_utils.get_distribution_strategy(0)
self.assertEquals(ds.num_replicas, 1)
self.assertEquals(len(ds.worker_devices), 1)
self.assertIn('CPU', ds.worker_devices[0])
self.assertEquals(ds.num_replicas_in_sync, 1)
self.assertEquals(len(ds.extended.worker_devices), 1)
self.assertIn('CPU', ds.extended.worker_devices[0])
def test_one_device_strategy_gpu(self):
ds = distribution_utils.get_distribution_strategy(1)
self.assertEquals(ds.num_replicas, 1)
self.assertEquals(len(ds.worker_devices), 1)
self.assertIn('GPU', ds.worker_devices[0])
self.assertEquals(ds.num_replicas_in_sync, 1)
self.assertEquals(len(ds.extended.worker_devices), 1)
self.assertIn('GPU', ds.extended.worker_devices[0])
def test_mirrored_strategy(self):
ds = distribution_utils.get_distribution_strategy(5)
self.assertEquals(ds.num_replicas, 5)
self.assertEquals(len(ds.worker_devices), 5)
for device in ds.worker_devices:
self.assertEquals(ds.num_replicas_in_sync, 5)
self.assertEquals(len(ds.extended.worker_devices), 5)
for device in ds.extended.worker_devices:
self.assertIn('GPU', device)
......
......@@ -74,6 +74,7 @@ request.
- [slim](slim): image classification models in TF-Slim.
- [street](street): identify the name of a street (in France) from an image
using a Deep RNN.
- [struct2depth](struct2depth): unsupervised learning of depth and ego-motion.
- [swivel](swivel): the Swivel algorithm for generating word embeddings.
- [syntaxnet](syntaxnet): neural models of natural language syntax.
- [tcn](tcn): Self-supervised representation learning from multi-view video.
......
# AstroNet: A Neural Network for Identifying Exoplanets in Light Curves
# Exoplanet ML
![Transit Animation](docs/transit.gif)
Machine learning models and utilities for exoplanet science.
## Contact
## Code Author
Chris Shallue: [@cshallue](https://github.com/cshallue)
## Background
## Quick Start
This directory contains TensorFlow models and data processing code for
identifying exoplanets in astrophysical light curves. For complete background,
see [our paper](http://adsabs.harvard.edu/abs/2018AJ....155...94S) in
*The Astronomical Journal*.
For shorter summaries, see:
* ["Earth to Exoplanet"](https://www.blog.google/topics/machine-learning/hunting-planets-machine-learning/) on the Google blog
* [This blog post](https://www.cfa.harvard.edu/~avanderb/page1.html#kepler90) by Andrew Vanderburg
* [This great article](https://milesobrien.com/artificial-intelligence-gains-intuition-hunting-exoplanets/) by Fedor Kossakovski
* [NASA's press release](https://www.nasa.gov/press-release/artificial-intelligence-nasa-data-used-to-discover-eighth-planet-circling-distant-star) article
Jump to the [AstroNet walkthrough](astronet/README.md#walkthrough).
## Citation
......@@ -30,12 +20,12 @@ around Kepler-90. *The Astronomical Journal*, 155(2), 94.
Full text available at [*The Astronomical Journal*](http://iopscience.iop.org/article/10.3847/1538-3881/aa9e09/meta).
## Code Directories
## Directories
[astronet/](astronet/)
* [TensorFlow](https://www.tensorflow.org/) code for:
* Downloading and preprocessing Kepler data.
* A neural network for identifying exoplanets in light curves. Contains code for:
* Downloading and preprocessing Kepler light curves.
* Building different types of neural network classification models.
* Training and evaluating a new model.
* Using a trained model to generate new predictions.
......@@ -44,25 +34,27 @@ Full text available at [*The Astronomical Journal*](http://iopscience.iop.org/ar
* A generative model for light curves.
[light_curve_util/](light_curve_util)
[light_curve/](light_curve)
* Utilities for operating on light curves. These include:
* Reading Kepler data from `.fits` files.
* Applying a median filter to smooth and normalize a light curve.
* Phase folding, splitting, removing periodic events, etc.
* In addition, some C++ implementations of light curve utilities are located in
[light_curve_util/cc/](light_curve_util/cc).
* [light_curve/fast_ops/](light_curve/fast_ops) contains optimized C++ light
curve operations.
[tf_util/](tf_util)
* Shared TensorFlow utilities.
[third_party/](third_party/)
* Utilities derived from third party code.
## Walkthrough
### Install Required Packages
# Setup
First, ensure that you have installed the following required packages:
## Required Packages
* **TensorFlow** ([instructions](https://www.tensorflow.org/install/))
* **Pandas** ([instructions](http://pandas.pydata.org/pandas-docs/stable/install.html))
......@@ -73,352 +65,11 @@ First, ensure that you have installed the following required packages:
* **Bazel** ([instructions](https://docs.bazel.build/versions/master/install.html))
* **Abseil Python Common Libraries** ([instructions](https://github.com/abseil/abseil-py))
### Optional: Run Unit Tests
## Run Unit Tests
Verify that all dependencies are satisfied by running the unit tests:
```bash
bazel test astronet/... light_curve_util/... third_party/...
```
### Download Kepler Data
A *light curve* is a plot of the brightness of a star over time. We will be
focusing on light curves produced by the Kepler space telescope, which monitored
the brightness of 200,000 stars in our milky way galaxy for 4 years. An example
light curve produced by Kepler is shown below.
![Kepler-934](docs/kepler-943.png)
To train a model to identify planets in Kepler light curves, you will need a
training set of labeled *Threshold Crossing Events* (TCEs). A TCE is a periodic
signal that has been detected in a Kepler light curve, and is associated with a
*period* (the number of days between each occurrence of the detected signal),
a *duration* (the time taken by each occurrence of the signal), an *epoch* (the
time of the first observed occurrence of the signal), and possibly additional
metadata like the signal-to-noise ratio. An example TCE is shown below. The
labels are ground truth classifications (decided by humans) that indicate which
TCEs in the training set are actual planets signals and which are caused by
other phenomena.
![Kepler-934 Transits](docs/kepler-943-transits.png)
You can download the DR24 TCE Table in CSV format from the [NASA Exoplanet
Archive](https://exoplanetarchive.ipac.caltech.edu/cgi-bin/TblView/nph-tblView?app=ExoTbls&config=q1_q17_dr24_tce). Ensure the following columns are selected:
* `rowid`: Integer ID of the row in the TCE table.
* `kepid`: Kepler ID of the target star.
* `tce_plnt_num`: TCE number within the target star.
* `tce_period`: Period of the detected event, in days.
* `tce_time0bk`: The time corresponding to the center of the first detected
event in Barycentric Julian Day (BJD) minus a constant offset of
2,454,833.0 days.
* `tce_duration`: Duration of the detected event, in hours.
* `av_training_set`: Autovetter training set label; one of PC (planet candidate),
AFP (astrophysical false positive), NTP (non-transiting phenomenon),
UNK (unknown).
Next, you will need to download the light curves of the stars corresponding to
the TCEs in the training set. These are available at the
[Mikulski Archive for Space Telescopes](https://archive.stsci.edu/). However,
you almost certainly don't want all of the Kepler data, which consists of almost
3 million files, takes up over a terabyte of space, and may take several weeks
to download! To train our model, we only need to download the subset of light
curves that are associated with TCEs in the DR24 file. To download just those
light curves, follow these steps:
**NOTE:** Even though we are only downloading a subset of the entire Kepler
dataset, the files downloaded by the following script take up about **90 GB**.
```bash
# Filename containing the CSV file of TCEs in the training set.
TCE_CSV_FILE="${HOME}/astronet/dr24_tce.csv"
# Directory to download Kepler light curves into.
KEPLER_DATA_DIR="${HOME}/astronet/kepler/"
# Generate a bash script that downloads the Kepler light curves in the training set.
python astronet/data/generate_download_script.py \
--kepler_csv_file=${TCE_CSV_FILE} \
--download_dir=${KEPLER_DATA_DIR}
# Run the download script to download Kepler light curves.
./get_kepler.sh
```
The final line should read: `Finished downloading 12669 Kepler targets to
${KEPLER_DATA_DIR}`
Let's explore the downloaded light curve of the Kepler-90 star! Note that Kepler
light curves are divided into
[four quarters each year](https://keplerscience.arc.nasa.gov/data-products.html#kepler-data-release-notes), which are separated by the quarterly rolls that the spacecraft
made to reorient its solar panels. In the downloaded light curves, each `.fits`
file corresponds to a specific Kepler quarter, but some quarters are divided
into multiple `.fits` files.
```python
# Launch iPython (or Python) from the tensorflow_models/astronet/ directory.
ipython
In[1]:
from light_curve_util import kepler_io
import matplotlib.pyplot as plt
import numpy as np
In[2]:
KEPLER_DATA_DIR = "/path/to/kepler/"
KEPLER_ID = 11442793 # Kepler-90.
In[3]:
# Read the light curve.
file_names = kepler_io.kepler_filenames(KEPLER_DATA_DIR, KEPLER_ID)
assert file_names, "Failed to find .fits files in {}".format(KEPLER_DATA_DIR)
all_time, all_flux = kepler_io.read_kepler_light_curve(file_names)
print("Read light curve with {} segments".format(len(all_time)))
In[4]:
# Plot the fourth segment.
plt.plot(all_time[3], all_flux[3], ".")
plt.show()
In[5]:
# Plot all light curve segments. We first divide by the median flux in each
# segment, because the segments are on different scales.
for f in all_flux:
f /= np.median(f)
plt.plot(np.concatenate(all_time), np.concatenate(all_flux), ".")
plt.show()
```
The output plots should look something like this:
![Kepler 90 Q4](docs/kep90-q4-raw.png)
![Kepler 90 All](docs/kep90-all.png)
The first plot is a single segment of approximately 20 days. You can see a
planet transit --- that's Kepler-90 g! Also, notice that the brightness of the
star is not flat over time --- there is natural variation in the brightness,
even away from the planet transit.
The second plot is the full light curve over the entire Kepler mission
(aproximately 4 years). You can easily see two transiting planets by eye ---
they are Kepler-90 h (the biggest known planet in the system with the deepest
transits) and Kepler-90 g (the second biggest known planet in the system with
the second deepest transits).
### Process Kepler Data
To train a model to identify exoplanets, you will need to provide TensorFlow
with training data in
[TFRecord](https://www.tensorflow.org/programmers_guide/datasets) format. The
TFRecord format consists of a set of sharded files containing serialized
`tf.Example` [protocol buffers](https://developers.google.com/protocol-buffers/).
The command below will generate a set of sharded TFRecord files for the TCEs in
the training set. Each `tf.Example` proto will contain the following light curve
representations:
* `global_view`: Vector of length 2001: a "global view" of the TCE.
* `local_view`: Vector of length 201: a "local view" of the TCE.
In addition, each `tf.Example` will contain the value of each column in the
input TCE CSV file. The columns include:
* `rowid`: Integer ID of the row in the TCE table.
* `kepid`: Kepler ID of the target star.
* `tce_plnt_num`: TCE number within the target star.
* `av_training_set`: Autovetter training set label.
* `tce_period`: Period of the detected event, in days.
```bash
# Use Bazel to create executable Python scripts.
#
# Alternatively, since all code is pure Python and does not need to be compiled,
# we could invoke the source scripts with the following addition to PYTHONPATH:
# export PYTHONPATH="/path/to/source/dir/:${PYTHONPATH}"
bazel build astronet/...
# Directory to save output TFRecord files into.
TFRECORD_DIR="${HOME}/astronet/tfrecord"
# Preprocess light curves into sharded TFRecord files using 5 worker processes.
bazel-bin/astronet/data/generate_input_records \
--input_tce_csv_file=${TCE_CSV_FILE} \
--kepler_data_dir=${KEPLER_DATA_DIR} \
--output_dir=${TFRECORD_DIR} \
--num_worker_processes=5
```
When the script finishes you will find 8 training files, 1 validation file and
1 test file in `TFRECORD_DIR`. The files will match the patterns
`train-0000?-of-00008`, `val-00000-of-00001` and `test-00000-of-00001`
respectively.
Here's a quick description of what the script does. For a full description, see
Section 3 of [our paper](http://iopscience.iop.org/article/10.3847/1538-3881/aa9e09/meta).
For each light curve, we first fit a normalization spline to remove any
low-frequency variability (that is, the natural variability in light from star)
without removing any deviations caused by planets or other objects. For example,
the following image shows the normalization spline for the segment of Kepler-90
that we considered above:
![Kepler 90 Q4 Spline](docs/kep90-q4-spline.png)
Next, we divide by the spline to make the star's baseline brightness
approximately flat. Notice that after normalization the transit of Kepler-90 g
is still preserved:
![Kepler 90 Q4 Normalized](docs/kep90-q4-normalized.png)
Finally, for each TCE in the input CSV table, we generate two representations of
the light curve of that star. Both representations are *phase-folded*, which
means that we combine all periods of the detected TCE into a single curve, with
the detected event centered.
Let's explore the generated representations of Kepler-90 g in the output.
```python
# Launch iPython (or Python) from the tensorflow_models/astronet/ directory.
ipython
In[1]:
import matplotlib.pyplot as plt
import numpy as np
import os.path
import tensorflow as tf
In[2]:
KEPLER_ID = 11442793 # Kepler-90
TFRECORD_DIR = "/path/to/tfrecords/dir"
In[3]:
# Helper function to find the tf.Example corresponding to a particular TCE.
def find_tce(kepid, tce_plnt_num, filenames):
for filename in filenames:
for record in tf.python_io.tf_record_iterator(filename):
ex = tf.train.Example.FromString(record)
if (ex.features.feature["kepid"].int64_list.value[0] == kepid and
ex.features.feature["tce_plnt_num"].int64_list.value[0] == tce_plnt_num):
print("Found {}_{} in file {}".format(kepid, tce_plnt_num, filename))
return ex
raise ValueError("{}_{} not found in files: {}".format(kepid, tce_plnt_num, filenames))
In[4]:
# Find Kepler-90 g.
filenames = tf.gfile.Glob(os.path.join(TFRECORD_DIR, "*"))
assert filenames, "No files found in {}".format(TFRECORD_DIR)
ex = find_tce(KEPLER_ID, 1, filenames)
In[5]:
# Plot the global and local views.
global_view = np.array(ex.features.feature["global_view"].float_list.value)
local_view = np.array(ex.features.feature["local_view"].float_list.value)
fig, axes = plt.subplots(1, 2, figsize=(20, 6))
axes[0].plot(global_view, ".")
axes[1].plot(local_view, ".")
plt.show()
```
The output should look something like this:
![Kepler 90 g Processed](docs/kep90h-localglobal.png)
### Train an AstroNet Model
The [astronet](astronet/) directory contains several types of neural
network architecture and various configuration options. To train a convolutional
neural network to classify Kepler TCEs as either "planet" or "not planet",
using the best configuration from
[our paper](http://iopscience.iop.org/article/10.3847/1538-3881/aa9e09/meta),
run the following training script:
```bash
# Directory to save model checkpoints into.
MODEL_DIR="${HOME}/astronet/model/"
# Run the training script.
bazel-bin/astronet/train \
--model=AstroCNNModel \
--config_name=local_global \
--train_files=${TFRECORD_DIR}/train* \
--eval_files=${TFRECORD_DIR}/val* \
--model_dir=${MODEL_DIR}
```
Optionally, you can also run a [TensorBoard](https://www.tensorflow.org/programmers_guide/summaries_and_tensorboard)
server in a separate process for real-time
monitoring of training progress and evaluation metrics.
```bash
# Launch TensorBoard server.
tensorboard --logdir ${MODEL_DIR}
```
The TensorBoard server will show a page like this:
![TensorBoard](docs/tensorboard.png)
### Evaluate an AstroNet Model
Run the following command to evaluate a model on the test set. The result will
be printed on the screen, and a summary file will also be written to the model
directory, which will be visible in TensorBoard.
```bash
# Run the evaluation script.
bazel-bin/astronet/evaluate \
--model=AstroCNNModel \
--config_name=local_global \
--eval_files=${TFRECORD_DIR}/test* \
--model_dir=${MODEL_DIR}
bazel test astronet/... astrowavenet/... light_curve/... tf_util/... third_party/...
```
The output should look something like this:
```bash
INFO:tensorflow:Saving dict for global step 10000: accuracy/accuracy = 0.9625159, accuracy/num_correct = 1515.0, auc = 0.988882, confusion_matrix/false_negatives = 10.0, confusion_matrix/false_positives = 49.0, confusion_matrix/true_negatives = 1165.0, confusion_matrix/true_positives = 350.0, global_step = 10000, loss = 0.112445444, losses/weighted_cross_entropy = 0.11295206, num_examples = 1574.
```
### Make Predictions
Suppose you detect a weak TCE in the light curve of the Kepler-90 star, with
period 14.44912 days, duration 2.70408 hours (0.11267 days) beginning 2.2 days
after 12:00 on 1/1/2009 (the year the Kepler telescope launched). To run this
TCE though your trained model, execute the following command:
```bash
# Generate a prediction for a new TCE.
bazel-bin/astronet/predict \
--model=AstroCNNModel \
--config_name=local_global \
--model_dir=${MODEL_DIR} \
--kepler_data_dir=${KEPLER_DATA_DIR} \
--kepler_id=11442793 \
--period=14.44912 \
--t0=2.2 \
--duration=0.11267 \
--output_image_file="${HOME}/astronet/kepler-90i.png"
```
The output should look like this:
```Prediction: 0.9480018```
This means the model is about 95% confident that the input TCE is a planet.
Of course, this is only a small step in the overall process of discovering and
validating an exoplanet: the model’s prediction is not proof one way or the
other. The process of validating this signal as a real exoplanet requires
significant follow-up work by an expert astronomer --- see Sections 6.3 and 6.4
of [our paper](http://iopscience.iop.org/article/10.3847/1538-3881/aa9e09/meta)
for the full details. In this particular case, our follow-up analysis validated
this signal as a bona fide exoplanet: it’s now called
[Kepler-90 i](https://www.nasa.gov/press-release/artificial-intelligence-nasa-data-used-to-discover-eighth-planet-circling-distant-star),
and is the record-breaking eighth planet discovered around the Kepler-90 star!
In addition to the output prediction, the script will also produce a plot of the
input representations. For Kepler-90 i, the plot should look something like
this:
![Kepler 90 h Processed](docs/kep90i-localglobal.png)
......@@ -13,7 +13,7 @@ py_library(
"//astronet/astro_fc_model:configurations",
"//astronet/astro_model",
"//astronet/astro_model:configurations",
"//astronet/util:configdict",
"//tf_util:configdict",
],
)
......@@ -23,10 +23,10 @@ py_binary(
srcs_version = "PY2AND3",
deps = [
":models",
"//astronet/util:config_util",
"//astronet/util:configdict",
"//astronet/util:estimator_runner",
"//astronet/util:estimator_util",
"//tf_util:config_util",
"//tf_util:configdict",
"//tf_util:estimator_runner",
],
)
......@@ -36,10 +36,10 @@ py_binary(
srcs_version = "PY2AND3",
deps = [
":models",
"//astronet/util:config_util",
"//astronet/util:configdict",
"//astronet/util:estimator_runner",
"//astronet/util:estimator_util",
"//tf_util:config_util",
"//tf_util:configdict",
"//tf_util:estimator_runner",
],
)
......@@ -50,8 +50,8 @@ py_binary(
deps = [
":models",
"//astronet/data:preprocess",
"//astronet/util:config_util",
"//astronet/util:configdict",
"//astronet/util:estimator_util",
"//tf_util:config_util",
"//tf_util:configdict",
],
)
# AstroNet: A Neural Network for Identifying Exoplanets in Light Curves
![Transit Animation](docs/transit.gif)
## Code Author
Chris Shallue: [@cshallue](https://github.com/cshallue)
## Background
This directory contains TensorFlow models and data processing code for
identifying exoplanets in astrophysical light curves. For complete background,
see [our paper](http://adsabs.harvard.edu/abs/2018AJ....155...94S) in
*The Astronomical Journal*.
For shorter summaries, see:
* ["Earth to Exoplanet"](https://www.blog.google/topics/machine-learning/hunting-planets-machine-learning/) on the Google blog
* [This blog post](https://www.cfa.harvard.edu/~avanderb/page1.html#kepler90) by Andrew Vanderburg
* [This great article](https://milesobrien.com/artificial-intelligence-gains-intuition-hunting-exoplanets/) by Fedor Kossakovski
* [NASA's press release](https://www.nasa.gov/press-release/artificial-intelligence-nasa-data-used-to-discover-eighth-planet-circling-distant-star) article
## Citation
If you find this code useful, please cite our paper:
Shallue, C. J., & Vanderburg, A. (2018). Identifying Exoplanets with Deep
Learning: A Five-planet Resonant Chain around Kepler-80 and an Eighth Planet
around Kepler-90. *The Astronomical Journal*, 155(2), 94.
Full text available at [*The Astronomical Journal*](http://iopscience.iop.org/article/10.3847/1538-3881/aa9e09/meta).
## Walkthrough
### Required Packages
First, ensure that you have installed the
[required packages](../README.md#required-packages) and that the
[unit tests](../README.md#run-unit-tests) pass.
### Download Kepler Data
A *light curve* is a plot of the brightness of a star over time. We will be
focusing on light curves produced by the Kepler space telescope, which monitored
the brightness of 200,000 stars in our milky way galaxy for 4 years. An example
light curve produced by Kepler is shown below.
![Kepler-934](docs/kepler-943.png)
To train a model to identify planets in Kepler light curves, you will need a
training set of labeled *Threshold Crossing Events* (TCEs). A TCE is a periodic
signal that has been detected in a Kepler light curve, and is associated with a
*period* (the number of days between each occurrence of the detected signal),
a *duration* (the time taken by each occurrence of the signal), an *epoch* (the
time of the first observed occurrence of the signal), and possibly additional
metadata like the signal-to-noise ratio. An example TCE is shown below. The
labels are ground truth classifications (decided by humans) that indicate which
TCEs in the training set are actual planets signals and which are caused by
other phenomena.
![Kepler-934 Transits](docs/kepler-943-transits.png)
You can download the DR24 TCE Table in CSV format from the [NASA Exoplanet
Archive](https://exoplanetarchive.ipac.caltech.edu/cgi-bin/TblView/nph-tblView?app=ExoTbls&config=q1_q17_dr24_tce). Ensure the following columns are selected:
* `rowid`: Integer ID of the row in the TCE table.
* `kepid`: Kepler ID of the target star.
* `tce_plnt_num`: TCE number within the target star.
* `tce_period`: Period of the detected event, in days.
* `tce_time0bk`: The time corresponding to the center of the first detected
event in Barycentric Julian Day (BJD) minus a constant offset of
2,454,833.0 days.
* `tce_duration`: Duration of the detected event, in hours.
* `av_training_set`: Autovetter training set label; one of PC (planet candidate),
AFP (astrophysical false positive), NTP (non-transiting phenomenon),
UNK (unknown).
Next, you will need to download the light curves of the stars corresponding to
the TCEs in the training set. These are available at the
[Mikulski Archive for Space Telescopes](https://archive.stsci.edu/). However,
you almost certainly don't want all of the Kepler data, which consists of almost
3 million files, takes up over a terabyte of space, and may take several weeks
to download! To train our model, we only need to download the subset of light
curves that are associated with TCEs in the DR24 file. To download just those
light curves, follow these steps:
**NOTE:** Even though we are only downloading a subset of the entire Kepler
dataset, the files downloaded by the following script take up about **90 GB**.
```bash
# Filename containing the CSV file of TCEs in the training set.
TCE_CSV_FILE="${HOME}/astronet/dr24_tce.csv"
# Directory to download Kepler light curves into.
KEPLER_DATA_DIR="${HOME}/astronet/kepler/"
# Generate a bash script that downloads the Kepler light curves in the training set.
python astronet/data/generate_download_script.py \
--kepler_csv_file=${TCE_CSV_FILE} \
--download_dir=${KEPLER_DATA_DIR}
# Run the download script to download Kepler light curves.
./get_kepler.sh
```
The final line should read: `Finished downloading 12669 Kepler targets to
${KEPLER_DATA_DIR}`
Let's explore the downloaded light curve of the Kepler-90 star! Note that Kepler
light curves are divided into
[four quarters each year](https://keplerscience.arc.nasa.gov/data-products.html#kepler-data-release-notes), which are separated by the quarterly rolls that the spacecraft
made to reorient its solar panels. In the downloaded light curves, each `.fits`
file corresponds to a specific Kepler quarter, but some quarters are divided
into multiple `.fits` files.
```python
# Launch iPython (or Python) from the tensorflow_models/astronet/ directory.
ipython
In[1]:
from light_curve import kepler_io
import matplotlib.pyplot as plt
import numpy as np
In[2]:
KEPLER_DATA_DIR = "/path/to/kepler/"
KEPLER_ID = 11442793 # Kepler-90.
In[3]:
# Read the light curve.
file_names = kepler_io.kepler_filenames(KEPLER_DATA_DIR, KEPLER_ID)
assert file_names, "Failed to find .fits files in {}".format(KEPLER_DATA_DIR)
all_time, all_flux = kepler_io.read_kepler_light_curve(file_names)
print("Read light curve with {} segments".format(len(all_time)))
In[4]:
# Plot the fourth segment.
plt.plot(all_time[3], all_flux[3], ".")
plt.show()
In[5]:
# Plot all light curve segments. We first divide by the median flux in each
# segment, because the segments are on different scales.
for f in all_flux:
f /= np.median(f)
plt.plot(np.concatenate(all_time), np.concatenate(all_flux), ".")
plt.show()
```
The output plots should look something like this:
![Kepler 90 Q4](docs/kep90-q4-raw.png)
![Kepler 90 All](docs/kep90-all.png)
The first plot is a single segment of approximately 20 days. You can see a
planet transit --- that's Kepler-90 g! Also, notice that the brightness of the
star is not flat over time --- there is natural variation in the brightness,
even away from the planet transit.
The second plot is the full light curve over the entire Kepler mission
(aproximately 4 years). You can easily see two transiting planets by eye ---
they are Kepler-90 h (the biggest known planet in the system with the deepest
transits) and Kepler-90 g (the second biggest known planet in the system with
the second deepest transits).
### Process Kepler Data
To train a model to identify exoplanets, you will need to provide TensorFlow
with training data in
[TFRecord](https://www.tensorflow.org/programmers_guide/datasets) format. The
TFRecord format consists of a set of sharded files containing serialized
`tf.Example` [protocol buffers](https://developers.google.com/protocol-buffers/).
The command below will generate a set of sharded TFRecord files for the TCEs in
the training set. Each `tf.Example` proto will contain the following light curve
representations:
* `global_view`: Vector of length 2001: a "global view" of the TCE.
* `local_view`: Vector of length 201: a "local view" of the TCE.
In addition, each `tf.Example` will contain the value of each column in the
input TCE CSV file. The columns include:
* `rowid`: Integer ID of the row in the TCE table.
* `kepid`: Kepler ID of the target star.
* `tce_plnt_num`: TCE number within the target star.
* `av_training_set`: Autovetter training set label.
* `tce_period`: Period of the detected event, in days.
```bash
# Use Bazel to create executable Python scripts.
#
# Alternatively, since all code is pure Python and does not need to be compiled,
# we could invoke the source scripts with the following addition to PYTHONPATH:
# export PYTHONPATH="/path/to/source/dir/:${PYTHONPATH}"
bazel build astronet/...
# Directory to save output TFRecord files into.
TFRECORD_DIR="${HOME}/astronet/tfrecord"
# Preprocess light curves into sharded TFRecord files using 5 worker processes.
bazel-bin/astronet/data/generate_input_records \
--input_tce_csv_file=${TCE_CSV_FILE} \
--kepler_data_dir=${KEPLER_DATA_DIR} \
--output_dir=${TFRECORD_DIR} \
--num_worker_processes=5
```
When the script finishes you will find 8 training files, 1 validation file and
1 test file in `TFRECORD_DIR`. The files will match the patterns
`train-0000?-of-00008`, `val-00000-of-00001` and `test-00000-of-00001`
respectively.
Here's a quick description of what the script does. For a full description, see
Section 3 of [our paper](http://iopscience.iop.org/article/10.3847/1538-3881/aa9e09/meta).
For each light curve, we first fit a normalization spline to remove any
low-frequency variability (that is, the natural variability in light from star)
without removing any deviations caused by planets or other objects. For example,
the following image shows the normalization spline for the segment of Kepler-90
that we considered above:
![Kepler 90 Q4 Spline](docs/kep90-q4-spline.png)
Next, we divide by the spline to make the star's baseline brightness
approximately flat. Notice that after normalization the transit of Kepler-90 g
is still preserved:
![Kepler 90 Q4 Normalized](docs/kep90-q4-normalized.png)
Finally, for each TCE in the input CSV table, we generate two representations of
the light curve of that star. Both representations are *phase-folded*, which
means that we combine all periods of the detected TCE into a single curve, with
the detected event centered.
Let's explore the generated representations of Kepler-90 g in the output.
```python
# Launch iPython (or Python) from the tensorflow_models/astronet/ directory.
ipython
In[1]:
import matplotlib.pyplot as plt
import numpy as np
import os.path
import tensorflow as tf
In[2]:
KEPLER_ID = 11442793 # Kepler-90
TFRECORD_DIR = "/path/to/tfrecords/dir"
In[3]:
# Helper function to find the tf.Example corresponding to a particular TCE.
def find_tce(kepid, tce_plnt_num, filenames):
for filename in filenames:
for record in tf.python_io.tf_record_iterator(filename):
ex = tf.train.Example.FromString(record)
if (ex.features.feature["kepid"].int64_list.value[0] == kepid and
ex.features.feature["tce_plnt_num"].int64_list.value[0] == tce_plnt_num):
print("Found {}_{} in file {}".format(kepid, tce_plnt_num, filename))
return ex
raise ValueError("{}_{} not found in files: {}".format(kepid, tce_plnt_num, filenames))
In[4]:
# Find Kepler-90 g.
filenames = tf.gfile.Glob(os.path.join(TFRECORD_DIR, "*"))
assert filenames, "No files found in {}".format(TFRECORD_DIR)
ex = find_tce(KEPLER_ID, 1, filenames)
In[5]:
# Plot the global and local views.
global_view = np.array(ex.features.feature["global_view"].float_list.value)
local_view = np.array(ex.features.feature["local_view"].float_list.value)
fig, axes = plt.subplots(1, 2, figsize=(20, 6))
axes[0].plot(global_view, ".")
axes[1].plot(local_view, ".")
plt.show()
```
The output should look something like this:
![Kepler 90 g Processed](docs/kep90h-localglobal.png)
### Train an AstroNet Model
This directory contains several types of neural network architecture and various
configuration options. To train a convolutional neural network to classify
Kepler TCEs as either "planet" or "not planet", using the best configuration
from
[our paper](http://iopscience.iop.org/article/10.3847/1538-3881/aa9e09/meta),
run the following training script:
```bash
# Directory to save model checkpoints into.
MODEL_DIR="${HOME}/astronet/model/"
# Run the training script.
bazel-bin/astronet/train \
--model=AstroCNNModel \
--config_name=local_global \
--train_files=${TFRECORD_DIR}/train* \
--eval_files=${TFRECORD_DIR}/val* \
--model_dir=${MODEL_DIR}
```
Optionally, you can also run a [TensorBoard](https://www.tensorflow.org/programmers_guide/summaries_and_tensorboard)
server in a separate process for real-time
monitoring of training progress and evaluation metrics.
```bash
# Launch TensorBoard server.
tensorboard --logdir ${MODEL_DIR}
```
The TensorBoard server will show a page like this:
![TensorBoard](docs/tensorboard.png)
### Evaluate an AstroNet Model
Run the following command to evaluate a model on the test set. The result will
be printed on the screen, and a summary file will also be written to the model
directory, which will be visible in TensorBoard.
```bash
# Run the evaluation script.
bazel-bin/astronet/evaluate \
--model=AstroCNNModel \
--config_name=local_global \
--eval_files=${TFRECORD_DIR}/test* \
--model_dir=${MODEL_DIR}
```
The output should look something like this:
```bash
INFO:tensorflow:Saving dict for global step 10000: accuracy/accuracy = 0.9625159, accuracy/num_correct = 1515.0, auc = 0.988882, confusion_matrix/false_negatives = 10.0, confusion_matrix/false_positives = 49.0, confusion_matrix/true_negatives = 1165.0, confusion_matrix/true_positives = 350.0, global_step = 10000, loss = 0.112445444, losses/weighted_cross_entropy = 0.11295206, num_examples = 1574.
```
### Make Predictions
Suppose you detect a weak TCE in the light curve of the Kepler-90 star, with
period 14.44912 days, duration 2.70408 hours (0.11267 days) beginning 2.2 days
after 12:00 on 1/1/2009 (the year the Kepler telescope launched). To run this
TCE though your trained model, execute the following command:
```bash
# Generate a prediction for a new TCE.
bazel-bin/astronet/predict \
--model=AstroCNNModel \
--config_name=local_global \
--model_dir=${MODEL_DIR} \
--kepler_data_dir=${KEPLER_DATA_DIR} \
--kepler_id=11442793 \
--period=14.44912 \
--t0=2.2 \
--duration=0.11267 \
--output_image_file="${HOME}/astronet/kepler-90i.png"
```
The output should look like this:
```Prediction: 0.9480018```
This means the model is about 95% confident that the input TCE is a planet.
Of course, this is only a small step in the overall process of discovering and
validating an exoplanet: the model’s prediction is not proof one way or the
other. The process of validating this signal as a real exoplanet requires
significant follow-up work by an expert astronomer --- see Sections 6.3 and 6.4
of [our paper](http://iopscience.iop.org/article/10.3847/1538-3881/aa9e09/meta)
for the full details. In this particular case, our follow-up analysis validated
this signal as a bona fide exoplanet: it’s now called
[Kepler-90 i](https://www.nasa.gov/press-release/artificial-intelligence-nasa-data-used-to-discover-eighth-planet-circling-distant-star),
and is the record-breaking eighth planet discovered around the Kepler-90 star!
In addition to the output prediction, the script will also produce a plot of the
input representations. For Kepler-90 i, the plot should look something like
this:
![Kepler 90 h Processed](docs/kep90i-localglobal.png)
......@@ -32,6 +32,6 @@ py_test(
":configurations",
"//astronet/ops:input_ops",
"//astronet/ops:testing",
"//astronet/util:configdict",
"//tf_util:configdict",
],
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment