Unverified Commit 03b4a0af authored by Hongjun Choi's avatar Hongjun Choi Committed by GitHub
Browse files

Merged commit includes the following changes: (#7430)

262988559  by A. Unique TensorFlower<gardener@tensorflow.org>:

    Enable NCF TF 2.0 model to run on TPUStrategy.

--
262971756  by A. Unique TensorFlower<gardener@tensorflow.org>:

    Internal change

262967691  by hongkuny<hongkuny@google.com>:

    Internal

--

PiperOrigin-RevId: 262988559
parent 3a14837d
......@@ -143,37 +143,32 @@ class DatasetManager(object):
if is_training:
return {
movielens.USER_COLUMN:
tf.io.FixedLenFeature([batch_size], dtype=tf.int64),
tf.io.FixedLenFeature([batch_size, 1], dtype=tf.int64),
movielens.ITEM_COLUMN:
tf.io.FixedLenFeature([batch_size], dtype=tf.int64),
tf.io.FixedLenFeature([batch_size, 1], dtype=tf.int64),
rconst.VALID_POINT_MASK:
tf.io.FixedLenFeature([batch_size], dtype=tf.int64),
tf.io.FixedLenFeature([batch_size, 1], dtype=tf.int64),
"labels":
tf.io.FixedLenFeature([batch_size], dtype=tf.int64)
tf.io.FixedLenFeature([batch_size, 1], dtype=tf.int64)
}
else:
return {
movielens.USER_COLUMN:
tf.io.FixedLenFeature([batch_size], dtype=tf.int64),
tf.io.FixedLenFeature([batch_size, 1], dtype=tf.int64),
movielens.ITEM_COLUMN:
tf.io.FixedLenFeature([batch_size], dtype=tf.int64),
tf.io.FixedLenFeature([batch_size, 1], dtype=tf.int64),
rconst.DUPLICATE_MASK:
tf.io.FixedLenFeature([batch_size], dtype=tf.int64)
tf.io.FixedLenFeature([batch_size, 1], dtype=tf.int64)
}
features = tf.io.parse_single_example(
serialized_data, _get_feature_map(batch_size, is_training=is_training))
users = tf.reshape(
tf.cast(features[movielens.USER_COLUMN], rconst.USER_DTYPE),
(batch_size,))
items = tf.reshape(
tf.cast(features[movielens.ITEM_COLUMN], rconst.ITEM_DTYPE),
(batch_size,))
users = tf.cast(features[movielens.USER_COLUMN], rconst.USER_DTYPE)
items = tf.cast(features[movielens.ITEM_COLUMN], rconst.ITEM_DTYPE)
if is_training:
valid_point_mask = tf.reshape(
tf.cast(features[movielens.ITEM_COLUMN], tf.bool), (batch_size,))
fake_dup_mask = tf.zeros_like(features[movielens.USER_COLUMN])
valid_point_mask = tf.cast(features[rconst.VALID_POINT_MASK], tf.bool)
fake_dup_mask = tf.zeros_like(users)
return {
movielens.USER_COLUMN: users,
movielens.ITEM_COLUMN: items,
......@@ -184,20 +179,15 @@ class DatasetManager(object):
rconst.DUPLICATE_MASK: fake_dup_mask
}
else:
labels = tf.reshape(
tf.cast(tf.zeros_like(features[movielens.USER_COLUMN]), tf.bool),
(batch_size, 1))
fake_valid_pt_mask = tf.cast(
tf.zeros_like(features[movielens.USER_COLUMN]), tf.bool)
labels = tf.cast(tf.zeros_like(users), tf.bool)
fake_valid_pt_mask = tf.cast(tf.zeros_like(users), tf.bool)
return {
movielens.USER_COLUMN:
users,
movielens.ITEM_COLUMN:
items,
rconst.DUPLICATE_MASK:
tf.reshape(
tf.cast(features[rconst.DUPLICATE_MASK], tf.bool),
(batch_size,)),
tf.cast(features[rconst.DUPLICATE_MASK], tf.bool),
rconst.VALID_POINT_MASK:
fake_valid_pt_mask,
rconst.TRAIN_LABEL_KEY:
......@@ -221,8 +211,8 @@ class DatasetManager(object):
if self._is_training:
mask_start_index = data.pop(rconst.MASK_START_INDEX)
batch_size = data[movielens.ITEM_COLUMN].shape[0]
data[rconst.VALID_POINT_MASK] = np.less(
np.arange(batch_size), mask_start_index)
data[rconst.VALID_POINT_MASK] = np.expand_dims(
np.less(np.arange(batch_size), mask_start_index), -1)
if self._stream_files:
example_bytes = self.serialize(data)
......@@ -313,19 +303,21 @@ class DatasetManager(object):
else:
types = {movielens.USER_COLUMN: rconst.USER_DTYPE,
movielens.ITEM_COLUMN: rconst.ITEM_DTYPE}
shapes = {movielens.USER_COLUMN: tf.TensorShape([batch_size]),
movielens.ITEM_COLUMN: tf.TensorShape([batch_size])}
shapes = {
movielens.USER_COLUMN: tf.TensorShape([batch_size, 1]),
movielens.ITEM_COLUMN: tf.TensorShape([batch_size, 1])
}
if self._is_training:
types[rconst.VALID_POINT_MASK] = np.bool
shapes[rconst.VALID_POINT_MASK] = tf.TensorShape([batch_size])
shapes[rconst.VALID_POINT_MASK] = tf.TensorShape([batch_size, 1])
types = (types, np.bool)
shapes = (shapes, tf.TensorShape([batch_size]))
shapes = (shapes, tf.TensorShape([batch_size, 1]))
else:
types[rconst.DUPLICATE_MASK] = np.bool
shapes[rconst.DUPLICATE_MASK] = tf.TensorShape([batch_size])
shapes[rconst.DUPLICATE_MASK] = tf.TensorShape([batch_size, 1])
data_generator = functools.partial(
self.data_generator, epochs_between_evals=epochs_between_evals)
......@@ -554,12 +546,17 @@ class BaseDataConstructor(threading.Thread):
items = np.concatenate([items, item_pad])
labels = np.concatenate([labels, label_pad])
self._train_dataset.put(i, {
movielens.USER_COLUMN: users,
movielens.ITEM_COLUMN: items,
rconst.MASK_START_INDEX: np.array(mask_start_index, dtype=np.int32),
"labels": labels,
})
self._train_dataset.put(
i, {
movielens.USER_COLUMN:
np.reshape(users, (self.train_batch_size, 1)),
movielens.ITEM_COLUMN:
np.reshape(items, (self.train_batch_size, 1)),
rconst.MASK_START_INDEX:
np.array(mask_start_index, dtype=np.int32),
"labels":
np.reshape(labels, (self.train_batch_size, 1)),
})
def _wait_to_construct_train_epoch(self):
count = 0
......@@ -649,11 +646,15 @@ class BaseDataConstructor(threading.Thread):
users, items, duplicate_mask = self._assemble_eval_batch(
users, positive_items, negative_items, self._eval_users_per_batch)
self._eval_dataset.put(i, {
movielens.USER_COLUMN: users.flatten(),
movielens.ITEM_COLUMN: items.flatten(),
rconst.DUPLICATE_MASK: duplicate_mask.flatten(),
})
self._eval_dataset.put(
i, {
movielens.USER_COLUMN:
np.reshape(users.flatten(), (self.eval_batch_size, 1)),
movielens.ITEM_COLUMN:
np.reshape(items.flatten(), (self.eval_batch_size, 1)),
rconst.DUPLICATE_MASK:
np.reshape(duplicate_mask.flatten(), (self.eval_batch_size, 1)),
})
def _construct_eval_epoch(self):
"""Loop to construct data for evaluation."""
......@@ -720,24 +721,37 @@ class DummyConstructor(threading.Thread):
num_users = params["num_users"]
num_items = params["num_items"]
users = tf.random.uniform([batch_size], dtype=tf.int32, minval=0,
users = tf.random.uniform([batch_size, 1],
dtype=tf.int32,
minval=0,
maxval=num_users)
items = tf.random.uniform([batch_size], dtype=tf.int32, minval=0,
items = tf.random.uniform([batch_size, 1],
dtype=tf.int32,
minval=0,
maxval=num_items)
if is_training:
valid_point_mask = tf.cast(tf.random.uniform(
[batch_size], dtype=tf.int32, minval=0, maxval=2), tf.bool)
labels = tf.cast(tf.random.uniform(
[batch_size], dtype=tf.int32, minval=0, maxval=2), tf.bool)
valid_point_mask = tf.cast(
tf.random.uniform([batch_size, 1],
dtype=tf.int32,
minval=0,
maxval=2), tf.bool)
labels = tf.cast(
tf.random.uniform([batch_size, 1],
dtype=tf.int32,
minval=0,
maxval=2), tf.bool)
data = {
movielens.USER_COLUMN: users,
movielens.ITEM_COLUMN: items,
rconst.VALID_POINT_MASK: valid_point_mask,
}, labels
else:
dupe_mask = tf.cast(tf.random.uniform([batch_size], dtype=tf.int32,
minval=0, maxval=2), tf.bool)
dupe_mask = tf.cast(
tf.random.uniform([batch_size, 1],
dtype=tf.int32,
minval=0,
maxval=2), tf.bool)
data = {
movielens.USER_COLUMN: users,
movielens.ITEM_COLUMN: items,
......
......@@ -168,8 +168,11 @@ class BaseTest(tf.test.TestCase):
md5 = hashlib.md5()
for features, labels in first_epoch:
data_list = [
features[movielens.USER_COLUMN], features[movielens.ITEM_COLUMN],
features[rconst.VALID_POINT_MASK], labels]
features[movielens.USER_COLUMN].flatten(),
features[movielens.ITEM_COLUMN].flatten(),
features[rconst.VALID_POINT_MASK].flatten(),
labels.flatten()
]
for i in data_list:
md5.update(i.tobytes())
......@@ -216,8 +219,10 @@ class BaseTest(tf.test.TestCase):
md5 = hashlib.md5()
for features in eval_data:
data_list = [
features[movielens.USER_COLUMN], features[movielens.ITEM_COLUMN],
features[rconst.DUPLICATE_MASK]]
features[movielens.USER_COLUMN].flatten(),
features[movielens.ITEM_COLUMN].flatten(),
features[rconst.DUPLICATE_MASK].flatten()
]
for i in data_list:
md5.update(i.tobytes())
......@@ -276,8 +281,11 @@ class BaseTest(tf.test.TestCase):
md5 = hashlib.md5()
for features, labels in results:
data_list = [
features[movielens.USER_COLUMN], features[movielens.ITEM_COLUMN],
features[rconst.VALID_POINT_MASK], labels]
features[movielens.USER_COLUMN].flatten(),
features[movielens.ITEM_COLUMN].flatten(),
features[rconst.VALID_POINT_MASK].flatten(),
labels.flatten()
]
for i in data_list:
md5.update(i.tobytes())
......
......@@ -37,7 +37,6 @@ from official.utils.flags import core as flags_core
from official.utils.misc import distribution_utils
from official.utils.misc import keras_utils
FLAGS = flags.FLAGS
......@@ -60,13 +59,8 @@ def get_inputs(params):
dataset=FLAGS.dataset, data_dir=FLAGS.data_dir, params=params,
constructor_type=FLAGS.constructor_type,
deterministic=FLAGS.seed is not None)
num_train_steps = (producer.train_batches_per_epoch //
params["batches_per_step"])
num_eval_steps = (producer.eval_batches_per_epoch //
params["batches_per_step"])
assert not producer.train_batches_per_epoch % params["batches_per_step"]
assert not producer.eval_batches_per_epoch % params["batches_per_step"]
num_train_steps = producer.train_batches_per_epoch
num_eval_steps = producer.eval_batches_per_epoch
return num_users, num_items, num_train_steps, num_eval_steps, producer
......@@ -74,18 +68,13 @@ def get_inputs(params):
def parse_flags(flags_obj):
"""Convenience function to turn flags into params."""
num_gpus = flags_core.get_num_gpus(flags_obj)
num_devices = FLAGS.num_tpu_shards if FLAGS.tpu else num_gpus or 1
batch_size = (flags_obj.batch_size + num_devices - 1) // num_devices
eval_divisor = (rconst.NUM_EVAL_NEGATIVES + 1) * num_devices
batch_size = flags_obj.batch_size
eval_batch_size = flags_obj.eval_batch_size or flags_obj.batch_size
eval_batch_size = ((eval_batch_size + eval_divisor - 1) //
eval_divisor * eval_divisor // num_devices)
return {
"train_epochs": flags_obj.train_epochs,
"batches_per_step": num_devices,
"batches_per_step": 1,
"use_seed": flags_obj.seed is not None,
"batch_size": batch_size,
"eval_batch_size": eval_batch_size,
......@@ -95,6 +84,7 @@ def parse_flags(flags_obj):
"mf_regularization": flags_obj.mf_regularization,
"mlp_reg_layers": [float(reg) for reg in flags_obj.mlp_regularization],
"num_neg": flags_obj.num_neg,
"distribution_strategy": flags_obj.distribution_strategy,
"num_gpus": num_gpus,
"use_tpu": flags_obj.tpu is not None,
"tpu": flags_obj.tpu,
......@@ -115,7 +105,7 @@ def parse_flags(flags_obj):
}
def get_distribution_strategy(params):
def get_v1_distribution_strategy(params):
"""Returns the distribution strategy to use."""
if params["use_tpu"]:
# Some of the networking libraries are quite chatty.
......
......@@ -66,7 +66,7 @@ def construct_estimator(model_dir, params):
Returns:
An Estimator or TPUEstimator.
"""
distribution = ncf_common.get_distribution_strategy(params)
distribution = ncf_common.get_v1_distribution_strategy(params)
run_config = tf.estimator.RunConfig(train_distribute=distribution,
eval_distribute=distribution)
......
......@@ -82,7 +82,6 @@ def create_dataset_from_data_producer(producer, params):
Returns:
Processed training features.
"""
labels = tf.expand_dims(labels, -1)
fake_dup_mask = tf.zeros_like(features[movielens.USER_COLUMN])
features[rconst.DUPLICATE_MASK] = fake_dup_mask
features[rconst.TRAIN_LABEL_KEY] = labels
......@@ -106,7 +105,6 @@ def create_dataset_from_data_producer(producer, params):
Processed evaluation features.
"""
labels = tf.cast(tf.zeros_like(features[movielens.USER_COLUMN]), tf.bool)
labels = tf.expand_dims(labels, -1)
fake_valid_pt_mask = tf.cast(
tf.zeros_like(features[movielens.USER_COLUMN]), tf.bool)
features[rconst.VALID_POINT_MASK] = fake_valid_pt_mask
......@@ -134,9 +132,13 @@ def create_ncf_input_data(params, producer=None, input_meta_data=None):
Returns:
(training dataset, evaluation dataset, train steps per epoch,
eval steps per epoch)
"""
Raises:
ValueError: If data is being generated online for when using TPU's.
"""
if params["train_dataset_path"]:
assert params["eval_dataset_path"]
train_dataset = create_dataset_from_tf_record_files(
params["train_dataset_path"],
input_meta_data["train_prebatch_size"],
......@@ -148,34 +150,18 @@ def create_ncf_input_data(params, producer=None, input_meta_data=None):
params["eval_batch_size"],
is_training=False)
# TODO(b/259377621): Remove number of devices (i.e.
# params["batches_per_step"]) in input pipeline logic and only use
# global batch size instead.
num_train_steps = int(
np.ceil(input_meta_data["num_train_steps"] /
params["batches_per_step"]))
num_eval_steps = (
input_meta_data["num_eval_steps"] // params["batches_per_step"])
num_train_steps = int(input_meta_data["num_train_steps"])
num_eval_steps = int(input_meta_data["num_eval_steps"])
else:
assert producer
if params["use_tpu"]:
raise ValueError("TPU training does not support data producer yet. "
"Use pre-processed data.")
assert producer
# Start retrieving data from producer.
train_dataset, eval_dataset = create_dataset_from_data_producer(
producer, params)
num_train_steps = (
producer.train_batches_per_epoch // params["batches_per_step"])
num_eval_steps = (
producer.eval_batches_per_epoch // params["batches_per_step"])
assert not producer.train_batches_per_epoch % params["batches_per_step"]
assert not producer.eval_batches_per_epoch % params["batches_per_step"]
# It is required that for distributed training, the dataset must call
# batch(). The parameter of batch() here is the number of replicas involed,
# such that each replica evenly gets a slice of data.
# drop_remainder = True, as we would like batch call to return a fixed shape
# vs None, this prevents a expensive broadcast during weighted_loss
batches_per_step = params["batches_per_step"]
train_dataset = train_dataset.batch(batches_per_step, drop_remainder=True)
eval_dataset = eval_dataset.batch(batches_per_step, drop_remainder=True)
num_train_steps = producer.train_batches_per_epoch
num_eval_steps = producer.eval_batches_per_epoch
return train_dataset, eval_dataset, num_train_steps, num_eval_steps
This diff is collapsed.
......@@ -189,7 +189,7 @@ class NcfTest(tf.test.TestCase):
self.assertAlmostEqual(ndcg, (1 + math.log(2) / math.log(3) +
2 * math.log(2) / math.log(4)) / 4)
_BASE_END_TO_END_FLAGS = ['-batch_size', '1024', '-train_epochs', '1']
_BASE_END_TO_END_FLAGS = ['-batch_size', '1044', '-train_epochs', '1']
@unittest.skipIf(keras_utils.is_v2_0(), "TODO(b/136018594)")
@mock.patch.object(rconst, "SYNTHETIC_BATCHES_PER_EPOCH", 100)
......
......@@ -109,7 +109,6 @@ def neumf_model_fn(features, labels, mode, params):
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.OPT_HP_ADAM_EPSILON,
value=params["epsilon"])
optimizer = tf.compat.v1.train.AdamOptimizer(
learning_rate=params["learning_rate"],
beta1=params["beta1"],
......@@ -151,7 +150,7 @@ def _strip_first_and_last_dimension(x, batch_size):
return tf.reshape(x[0, :], (batch_size,))
def construct_model(user_input, item_input, params, need_strip=False):
def construct_model(user_input, item_input, params):
# type: (tf.Tensor, tf.Tensor, dict) -> tf.keras.Model
"""Initialize NeuMF model.
......@@ -184,34 +183,33 @@ def construct_model(user_input, item_input, params, need_strip=False):
# Initializer for embedding layers
embedding_initializer = "glorot_uniform"
if need_strip:
batch_size = params["batch_size"]
user_input_reshaped = tf.keras.layers.Lambda(
lambda x: _strip_first_and_last_dimension(
x, batch_size))(user_input)
def mf_slice_fn(x):
x = tf.squeeze(x, [1])
return x[:, :mf_dim]
item_input_reshaped = tf.keras.layers.Lambda(
lambda x: _strip_first_and_last_dimension(
x, batch_size))(item_input)
def mlp_slice_fn(x):
x = tf.squeeze(x, [1])
return x[:, mf_dim:]
# It turns out to be significantly more effecient to store the MF and MLP
# embedding portions in the same table, and then slice as needed.
mf_slice_fn = lambda x: x[:, :mf_dim]
mlp_slice_fn = lambda x: x[:, mf_dim:]
embedding_user = tf.keras.layers.Embedding(
num_users, mf_dim + model_layers[0] // 2,
num_users,
mf_dim + model_layers[0] // 2,
embeddings_initializer=embedding_initializer,
embeddings_regularizer=tf.keras.regularizers.l2(mf_regularization),
input_length=1, name="embedding_user")(
user_input_reshaped if need_strip else user_input)
input_length=1,
name="embedding_user")(
user_input)
embedding_item = tf.keras.layers.Embedding(
num_items, mf_dim + model_layers[0] // 2,
num_items,
mf_dim + model_layers[0] // 2,
embeddings_initializer=embedding_initializer,
embeddings_regularizer=tf.keras.regularizers.l2(mf_regularization),
input_length=1, name="embedding_item")(
item_input_reshaped if need_strip else item_input)
input_length=1,
name="embedding_item")(
item_input)
# GMF part
mf_user_latent = tf.keras.layers.Lambda(
......
......@@ -24,6 +24,8 @@ import random
import string
import tensorflow as tf
from official.utils.misc import tpu_lib
def _collective_communication(all_reduce_alg):
"""Return a CollectiveCommunication based on all_reduce_alg.
......@@ -83,16 +85,18 @@ def get_distribution_strategy(distribution_strategy="default",
num_gpus=0,
num_workers=1,
all_reduce_alg=None,
num_packs=1):
num_packs=1,
tpu_address=None):
"""Return a DistributionStrategy for running the model.
Args:
distribution_strategy: a string specifying which distribution strategy to
use. Accepted values are 'off', 'default', 'one_device', 'mirrored',
'parameter_server', 'multi_worker_mirrored', case insensitive. 'off' means
not to use Distribution Strategy; 'default' means to choose from
'parameter_server', 'multi_worker_mirrored', and 'tpu' -- case insensitive.
'off' means not to use Distribution Strategy; 'default' means to choose from
`MirroredStrategy`, `MultiWorkerMirroredStrategy`, or `OneDeviceStrategy`
according to the number of GPUs and number of workers.
according to the number of GPUs and number of workers. 'tpu' means to use
TPUStrategy using `tpu_address`.
num_gpus: Number of GPUs to run this model.
num_workers: Number of workers to run this model.
all_reduce_alg: Optional. Specifies which algorithm to use when performing
......@@ -102,12 +106,14 @@ def get_distribution_strategy(distribution_strategy="default",
device topology.
num_packs: Optional. Sets the `num_packs` in `tf.distribute.NcclAllReduce`
or `tf.distribute.HierarchicalCopyAllReduce` for `MirroredStrategy`.
tpu_address: Optional. String that represents TPU to connect to. Must not
be None if `distribution_strategy` is set to `tpu`.
Returns:
tf.distribute.DistibutionStrategy object.
Raises:
ValueError: if `distribution_strategy` is 'off' or 'one_device' and
`num_gpus` is larger than 1; or `num_gpus` is negative.
`num_gpus` is larger than 1; or `num_gpus` is negative or if
`distribution_strategy` is `tpu` but `tpu_address` is not specified.
"""
if num_gpus < 0:
raise ValueError("`num_gpus` can not be negative.")
......@@ -120,6 +126,15 @@ def get_distribution_strategy(distribution_strategy="default",
"flag cannot be set to 'off'.".format(num_gpus, num_workers))
return None
if distribution_strategy == "tpu":
if not tpu_address:
raise ValueError("`tpu_address` must be specified when using "
"TPUStrategy.")
# Initialize TPU System.
cluster_resolver = tpu_lib.tpu_initialize(tpu_address)
return tf.distribute.experimental.TPUStrategy(cluster_resolver)
if distribution_strategy == "multi_worker_mirrored":
return tf.distribute.experimental.MultiWorkerMirroredStrategy(
communication=_collective_communication(all_reduce_alg))
......
......@@ -31,3 +31,8 @@ def tpu_initialize(tpu_address):
tf.config.experimental_connect_to_host(cluster_resolver.master())
tf.tpu.experimental.initialize_tpu_system(cluster_resolver)
return cluster_resolver
def get_primary_cpu_task(use_remote_tpu=False):
"""Returns remote TPU worker address. No-op for GPU/CPU training."""
return "/job:worker" if use_remote_tpu else ""
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment