Commit c88fcb2b authored by Shawn Wang's avatar Shawn Wang
Browse files

Use flagfile to pass flags to data async generation process.

parent d4ac494f
...@@ -51,7 +51,7 @@ _log_file = None ...@@ -51,7 +51,7 @@ _log_file = None
def log_msg(msg): def log_msg(msg):
"""Include timestamp info when logging messages to a file.""" """Include timestamp info when logging messages to a file."""
if flags.FLAGS.use_command_file: if flags.FLAGS.use_tf_logging:
tf.logging.info(msg) tf.logging.info(msg)
return return
...@@ -440,44 +440,26 @@ def _generation_loop(num_workers, # type: int ...@@ -440,44 +440,26 @@ def _generation_loop(num_workers, # type: int
gc.collect() gc.collect()
def _set_flags_with_command_file(): def _parse_flagfile():
"""Use arguments from COMMAND_FILE when use_command_file is True.""" """Fill flags with flagfile."""
command_file = os.path.join(flags.FLAGS.data_dir, flagfile = os.path.join(flags.FLAGS.data_dir,
rconst.COMMAND_FILE) rconst.FLAGFILE)
tf.logging.info("Waiting for command file to appear at {}..." tf.logging.info("Waiting for flagfile to appear at {}..."
.format(command_file)) .format(flagfile))
while not tf.gfile.Exists(command_file): while not tf.gfile.Exists(flagfile):
time.sleep(1) time.sleep(1)
tf.logging.info("Command file found.") tf.logging.info("flagfile found.")
with tf.gfile.Open(command_file, "r") as f: flags.FLAGS([__file__, "--flagfile", flagfile])
command = json.load(f)
flags.FLAGS.num_workers = command["num_workers"]
assert flags.FLAGS.data_dir == command["data_dir"]
flags.FLAGS.cache_id = command["cache_id"]
flags.FLAGS.num_readers = command["num_readers"]
flags.FLAGS.num_neg = command["num_neg"]
flags.FLAGS.num_train_positives = command["num_train_positives"]
flags.FLAGS.num_items = command["num_items"]
flags.FLAGS.epochs_per_cycle = command["epochs_per_cycle"]
flags.FLAGS.train_batch_size = command["train_batch_size"]
flags.FLAGS.eval_batch_size = command["eval_batch_size"]
flags.FLAGS.spillover = command["spillover"]
flags.FLAGS.redirect_logs = command["redirect_logs"]
assert flags.FLAGS.redirect_logs is False
if "seed" in command:
flags.FLAGS.seed = command["seed"]
def main(_): def main(_):
global _log_file global _log_file
if flags.FLAGS.use_command_file is not None: _parse_flagfile()
_set_flags_with_command_file()
redirect_logs = flags.FLAGS.redirect_logs redirect_logs = flags.FLAGS.redirect_logs
cache_paths = rconst.Paths( cache_paths = rconst.Paths(
data_dir=flags.FLAGS.data_dir, cache_id=flags.FLAGS.cache_id) data_dir=flags.FLAGS.data_dir, cache_id=flags.FLAGS.cache_id)
log_file_name = "data_gen_proc_{}.log".format(cache_paths.cache_id) log_file_name = "data_gen_proc_{}.log".format(cache_paths.cache_id)
log_path = os.path.join(cache_paths.data_dir, log_file_name) log_path = os.path.join(cache_paths.data_dir, log_file_name)
if log_path.startswith("gs://") and redirect_logs: if log_path.startswith("gs://") and redirect_logs:
...@@ -559,12 +541,11 @@ def define_flags(): ...@@ -559,12 +541,11 @@ def define_flags():
flags.DEFINE_boolean(name="redirect_logs", default=False, flags.DEFINE_boolean(name="redirect_logs", default=False,
help="Catch logs and write them to a file. " help="Catch logs and write them to a file. "
"(Useful if this is run as a subprocess)") "(Useful if this is run as a subprocess)")
flags.DEFINE_boolean(name="use_tf_logging", default=False,
help="Use tf.logging instead of log file.")
flags.DEFINE_integer(name="seed", default=None, flags.DEFINE_integer(name="seed", default=None,
help="NumPy random seed to set at startup. If not " help="NumPy random seed to set at startup. If not "
"specified, a seed will not be set.") "specified, a seed will not be set.")
flags.DEFINE_boolean(name="use_command_file", default=False,
help="Use command arguments from json at command_path. "
"All arguments other than data_dir will be ignored.")
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -430,50 +430,7 @@ def instantiate_pipeline(dataset, data_dir, batch_size, eval_batch_size, ...@@ -430,50 +430,7 @@ def instantiate_pipeline(dataset, data_dir, batch_size, eval_batch_size,
# pool underlying the training generation doesn't starve other processes. # pool underlying the training generation doesn't starve other processes.
num_workers = int(multiprocessing.cpu_count() * 0.75) or 1 num_workers = int(multiprocessing.cpu_count() * 0.75) or 1
if use_subprocess: flags_ = {
tf.logging.info("Creating training file subprocess.")
subproc_env = os.environ.copy()
# The subprocess uses TensorFlow for tf.gfile, but it does not need GPU
# resources and by default will try to allocate GPU memory. This would cause
# contention with the main training process.
subproc_env["CUDA_VISIBLE_DEVICES"] = ""
subproc_args = popen_helper.INVOCATION + [
"--data_dir", data_dir,
"--cache_id", str(ncf_dataset.cache_paths.cache_id),
"--num_neg", str(num_neg),
"--num_train_positives", str(ncf_dataset.num_train_positives),
"--num_items", str(ncf_dataset.num_items),
"--num_readers", str(ncf_dataset.num_data_readers),
"--epochs_per_cycle", str(epochs_per_cycle),
"--train_batch_size", str(batch_size),
"--eval_batch_size", str(eval_batch_size),
"--num_workers", str(num_workers),
# This allows the training input function to guarantee batch size and
# significantly improves performance. (~5% increase in examples/sec on
# GPU, and needed for TPU XLA.)
"--spillover", "True",
"--redirect_logs", "True"
]
if ncf_dataset.deterministic:
subproc_args.extend(["--seed", str(int(stat_utils.random_int32()))])
tf.logging.info(
"Generation subprocess command: {}".format(" ".join(subproc_args)))
proc = subprocess.Popen(args=subproc_args, shell=False, env=subproc_env)
else:
# We write to a temp file then atomically rename it to the final file,
# because writing directly to the final file can cause the data generation
# async process to read a partially written JSON file.
command_file_temp = os.path.join(data_dir, rconst.COMMAND_FILE_TEMP)
tf.logging.info("Generation subprocess command at {} ..."
.format(command_file_temp))
with tf.gfile.Open(command_file_temp, "w") as f:
command = {
"data_dir": data_dir, "data_dir": data_dir,
"cache_id": ncf_dataset.cache_paths.cache_id, "cache_id": ncf_dataset.cache_paths.cache_id,
"num_neg": num_neg, "num_neg": num_neg,
...@@ -488,18 +445,38 @@ def instantiate_pipeline(dataset, data_dir, batch_size, eval_batch_size, ...@@ -488,18 +445,38 @@ def instantiate_pipeline(dataset, data_dir, batch_size, eval_batch_size,
# significantly improves performance. (~5% increase in examples/sec on # significantly improves performance. (~5% increase in examples/sec on
# GPU, and needed for TPU XLA.) # GPU, and needed for TPU XLA.)
"spillover": True, "spillover": True,
"redirect_logs": False "redirect_logs": use_subprocess,
"use_tf_logging": not use_subprocess,
} }
if ncf_dataset.deterministic: if ncf_dataset.deterministic:
command["seed"] = stat_utils.random_int32() flags_["seed"] = stat_utils.random_int32()
# We write to a temp file then atomically rename it to the final file,
json.dump(command, f) # because writing directly to the final file can cause the data generation
command_file = os.path.join(data_dir, rconst.COMMAND_FILE) # async process to read a partially written JSON file.
tf.gfile.Rename(command_file_temp, command_file) flagfile_temp = os.path.join(flags.FLAGS.data_dir, rconst.FLAGFILE_TEMP)
tf.logging.info("Preparing flagfile for async data generation in {} ..."
.format(flagfile_temp))
with tf.gfile.Open(flagfile_temp, "w") as f:
for k, v in six.iteritems(flags_):
f.write("--{}={}\n".format(k, v))
flagfile = os.path.join(data_dir, rconst.FLAGFILE)
tf.gfile.Rename(flagfile_temp, flagfile)
tf.logging.info(
"Wrote flagfile for async data generation in {}."
.format(flagfile))
if use_subprocess:
tf.logging.info("Creating training file subprocess.")
subproc_env = os.environ.copy()
# The subprocess uses TensorFlow for tf.gfile, but it does not need GPU
# resources and by default will try to allocate GPU memory. This would cause
# contention with the main training process.
subproc_env["CUDA_VISIBLE_DEVICES"] = ""
subproc_args = popen_helper.INVOCATION + [
"--data_dir", data_dir]
tf.logging.info( tf.logging.info(
"Generation subprocess command saved to: {}" "Generation subprocess command: {}".format(" ".join(subproc_args)))
.format(command_file)) proc = subprocess.Popen(args=subproc_args, shell=False, env=subproc_env)
cleanup_called = {"finished": False} cleanup_called = {"finished": False}
@atexit.register @atexit.register
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment