Commit 441c8f40 authored by qianyj's avatar qianyj
Browse files

update TF code

parent ec90ad8e
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Preprocess dataset and construct any necessary artifacts."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import pickle
import time
import timeit
import typing
# pylint: disable=wrong-import-order
import numpy as np
import pandas as pd
import tensorflow as tf
# pylint: enable=wrong-import-order
from official.datasets import movielens
from official.recommendation import constants as rconst
from official.recommendation import data_pipeline
from official.utils.logs import mlperf_helper
DATASET_TO_NUM_USERS_AND_ITEMS = {
"ml-1m": (6040, 3706),
"ml-20m": (138493, 26744)
}
_EXPECTED_CACHE_KEYS = (
rconst.TRAIN_USER_KEY, rconst.TRAIN_ITEM_KEY, rconst.EVAL_USER_KEY,
rconst.EVAL_ITEM_KEY, rconst.USER_MAP, rconst.ITEM_MAP)
def _filter_index_sort(raw_rating_path, cache_path):
# type: (str, str, bool) -> (dict, bool)
"""Read in data CSV, and output structured data.
This function reads in the raw CSV of positive items, and performs three
preprocessing transformations:
1) Filter out all users who have not rated at least a certain number
of items. (Typically 20 items)
2) Zero index the users and items such that the largest user_id is
`num_users - 1` and the largest item_id is `num_items - 1`
3) Sort the dataframe by user_id, with timestamp as a secondary sort key.
This allows the dataframe to be sliced by user in-place, and for the last
item to be selected simply by calling the `-1` index of a user's slice.
While all of these transformations are performed by Pandas (and are therefore
single-threaded), they only take ~2 minutes, and the overhead to apply a
MapReduce pattern to parallel process the dataset adds significant complexity
for no computational gain. For a larger dataset parallelizing this
preprocessing could yield speedups. (Also, this preprocessing step is only
performed once for an entire run.
Args:
raw_rating_path: The path to the CSV which contains the raw dataset.
cache_path: The path to the file where results of this function are saved.
Returns:
A filtered, zero-index remapped, sorted dataframe, a dict mapping raw user
IDs to regularized user IDs, and a dict mapping raw item IDs to regularized
item IDs.
"""
valid_cache = tf.gfile.Exists(cache_path)
if valid_cache:
with tf.gfile.Open(cache_path, "rb") as f:
cached_data = pickle.load(f)
cache_age = time.time() - cached_data.get("create_time", 0)
if cache_age > rconst.CACHE_INVALIDATION_SEC:
valid_cache = False
for key in _EXPECTED_CACHE_KEYS:
if key not in cached_data:
valid_cache = False
if not valid_cache:
tf.logging.info("Removing stale raw data cache file.")
tf.gfile.Remove(cache_path)
if valid_cache:
data = cached_data
else:
with tf.gfile.Open(raw_rating_path) as f:
df = pd.read_csv(f)
# Get the info of users who have more than 20 ratings on items
grouped = df.groupby(movielens.USER_COLUMN)
df = grouped.filter(
lambda x: len(x) >= rconst.MIN_NUM_RATINGS) # type: pd.DataFrame
original_users = df[movielens.USER_COLUMN].unique()
original_items = df[movielens.ITEM_COLUMN].unique()
# Map the ids of user and item to 0 based index for following processing
tf.logging.info("Generating user_map and item_map...")
user_map = {user: index for index, user in enumerate(original_users)}
item_map = {item: index for index, item in enumerate(original_items)}
df[movielens.USER_COLUMN] = df[movielens.USER_COLUMN].apply(
lambda user: user_map[user])
df[movielens.ITEM_COLUMN] = df[movielens.ITEM_COLUMN].apply(
lambda item: item_map[item])
num_users = len(original_users)
num_items = len(original_items)
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.PREPROC_HP_NUM_EVAL,
value=rconst.NUM_EVAL_NEGATIVES)
assert num_users <= np.iinfo(rconst.USER_DTYPE).max
assert num_items <= np.iinfo(rconst.ITEM_DTYPE).max
assert df[movielens.USER_COLUMN].max() == num_users - 1
assert df[movielens.ITEM_COLUMN].max() == num_items - 1
# This sort is used to shard the dataframe by user, and later to select
# the last item for a user to be used in validation.
tf.logging.info("Sorting by user, timestamp...")
# This sort is equivalent to
# df.sort_values([movielens.USER_COLUMN, movielens.TIMESTAMP_COLUMN],
# inplace=True)
# except that the order of items with the same user and timestamp are
# sometimes different. For some reason, this sort results in a better
# hit-rate during evaluation, matching the performance of the MLPerf
# reference implementation.
df.sort_values(by=movielens.TIMESTAMP_COLUMN, inplace=True)
df.sort_values([movielens.USER_COLUMN, movielens.TIMESTAMP_COLUMN],
inplace=True, kind="mergesort")
df = df.reset_index() # The dataframe does not reconstruct indices in the
# sort or filter steps.
grouped = df.groupby(movielens.USER_COLUMN, group_keys=False)
eval_df, train_df = grouped.tail(1), grouped.apply(lambda x: x.iloc[:-1])
data = {
rconst.TRAIN_USER_KEY: train_df[movielens.USER_COLUMN]
.values.astype(rconst.USER_DTYPE),
rconst.TRAIN_ITEM_KEY: train_df[movielens.ITEM_COLUMN]
.values.astype(rconst.ITEM_DTYPE),
rconst.EVAL_USER_KEY: eval_df[movielens.USER_COLUMN]
.values.astype(rconst.USER_DTYPE),
rconst.EVAL_ITEM_KEY: eval_df[movielens.ITEM_COLUMN]
.values.astype(rconst.ITEM_DTYPE),
rconst.USER_MAP: user_map,
rconst.ITEM_MAP: item_map,
"create_time": time.time(),
}
tf.logging.info("Writing raw data cache.")
with tf.gfile.Open(cache_path, "wb") as f:
pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL)
# TODO(robieta): MLPerf cache clear.
return data, valid_cache
def instantiate_pipeline(dataset, data_dir, params, constructor_type=None,
deterministic=False):
# type: (str, str, dict, typing.Optional[str], bool) -> (NCFDataset, typing.Callable)
"""Load and digest data CSV into a usable form.
Args:
dataset: The name of the dataset to be used.
data_dir: The root directory of the dataset.
params: dict of parameters for the run.
constructor_type: The name of the constructor subclass that should be used
for the input pipeline.
deterministic: Tell the data constructor to produce deterministically.
"""
tf.logging.info("Beginning data preprocessing.")
st = timeit.default_timer()
raw_rating_path = os.path.join(data_dir, dataset, movielens.RATINGS_FILE)
cache_path = os.path.join(data_dir, dataset, rconst.RAW_CACHE_FILE)
raw_data, _ = _filter_index_sort(raw_rating_path, cache_path)
user_map, item_map = raw_data["user_map"], raw_data["item_map"]
num_users, num_items = DATASET_TO_NUM_USERS_AND_ITEMS[dataset]
if num_users != len(user_map):
raise ValueError("Expected to find {} users, but found {}".format(
num_users, len(user_map)))
if num_items != len(item_map):
raise ValueError("Expected to find {} items, but found {}".format(
num_items, len(item_map)))
producer = data_pipeline.get_constructor(constructor_type or "materialized")(
maximum_number_epochs=params["train_epochs"],
num_users=num_users,
num_items=num_items,
user_map=user_map,
item_map=item_map,
train_pos_users=raw_data[rconst.TRAIN_USER_KEY],
train_pos_items=raw_data[rconst.TRAIN_ITEM_KEY],
train_batch_size=params["batch_size"],
batches_per_train_step=params["batches_per_step"],
num_train_negatives=params["num_neg"],
eval_pos_users=raw_data[rconst.EVAL_USER_KEY],
eval_pos_items=raw_data[rconst.EVAL_ITEM_KEY],
eval_batch_size=params["eval_batch_size"],
batches_per_eval_step=params["batches_per_step"],
stream_files=params["use_tpu"],
deterministic=deterministic
)
run_time = timeit.default_timer() - st
tf.logging.info("Data preprocessing complete. Time: {:.1f} sec."
.format(run_time))
print(producer)
return num_users, num_items, producer
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Helper file for running the async data generation process in OSS."""
import contextlib
import multiprocessing
import multiprocessing.pool
def get_forkpool(num_workers, init_worker=None, closing=True):
pool = multiprocessing.Pool(processes=num_workers, initializer=init_worker)
return contextlib.closing(pool) if closing else pool
def get_threadpool(num_workers, init_worker=None, closing=True):
pool = multiprocessing.pool.ThreadPool(processes=num_workers,
initializer=init_worker)
return contextlib.closing(pool) if closing else pool
class FauxPool(object):
"""Mimic a pool using for loops.
This class is used in place of proper pools when true determinism is desired
for testing or debugging.
"""
def __init__(self, *args, **kwargs):
pass
def map(self, func, iterable, chunksize=None):
return [func(i) for i in iterable]
def imap(self, func, iterable, chunksize=1):
for i in iterable:
yield func(i)
def close(self):
pass
def terminate(self):
pass
def join(self):
pass
def get_fauxpool(num_workers, init_worker=None, closing=True):
pool = FauxPool(processes=num_workers, initializer=init_worker)
return contextlib.closing(pool) if closing else pool
#!/bin/bash
set -e
if [ `id -u` != 0 ]; then
echo "Calling sudo to gain root for this shell. (Needed to clear caches.)"
sudo echo "Success"
fi
SCRIPT_DIR=`dirname "$BASH_SOURCE"`
export PYTHONPATH="${SCRIPT_DIR}/../../"
DATASET="ml-20m"
BUCKET=${BUCKET:-""}
ROOT_DIR="${BUCKET:-/tmp}/MLPerf_NCF"
echo "Root directory: ${ROOT_DIR}"
if [[ -z ${BUCKET} ]]; then
LOCAL_ROOT=${ROOT_DIR}
else
LOCAL_ROOT="/tmp/MLPerf_NCF"
mkdir -p ${LOCAL_ROOT}
echo "Local root (for files which cannot use GCS): ${LOCAL_ROOT}"
fi
DATE=$(date '+%Y-%m-%d_%H:%M:%S')
TEST_DIR="${ROOT_DIR}/${DATE}"
LOCAL_TEST_DIR="${LOCAL_ROOT}/${DATE}"
mkdir -p ${LOCAL_TEST_DIR}
TPU=${TPU:-""}
if [[ -z ${TPU} ]]; then
DEVICE_FLAG="--num_gpus -1" # --use_xla_for_gpu"
else
DEVICE_FLAG="--tpu ${TPU} --num_gpus 0"
fi
DATA_DIR="${ROOT_DIR}/movielens_data"
python "${SCRIPT_DIR}/../datasets/movielens.py" --data_dir ${DATA_DIR} --dataset ${DATASET}
{
for i in `seq 0 4`;
do
START_TIME=$(date +%s)
MODEL_DIR="${TEST_DIR}/model_dir_${i}"
RUN_LOG="${LOCAL_TEST_DIR}/run_${i}.log"
export COMPLIANCE_FILE="${LOCAL_TEST_DIR}/run_${i}_compliance_raw.log"
export STITCHED_COMPLIANCE_FILE="${LOCAL_TEST_DIR}/run_${i}_compliance_submission.log"
echo ""
echo "Beginning run ${i}"
echo " Complete output logs are in ${RUN_LOG}"
echo " Compliance logs: (submission log is created after run.)"
echo " ${COMPLIANCE_FILE}"
echo " ${STITCHED_COMPLIANCE_FILE}"
# To reduce variation set the seed flag:
# --seed ${i}
python -u "${SCRIPT_DIR}/ncf_main.py" \
--model_dir ${MODEL_DIR} \
--data_dir ${DATA_DIR} \
--dataset ${DATASET} --hooks "" \
${DEVICE_FLAG} \
--clean \
--train_epochs 14 \
--batch_size 98304 \
--eval_batch_size 160000 \
--learning_rate 0.00382059 \
--beta1 0.783529 \
--beta2 0.909003 \
--epsilon 1.45439e-07 \
--layers 256,256,128,64 --num_factors 64 \
--hr_threshold 0.635 \
--ml_perf \
|& tee ${RUN_LOG} \
| grep --line-buffered -E --regexp="(Iteration [0-9]+: HR = [0-9\.]+, NDCG = [0-9\.]+, Loss = [0-9\.]+)|(pipeline_hash)|(MLPerf time:)"
END_TIME=$(date +%s)
echo "Run ${i} complete: $(( $END_TIME - $START_TIME )) seconds."
# Don't fill up the local hard drive.
if [[ -z ${BUCKET} ]]; then
echo "Removing model directory to save space."
rm -r ${MODEL_DIR}
fi
done
} |& tee "${LOCAL_TEST_DIR}/summary.log"
google-api-python-client>=1.6.7
google-cloud-bigquery>=0.31.0
kaggle>=1.3.9
mlperf_compliance==0.0.10
numpy>=1.15.4
oauth2client>=4.1.2
pandas>=0.22.0
psutil>=5.4.3
py-cpuinfo>=3.3.0
scipy>=0.19.1
typing
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment