"src/array/git@developer.sourcefind.cn:OpenDAS/dgl.git" did not exist on "d89f825d8cd70832f41e190837e532787ff795f4"
Unverified Commit ccd4eab0 authored by Quan (Andy) Gan's avatar Quan (Andy) Gan Committed by GitHub
Browse files

removing old pinsage implementation (#1525)

parent 96984fac
# PinSage model
NOTE: this version is not using NodeFlow yet.
This example only work with Python 3.6+
First, download and extract from https://data.dgl.ai/dataset/ml-1m.tar.gz
One can then run the following to train PinSage on MovieLens-1M:
```bash
python3 main.py --opt Adam --lr 1e-3
```
One can also incorporate user and movie features into training:
```bash
python3 main.py --opt Adam --lr 1e-3 --use-feature
```
Currently, performance of PinSage on MovieLens-1M has the best mean reciprocal rank of
0.032298±0.048078 on validation (and 0.033695±0.051963 on test set for the same model).
The Implicit Factorization Model from Spotlight has a 0.034572±0.041653 on the test set.
import torch
import torch.nn as nn
import torch.nn.functional as F
import pandas as pd
import numpy as np
import tqdm
from rec.model.pinsage import PinSage
from rec.datasets.movielens import MovieLens
from rec.utils import cuda
from dgl import DGLGraph
import argparse
import pickle
import os
parser = argparse.ArgumentParser()
parser.add_argument('--opt', type=str, default='SGD')
parser.add_argument('--lr', type=float, default=1)
parser.add_argument('--sched', type=str, default='none')
parser.add_argument('--layers', type=int, default=2)
parser.add_argument('--use-feature', action='store_true')
parser.add_argument('--sgd-switch', type=int, default=-1)
parser.add_argument('--n-negs', type=int, default=1)
parser.add_argument('--loss', type=str, default='hinge')
parser.add_argument('--hard-neg-prob', type=float, default=0)
args = parser.parse_args()
print(args)
cache_file = 'ml.pkl'
if os.path.exists(cache_file):
with open(cache_file, 'rb') as f:
ml = pickle.load(f)
else:
ml = MovieLens('./ml-1m')
with open(cache_file, 'wb') as f:
pickle.dump(ml, f)
g = ml.g
n_hidden = 100
n_layers = args.layers
batch_size = 256
margin = 0.9
n_negs = args.n_negs
hard_neg_prob = args.hard_neg_prob
loss_func = {
'hinge': lambda diff: (diff + margin).clamp(min=0).mean(),
'bpr': lambda diff: (1 - torch.sigmoid(-diff)).mean(),
}
model = cuda(PinSage(
g.number_of_nodes(),
[n_hidden] * (n_layers + 1),
20,
0.5,
10,
use_feature=args.use_feature,
G=g,
))
opt = getattr(torch.optim, args.opt)(model.parameters(), lr=args.lr)
def forward(model, g_prior, nodeset, train=True):
if train:
return model(g_prior, nodeset)
else:
with torch.no_grad():
return model(g_prior, nodeset)
def runtrain(g_train_bases, g_train_pairs, train):
global opt
if train:
model.train()
else:
model.eval()
g_prior = g.edge_subgraph(g_train_bases, preserve_nodes=True)
g_prior.copy_from_parent()
# generate batches of training pairs
edge_batches = g_train_pairs[torch.randperm(g_train_pairs.shape[0])].split(batch_size)
with tqdm.tqdm(edge_batches) as tq:
sum_loss = 0
sum_acc = 0
count = 0
for batch_id, batch in enumerate(tq):
count += batch.shape[0]
# Get source (user) and destination (item) nodes, as well as negative items
src, dst = g.find_edges(batch)
dst_neg = []
for i in range(len(dst)):
dst_neg.append(np.random.randint(
len(ml.user_ids), len(ml.user_ids) + len(ml.movie_ids), n_negs))
dst_neg = torch.LongTensor(dst_neg)
dst = dst.view(-1, 1).expand_as(dst_neg).flatten()
src = src.view(-1, 1).expand_as(dst_neg).flatten()
dst_neg = dst_neg.flatten()
# make sure that the source/destination/negative nodes have successors
mask = (g_prior.in_degrees(dst_neg) > 0) & \
(g_prior.in_degrees(dst) > 0) & \
(g_prior.in_degrees(src) > 0)
src = src[mask]
dst = dst[mask]
dst_neg = dst_neg[mask]
if len(src) == 0:
continue
nodeset = cuda(torch.cat([src, dst, dst_neg]))
src_size, dst_size, dst_neg_size = \
src.shape[0], dst.shape[0], dst_neg.shape[0]
# get representations and compute losses
h_src, h_dst, h_dst_neg = (
forward(model, g_prior, nodeset, train)
.split([src_size, dst_size, dst_neg_size]))
diff = (h_src * (h_dst_neg - h_dst)).sum(1)
loss = loss_func[args.loss](diff)
acc = (diff < 0).sum()
assert loss.item() == loss.item()
grad_sqr_norm = 0
if train:
opt.zero_grad()
loss.backward()
for name, p in model.named_parameters():
assert (p.grad != p.grad).sum() == 0
grad_sqr_norm += p.grad.norm().item() ** 2
opt.step()
sum_loss += loss.item()
sum_acc += acc.item() / n_negs
avg_loss = sum_loss / (batch_id + 1)
avg_acc = sum_acc / count
tq.set_postfix({'loss': '%.6f' % loss.item(),
'avg_loss': '%.3f' % avg_loss,
'avg_acc': '%.3f' % avg_acc,
'grad_norm': '%.6f' % np.sqrt(grad_sqr_norm)})
return avg_loss, avg_acc
def runtest(g_train_bases, ml, validation=True):
model.eval()
n_users = len(ml.users.index)
n_items = len(ml.movies.index)
g_prior = g.edge_subgraph(g_train_bases, preserve_nodes=True)
g_prior.copy_from_parent()
# Pre-compute the representations of users and items
hs = []
with torch.no_grad():
with tqdm.trange(n_users + n_items) as tq:
for node_id in tq:
nodeset = cuda(torch.LongTensor([node_id]))
h = forward(model, g_prior, nodeset, False)
hs.append(h)
h = torch.cat(hs, 0)
rr = []
with torch.no_grad():
with tqdm.trange(n_users) as tq:
for u_nid in tq:
# For each user, exclude the items appearing in
# (1) the training set, and
# (2) either the validation set when testing, or the test set when
# validating.
uid = ml.user_ids[u_nid]
pids_exclude = ml.ratings[
(ml.ratings['user_id'] == uid) &
(ml.ratings['train'] | ml.ratings['test' if validation else 'valid'])
]['movie_id'].values
pids_candidate = ml.ratings[
(ml.ratings['user_id'] == uid) &
ml.ratings['valid' if validation else 'test']
]['movie_id'].values
pids = np.setdiff1d(ml.movie_ids, pids_exclude)
p_nids = np.array([ml.movie_ids_invmap[pid] for pid in pids])
p_nids_candidate = np.array([ml.movie_ids_invmap[pid] for pid in pids_candidate])
# compute scores of items and rank them, then compute the MRR.
dst = torch.from_numpy(p_nids) + n_users
src = torch.zeros_like(dst).fill_(u_nid)
h_dst = h[dst]
h_src = h[src]
score = (h_src * h_dst).sum(1)
score_sort_idx = score.sort(descending=True)[1].cpu().numpy()
rank_map = {v: i for i, v in enumerate(p_nids[score_sort_idx])}
rank_candidates = np.array([rank_map[p_nid] for p_nid in p_nids_candidate])
rank = 1 / (rank_candidates + 1)
rr.append(rank.mean())
tq.set_postfix({'rank': rank.mean()})
return np.array(rr)
def train():
global opt, sched
best_mrr = 0
for epoch in range(500):
ml.refresh_mask()
# In training, we perform message passing on edges marked with 'prior', and
# do link prediction on edges marked with 'train'.
# 'prior' and 'train' are disjoint so that the training pairs can not pass
# messages between each other.
# 'prior' and 'train' are re-generated everytime with ml.refresh_mask() above.
g_train_bases = g.filter_edges(lambda edges: edges.data['prior'])
g_train_pairs = g.filter_edges(lambda edges: edges.data['train'] & ~edges.data['inv'])
# In testing we perform message passing on both 'prior' and 'train' edges.
g_test_bases = g.filter_edges(
lambda edges: edges.data['prior'] | edges.data['train'])
print('Epoch %d validation' % epoch)
with torch.no_grad():
valid_mrr = runtest(g_test_bases, ml, True)
if best_mrr < valid_mrr.mean():
best_mrr = valid_mrr.mean()
torch.save(model.state_dict(), 'model.pt')
print(pd.Series(valid_mrr).describe())
print('Epoch %d test' % epoch)
with torch.no_grad():
test_mrr = runtest(g_test_bases, ml, False)
print(pd.Series(test_mrr).describe())
print('Epoch %d train' % epoch)
runtrain(g_train_bases, g_train_pairs, True)
if __name__ == '__main__':
train()
import pandas as pd
import dgl
import os
import torch
import numpy as np
import scipy.sparse as sp
import time
from functools import partial
from .. import randomwalk
import stanfordnlp
import re
import tqdm
import string
class MovieLens(object):
def __init__(self, directory):
'''
directory: path to movielens directory which should have the three
files:
users.dat
movies.dat
ratings.dat
'''
self.directory = directory
users = []
movies = []
ratings = []
# read users
with open(os.path.join(directory, 'users.dat')) as f:
for l in f:
id_, gender, age, occupation, zip_ = l.strip().split('::')
users.append({
'id': int(id_),
'gender': gender,
'age': age,
'occupation': occupation,
'zip': zip_,
})
self.users = pd.DataFrame(users).set_index('id').astype('category')
# read movies
with open(os.path.join(directory, 'movies.dat'), encoding='latin1') as f:
for l in f:
id_, title, genres = l.strip().split('::')
genres_set = set(genres.split('|'))
# extract year
assert re.match(r'.*\([0-9]{4}\)$', title)
year = title[-5:-1]
title = title[:-6].strip()
data = {'id': int(id_), 'title': title, 'year': year}
for g in genres_set:
data[g] = True
movies.append(data)
self.movies = (
pd.DataFrame(movies)
.set_index('id')
.fillna(False)
.astype({'year': 'category'}))
self.genres = self.movies.columns[self.movies.dtypes == bool]
# read ratings
with open(os.path.join(directory, 'ratings.dat')) as f:
for l in f:
user_id, movie_id, rating, timestamp = [int(_) for _ in l.split('::')]
ratings.append({
'user_id': user_id,
'movie_id': movie_id,
'rating': rating,
'timestamp': timestamp,
})
ratings = pd.DataFrame(ratings)
movie_count = ratings['movie_id'].value_counts()
movie_count.name = 'movie_count'
ratings = ratings.join(movie_count, on='movie_id')
self.ratings = ratings
# drop users and movies which do not exist in ratings
self.users = self.users[self.users.index.isin(self.ratings['user_id'])]
self.movies = self.movies[self.movies.index.isin(self.ratings['movie_id'])]
self.data_split()
self.build_graph()
def split_user(self, df, filter_counts=False):
df_new = df.copy()
df_new['prob'] = 0
if filter_counts:
df_new_sub = (df_new['movie_count'] >= 10).nonzero()[0]
else:
df_new_sub = df_new['train'].nonzero()[0]
prob = np.linspace(0, 1, df_new_sub.shape[0], endpoint=False)
np.random.shuffle(prob)
df_new['prob'].iloc[df_new_sub] = prob
return df_new
def data_split(self):
self.ratings = self.ratings.groupby('user_id', group_keys=False).apply(
partial(self.split_user, filter_counts=True))
self.ratings['train'] = self.ratings['prob'] <= 0.8
self.ratings['valid'] = (self.ratings['prob'] > 0.8) & (self.ratings['prob'] <= 0.9)
self.ratings['test'] = self.ratings['prob'] > 0.9
self.ratings.drop(['prob'], axis=1, inplace=True)
def build_graph(self):
user_ids = list(self.users.index)
movie_ids = list(self.movies.index)
user_ids_invmap = {id_: i for i, id_ in enumerate(user_ids)}
movie_ids_invmap = {id_: i for i, id_ in enumerate(movie_ids)}
self.user_ids = user_ids
self.movie_ids = movie_ids
self.user_ids_invmap = user_ids_invmap
self.movie_ids_invmap = movie_ids_invmap
g = dgl.DGLGraph()
g.add_nodes(len(user_ids) + len(movie_ids))
# user features
for user_column in self.users.columns:
udata = torch.zeros(g.number_of_nodes(), dtype=torch.int64)
# 0 for padding
udata[:len(user_ids)] = \
torch.LongTensor(self.users[user_column].cat.codes.values.astype('int64') + 1)
g.ndata[user_column] = udata
# movie genre
movie_genres = torch.from_numpy(self.movies[self.genres].values.astype('float32'))
g.ndata['genre'] = torch.zeros(g.number_of_nodes(), len(self.genres))
g.ndata['genre'][len(user_ids):len(user_ids) + len(movie_ids)] = movie_genres
# movie year
g.ndata['year'] = torch.zeros(g.number_of_nodes(), dtype=torch.int64)
# 0 for padding
g.ndata['year'][len(user_ids):len(user_ids) + len(movie_ids)] = \
torch.LongTensor(self.movies['year'].cat.codes.values.astype('int64') + 1)
# movie title
nlp = stanfordnlp.Pipeline(use_gpu=False, processors='tokenize,lemma')
vocab = set()
title_words = []
for t in tqdm.tqdm(self.movies['title'].values):
doc = nlp(t)
words = set()
for s in doc.sentences:
words.update(w.lemma.lower() for w in s.words
if not re.fullmatch(r'['+string.punctuation+']+', w.lemma))
vocab.update(words)
title_words.append(words)
vocab = list(vocab)
vocab_invmap = {w: i for i, w in enumerate(vocab)}
# bag-of-words
g.ndata['title'] = torch.zeros(g.number_of_nodes(), len(vocab))
for i, tw in enumerate(tqdm.tqdm(title_words)):
g.ndata['title'][len(user_ids) + i, [vocab_invmap[w] for w in tw]] = 1
self.vocab = vocab
self.vocab_invmap = vocab_invmap
rating_user_vertices = [user_ids_invmap[id_] for id_ in self.ratings['user_id'].values]
rating_movie_vertices = [movie_ids_invmap[id_] + len(user_ids)
for id_ in self.ratings['movie_id'].values]
self.rating_user_vertices = rating_user_vertices
self.rating_movie_vertices = rating_movie_vertices
g.add_edges(
rating_user_vertices,
rating_movie_vertices,
data={'inv': torch.zeros(self.ratings.shape[0], dtype=torch.uint8)})
g.add_edges(
rating_movie_vertices,
rating_user_vertices,
data={'inv': torch.ones(self.ratings.shape[0], dtype=torch.uint8)})
self.g = g
def generate_mask(self):
while True:
ratings = self.ratings.groupby('user_id', group_keys=False).apply(self.split_user)
prior_prob = ratings['prob'].values
for i in range(5):
train_mask = (prior_prob >= 0.2 * i) & (prior_prob < 0.2 * (i + 1))
prior_mask = ~train_mask
train_mask &= ratings['train'].values
prior_mask &= ratings['train'].values
yield prior_mask, train_mask
def refresh_mask(self):
if not hasattr(self, 'masks'):
self.masks = self.generate_mask()
prior_mask, train_mask = next(self.masks)
valid_tensor = torch.from_numpy(self.ratings['valid'].values.astype('uint8'))
test_tensor = torch.from_numpy(self.ratings['test'].values.astype('uint8'))
train_tensor = torch.from_numpy(train_mask.astype('uint8'))
prior_tensor = torch.from_numpy(prior_mask.astype('uint8'))
edge_data = {
'prior': prior_tensor,
'valid': valid_tensor,
'test': test_tensor,
'train': train_tensor,
}
self.g.edges[self.rating_user_vertices, self.rating_movie_vertices].data.update(edge_data)
self.g.edges[self.rating_movie_vertices, self.rating_user_vertices].data.update(edge_data)
import torch
import torch.nn as nn
import torch.nn.functional as F
import dgl
from .. import randomwalk
from ..utils import cuda
def create_embeddings(n_nodes, n_features):
return nn.Parameter(torch.randn(n_nodes, n_features))
def mix_embeddings(h, ndata, emb, proj):
'''Combine node-specific trainable embedding ``h`` with categorical inputs
(projected by ``emb``) and numeric inputs (projected by ``proj``).
'''
e = []
for key, value in ndata.items():
if value.dtype == torch.int64:
e.append(emb[key](value))
elif value.dtype == torch.float32:
e.append(proj[key](value))
return h + torch.stack(e, 0).sum(0)
def get_embeddings(h, nodeset):
return h[nodeset]
def put_embeddings(h, nodeset, new_embeddings):
n_nodes = nodeset.shape[0]
n_features = h.shape[1]
return h.scatter(0, nodeset[:, None].expand(n_nodes, n_features), new_embeddings)
def safediv(a, b):
b = torch.where(b == 0, torch.ones_like(b), b)
return a / b
def init_weight(w, func_name, nonlinearity):
getattr(nn.init, func_name)(w, gain=nn.init.calculate_gain(nonlinearity))
def init_bias(w):
nn.init.constant_(w, 0)
class PinSageConv(nn.Module):
def __init__(self, in_features, out_features, hidden_features):
super(PinSageConv, self).__init__()
self.in_features = in_features
self.out_features = out_features
self.hidden_features = hidden_features
self.Q = nn.Linear(in_features, hidden_features)
self.W = nn.Linear(in_features + hidden_features, out_features)
init_weight(self.Q.weight, 'xavier_uniform_', 'leaky_relu')
init_weight(self.W.weight, 'xavier_uniform_', 'leaky_relu')
init_bias(self.Q.bias)
init_bias(self.W.bias)
def forward(self, h, nodeset, nb_nodes, nb_weights):
'''
h: node embeddings (num_total_nodes, in_features), or a container
of the node embeddings (for distributed computing)
nodeset: node IDs in this minibatch (num_nodes,)
nb_nodes: neighbor node IDs of each node in nodeset (num_nodes, num_neighbors)
nb_weights: weight of each neighbor node (num_nodes, num_neighbors)
return: new node embeddings (num_nodes, out_features)
'''
n_nodes, T = nb_nodes.shape
h_nodeset = get_embeddings(h, nodeset) # (n_nodes, in_features)
h_neighbors = get_embeddings(h, nb_nodes.view(-1)).view(n_nodes, T, self.in_features)
h_neighbors = F.leaky_relu(self.Q(h_neighbors))
h_agg = safediv(
(nb_weights[:, :, None] * h_neighbors).sum(1),
nb_weights.sum(1, keepdim=True))
h_concat = torch.cat([h_nodeset, h_agg], 1)
h_new = F.leaky_relu(self.W(h_concat))
h_new = safediv(h_new, h_new.norm(dim=1, keepdim=True))
return h_new
class PinSage(nn.Module):
'''
Completes a multi-layer PinSage convolution
G: DGLGraph
feature_sizes: the dimensionality of input/hidden/output features
T: number of neighbors we pick for each node
restart_prob: restart probability
max_nodes: max number of nodes visited for each seed
'''
def __init__(self, num_nodes, feature_sizes, T, restart_prob, max_nodes,
use_feature=False, G=None):
super(PinSage, self).__init__()
self.T = T
self.restart_prob = restart_prob
self.max_nodes = max_nodes
self.in_features = feature_sizes[0]
self.out_features = feature_sizes[-1]
self.n_layers = len(feature_sizes) - 1
self.convs = nn.ModuleList()
for i in range(self.n_layers):
self.convs.append(PinSageConv(
feature_sizes[i], feature_sizes[i+1], feature_sizes[i+1]))
self.h = create_embeddings(num_nodes, self.in_features)
self.use_feature = use_feature
if use_feature:
self.emb = nn.ModuleDict()
self.proj = nn.ModuleDict()
for key, scheme in G.node_attr_schemes().items():
if scheme.dtype == torch.int64:
self.emb[key] = nn.Embedding(
G.ndata[key].max().item() + 1,
self.in_features,
padding_idx=0)
elif scheme.dtype == torch.float32:
self.proj[key] = nn.Sequential(
nn.Linear(scheme.shape[0], self.in_features),
nn.LeakyReLU(),
)
def forward(self, G, nodeset):
'''
Given a complete embedding matrix h and a list of node IDs, return
the output embeddings of these node IDs.
nodeset: node IDs in this minibatch (num_nodes,)
return: new node embeddings (num_nodes, out_features)
'''
if self.use_feature:
h = mix_embeddings(self.h, G.ndata, self.emb, self.proj)
else:
h = self.h
nodeflow = randomwalk.random_walk_nodeflow(
G, nodeset, self.n_layers, self.restart_prob, self.max_nodes, self.T)
for i, (nodeset, nb_weights, nb_nodes) in enumerate(nodeflow):
new_embeddings = self.convs[i](h, nodeset, nb_nodes, nb_weights)
h = put_embeddings(h, nodeset, new_embeddings)
h_new = get_embeddings(h, nodeset)
return h_new
import torch
import dgl
from ..utils import cuda
from collections import Counter
def random_walk_sampler(G, nodeset, restart_prob, max_nodes):
'''
G: DGLGraph
nodeset: 1D CPU Tensor of node IDs
restart_prob: float
max_nodes: int
return: list[list[Tensor]]
'''
traces = dgl.contrib.sampling.bipartite_single_sided_random_walk_with_restart(
G, nodeset, restart_prob, max_nodes)
return traces
# Note: this function is not friendly to giant graphs since we use a matrix
# with size (num_nodes_in_nodeset, num_nodes_in_graph).
def random_walk_distribution(G, nodeset, restart_prob, max_nodes):
n_nodes = nodeset.shape[0]
n_available_nodes = G.number_of_nodes()
traces = random_walk_sampler(G, nodeset, restart_prob, max_nodes)
visited_counts = torch.zeros(n_nodes, n_available_nodes)
for i in range(n_nodes):
visited_nodes = torch.cat(traces[i])
visited_counts[i].scatter_add_(0, visited_nodes, torch.ones_like(visited_nodes, dtype=torch.float32))
return visited_counts
def random_walk_distribution_topt(G, nodeset, restart_prob, max_nodes, top_T):
'''
returns the top T important neighbors of each node in nodeset, as well as
the weights of the neighbors.
'''
visited_prob = random_walk_distribution(G, nodeset, restart_prob, max_nodes)
weights, nodes = visited_prob.topk(top_T, 1)
weights = weights / weights.sum(1, keepdim=True)
return weights, nodes
def random_walk_nodeflow(G, nodeset, n_layers, restart_prob, max_nodes, top_T):
'''
returns a list of triplets (
"active" node IDs whose embeddings are computed at the i-th layer (num_nodes,)
weight of each neighboring node of each "active" node on the i-th layer (num_nodes, top_T)
neighboring node IDs for each "active" node on the i-th layer (num_nodes, top_T)
)
'''
dev = nodeset.device
nodeset = nodeset.cpu()
nodeflow = []
cur_nodeset = nodeset
for i in reversed(range(n_layers)):
nb_weights, nb_nodes = random_walk_distribution_topt(G, cur_nodeset, restart_prob, max_nodes, top_T)
nodeflow.insert(0, (cur_nodeset.to(dev), nb_weights.to(dev), nb_nodes.to(dev)))
cur_nodeset = torch.cat([nb_nodes.view(-1), cur_nodeset]).unique()
return nodeflow
import torch
def cuda(x):
if torch.cuda.is_available():
return x.cuda()
else:
return x
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment