data_preprocessing.py 9.04 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Preprocess dataset and construct any necessary artifacts."""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os
import pickle
import time
import timeit
import typing

# pylint: disable=wrong-import-order
import numpy as np
import pandas as pd
import tensorflow as tf
31
from absl import logging
32
33
34
35
# pylint: enable=wrong-import-order

from official.datasets import movielens
from official.recommendation import constants as rconst
36
from official.recommendation import data_pipeline
37
from official.utils.logs import mlperf_helper
38
39


40
41
42
43
44
45
DATASET_TO_NUM_USERS_AND_ITEMS = {
    "ml-1m": (6040, 3706),
    "ml-20m": (138493, 26744)
}


46
47
_EXPECTED_CACHE_KEYS = (
    rconst.TRAIN_USER_KEY, rconst.TRAIN_ITEM_KEY, rconst.EVAL_USER_KEY,
48
    rconst.EVAL_ITEM_KEY, rconst.USER_MAP, rconst.ITEM_MAP)
49
50


Taylor Robie's avatar
Taylor Robie committed
51
def _filter_index_sort(raw_rating_path, cache_path):
52
  # type: (str, str, bool) -> (dict, bool)
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
  """Read in data CSV, and output structured data.

  This function reads in the raw CSV of positive items, and performs three
  preprocessing transformations:

  1)  Filter out all users who have not rated at least a certain number
      of items. (Typically 20 items)

  2)  Zero index the users and items such that the largest user_id is
      `num_users - 1` and the largest item_id is `num_items - 1`

  3)  Sort the dataframe by user_id, with timestamp as a secondary sort key.
      This allows the dataframe to be sliced by user in-place, and for the last
      item to be selected simply by calling the `-1` index of a user's slice.

  While all of these transformations are performed by Pandas (and are therefore
  single-threaded), they only take ~2 minutes, and the overhead to apply a
  MapReduce pattern to parallel process the dataset adds significant complexity
  for no computational gain. For a larger dataset parallelizing this
  preprocessing could yield speedups. (Also, this preprocessing step is only
  performed once for an entire run.

  Args:
    raw_rating_path: The path to the CSV which contains the raw dataset.
77
    cache_path: The path to the file where results of this function are saved.
78
79

  Returns:
Reed's avatar
Reed committed
80
81
82
    A filtered, zero-index remapped, sorted dataframe, a dict mapping raw user
    IDs to regularized user IDs, and a dict mapping raw item IDs to regularized
    item IDs.
83
  """
84
  valid_cache = tf.io.gfile.exists(cache_path)
85
  if valid_cache:
86
    with tf.io.gfile.GFile(cache_path, "rb") as f:
87
      cached_data = pickle.load(f)
88

89
90
91
    cache_age = time.time() - cached_data.get("create_time", 0)
    if cache_age > rconst.CACHE_INVALIDATION_SEC:
      valid_cache = False
92

93
94
95
    for key in _EXPECTED_CACHE_KEYS:
      if key not in cached_data:
        valid_cache = False
96

97
    if not valid_cache:
98
99
      logging.info("Removing stale raw data cache file.")
      tf.io.gfile.remove(cache_path)
100

101
102
103
  if valid_cache:
    data = cached_data
  else:
104
    with tf.io.gfile.GFile(raw_rating_path) as f:
105
106
107
108
109
110
111
112
113
114
115
      df = pd.read_csv(f)

    # Get the info of users who have more than 20 ratings on items
    grouped = df.groupby(movielens.USER_COLUMN)
    df = grouped.filter(
        lambda x: len(x) >= rconst.MIN_NUM_RATINGS) # type: pd.DataFrame

    original_users = df[movielens.USER_COLUMN].unique()
    original_items = df[movielens.ITEM_COLUMN].unique()

    # Map the ids of user and item to 0 based index for following processing
116
    logging.info("Generating user_map and item_map...")
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
    user_map = {user: index for index, user in enumerate(original_users)}
    item_map = {item: index for index, item in enumerate(original_items)}

    df[movielens.USER_COLUMN] = df[movielens.USER_COLUMN].apply(
        lambda user: user_map[user])
    df[movielens.ITEM_COLUMN] = df[movielens.ITEM_COLUMN].apply(
        lambda item: item_map[item])

    num_users = len(original_users)
    num_items = len(original_items)

    mlperf_helper.ncf_print(key=mlperf_helper.TAGS.PREPROC_HP_NUM_EVAL,
                            value=rconst.NUM_EVAL_NEGATIVES)

    assert num_users <= np.iinfo(rconst.USER_DTYPE).max
    assert num_items <= np.iinfo(rconst.ITEM_DTYPE).max
    assert df[movielens.USER_COLUMN].max() == num_users - 1
    assert df[movielens.ITEM_COLUMN].max() == num_items - 1

    # This sort is used to shard the dataframe by user, and later to select
    # the last item for a user to be used in validation.
138
    logging.info("Sorting by user, timestamp...")
139
140
141
142
143
144
145
146
147
148
149

    # This sort is equivalent to
    #   df.sort_values([movielens.USER_COLUMN, movielens.TIMESTAMP_COLUMN],
    #   inplace=True)
    # except that the order of items with the same user and timestamp are
    # sometimes different. For some reason, this sort results in a better
    # hit-rate during evaluation, matching the performance of the MLPerf
    # reference implementation.
    df.sort_values(by=movielens.TIMESTAMP_COLUMN, inplace=True)
    df.sort_values([movielens.USER_COLUMN, movielens.TIMESTAMP_COLUMN],
                   inplace=True, kind="mergesort")
150

151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
    df = df.reset_index()  # The dataframe does not reconstruct indices in the
                           # sort or filter steps.

    grouped = df.groupby(movielens.USER_COLUMN, group_keys=False)
    eval_df, train_df = grouped.tail(1), grouped.apply(lambda x: x.iloc[:-1])

    data = {
        rconst.TRAIN_USER_KEY: train_df[movielens.USER_COLUMN]
                               .values.astype(rconst.USER_DTYPE),
        rconst.TRAIN_ITEM_KEY: train_df[movielens.ITEM_COLUMN]
                               .values.astype(rconst.ITEM_DTYPE),
        rconst.EVAL_USER_KEY: eval_df[movielens.USER_COLUMN]
                              .values.astype(rconst.USER_DTYPE),
        rconst.EVAL_ITEM_KEY: eval_df[movielens.ITEM_COLUMN]
                              .values.astype(rconst.ITEM_DTYPE),
        rconst.USER_MAP: user_map,
        rconst.ITEM_MAP: item_map,
        "create_time": time.time(),
    }

171
172
    logging.info("Writing raw data cache.")
    with tf.io.gfile.GFile(cache_path, "wb") as f:
173
174
175
176
177
178
      pickle.dump(data, f, protocol=pickle.HIGHEST_PROTOCOL)

  # TODO(robieta): MLPerf cache clear.
  return data, valid_cache


179
def instantiate_pipeline(dataset, data_dir, params, constructor_type=None,
180
                         deterministic=False, epoch_dir=None):
cclauss's avatar
cclauss committed
181
  # type: (str, str, dict, typing.Optional[str], bool, typing.Optional[str]) -> (int, int, data_pipeline.BaseDataConstructor)
182
183
184
185
186
  """Load and digest data CSV into a usable form.

  Args:
    dataset: The name of the dataset to be used.
    data_dir: The root directory of the dataset.
Taylor Robie's avatar
Taylor Robie committed
187
    params: dict of parameters for the run.
188
189
190
    constructor_type: The name of the constructor subclass that should be used
      for the input pipeline.
    deterministic: Tell the data constructor to produce deterministically.
191
    epoch_dir: Directory in which to store the training epochs.
192
  """
193
  logging.info("Beginning data preprocessing.")
194
195
196

  st = timeit.default_timer()
  raw_rating_path = os.path.join(data_dir, dataset, movielens.RATINGS_FILE)
197
198
  cache_path = os.path.join(data_dir, dataset, rconst.RAW_CACHE_FILE)

Taylor Robie's avatar
Taylor Robie committed
199
  raw_data, _ = _filter_index_sort(raw_rating_path, cache_path)
200
  user_map, item_map = raw_data["user_map"], raw_data["item_map"]
201
202
203
204
205
206
207
208
  num_users, num_items = DATASET_TO_NUM_USERS_AND_ITEMS[dataset]

  if num_users != len(user_map):
    raise ValueError("Expected to find {} users, but found {}".format(
        num_users, len(user_map)))
  if num_items != len(item_map):
    raise ValueError("Expected to find {} items, but found {}".format(
        num_items, len(item_map)))
209

210
  producer = data_pipeline.get_constructor(constructor_type or "materialized")(
211
212
213
214
215
216
217
218
219
220
221
222
223
224
      maximum_number_epochs=params["train_epochs"],
      num_users=num_users,
      num_items=num_items,
      user_map=user_map,
      item_map=item_map,
      train_pos_users=raw_data[rconst.TRAIN_USER_KEY],
      train_pos_items=raw_data[rconst.TRAIN_ITEM_KEY],
      train_batch_size=params["batch_size"],
      batches_per_train_step=params["batches_per_step"],
      num_train_negatives=params["num_neg"],
      eval_pos_users=raw_data[rconst.EVAL_USER_KEY],
      eval_pos_items=raw_data[rconst.EVAL_ITEM_KEY],
      eval_batch_size=params["eval_batch_size"],
      batches_per_eval_step=params["batches_per_step"],
225
      stream_files=params["use_tpu"],
226
227
      deterministic=deterministic,
      epoch_dir=epoch_dir
228
  )
229
230

  run_time = timeit.default_timer() - st
231
232
  logging.info("Data preprocessing complete. Time: {:.1f} sec."
               .format(run_time))
233

234
235
  print(producer)
  return num_users, num_items, producer