Unverified Commit 61b03fcf authored by shizhiw's avatar shizhiw Committed by GitHub
Browse files

Merge pull request #5462 from tensorflow/shizhiw

Allow data async generation to be run as a standalone job rather than a subprocess.
parents 86c0ad3a acba9b0b
......@@ -64,6 +64,8 @@ DUPLICATE_MASK = "duplicate_mask"
CYCLES_TO_BUFFER = 3 # The number of train cycles worth of data to "run ahead"
# of the main training loop.
FLAGFILE_TEMP = "flagfile.temp"
FLAGFILE = "flagfile"
READY_FILE_TEMP = "ready.json.temp"
READY_FILE = "ready.json"
TRAIN_RECORD_TEMPLATE = "train_{}.tfrecords"
......
......@@ -43,6 +43,7 @@ from absl import flags
from official.datasets import movielens
from official.recommendation import constants as rconst
from official.recommendation import stat_utils
from official.recommendation import popen_helper
_log_file = None
......@@ -50,6 +51,10 @@ _log_file = None
def log_msg(msg):
"""Include timestamp info when logging messages to a file."""
if flags.FLAGS.use_tf_logging:
tf.logging.info(msg)
return
if flags.FLAGS.redirect_logs:
timestamp = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
print("[{}] {}".format(timestamp, msg), file=_log_file)
......@@ -207,8 +212,7 @@ def _construct_training_records(
map_args = [(shard, num_items, num_neg, process_seeds[i])
for i, shard in enumerate(training_shards * epochs_per_cycle)]
with contextlib.closing(multiprocessing.Pool(
processes=num_workers, initializer=init_worker)) as pool:
with popen_helper.get_pool(num_workers, init_worker) as pool:
map_fn = pool.imap if deterministic else pool.imap_unordered # pylint: disable=no-member
data_generator = map_fn(_process_shard, map_args)
data = [
......@@ -436,13 +440,33 @@ def _generation_loop(num_workers, # type: int
gc.collect()
def _parse_flagfile():
"""Fill flags with flagfile written by the main process."""
flagfile = os.path.join(flags.FLAGS.data_dir,
rconst.FLAGFILE)
tf.logging.info("Waiting for flagfile to appear at {}..."
.format(flagfile))
start_time = time.time()
while not tf.gfile.Exists(flagfile):
if time.time() - start_time > rconst.TIMEOUT_SECONDS:
log_msg("Waited more than {} seconds. Concluding that this "
"process is orphaned and exiting gracefully."
.format(rconst.TIMEOUT_SECONDS))
sys.exit()
time.sleep(1)
tf.logging.info("flagfile found.")
# This overrides FLAGS with flags from flagfile.
flags.FLAGS([__file__, "--flagfile", flagfile])
def main(_):
global _log_file
_parse_flagfile()
redirect_logs = flags.FLAGS.redirect_logs
cache_paths = rconst.Paths(
data_dir=flags.FLAGS.data_dir, cache_id=flags.FLAGS.cache_id)
log_file_name = "data_gen_proc_{}.log".format(cache_paths.cache_id)
log_path = os.path.join(cache_paths.data_dir, log_file_name)
if log_path.startswith("gs://") and redirect_logs:
......@@ -489,16 +513,12 @@ def main(_):
def define_flags():
"""Construct flags for the server.
This function does not use offical.utils.flags, as these flags are not meant
to be used by humans. Rather, they should be passed as part of a subprocess
call.
"""
"""Construct flags for the server."""
flags.DEFINE_integer(name="num_workers", default=multiprocessing.cpu_count(),
help="Size of the negative generation worker pool.")
flags.DEFINE_string(name="data_dir", default=None,
help="The data root. (used to construct cache paths.)")
flags.mark_flags_as_required(["data_dir"])
flags.DEFINE_string(name="cache_id", default=None,
help="The cache_id generated in the main process.")
flags.DEFINE_integer(name="num_readers", default=4,
......@@ -528,15 +548,12 @@ def define_flags():
flags.DEFINE_boolean(name="redirect_logs", default=False,
help="Catch logs and write them to a file. "
"(Useful if this is run as a subprocess)")
flags.DEFINE_boolean(name="use_tf_logging", default=False,
help="Use tf.logging instead of log file.")
flags.DEFINE_integer(name="seed", default=None,
help="NumPy random seed to set at startup. If not "
"specified, a seed will not be set.")
flags.mark_flags_as_required(
["data_dir", "cache_id", "num_neg", "num_train_positives", "num_items",
"train_batch_size", "eval_batch_size"])
if __name__ == "__main__":
define_flags()
......
......@@ -436,7 +436,8 @@ def _shutdown(proc):
def instantiate_pipeline(dataset, data_dir, batch_size, eval_batch_size,
num_data_readers=None, num_neg=4, epochs_per_cycle=1,
match_mlperf=False, deterministic=False):
match_mlperf=False, deterministic=False,
use_subprocess=True):
# type: (...) -> (NCFDataset, typing.Callable)
"""Preprocess data and start negative generation subprocess."""
......@@ -445,43 +446,57 @@ def instantiate_pipeline(dataset, data_dir, batch_size, eval_batch_size,
num_data_readers=num_data_readers,
match_mlperf=match_mlperf,
deterministic=deterministic)
# By limiting the number of workers we guarantee that the worker
# pool underlying the training generation doesn't starve other processes.
num_workers = int(multiprocessing.cpu_count() * 0.75) or 1
tf.logging.info("Creating training file subprocess.")
flags_ = {
"data_dir": data_dir,
"cache_id": ncf_dataset.cache_paths.cache_id,
"num_neg": num_neg,
"num_train_positives": ncf_dataset.num_train_positives,
"num_items": ncf_dataset.num_items,
"num_readers": ncf_dataset.num_data_readers,
"epochs_per_cycle": epochs_per_cycle,
"train_batch_size": batch_size,
"eval_batch_size": eval_batch_size,
"num_workers": num_workers,
# This allows the training input function to guarantee batch size and
# significantly improves performance. (~5% increase in examples/sec on
# GPU, and needed for TPU XLA.)
"spillover": True,
"redirect_logs": use_subprocess,
"use_tf_logging": not use_subprocess,
}
if ncf_dataset.deterministic:
flags_["seed"] = stat_utils.random_int32()
tf.gfile.MakeDirs(flags.FLAGS.data_dir)
# We write to a temp file then atomically rename it to the final file,
# because writing directly to the final file can cause the data generation
# async process to read a partially written JSON file.
flagfile_temp = os.path.join(flags.FLAGS.data_dir, rconst.FLAGFILE_TEMP)
tf.logging.info("Preparing flagfile for async data generation in {} ..."
.format(flagfile_temp))
with tf.gfile.Open(flagfile_temp, "w") as f:
for k, v in six.iteritems(flags_):
f.write("--{}={}\n".format(k, v))
flagfile = os.path.join(data_dir, rconst.FLAGFILE)
tf.gfile.Rename(flagfile_temp, flagfile)
tf.logging.info(
"Wrote flagfile for async data generation in {}."
.format(flagfile))
if use_subprocess:
tf.logging.info("Creating training file subprocess.")
subproc_env = os.environ.copy()
# The subprocess uses TensorFlow for tf.gfile, but it does not need GPU
# resources and by default will try to allocate GPU memory. This would cause
# contention with the main training process.
subproc_env["CUDA_VISIBLE_DEVICES"] = ""
# By limiting the number of workers we guarantee that the worker
# pool underlying the training generation doesn't starve other processes.
num_workers = int(multiprocessing.cpu_count() * 0.75) or 1
subproc_args = popen_helper.INVOCATION + [
"--data_dir", data_dir,
"--cache_id", str(ncf_dataset.cache_paths.cache_id),
"--num_neg", str(num_neg),
"--num_train_positives", str(ncf_dataset.num_train_positives),
"--num_items", str(ncf_dataset.num_items),
"--num_readers", str(ncf_dataset.num_data_readers),
"--epochs_per_cycle", str(epochs_per_cycle),
"--train_batch_size", str(batch_size),
"--eval_batch_size", str(eval_batch_size),
"--num_workers", str(num_workers),
"--spillover", "True", # This allows the training input function to
# guarantee batch size and significantly improves
# performance. (~5% increase in examples/sec on
# GPU, and needed for TPU XLA.)
"--redirect_logs", "True"
]
if ncf_dataset.deterministic:
subproc_args.extend(["--seed", str(int(stat_utils.random_int32()))])
"--data_dir", data_dir]
tf.logging.info(
"Generation subprocess command: {}".format(" ".join(subproc_args)))
proc = subprocess.Popen(args=subproc_args, shell=False, env=subproc_env)
cleanup_called = {"finished": False}
......@@ -491,7 +506,9 @@ def instantiate_pipeline(dataset, data_dir, batch_size, eval_batch_size,
if cleanup_called["finished"]:
return
if use_subprocess:
_shutdown(proc)
try:
tf.gfile.DeleteRecursively(ncf_dataset.cache_paths.cache_root)
except tf.errors.NotFoundError:
......
......@@ -151,7 +151,8 @@ def run_ncf(_):
num_neg=FLAGS.num_neg,
epochs_per_cycle=FLAGS.epochs_between_evals,
match_mlperf=FLAGS.ml_perf,
deterministic=FLAGS.seed is not None)
deterministic=FLAGS.seed is not None,
use_subprocess=FLAGS.use_subprocess)
num_users = ncf_dataset.num_users
num_items = ncf_dataset.num_items
approx_train_steps = int(ncf_dataset.num_train_positives
......@@ -380,6 +381,12 @@ def define_ncf_flags():
return (eval_batch_size is None or
int(eval_batch_size) > rconst.NUM_EVAL_NEGATIVES)
flags.DEFINE_bool(
name="use_subprocess", default=True, help=flags_core.help_wrap(
"By default, ncf_main.py starts async data generation process as a "
"subprocess. If set to False, ncf_main.py will assume the async data "
"generation process has already been started by the user."))
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
......
......@@ -14,6 +14,8 @@
# ==============================================================================
"""Helper file for running the async data generation process in OSS."""
import contextlib
import multiprocessing
import os
import sys
......@@ -27,3 +29,8 @@ _ASYNC_GEN_PATH = os.path.join(os.path.dirname(__file__),
"data_async_generation.py")
INVOCATION = [_PYTHON, _ASYNC_GEN_PATH]
def get_pool(num_workers, init_worker=None):
return contextlib.closing(multiprocessing.Pool(
processes=num_workers, initializer=init_worker))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment