Unverified Commit 6367318f authored by Quan (Andy) Gan's avatar Quan (Andy) Gan Committed by GitHub
Browse files

[Model] use dask to parallelize preprocessing (#2040)

parent 47c96dd1
# PinSAGE example # PinSAGE example
## Requirements
- dask
- pandas
- torchtext
## Prepare datasets ## Prepare datasets
### MovieLens 1M ### MovieLens 1M
......
...@@ -2,26 +2,28 @@ import torch ...@@ -2,26 +2,28 @@ import torch
import dgl import dgl
import numpy as np import numpy as np
import scipy.sparse as ssp import scipy.sparse as ssp
import tqdm
import dask.dataframe as dd
# This is the train-test split method most of the recommender system papers running on MovieLens # This is the train-test split method most of the recommender system papers running on MovieLens
# takes. It essentially follows the intuition of "training on the past and predict the future". # takes. It essentially follows the intuition of "training on the past and predict the future".
# One can also change the threshold to make validation and test set take larger proportions. # One can also change the threshold to make validation and test set take larger proportions.
def train_test_split_by_time(df, timestamp, item): def train_test_split_by_time(df, timestamp, item):
df = df.copy()
df['train_mask'] = np.ones((len(df),), dtype=np.bool) df['train_mask'] = np.ones((len(df),), dtype=np.bool)
df['val_mask'] = np.zeros((len(df),), dtype=np.bool) df['val_mask'] = np.zeros((len(df),), dtype=np.bool)
df['test_mask'] = np.zeros((len(df),), dtype=np.bool) df['test_mask'] = np.zeros((len(df),), dtype=np.bool)
df = df.sort_values([item, timestamp]) df = dd.from_pandas(df, npartitions=10)
for track_id in df[item].unique(): def train_test_split(df):
idx = (df[item] == track_id).to_numpy().nonzero()[0] df = df.sort_values([timestamp])
idx = df.index[idx] if df.shape[0] > 1:
if len(idx) > 1: df.iloc[-1, -3] = False
df.loc[idx[-1], 'train_mask'] = False df.iloc[-1, -1] = True
df.loc[idx[-1], 'test_mask'] = True if df.shape[0] > 2:
if len(idx) > 2: df.iloc[-2, -3] = False
df.loc[idx[-2], 'train_mask'] = False df.iloc[-2, -2] = True
df.loc[idx[-2], 'val_mask'] = True return df
df = df.sort_index() df = df.groupby(item).apply(train_test_split).compute(scheduler='processes').sort_index()
print(df[df[item] == df[item].unique()[0]].sort_values(timestamp))
return df['train_mask'].to_numpy().nonzero()[0], \ return df['train_mask'].to_numpy().nonzero()[0], \
df['val_mask'].to_numpy().nonzero()[0], \ df['val_mask'].to_numpy().nonzero()[0], \
df['test_mask'].to_numpy().nonzero()[0] df['test_mask'].to_numpy().nonzero()[0]
......
...@@ -25,18 +25,19 @@ import torchtext ...@@ -25,18 +25,19 @@ import torchtext
from builder import PandasGraphBuilder from builder import PandasGraphBuilder
from data_utils import * from data_utils import *
parser = argparse.ArgumentParser() if __name__ == '__main__':
parser.add_argument('directory', type=str) parser = argparse.ArgumentParser()
parser.add_argument('output_path', type=str) parser.add_argument('directory', type=str)
args = parser.parse_args() parser.add_argument('output_path', type=str)
directory = args.directory args = parser.parse_args()
output_path = args.output_path directory = args.directory
output_path = args.output_path
## Build heterogeneous graph
## Build heterogeneous graph
# Load data
users = [] # Load data
with open(os.path.join(directory, 'users.dat'), encoding='latin1') as f: users = []
with open(os.path.join(directory, 'users.dat'), encoding='latin1') as f:
for l in f: for l in f:
id_, gender, age, occupation, zip_ = l.strip().split('::') id_, gender, age, occupation, zip_ = l.strip().split('::')
users.append({ users.append({
...@@ -46,10 +47,10 @@ with open(os.path.join(directory, 'users.dat'), encoding='latin1') as f: ...@@ -46,10 +47,10 @@ with open(os.path.join(directory, 'users.dat'), encoding='latin1') as f:
'occupation': occupation, 'occupation': occupation,
'zip': zip_, 'zip': zip_,
}) })
users = pd.DataFrame(users).astype('category') users = pd.DataFrame(users).astype('category')
movies = [] movies = []
with open(os.path.join(directory, 'movies.dat'), encoding='latin1') as f: with open(os.path.join(directory, 'movies.dat'), encoding='latin1') as f:
for l in f: for l in f:
id_, title, genres = l.strip().split('::') id_, title, genres = l.strip().split('::')
genres_set = set(genres.split('|')) genres_set = set(genres.split('|'))
...@@ -63,10 +64,10 @@ with open(os.path.join(directory, 'movies.dat'), encoding='latin1') as f: ...@@ -63,10 +64,10 @@ with open(os.path.join(directory, 'movies.dat'), encoding='latin1') as f:
for g in genres_set: for g in genres_set:
data[g] = True data[g] = True
movies.append(data) movies.append(data)
movies = pd.DataFrame(movies).astype({'year': 'category'}) movies = pd.DataFrame(movies).astype({'year': 'category'})
ratings = [] ratings = []
with open(os.path.join(directory, 'ratings.dat'), encoding='latin1') as f: with open(os.path.join(directory, 'ratings.dat'), encoding='latin1') as f:
for l in f: for l in f:
user_id, movie_id, rating, timestamp = [int(_) for _ in l.split('::')] user_id, movie_id, rating, timestamp = [int(_) for _ in l.split('::')]
ratings.append({ ratings.append({
...@@ -75,69 +76,69 @@ with open(os.path.join(directory, 'ratings.dat'), encoding='latin1') as f: ...@@ -75,69 +76,69 @@ with open(os.path.join(directory, 'ratings.dat'), encoding='latin1') as f:
'rating': rating, 'rating': rating,
'timestamp': timestamp, 'timestamp': timestamp,
}) })
ratings = pd.DataFrame(ratings) ratings = pd.DataFrame(ratings)
# Filter the users and items that never appear in the rating table. # Filter the users and items that never appear in the rating table.
distinct_users_in_ratings = ratings['user_id'].unique() distinct_users_in_ratings = ratings['user_id'].unique()
distinct_movies_in_ratings = ratings['movie_id'].unique() distinct_movies_in_ratings = ratings['movie_id'].unique()
users = users[users['user_id'].isin(distinct_users_in_ratings)] users = users[users['user_id'].isin(distinct_users_in_ratings)]
movies = movies[movies['movie_id'].isin(distinct_movies_in_ratings)] movies = movies[movies['movie_id'].isin(distinct_movies_in_ratings)]
# Group the movie features into genres (a vector), year (a category), title (a string) # Group the movie features into genres (a vector), year (a category), title (a string)
genre_columns = movies.columns.drop(['movie_id', 'title', 'year']) genre_columns = movies.columns.drop(['movie_id', 'title', 'year'])
movies[genre_columns] = movies[genre_columns].fillna(False).astype('bool') movies[genre_columns] = movies[genre_columns].fillna(False).astype('bool')
movies_categorical = movies.drop('title', axis=1) movies_categorical = movies.drop('title', axis=1)
# Build graph # Build graph
graph_builder = PandasGraphBuilder() graph_builder = PandasGraphBuilder()
graph_builder.add_entities(users, 'user_id', 'user') graph_builder.add_entities(users, 'user_id', 'user')
graph_builder.add_entities(movies_categorical, 'movie_id', 'movie') graph_builder.add_entities(movies_categorical, 'movie_id', 'movie')
graph_builder.add_binary_relations(ratings, 'user_id', 'movie_id', 'watched') graph_builder.add_binary_relations(ratings, 'user_id', 'movie_id', 'watched')
graph_builder.add_binary_relations(ratings, 'movie_id', 'user_id', 'watched-by') graph_builder.add_binary_relations(ratings, 'movie_id', 'user_id', 'watched-by')
g = graph_builder.build() g = graph_builder.build()
# Assign features. # Assign features.
# Note that variable-sized features such as texts or images are handled elsewhere. # Note that variable-sized features such as texts or images are handled elsewhere.
g.nodes['user'].data['gender'] = torch.LongTensor(users['gender'].cat.codes.values) g.nodes['user'].data['gender'] = torch.LongTensor(users['gender'].cat.codes.values)
g.nodes['user'].data['age'] = torch.LongTensor(users['age'].cat.codes.values) g.nodes['user'].data['age'] = torch.LongTensor(users['age'].cat.codes.values)
g.nodes['user'].data['occupation'] = torch.LongTensor(users['occupation'].cat.codes.values) g.nodes['user'].data['occupation'] = torch.LongTensor(users['occupation'].cat.codes.values)
g.nodes['user'].data['zip'] = torch.LongTensor(users['zip'].cat.codes.values) g.nodes['user'].data['zip'] = torch.LongTensor(users['zip'].cat.codes.values)
g.nodes['movie'].data['year'] = torch.LongTensor(movies['year'].cat.codes.values) g.nodes['movie'].data['year'] = torch.LongTensor(movies['year'].cat.codes.values)
g.nodes['movie'].data['genre'] = torch.FloatTensor(movies[genre_columns].values) g.nodes['movie'].data['genre'] = torch.FloatTensor(movies[genre_columns].values)
g.edges['watched'].data['rating'] = torch.LongTensor(ratings['rating'].values) g.edges['watched'].data['rating'] = torch.LongTensor(ratings['rating'].values)
g.edges['watched'].data['timestamp'] = torch.LongTensor(ratings['timestamp'].values) g.edges['watched'].data['timestamp'] = torch.LongTensor(ratings['timestamp'].values)
g.edges['watched-by'].data['rating'] = torch.LongTensor(ratings['rating'].values) g.edges['watched-by'].data['rating'] = torch.LongTensor(ratings['rating'].values)
g.edges['watched-by'].data['timestamp'] = torch.LongTensor(ratings['timestamp'].values) g.edges['watched-by'].data['timestamp'] = torch.LongTensor(ratings['timestamp'].values)
# Train-validation-test split # Train-validation-test split
# This is a little bit tricky as we want to select the last interaction for test, and the # This is a little bit tricky as we want to select the last interaction for test, and the
# second-to-last interaction for validation. # second-to-last interaction for validation.
train_indices, val_indices, test_indices = train_test_split_by_time(ratings, 'timestamp', 'movie_id') train_indices, val_indices, test_indices = train_test_split_by_time(ratings, 'timestamp', 'movie_id')
# Build the graph with training interactions only. # Build the graph with training interactions only.
train_g = build_train_graph(g, train_indices, 'user', 'movie', 'watched', 'watched-by') train_g = build_train_graph(g, train_indices, 'user', 'movie', 'watched', 'watched-by')
# Build the user-item sparse matrix for validation and test set. # Build the user-item sparse matrix for validation and test set.
val_matrix, test_matrix = build_val_test_matrix(g, val_indices, test_indices, 'user', 'movie', 'watched') val_matrix, test_matrix = build_val_test_matrix(g, val_indices, test_indices, 'user', 'movie', 'watched')
## Build title set ## Build title set
movie_textual_dataset = {'title': movies['title'].values} movie_textual_dataset = {'title': movies['title'].values}
# The model should build their own vocabulary and process the texts. Here is one example # The model should build their own vocabulary and process the texts. Here is one example
# of using torchtext to pad and numericalize a batch of strings. # of using torchtext to pad and numericalize a batch of strings.
# field = torchtext.data.Field(include_lengths=True, lower=True, batch_first=True) # field = torchtext.data.Field(include_lengths=True, lower=True, batch_first=True)
# examples = [torchtext.data.Example.fromlist([t], [('title', title_field)]) for t in texts] # examples = [torchtext.data.Example.fromlist([t], [('title', title_field)]) for t in texts]
# titleset = torchtext.data.Dataset(examples, [('title', title_field)]) # titleset = torchtext.data.Dataset(examples, [('title', title_field)])
# field.build_vocab(titleset.title, vectors='fasttext.simple.300d') # field.build_vocab(titleset.title, vectors='fasttext.simple.300d')
# token_ids, lengths = field.process([examples[0].title, examples[1].title]) # token_ids, lengths = field.process([examples[0].title, examples[1].title])
## Dump the graph and the datasets ## Dump the graph and the datasets
dataset = { dataset = {
'train-graph': train_g, 'train-graph': train_g,
'val-matrix': val_matrix, 'val-matrix': val_matrix,
'test-matrix': test_matrix, 'test-matrix': test_matrix,
...@@ -149,5 +150,5 @@ dataset = { ...@@ -149,5 +150,5 @@ dataset = {
'item-to-user-type': 'watched-by', 'item-to-user-type': 'watched-by',
'timestamp-edge-column': 'timestamp'} 'timestamp-edge-column': 'timestamp'}
with open(output_path, 'wb') as f: with open(output_path, 'wb') as f:
pickle.dump(dataset, f) pickle.dump(dataset, f)
...@@ -11,34 +11,35 @@ import pickle ...@@ -11,34 +11,35 @@ import pickle
from data_utils import * from data_utils import *
from builder import PandasGraphBuilder from builder import PandasGraphBuilder
parser = argparse.ArgumentParser() if __name__ == '__main__':
parser.add_argument('directory', type=str) parser = argparse.ArgumentParser()
parser.add_argument('output_path', type=str) parser.add_argument('directory', type=str)
args = parser.parse_args() parser.add_argument('output_path', type=str)
directory = args.directory args = parser.parse_args()
output_path = args.output_path directory = args.directory
output_path = args.output_path
data = pd.read_csv(os.path.join(directory, 'context_content_features.csv')) data = pd.read_csv(os.path.join(directory, 'context_content_features.csv'))
track_feature_cols = list(data.columns[1:13]) track_feature_cols = list(data.columns[1:13])
data = data[['user_id', 'track_id', 'created_at'] + track_feature_cols].dropna() data = data[['user_id', 'track_id', 'created_at'] + track_feature_cols].dropna()
users = data[['user_id']].drop_duplicates() users = data[['user_id']].drop_duplicates()
tracks = data[['track_id'] + track_feature_cols].drop_duplicates() tracks = data[['track_id'] + track_feature_cols].drop_duplicates()
assert tracks['track_id'].value_counts().max() == 1 assert tracks['track_id'].value_counts().max() == 1
tracks = tracks.astype({'mode': 'int64', 'key': 'int64', 'artist_id': 'category'}) tracks = tracks.astype({'mode': 'int64', 'key': 'int64', 'artist_id': 'category'})
events = data[['user_id', 'track_id', 'created_at']] events = data[['user_id', 'track_id', 'created_at']]
events['created_at'] = events['created_at'].values.astype('datetime64[s]').astype('int64') events['created_at'] = events['created_at'].values.astype('datetime64[s]').astype('int64')
graph_builder = PandasGraphBuilder() graph_builder = PandasGraphBuilder()
graph_builder.add_entities(users, 'user_id', 'user') graph_builder.add_entities(users, 'user_id', 'user')
graph_builder.add_entities(tracks, 'track_id', 'track') graph_builder.add_entities(tracks, 'track_id', 'track')
graph_builder.add_binary_relations(events, 'user_id', 'track_id', 'listened') graph_builder.add_binary_relations(events, 'user_id', 'track_id', 'listened')
graph_builder.add_binary_relations(events, 'track_id', 'user_id', 'listened-by') graph_builder.add_binary_relations(events, 'track_id', 'user_id', 'listened-by')
g = graph_builder.build() g = graph_builder.build()
float_cols = [] float_cols = []
for col in tracks.columns: for col in tracks.columns:
if col == 'track_id': if col == 'track_id':
continue continue
elif col == 'artist_id': elif col == 'artist_id':
...@@ -47,17 +48,17 @@ for col in tracks.columns: ...@@ -47,17 +48,17 @@ for col in tracks.columns:
float_cols.append(col) float_cols.append(col)
else: else:
g.nodes['track'].data[col] = torch.LongTensor(tracks[col].values) g.nodes['track'].data[col] = torch.LongTensor(tracks[col].values)
g.nodes['track'].data['song_features'] = torch.FloatTensor(linear_normalize(tracks[float_cols].values)) g.nodes['track'].data['song_features'] = torch.FloatTensor(linear_normalize(tracks[float_cols].values))
g.edges['listened'].data['created_at'] = torch.LongTensor(events['created_at'].values) g.edges['listened'].data['created_at'] = torch.LongTensor(events['created_at'].values)
g.edges['listened-by'].data['created_at'] = torch.LongTensor(events['created_at'].values) g.edges['listened-by'].data['created_at'] = torch.LongTensor(events['created_at'].values)
n_edges = g.number_of_edges('listened') n_edges = g.number_of_edges('listened')
train_indices, val_indices, test_indices = train_test_split_by_time(events, 'created_at', 'track_id') train_indices, val_indices, test_indices = train_test_split_by_time(events, 'created_at', 'track_id')
train_g = build_train_graph(g, train_indices, 'user', 'track', 'listened', 'listened-by') train_g = build_train_graph(g, train_indices, 'user', 'track', 'listened', 'listened-by')
val_matrix, test_matrix = build_val_test_matrix( val_matrix, test_matrix = build_val_test_matrix(
g, val_indices, test_indices, 'user', 'track', 'listened') g, val_indices, test_indices, 'user', 'track', 'listened')
dataset = { dataset = {
'train-graph': train_g, 'train-graph': train_g,
'val-matrix': val_matrix, 'val-matrix': val_matrix,
'test-matrix': test_matrix, 'test-matrix': test_matrix,
...@@ -69,5 +70,5 @@ dataset = { ...@@ -69,5 +70,5 @@ dataset = {
'item-to-user-type': 'listened-by', 'item-to-user-type': 'listened-by',
'timestamp-edge-column': 'created_at'} 'timestamp-edge-column': 'created_at'}
with open(output_path, 'wb') as f: with open(output_path, 'wb') as f:
pickle.dump(dataset, f) pickle.dump(dataset, f)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment