Unverified Commit 8ff9eb54 authored by Shining Sun's avatar Shining Sun Committed by GitHub
Browse files

Remove contrib imports, or move them inline (#6591)

* Remove contrib imports, or move them inline

* Use exposed API for FixedLenFeature

* Replace tf.logging with absl logging

* Change GFile to v2 APIs

* replace tf.logging with absl loggin in movielens

* Fixing an import bug

* Change gfile to v2 APIs in code

* Swap to keras optimizer v2

* Bug fix for optimizer

* Change tf.log to tf.keras.backend.log

* Change the loss function to keras loss

* convert another loss to keras loss

* Resolve comments and fix lint

* Add a doc string

* Fix existing tests and add new tests for DS

* Added tests for multi-replica

* Fix lint

* resolve comments

* make estimator run in tf2.0

* use compat v1 loss

* fix lint issue
parent 139dd8e9
...@@ -33,6 +33,7 @@ import six ...@@ -33,6 +33,7 @@ import six
from six.moves import urllib # pylint: disable=redefined-builtin from six.moves import urllib # pylint: disable=redefined-builtin
from absl import app as absl_app from absl import app as absl_app
from absl import flags from absl import flags
from absl import logging
import tensorflow as tf import tensorflow as tf
# pylint: enable=g-bad-import-order # pylint: enable=g-bad-import-order
...@@ -100,10 +101,10 @@ def _download_and_clean(dataset, data_dir): ...@@ -100,10 +101,10 @@ def _download_and_clean(dataset, data_dir):
expected_files = ["{}.zip".format(dataset), RATINGS_FILE, MOVIES_FILE] expected_files = ["{}.zip".format(dataset), RATINGS_FILE, MOVIES_FILE]
tf.gfile.MakeDirs(data_subdir) tf.io.gfile.makedirs(data_subdir)
if set(expected_files).intersection( if set(expected_files).intersection(
tf.gfile.ListDirectory(data_subdir)) == set(expected_files): tf.io.gfile.listdir(data_subdir)) == set(expected_files):
tf.logging.info("Dataset {} has already been downloaded".format(dataset)) logging.info("Dataset {} has already been downloaded".format(dataset))
return return
url = "{}{}.zip".format(_DATA_URL, dataset) url = "{}{}.zip".format(_DATA_URL, dataset)
...@@ -114,9 +115,9 @@ def _download_and_clean(dataset, data_dir): ...@@ -114,9 +115,9 @@ def _download_and_clean(dataset, data_dir):
zip_path, _ = urllib.request.urlretrieve(url, zip_path) zip_path, _ = urllib.request.urlretrieve(url, zip_path)
statinfo = os.stat(zip_path) statinfo = os.stat(zip_path)
# A new line to clear the carriage return from download progress # A new line to clear the carriage return from download progress
# tf.logging.info is not applicable here # logging.info is not applicable here
print() print()
tf.logging.info( logging.info(
"Successfully downloaded {} {} bytes".format( "Successfully downloaded {} {} bytes".format(
zip_path, statinfo.st_size)) zip_path, statinfo.st_size))
...@@ -127,16 +128,16 @@ def _download_and_clean(dataset, data_dir): ...@@ -127,16 +128,16 @@ def _download_and_clean(dataset, data_dir):
else: else:
_regularize_20m_dataset(temp_dir) _regularize_20m_dataset(temp_dir)
for fname in tf.gfile.ListDirectory(temp_dir): for fname in tf.io.gfile.listdir(temp_dir):
if not tf.gfile.Exists(os.path.join(data_subdir, fname)): if not tf.io.gfile.exists(os.path.join(data_subdir, fname)):
tf.gfile.Copy(os.path.join(temp_dir, fname), tf.io.gfile.copy(os.path.join(temp_dir, fname),
os.path.join(data_subdir, fname)) os.path.join(data_subdir, fname))
else: else:
tf.logging.info("Skipping copy of {}, as it already exists in the " logging.info("Skipping copy of {}, as it already exists in the "
"destination folder.".format(fname)) "destination folder.".format(fname))
finally: finally:
tf.gfile.DeleteRecursively(temp_dir) tf.io.gfile.rmtree(temp_dir)
def _transform_csv(input_path, output_path, names, skip_first, separator=","): def _transform_csv(input_path, output_path, names, skip_first, separator=","):
...@@ -152,8 +153,8 @@ def _transform_csv(input_path, output_path, names, skip_first, separator=","): ...@@ -152,8 +153,8 @@ def _transform_csv(input_path, output_path, names, skip_first, separator=","):
if six.PY2: if six.PY2:
names = [n.decode("utf-8") for n in names] names = [n.decode("utf-8") for n in names]
with tf.gfile.Open(output_path, "wb") as f_out, \ with tf.io.gfile.GFile(output_path, "wb") as f_out, \
tf.gfile.Open(input_path, "rb") as f_in: tf.io.gfile.GFile(input_path, "rb") as f_in:
# Write column names to the csv. # Write column names to the csv.
f_out.write(",".join(names).encode("utf-8")) f_out.write(",".join(names).encode("utf-8"))
...@@ -199,7 +200,7 @@ def _regularize_1m_dataset(temp_dir): ...@@ -199,7 +200,7 @@ def _regularize_1m_dataset(temp_dir):
output_path=os.path.join(temp_dir, MOVIES_FILE), output_path=os.path.join(temp_dir, MOVIES_FILE),
names=MOVIE_COLUMNS, skip_first=False, separator="::") names=MOVIE_COLUMNS, skip_first=False, separator="::")
tf.gfile.DeleteRecursively(working_dir) tf.io.gfile.rmtree(working_dir)
def _regularize_20m_dataset(temp_dir): def _regularize_20m_dataset(temp_dir):
...@@ -233,7 +234,7 @@ def _regularize_20m_dataset(temp_dir): ...@@ -233,7 +234,7 @@ def _regularize_20m_dataset(temp_dir):
output_path=os.path.join(temp_dir, MOVIES_FILE), output_path=os.path.join(temp_dir, MOVIES_FILE),
names=MOVIE_COLUMNS, skip_first=True, separator=",") names=MOVIE_COLUMNS, skip_first=True, separator=",")
tf.gfile.DeleteRecursively(working_dir) tf.io.gfile.rmtree(working_dir)
def download(dataset, data_dir): def download(dataset, data_dir):
...@@ -244,14 +245,14 @@ def download(dataset, data_dir): ...@@ -244,14 +245,14 @@ def download(dataset, data_dir):
def ratings_csv_to_dataframe(data_dir, dataset): def ratings_csv_to_dataframe(data_dir, dataset):
with tf.gfile.Open(os.path.join(data_dir, dataset, RATINGS_FILE)) as f: with tf.io.gfile.GFile(os.path.join(data_dir, dataset, RATINGS_FILE)) as f:
return pd.read_csv(f, encoding="utf-8") return pd.read_csv(f, encoding="utf-8")
def csv_to_joint_dataframe(data_dir, dataset): def csv_to_joint_dataframe(data_dir, dataset):
ratings = ratings_csv_to_dataframe(data_dir, dataset) ratings = ratings_csv_to_dataframe(data_dir, dataset)
with tf.gfile.Open(os.path.join(data_dir, dataset, MOVIES_FILE)) as f: with tf.io.gfile.GFile(os.path.join(data_dir, dataset, MOVIES_FILE)) as f:
movies = pd.read_csv(f, encoding="utf-8") movies = pd.read_csv(f, encoding="utf-8")
df = ratings.merge(movies, on=ITEM_COLUMN) df = ratings.merge(movies, on=ITEM_COLUMN)
...@@ -302,7 +303,6 @@ def main(_): ...@@ -302,7 +303,6 @@ def main(_):
if __name__ == "__main__": if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
define_data_download_flags() define_data_download_flags()
FLAGS = flags.FLAGS FLAGS = flags.FLAGS
absl_app.run(main) absl_app.run(main)
...@@ -33,7 +33,7 @@ import numpy as np ...@@ -33,7 +33,7 @@ import numpy as np
import six import six
from six.moves import queue from six.moves import queue
import tensorflow as tf import tensorflow as tf
from tensorflow.contrib.tpu.python.tpu.datasets import StreamingFilesDataset from absl import logging
from official.datasets import movielens from official.datasets import movielens
from official.recommendation import constants as rconst from official.recommendation import constants as rconst
...@@ -57,17 +57,17 @@ Eval: ...@@ -57,17 +57,17 @@ Eval:
_TRAIN_FEATURE_MAP = { _TRAIN_FEATURE_MAP = {
movielens.USER_COLUMN: tf.FixedLenFeature([], dtype=tf.string), movielens.USER_COLUMN: tf.io.FixedLenFeature([], dtype=tf.string),
movielens.ITEM_COLUMN: tf.FixedLenFeature([], dtype=tf.string), movielens.ITEM_COLUMN: tf.io.FixedLenFeature([], dtype=tf.string),
rconst.MASK_START_INDEX: tf.FixedLenFeature([1], dtype=tf.string), rconst.MASK_START_INDEX: tf.io.FixedLenFeature([1], dtype=tf.string),
"labels": tf.FixedLenFeature([], dtype=tf.string), "labels": tf.io.FixedLenFeature([], dtype=tf.string),
} }
_EVAL_FEATURE_MAP = { _EVAL_FEATURE_MAP = {
movielens.USER_COLUMN: tf.FixedLenFeature([], dtype=tf.string), movielens.USER_COLUMN: tf.io.FixedLenFeature([], dtype=tf.string),
movielens.ITEM_COLUMN: tf.FixedLenFeature([], dtype=tf.string), movielens.ITEM_COLUMN: tf.io.FixedLenFeature([], dtype=tf.string),
rconst.DUPLICATE_MASK: tf.FixedLenFeature([], dtype=tf.string) rconst.DUPLICATE_MASK: tf.io.FixedLenFeature([], dtype=tf.string)
} }
...@@ -200,7 +200,7 @@ class DatasetManager(object): ...@@ -200,7 +200,7 @@ class DatasetManager(object):
def start_construction(self): def start_construction(self):
if self._stream_files: if self._stream_files:
tf.gfile.MakeDirs(self.current_data_root) tf.io.gfile.makedirs(self.current_data_root)
template = os.path.join(self.current_data_root, rconst.SHARD_TEMPLATE) template = os.path.join(self.current_data_root, rconst.SHARD_TEMPLATE)
self._writers = [tf.io.TFRecordWriter(template.format(i)) self._writers = [tf.io.TFRecordWriter(template.format(i))
for i in range(rconst.NUM_FILE_SHARDS)] for i in range(rconst.NUM_FILE_SHARDS)]
...@@ -261,6 +261,10 @@ class DatasetManager(object): ...@@ -261,6 +261,10 @@ class DatasetManager(object):
file_pattern = os.path.join( file_pattern = os.path.join(
epoch_data_dir, rconst.SHARD_TEMPLATE.format("*")) epoch_data_dir, rconst.SHARD_TEMPLATE.format("*"))
# TODO: remove this contrib import
# pylint: disable=line-too-long
from tensorflow.contrib.tpu.python.tpu.datasets import StreamingFilesDataset
# pylint: enable=line-too-long
dataset = StreamingFilesDataset( dataset = StreamingFilesDataset(
files=file_pattern, worker_job=popen_helper.worker_job(), files=file_pattern, worker_job=popen_helper.worker_job(),
num_parallel_reads=rconst.NUM_FILE_SHARDS, num_epochs=1, num_parallel_reads=rconst.NUM_FILE_SHARDS, num_epochs=1,
...@@ -388,7 +392,7 @@ class BaseDataConstructor(threading.Thread): ...@@ -388,7 +392,7 @@ class BaseDataConstructor(threading.Thread):
self._shuffle_with_forkpool = not stream_files self._shuffle_with_forkpool = not stream_files
if stream_files: if stream_files:
self._shard_root = epoch_dir or tempfile.mkdtemp(prefix="ncf_") self._shard_root = epoch_dir or tempfile.mkdtemp(prefix="ncf_")
atexit.register(tf.gfile.DeleteRecursively, dirname=self._shard_root) atexit.register(tf.io.gfile.rmtree, dirname=self._shard_root)
else: else:
self._shard_root = None self._shard_root = None
...@@ -517,7 +521,7 @@ class BaseDataConstructor(threading.Thread): ...@@ -517,7 +521,7 @@ class BaseDataConstructor(threading.Thread):
time.sleep(0.01) time.sleep(0.01)
count += 1 count += 1
if count >= 100 and np.log10(count) == np.round(np.log10(count)): if count >= 100 and np.log10(count) == np.round(np.log10(count)):
tf.logging.info( logging.info(
"Waited {} times for training data to be consumed".format(count)) "Waited {} times for training data to be consumed".format(count))
def _construct_training_epoch(self): def _construct_training_epoch(self):
...@@ -537,7 +541,7 @@ class BaseDataConstructor(threading.Thread): ...@@ -537,7 +541,7 @@ class BaseDataConstructor(threading.Thread):
pool.map(self._get_training_batch, map_args) pool.map(self._get_training_batch, map_args)
self._train_dataset.end_construction() self._train_dataset.end_construction()
tf.logging.info("Epoch construction complete. Time: {:.1f} seconds".format( logging.info("Epoch construction complete. Time: {:.1f} seconds".format(
timeit.default_timer() - start_time)) timeit.default_timer() - start_time))
@staticmethod @staticmethod
...@@ -619,7 +623,7 @@ class BaseDataConstructor(threading.Thread): ...@@ -619,7 +623,7 @@ class BaseDataConstructor(threading.Thread):
pool.map(self._get_eval_batch, map_args) pool.map(self._get_eval_batch, map_args)
self._eval_dataset.end_construction() self._eval_dataset.end_construction()
tf.logging.info("Eval construction complete. Time: {:.1f} seconds".format( logging.info("Eval construction complete. Time: {:.1f} seconds".format(
timeit.default_timer() - start_time)) timeit.default_timer() - start_time))
def make_input_fn(self, is_training): def make_input_fn(self, is_training):
...@@ -760,7 +764,7 @@ class MaterializedDataConstructor(BaseDataConstructor): ...@@ -760,7 +764,7 @@ class MaterializedDataConstructor(BaseDataConstructor):
self._per_user_neg_count[i] = self._num_items - positives.shape[0] self._per_user_neg_count[i] = self._num_items - positives.shape[0]
self._negative_table[i, :self._per_user_neg_count[i]] = negatives self._negative_table[i, :self._per_user_neg_count[i]] = negatives
tf.logging.info("Negative sample table built. Time: {:.1f} seconds".format( logging.info("Negative sample table built. Time: {:.1f} seconds".format(
timeit.default_timer() - start_time)) timeit.default_timer() - start_time))
def lookup_negative_items(self, negative_users, **kwargs): def lookup_negative_items(self, negative_users, **kwargs):
...@@ -813,7 +817,7 @@ class BisectionDataConstructor(BaseDataConstructor): ...@@ -813,7 +817,7 @@ class BisectionDataConstructor(BaseDataConstructor):
self._total_negatives = np.concatenate([ self._total_negatives = np.concatenate([
self._index_segment(i) for i in range(self._num_users)]) self._index_segment(i) for i in range(self._num_users)])
tf.logging.info("Negative total vector built. Time: {:.1f} seconds".format( logging.info("Negative total vector built. Time: {:.1f} seconds".format(
timeit.default_timer() - start_time)) timeit.default_timer() - start_time))
def lookup_negative_items(self, negative_users, **kwargs): def lookup_negative_items(self, negative_users, **kwargs):
......
...@@ -28,6 +28,7 @@ import typing ...@@ -28,6 +28,7 @@ import typing
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import tensorflow as tf import tensorflow as tf
from absl import logging
# pylint: enable=wrong-import-order # pylint: enable=wrong-import-order
from official.datasets import movielens from official.datasets import movielens
...@@ -80,9 +81,9 @@ def _filter_index_sort(raw_rating_path, cache_path): ...@@ -80,9 +81,9 @@ def _filter_index_sort(raw_rating_path, cache_path):
IDs to regularized user IDs, and a dict mapping raw item IDs to regularized IDs to regularized user IDs, and a dict mapping raw item IDs to regularized
item IDs. item IDs.
""" """
valid_cache = tf.gfile.Exists(cache_path) valid_cache = tf.io.gfile.exists(cache_path)
if valid_cache: if valid_cache:
with tf.gfile.Open(cache_path, "rb") as f: with tf.io.gfile.GFile(cache_path, "rb") as f:
cached_data = pickle.load(f) cached_data = pickle.load(f)
cache_age = time.time() - cached_data.get("create_time", 0) cache_age = time.time() - cached_data.get("create_time", 0)
...@@ -94,13 +95,13 @@ def _filter_index_sort(raw_rating_path, cache_path): ...@@ -94,13 +95,13 @@ def _filter_index_sort(raw_rating_path, cache_path):
valid_cache = False valid_cache = False
if not valid_cache: if not valid_cache:
tf.logging.info("Removing stale raw data cache file.") logging.info("Removing stale raw data cache file.")
tf.gfile.Remove(cache_path) tf.io.gfile.remove(cache_path)
if valid_cache: if valid_cache:
data = cached_data data = cached_data
else: else:
with tf.gfile.Open(raw_rating_path) as f: with tf.io.gfile.GFile(raw_rating_path) as f:
df = pd.read_csv(f) df = pd.read_csv(f)
# Get the info of users who have more than 20 ratings on items # Get the info of users who have more than 20 ratings on items
...@@ -112,7 +113,7 @@ def _filter_index_sort(raw_rating_path, cache_path): ...@@ -112,7 +113,7 @@ def _filter_index_sort(raw_rating_path, cache_path):
original_items = df[movielens.ITEM_COLUMN].unique() original_items = df[movielens.ITEM_COLUMN].unique()
# Map the ids of user and item to 0 based index for following processing # Map the ids of user and item to 0 based index for following processing
tf.logging.info("Generating user_map and item_map...") logging.info("Generating user_map and item_map...")
user_map = {user: index for index, user in enumerate(original_users)} user_map = {user: index for index, user in enumerate(original_users)}
item_map = {item: index for index, item in enumerate(original_items)} item_map = {item: index for index, item in enumerate(original_items)}
...@@ -134,7 +135,7 @@ def _filter_index_sort(raw_rating_path, cache_path): ...@@ -134,7 +135,7 @@ def _filter_index_sort(raw_rating_path, cache_path):
# This sort is used to shard the dataframe by user, and later to select # This sort is used to shard the dataframe by user, and later to select
# the last item for a user to be used in validation. # the last item for a user to be used in validation.
tf.logging.info("Sorting by user, timestamp...") logging.info("Sorting by user, timestamp...")
# This sort is equivalent to # This sort is equivalent to
# df.sort_values([movielens.USER_COLUMN, movielens.TIMESTAMP_COLUMN], # df.sort_values([movielens.USER_COLUMN, movielens.TIMESTAMP_COLUMN],
...@@ -167,8 +168,8 @@ def _filter_index_sort(raw_rating_path, cache_path): ...@@ -167,8 +168,8 @@ def _filter_index_sort(raw_rating_path, cache_path):
"create_time": time.time(), "create_time": time.time(),
} }
tf.logging.info("Writing raw data cache.") logging.info("Writing raw data cache.")
with tf.gfile.Open(cache_path, "wb") as f: with tf.io.gfile.GFile(cache_path, "wb") as f:
pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL) pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL)
# TODO(robieta): MLPerf cache clear. # TODO(robieta): MLPerf cache clear.
...@@ -189,7 +190,7 @@ def instantiate_pipeline(dataset, data_dir, params, constructor_type=None, ...@@ -189,7 +190,7 @@ def instantiate_pipeline(dataset, data_dir, params, constructor_type=None,
deterministic: Tell the data constructor to produce deterministically. deterministic: Tell the data constructor to produce deterministically.
epoch_dir: Directory in which to store the training epochs. epoch_dir: Directory in which to store the training epochs.
""" """
tf.logging.info("Beginning data preprocessing.") logging.info("Beginning data preprocessing.")
st = timeit.default_timer() st = timeit.default_timer()
raw_rating_path = os.path.join(data_dir, dataset, movielens.RATINGS_FILE) raw_rating_path = os.path.join(data_dir, dataset, movielens.RATINGS_FILE)
...@@ -227,8 +228,8 @@ def instantiate_pipeline(dataset, data_dir, params, constructor_type=None, ...@@ -227,8 +228,8 @@ def instantiate_pipeline(dataset, data_dir, params, constructor_type=None,
) )
run_time = timeit.default_timer() - st run_time = timeit.default_timer() - st
tf.logging.info("Data preprocessing complete. Time: {:.1f} sec." logging.info("Data preprocessing complete. Time: {:.1f} sec."
.format(run_time)) .format(run_time))
print(producer) print(producer)
return num_users, num_items, producer return num_users, num_items, producer
...@@ -58,7 +58,7 @@ class BaseTest(tf.test.TestCase): ...@@ -58,7 +58,7 @@ class BaseTest(tf.test.TestCase):
def setUp(self): def setUp(self):
self.temp_data_dir = self.get_temp_dir() self.temp_data_dir = self.get_temp_dir()
ratings_folder = os.path.join(self.temp_data_dir, DATASET) ratings_folder = os.path.join(self.temp_data_dir, DATASET)
tf.gfile.MakeDirs(ratings_folder) tf.io.gfile.makedirs(ratings_folder)
np.random.seed(0) np.random.seed(0)
raw_user_ids = np.arange(NUM_USERS * 3) raw_user_ids = np.arange(NUM_USERS * 3)
np.random.shuffle(raw_user_ids) np.random.shuffle(raw_user_ids)
...@@ -76,7 +76,7 @@ class BaseTest(tf.test.TestCase): ...@@ -76,7 +76,7 @@ class BaseTest(tf.test.TestCase):
self.rating_file = os.path.join(ratings_folder, movielens.RATINGS_FILE) self.rating_file = os.path.join(ratings_folder, movielens.RATINGS_FILE)
self.seen_pairs = set() self.seen_pairs = set()
self.holdout = {} self.holdout = {}
with tf.gfile.Open(self.rating_file, "w") as f: with tf.io.gfile.GFile(self.rating_file, "w") as f:
f.write("user_id,item_id,rating,timestamp\n") f.write("user_id,item_id,rating,timestamp\n")
for usr, itm, scr, ts in zip(users, items, scores, times): for usr, itm, scr, ts in zip(users, items, scores, times):
pair = (usr, itm) pair = (usr, itm)
...@@ -341,5 +341,4 @@ class BaseTest(tf.test.TestCase): ...@@ -341,5 +341,4 @@ class BaseTest(tf.test.TestCase):
if __name__ == "__main__": if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
tf.test.main() tf.test.main()
...@@ -20,12 +20,12 @@ from __future__ import division ...@@ -20,12 +20,12 @@ from __future__ import division
from __future__ import print_function from __future__ import print_function
import json import json
import logging
import os import os
# pylint: disable=g-bad-import-order # pylint: disable=g-bad-import-order
import numpy as np import numpy as np
from absl import flags from absl import flags
from absl import logging
import tensorflow as tf import tensorflow as tf
# pylint: enable=g-bad-import-order # pylint: enable=g-bad-import-order
...@@ -109,18 +109,6 @@ def parse_flags(flags_obj): ...@@ -109,18 +109,6 @@ def parse_flags(flags_obj):
} }
def get_optimizer(params):
optimizer = tf.train.AdamOptimizer(
learning_rate=params["learning_rate"],
beta1=params["beta1"],
beta2=params["beta2"],
epsilon=params["epsilon"])
if params["use_tpu"]:
optimizer = tf.contrib.tpu.CrossShardOptimizer(optimizer)
return optimizer
def get_distribution_strategy(params): def get_distribution_strategy(params):
"""Returns the distribution strategy to use.""" """Returns the distribution strategy to use."""
if params["turn_off_distribution_strategy"]: if params["turn_off_distribution_strategy"]:
...@@ -132,14 +120,14 @@ def get_distribution_strategy(params): ...@@ -132,14 +120,14 @@ def get_distribution_strategy(params):
"oauth2client.transport"]: "oauth2client.transport"]:
logging.getLogger(name).setLevel(logging.ERROR) logging.getLogger(name).setLevel(logging.ERROR)
tpu_cluster_resolver = tf.contrib.cluster_resolver.TPUClusterResolver( tpu_cluster_resolver = tf.distribute.cluster_resolver.TPUClusterResolver(
tpu=params["tpu"], tpu=params["tpu"],
zone=params["tpu_zone"], zone=params["tpu_zone"],
project=params["tpu_gcp_project"], project=params["tpu_gcp_project"],
coordinator_name="coordinator" coordinator_name="coordinator"
) )
tf.logging.info("Issuing reset command to TPU to ensure a clean state.") logging.info("Issuing reset command to TPU to ensure a clean state.")
tf.Session.reset(tpu_cluster_resolver.get_master()) tf.Session.reset(tpu_cluster_resolver.get_master())
# Estimator looks at the master it connects to for MonitoredTrainingSession # Estimator looks at the master it connects to for MonitoredTrainingSession
...@@ -153,7 +141,7 @@ def get_distribution_strategy(params): ...@@ -153,7 +141,7 @@ def get_distribution_strategy(params):
} }
os.environ['TF_CONFIG'] = json.dumps(tf_config_env) os.environ['TF_CONFIG'] = json.dumps(tf_config_env)
distribution = tf.contrib.distribute.TPUStrategy( distribution = tf.distribute.experimental.TPUStrategy(
tpu_cluster_resolver, steps_per_run=100) tpu_cluster_resolver, steps_per_run=100)
else: else:
......
...@@ -25,7 +25,6 @@ from __future__ import print_function ...@@ -25,7 +25,6 @@ from __future__ import print_function
import contextlib import contextlib
import heapq import heapq
import json import json
import logging
import math import math
import multiprocessing import multiprocessing
import os import os
...@@ -36,10 +35,10 @@ import typing ...@@ -36,10 +35,10 @@ import typing
import numpy as np import numpy as np
from absl import app as absl_app from absl import app as absl_app
from absl import flags from absl import flags
from absl import logging
import tensorflow as tf import tensorflow as tf
# pylint: enable=g-bad-import-order # pylint: enable=g-bad-import-order
from tensorflow.contrib.compiler import xla
from official.datasets import movielens from official.datasets import movielens
from official.recommendation import constants as rconst from official.recommendation import constants as rconst
from official.recommendation import data_pipeline from official.recommendation import data_pipeline
...@@ -73,7 +72,9 @@ def construct_estimator(model_dir, params): ...@@ -73,7 +72,9 @@ def construct_estimator(model_dir, params):
model_fn = neumf_model.neumf_model_fn model_fn = neumf_model.neumf_model_fn
if params["use_xla_for_gpu"]: if params["use_xla_for_gpu"]:
tf.logging.info("Using XLA for GPU for training and evaluation.") # TODO: remove the contrib imput
from tensorflow.contrib.compiler import xla
logging.info("Using XLA for GPU for training and evaluation.")
model_fn = xla.estimator_model_fn(model_fn) model_fn = xla.estimator_model_fn(model_fn)
estimator = tf.estimator.Estimator(model_fn=model_fn, model_dir=model_dir, estimator = tf.estimator.Estimator(model_fn=model_fn, model_dir=model_dir,
config=run_config, params=params) config=run_config, params=params)
...@@ -133,7 +134,7 @@ def run_ncf(_): ...@@ -133,7 +134,7 @@ def run_ncf(_):
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.TRAIN_LOOP) mlperf_helper.ncf_print(key=mlperf_helper.TAGS.TRAIN_LOOP)
for cycle_index in range(total_training_cycle): for cycle_index in range(total_training_cycle):
assert FLAGS.epochs_between_evals == 1 or not mlperf_helper.LOGGER.enabled assert FLAGS.epochs_between_evals == 1 or not mlperf_helper.LOGGER.enabled
tf.logging.info("Starting a training cycle: {}/{}".format( logging.info("Starting a training cycle: {}/{}".format(
cycle_index + 1, total_training_cycle)) cycle_index + 1, total_training_cycle))
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.TRAIN_EPOCH, mlperf_helper.ncf_print(key=mlperf_helper.TAGS.TRAIN_EPOCH,
...@@ -143,13 +144,13 @@ def run_ncf(_): ...@@ -143,13 +144,13 @@ def run_ncf(_):
estimator.train(input_fn=train_input_fn, hooks=train_hooks, estimator.train(input_fn=train_input_fn, hooks=train_hooks,
steps=num_train_steps) steps=num_train_steps)
tf.logging.info("Beginning evaluation.") logging.info("Beginning evaluation.")
eval_input_fn = producer.make_input_fn(is_training=False) eval_input_fn = producer.make_input_fn(is_training=False)
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.EVAL_START, mlperf_helper.ncf_print(key=mlperf_helper.TAGS.EVAL_START,
value=cycle_index) value=cycle_index)
eval_results = estimator.evaluate(eval_input_fn, steps=num_eval_steps) eval_results = estimator.evaluate(eval_input_fn, steps=num_eval_steps)
tf.logging.info("Evaluation complete.") logging.info("Evaluation complete.")
hr = float(eval_results[rconst.HR_KEY]) hr = float(eval_results[rconst.HR_KEY])
ndcg = float(eval_results[rconst.NDCG_KEY]) ndcg = float(eval_results[rconst.NDCG_KEY])
...@@ -169,7 +170,7 @@ def run_ncf(_): ...@@ -169,7 +170,7 @@ def run_ncf(_):
# Benchmark the evaluation results # Benchmark the evaluation results
benchmark_logger.log_evaluation_result(eval_results) benchmark_logger.log_evaluation_result(eval_results)
# Log the HR and NDCG results. # Log the HR and NDCG results.
tf.logging.info( logging.info(
"Iteration {}: HR = {:.4f}, NDCG = {:.4f}, Loss = {:.4f}".format( "Iteration {}: HR = {:.4f}, NDCG = {:.4f}, Loss = {:.4f}".format(
cycle_index + 1, hr, ndcg, loss)) cycle_index + 1, hr, ndcg, loss))
...@@ -189,6 +190,6 @@ def run_ncf(_): ...@@ -189,6 +190,6 @@ def run_ncf(_):
if __name__ == "__main__": if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO) logging.set_verbosity(logging.INFO)
ncf_common.define_ncf_flags() ncf_common.define_ncf_flags()
absl_app.run(main) absl_app.run(main)
...@@ -27,6 +27,7 @@ import os ...@@ -27,6 +27,7 @@ import os
# pylint: disable=g-bad-import-order # pylint: disable=g-bad-import-order
from absl import app as absl_app from absl import app as absl_app
from absl import flags from absl import flags
from absl import logging
import tensorflow as tf import tensorflow as tf
# pylint: enable=g-bad-import-order # pylint: enable=g-bad-import-order
...@@ -46,9 +47,10 @@ FLAGS = flags.FLAGS ...@@ -46,9 +47,10 @@ FLAGS = flags.FLAGS
def _keras_loss(y_true, y_pred): def _keras_loss(y_true, y_pred):
# Here we are using the exact same loss used by the estimator # Here we are using the exact same loss used by the estimator
loss = tf.losses.sparse_softmax_cross_entropy( loss = tf.keras.losses.sparse_categorical_crossentropy(
labels=tf.cast(y_true, tf.int32), y_pred=y_pred,
logits=y_pred) y_true=tf.cast(y_true, tf.int32),
from_logits=True)
return loss return loss
...@@ -66,7 +68,7 @@ def _get_metric_fn(params): ...@@ -66,7 +68,7 @@ def _get_metric_fn(params):
# repetition correction # repetition correction
dup_mask = tf.zeros([batch_size, 1]) dup_mask = tf.zeros([batch_size, 1])
cross_entropy, metric_fn, in_top_k, ndcg, metric_weights = ( _, _, in_top_k, _, _ = (
neumf_model.compute_eval_loss_and_metrics_helper( neumf_model.compute_eval_loss_and_metrics_helper(
logits, logits,
softmax_logits, softmax_logits,
...@@ -155,13 +157,13 @@ def _get_keras_model(params): ...@@ -155,13 +157,13 @@ def _get_keras_model(params):
# is designed to be of batch_size 1 for each replica. # is designed to be of batch_size 1 for each replica.
user_input = tf.keras.layers.Input( user_input = tf.keras.layers.Input(
shape=(batch_size,), shape=(batch_size,),
batch_size=1, batch_size=params["batches_per_step"],
name=movielens.USER_COLUMN, name=movielens.USER_COLUMN,
dtype=tf.int32) dtype=tf.int32)
item_input = tf.keras.layers.Input( item_input = tf.keras.layers.Input(
shape=(batch_size,), shape=(batch_size,),
batch_size=1, batch_size=params["batches_per_step"],
name=movielens.ITEM_COLUMN, name=movielens.ITEM_COLUMN,
dtype=tf.int32) dtype=tf.int32)
...@@ -193,7 +195,7 @@ def run_ncf(_): ...@@ -193,7 +195,7 @@ def run_ncf(_):
"""Run NCF training and eval with Keras.""" """Run NCF training and eval with Keras."""
# TODO(seemuch): Support different train and eval batch sizes # TODO(seemuch): Support different train and eval batch sizes
if FLAGS.eval_batch_size != FLAGS.batch_size: if FLAGS.eval_batch_size != FLAGS.batch_size:
tf.logging.warning( logging.warning(
"The Keras implementation of NCF currently does not support batch_size " "The Keras implementation of NCF currently does not support batch_size "
"!= eval_batch_size ({} vs. {}). Overriding eval_batch_size to match " "!= eval_batch_size ({} vs. {}). Overriding eval_batch_size to match "
"batch_size".format(FLAGS.eval_batch_size, FLAGS.batch_size) "batch_size".format(FLAGS.eval_batch_size, FLAGS.batch_size)
...@@ -226,7 +228,11 @@ def run_ncf(_): ...@@ -226,7 +228,11 @@ def run_ncf(_):
strategy = ncf_common.get_distribution_strategy(params) strategy = ncf_common.get_distribution_strategy(params)
with distribution_utils.get_strategy_scope(strategy): with distribution_utils.get_strategy_scope(strategy):
keras_model = _get_keras_model(params) keras_model = _get_keras_model(params)
optimizer = ncf_common.get_optimizer(params) optimizer = tf.keras.optimizers.Adam(
learning_rate=params["learning_rate"],
beta_1=params["beta1"],
beta_2=params["beta2"],
epsilon=params["epsilon"])
time_callback = keras_utils.TimeHistory(batch_size, FLAGS.log_steps) time_callback = keras_utils.TimeHistory(batch_size, FLAGS.log_steps)
keras_model.compile( keras_model.compile(
...@@ -241,14 +247,14 @@ def run_ncf(_): ...@@ -241,14 +247,14 @@ def run_ncf(_):
time_callback], time_callback],
verbose=2) verbose=2)
tf.logging.info("Training done. Start evaluating") logging.info("Training done. Start evaluating")
eval_results = keras_model.evaluate( eval_results = keras_model.evaluate(
eval_input_dataset, eval_input_dataset,
steps=num_eval_steps, steps=num_eval_steps,
verbose=2) verbose=2)
tf.logging.info("Keras evaluation is done.") logging.info("Keras evaluation is done.")
stats = build_stats(history, eval_results, time_callback) stats = build_stats(history, eval_results, time_callback)
return stats return stats
...@@ -298,6 +304,5 @@ def main(_): ...@@ -298,6 +304,5 @@ def main(_):
if __name__ == "__main__": if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
ncf_common.define_ncf_flags() ncf_common.define_ncf_flags()
absl_app.run(main) absl_app.run(main)
...@@ -32,6 +32,7 @@ from official.recommendation import ncf_common ...@@ -32,6 +32,7 @@ from official.recommendation import ncf_common
from official.recommendation import ncf_estimator_main from official.recommendation import ncf_estimator_main
from official.recommendation import ncf_keras_main from official.recommendation import ncf_keras_main
from official.utils.testing import integration from official.utils.testing import integration
from tensorflow.python.eager import context # pylint: disable=ungrouped-imports
NUM_TRAIN_NEG = 4 NUM_TRAIN_NEG = 4
...@@ -204,7 +205,7 @@ class NcfTest(tf.test.TestCase): ...@@ -204,7 +205,7 @@ class NcfTest(tf.test.TestCase):
integration.run_synthetic( integration.run_synthetic(
ncf_keras_main.main, tmp_root=self.get_temp_dir(), max_train=None, ncf_keras_main.main, tmp_root=self.get_temp_dir(), max_train=None,
extra_flags=self._BASE_END_TO_END_FLAGS + extra_flags=self._BASE_END_TO_END_FLAGS +
['-distribution_strategy', 'off', '-log_steps', '100']) ['-distribution_strategy', 'off'])
@mock.patch.object(rconst, "SYNTHETIC_BATCHES_PER_EPOCH", 100) @mock.patch.object(rconst, "SYNTHETIC_BATCHES_PER_EPOCH", 100)
def test_end_to_end_keras_mlperf(self): def test_end_to_end_keras_mlperf(self):
...@@ -212,10 +213,29 @@ class NcfTest(tf.test.TestCase): ...@@ -212,10 +213,29 @@ class NcfTest(tf.test.TestCase):
ncf_keras_main.main, tmp_root=self.get_temp_dir(), max_train=None, ncf_keras_main.main, tmp_root=self.get_temp_dir(), max_train=None,
extra_flags=self._BASE_END_TO_END_FLAGS + extra_flags=self._BASE_END_TO_END_FLAGS +
['-ml_perf', 'True', ['-ml_perf', 'True',
'-distribution_strategy', 'off', '-distribution_strategy', 'off'])
'-log_steps', '100'])
@mock.patch.object(rconst, "SYNTHETIC_BATCHES_PER_EPOCH", 100)
def test_end_to_end_keras_1_gpu(self):
if context.num_gpus() < 1:
self.skipTest(
"{} GPUs are not available for this test. {} GPUs are available".
format(1, context.num_gpus()))
integration.run_synthetic(
ncf_keras_main.main, tmp_root=self.get_temp_dir(), max_train=None,
extra_flags=self._BASE_END_TO_END_FLAGS + ['-num_gpus', '1'])
@mock.patch.object(rconst, "SYNTHETIC_BATCHES_PER_EPOCH", 100)
def test_end_to_end_keras_2_gpu(self):
if context.num_gpus() < 2:
self.skipTest(
"{} GPUs are not available for this test. {} GPUs are available".
format(2, context.num_gpus()))
integration.run_synthetic(
ncf_keras_main.main, tmp_root=self.get_temp_dir(), max_train=None,
extra_flags=self._BASE_END_TO_END_FLAGS + ['-num_gpus', '2'])
if __name__ == "__main__": if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
tf.test.main() tf.test.main()
...@@ -109,12 +109,20 @@ def neumf_model_fn(features, labels, mode, params): ...@@ -109,12 +109,20 @@ def neumf_model_fn(features, labels, mode, params):
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.OPT_HP_ADAM_EPSILON, mlperf_helper.ncf_print(key=mlperf_helper.TAGS.OPT_HP_ADAM_EPSILON,
value=params["epsilon"]) value=params["epsilon"])
optimizer = ncf_common.get_optimizer(params)
optimizer = tf.compat.v1.train.AdamOptimizer(
learning_rate=params["learning_rate"],
beta1=params["beta1"],
beta2=params["beta2"],
epsilon=params["epsilon"])
if params["use_tpu"]:
# TODO: remove this contrib import
optimizer = tf.contrib.tpu.CrossShardOptimizer(optimizer)
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.MODEL_HP_LOSS_FN, mlperf_helper.ncf_print(key=mlperf_helper.TAGS.MODEL_HP_LOSS_FN,
value=mlperf_helper.TAGS.BCE) value=mlperf_helper.TAGS.BCE)
loss = tf.losses.sparse_softmax_cross_entropy( loss = tf.compat.v1.losses.sparse_softmax_cross_entropy(
labels=labels, labels=labels,
logits=softmax_logits, logits=softmax_logits,
weights=tf.cast(valid_pt_mask, tf.float32) weights=tf.cast(valid_pt_mask, tf.float32)
...@@ -123,14 +131,14 @@ def neumf_model_fn(features, labels, mode, params): ...@@ -123,14 +131,14 @@ def neumf_model_fn(features, labels, mode, params):
# This tensor is used by logging hooks. # This tensor is used by logging hooks.
tf.identity(loss, name="cross_entropy") tf.identity(loss, name="cross_entropy")
global_step = tf.train.get_global_step() global_step = tf.compat.v1.train.get_global_step()
tvars = tf.trainable_variables() tvars = tf.compat.v1.trainable_variables()
gradients = optimizer.compute_gradients( gradients = optimizer.compute_gradients(
loss, tvars, colocate_gradients_with_ops=True) loss, tvars, colocate_gradients_with_ops=True)
gradients = _sparse_to_dense_grads(gradients) gradients = _sparse_to_dense_grads(gradients)
minimize_op = optimizer.apply_gradients( minimize_op = optimizer.apply_gradients(
gradients, global_step=global_step, name="train") gradients, global_step=global_step, name="train")
update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS) update_ops = tf.compat.v1.get_collection(tf.compat.v1.GraphKeys.UPDATE_OPS)
train_op = tf.group(minimize_op, update_ops) train_op = tf.group(minimize_op, update_ops)
return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op) return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=train_op)
...@@ -381,15 +389,17 @@ def compute_eval_loss_and_metrics_helper(logits, # type: tf.Tensor ...@@ -381,15 +389,17 @@ def compute_eval_loss_and_metrics_helper(logits, # type: tf.Tensor
# ignore padded examples # ignore padded examples
example_weights *= tf.cast(expanded_metric_weights, tf.float32) example_weights *= tf.cast(expanded_metric_weights, tf.float32)
cross_entropy = tf.losses.sparse_softmax_cross_entropy( cross_entropy = tf.compat.v1.losses.sparse_softmax_cross_entropy(
logits=softmax_logits, labels=eval_labels, weights=example_weights) logits=softmax_logits, labels=eval_labels, weights=example_weights)
def metric_fn(top_k_tensor, ndcg_tensor, weight_tensor): def metric_fn(top_k_tensor, ndcg_tensor, weight_tensor):
return { return {
rconst.HR_KEY: tf.metrics.mean(top_k_tensor, weights=weight_tensor, rconst.HR_KEY: tf.compat.v1.metrics.mean(top_k_tensor,
name=rconst.HR_METRIC_NAME), weights=weight_tensor,
rconst.NDCG_KEY: tf.metrics.mean(ndcg_tensor, weights=weight_tensor, name=rconst.HR_METRIC_NAME),
name=rconst.NDCG_METRIC_NAME), rconst.NDCG_KEY: tf.compat.v1.metrics.mean(ndcg_tensor,
weights=weight_tensor,
name=rconst.NDCG_METRIC_NAME)
} }
return cross_entropy, metric_fn, in_top_k, ndcg, metric_weights return cross_entropy, metric_fn, in_top_k, ndcg, metric_weights
...@@ -428,7 +438,7 @@ def compute_top_k_and_ndcg(logits, # type: tf.Tensor ...@@ -428,7 +438,7 @@ def compute_top_k_and_ndcg(logits, # type: tf.Tensor
# Determine the location of the first element in each row after the elements # Determine the location of the first element in each row after the elements
# are sorted. # are sorted.
sort_indices = tf.contrib.framework.argsort( sort_indices = tf.argsort(
logits_by_user, axis=1, direction="DESCENDING") logits_by_user, axis=1, direction="DESCENDING")
# Use matrix multiplication to extract the position of the true item from the # Use matrix multiplication to extract the position of the true item from the
...@@ -443,7 +453,8 @@ def compute_top_k_and_ndcg(logits, # type: tf.Tensor ...@@ -443,7 +453,8 @@ def compute_top_k_and_ndcg(logits, # type: tf.Tensor
position_vector = tf.reduce_sum(sparse_positions, axis=1) position_vector = tf.reduce_sum(sparse_positions, axis=1)
in_top_k = tf.cast(tf.less(position_vector, rconst.TOP_K), tf.float32) in_top_k = tf.cast(tf.less(position_vector, rconst.TOP_K), tf.float32)
ndcg = tf.log(2.) / tf.log(tf.cast(position_vector, tf.float32) + 2) ndcg = tf.math.log(2.) / tf.math.log(
tf.cast(position_vector, tf.float32) + 2)
ndcg *= in_top_k ndcg *= in_top_k
# If a row is a padded row, all but the first element will be a duplicate. # If a row is a padded row, all but the first element will be a duplicate.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment