dataset.py 8.65 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Prepare dataset for NCF.

Load the training dataset and evaluation dataset from csv file into memory.
Prepare input for model training and evaluation.
"""
import time

import numpy as np
from six.moves import xrange  # pylint: disable=redefined-builtin
import tensorflow as tf

from official.recommendation import constants  # pylint: disable=g-bad-import-order

# The column names and types of csv file
_CSV_COLUMN_NAMES = [constants.USER, constants.ITEM, constants.RATING]
_CSV_TYPES = [[0], [0], [0]]

# The buffer size for shuffling train dataset.
_SHUFFLE_BUFFER_SIZE = 1024


class NCFDataSet(object):
  """A class containing data information for model training and evaluation."""

  def __init__(self, train_data, num_users, num_items, num_negatives,
               true_items, all_items):
    """Initialize NCFDataset class.

    Args:
      train_data: A list containing the positive training instances.
      num_users: An integer, the number of users in training dataset.
      num_items: An integer, the number of items in training dataset.
      num_negatives: An integer, the number of negative instances for each user
        in train dataset.
      true_items: A list, the ground truth (positive) items of users for
        evaluation. Each entry is a latest positive instance for one user.
      all_items: A nested list, all items for evaluation, and each entry is the
        evaluation items for one user.
    """
    self.train_data = train_data
    self.num_users = num_users
    self.num_items = num_items
    self.num_negatives = num_negatives
    self.eval_true_items = true_items
    self.eval_all_items = all_items


def load_data(file_name):
  """Load data from a csv file which splits on \t."""
  lines = tf.gfile.Open(file_name, "r").readlines()

  # Process the file line by line
  def _process_line(line):
    return [int(col) for col in line.split("\t")]

  data = [_process_line(line) for line in lines]
  return data


def data_preprocessing(train_fname, test_fname, test_neg_fname, num_negatives):
  """Preprocess the train and test dataset.

  In data preprocessing, the training positive instances are loaded into memory
  for random negative instance generation in each training epoch. The test
  dataset are generated from test positive and negative instances.

  Args:
    train_fname: A string, the file name of training positive dataset.
    test_fname: A string, the file name of test positive dataset. Each user has
      one positive instance.
    test_neg_fname: A string, the file name of test negative dataset. Each user
      has 100 negative instances by default.
    num_negatives: An integer, the number of negative instances for each user
      in train dataset.

  Returns:
    ncf_dataset: A NCFDataset object containing information about training and
      evaluation/test dataset.
  """
  # Load training positive instances into memory for later train data generation
  train_data = load_data(train_fname)
  # Get total number of users in the dataset
  num_users = len(np.unique(np.array(train_data)[:, 0]))

  # Process test dataset to csv file
  test_ratings = load_data(test_fname)
  test_negatives = load_data(test_neg_fname)
  # Get the total number of items in both train dataset and test dataset (the
  # whole dataset)
  num_items = len(
      set(np.array(train_data)[:, 1]) | set(np.array(test_ratings)[:, 1]))

  # Generate test instances for each user
  true_items, all_items = [], []
  all_test_data = []
  for idx in range(num_users):
    items = test_negatives[idx]
    rating = test_ratings[idx]
    user = rating[0]  # User
    true_item = rating[1]  # Positive item as ground truth

    # All items with first 100 as negative and last one positive
    items.append(true_item)
    users = np.full(len(items), user, dtype=np.int32)

    users_items = list(zip(users, items))  # User-item list
    true_items.append(true_item)  # all ground truth items
    all_items.append(items)  # All items (including positive and negative items)
    all_test_data.extend(users_items)  # Generate test dataset

  # Save test dataset into csv file
  np.savetxt(constants.TEST_DATA, np.asarray(all_test_data).astype(int),
             fmt="%i", delimiter=",")

  # Create NCFDataset object
  ncf_dataset = NCFDataSet(
      train_data, num_users, num_items, num_negatives, true_items, all_items)

  return ncf_dataset


def generate_train_dataset(train_data, num_items, num_negatives):
  """Generate train dataset for each epoch.

  Given positive training instances, randomly generate negative instances to
  form the training dataset.

  Args:
    train_data: A list of positive training instances.
    num_items: An integer, the number of items in positive training instances.
    num_negatives: An integer, the number of negative training instances
      following positive training instances. It is 4 by default.
  """
  all_train_data = []
  # A set with user-item tuples
  train_data_set = set((u, i) for u, i, _ in train_data)
  for u, i, _ in train_data:
    # Positive instance
    all_train_data.append([u, i, 1])
    # Negative instances, randomly generated
    for _ in xrange(num_negatives):
      j = np.random.randint(num_items)
      while (u, j) in train_data_set:
        j = np.random.randint(num_items)
      all_train_data.append([u, j, 0])

  # Save the train dataset into a csv file
  np.savetxt(constants.TRAIN_DATA, np.asarray(all_train_data).astype(int),
             fmt="%i", delimiter=",")


def input_fn(training, batch_size, repeat=1, ncf_dataset=None,
             num_parallel_calls=1):
  """Input function for model training and evaluation.

  The train input consists of 1 positive instance (user and item have
  interactions) followed by some number of negative instances in which the items
  are randomly chosen. The number of negative instances is "num_negatives" which
  is 4 by default. Note that for each epoch, we need to re-generate the negative
  instances. Together with positive instances, they form a new train dataset.

  Args:
    training: A boolean flag for training mode.
    batch_size: An integer, batch size for training and evaluation.
    repeat: An integer, how many times to repeat the dataset.
    ncf_dataset: An NCFDataSet object, which contains the information to
      generate negative training instances.
    num_parallel_calls: An integer, number of cpu cores for parallel input
      processing.

  Returns:
    dataset: A tf.data.Dataset object containing examples loaded from the files.
  """
  # Default test file name
  file_name = constants.TEST_DATA

  # Generate random negative instances for training in each epoch
  if training:
    t1 = time.time()
    generate_train_dataset(
        ncf_dataset.train_data, ncf_dataset.num_items,
        ncf_dataset.num_negatives)
    file_name = constants.TRAIN_DATA
    tf.logging.info(
        "Generating training instances: {:.1f}s".format(time.time() - t1))

  # Create a dataset containing the text lines.
  dataset = tf.data.TextLineDataset(file_name)

  # Test dataset only has two fields while train dataset has three
  num_cols = len(_CSV_COLUMN_NAMES) - 1
  # Shuffle the dataset for training
  if training:
    dataset = dataset.shuffle(buffer_size=_SHUFFLE_BUFFER_SIZE)
    num_cols += 1

  def _parse_csv(line):
    """Parse each line of the csv file."""
    # Decode the line into its fields
    fields = tf.decode_csv(line, record_defaults=_CSV_TYPES[0:num_cols])
    fields = [tf.expand_dims(field, axis=0) for field in fields]

    # Pack the result into a dictionary
    features = dict(zip(_CSV_COLUMN_NAMES[0:num_cols], fields))
    # Separate the labels from the features for training
    if training:
      labels = features.pop(constants.RATING)
      return features, labels
    # Return features only for test/prediction
    return features

  # Parse each line into a dictionary
  dataset = dataset.map(_parse_csv, num_parallel_calls=num_parallel_calls)

  # Repeat and batch the dataset
  dataset = dataset.repeat(repeat)
  dataset = dataset.batch(batch_size)

  # Prefetch to improve speed of input pipeline.
  dataset = dataset.prefetch(1)
  return dataset