Unverified Commit dc0432e7 authored by Quan (Andy) Gan's avatar Quan (Andy) Gan Committed by GitHub
Browse files

[Model] Unsupervised learning with GraphSAGE (#1440)



* unsupervised graphsage first commit

* fix

* disable remove_edges and still got 0.90 performance

* optimize edgeids with multimap

* change hyperparams

* update README
Co-authored-by: default avatarMinjie Wang <wmjlyjemaine@gmail.com>
parent 1506560e
...@@ -45,3 +45,21 @@ Accuracy: ...@@ -45,3 +45,21 @@ Accuracy:
| Full Graph | 0.9504 | | Full Graph | 0.9504 |
| Neighbor Sampling | 0.9495 | | Neighbor Sampling | 0.9495 |
| Control Variate | 0.9490 | | Control Variate | 0.9490 |
### Unsupervised training
Train w/ mini-batch sampling in an unsupervised fashion (on the Reddit dataset)
```bash
python3 train_sampling_unsupervised.py
```
Notably,
* The loss function is defined by predicting whether an edge exists between two nodes or not. This matches the official
implementation, and is equivalent to the loss defined in the paper with 1-hop random walks.
* When computing the score of `(u, v)`, the connections between node `u` and `v` are removed from neighbor sampling.
This trick increases the F1-micro score on test set by 0.02.
* The performance of the learned embeddings are measured by training a softmax regression with scikit-learn, as described
in the paper.
Micro F1 score reaches 0.9212 on test set.
import dgl
import numpy as np
import torch as th
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.multiprocessing as mp
from torch.utils.data import DataLoader
import dgl.function as fn
import dgl.nn.pytorch as dglnn
import time
import argparse
from _thread import start_new_thread
from functools import wraps
from dgl.data import RedditDataset
import tqdm
import traceback
import sklearn.linear_model as lm
import sklearn.metrics as skm
#### Negative sampler
class NegativeSampler(object):
def __init__(self, g):
self.weights = g.in_degrees().float() ** 0.75
def __call__(self, num_samples):
return self.weights.multinomial(num_samples, replacement=True)
#### Neighbor sampler
class NeighborSampler(object):
def __init__(self, g, fanouts, num_negs):
self.g = g
self.fanouts = fanouts
self.neg_sampler = NegativeSampler(g)
self.num_negs = num_negs
def sample_blocks(self, seed_edges):
n_edges = len(seed_edges)
seed_edges = th.LongTensor(np.asarray(seed_edges))
heads, tails = self.g.find_edges(seed_edges)
neg_tails = self.neg_sampler(self.num_negs * n_edges)
neg_heads = heads.view(-1, 1).expand(n_edges, self.num_negs).flatten()
# Maintain the correspondence between heads, tails and negative tails as two
# graphs.
# pos_graph contains the correspondence between each head and its positive tail.
# neg_graph contains the correspondence between each head and its negative tails.
# Both pos_graph and neg_graph are first constructed with the same node space as
# the original graph. Then they are compacted together with dgl.compact_graphs.
pos_graph = dgl.graph((heads, tails), num_nodes=self.g.number_of_nodes())
neg_graph = dgl.graph((neg_heads, neg_tails), num_nodes=self.g.number_of_nodes())
pos_graph, neg_graph = dgl.compact_graphs([pos_graph, neg_graph])
# Obtain the node IDs being used in either pos_graph or neg_graph. Since they
# are compacted together, pos_graph and neg_graph share the same compacted node
# space.
seeds = pos_graph.ndata[dgl.NID]
blocks = []
for fanout in self.fanouts:
# For each seed node, sample ``fanout`` neighbors.
frontier = dgl.sampling.sample_neighbors(g, seeds, fanout, replace=True)
# Remove all edges between heads and tails, as well as heads and neg_tails.
_, _, edge_ids = frontier.edge_ids(
th.cat([heads, tails, neg_heads, neg_tails]),
th.cat([tails, heads, neg_tails, neg_heads]),
return_uv=True)
frontier = dgl.remove_edges(frontier, edge_ids)
# Then we compact the frontier into a bipartite graph for message passing.
block = dgl.to_block(frontier, seeds)
# Obtain the seed nodes for next layer.
seeds = block.srcdata[dgl.NID]
blocks.insert(0, block)
return pos_graph, neg_graph, blocks
class SAGE(nn.Module):
def __init__(self,
in_feats,
n_hidden,
n_classes,
n_layers,
activation,
dropout):
super().__init__()
self.n_layers = n_layers
self.n_hidden = n_hidden
self.n_classes = n_classes
self.layers = nn.ModuleList()
self.layers.append(dglnn.SAGEConv(in_feats, n_hidden, 'mean'))
for i in range(1, n_layers - 1):
self.layers.append(dglnn.SAGEConv(n_hidden, n_hidden, 'mean'))
self.layers.append(dglnn.SAGEConv(n_hidden, n_classes, 'mean'))
self.dropout = nn.Dropout(dropout)
self.activation = activation
def forward(self, blocks, x):
h = x
for l, (layer, block) in enumerate(zip(self.layers, blocks)):
# We need to first copy the representation of nodes on the RHS from the
# appropriate nodes on the LHS.
# Note that the shape of h is (num_nodes_LHS, D) and the shape of h_dst
# would be (num_nodes_RHS, D)
h_dst = h[:block.number_of_dst_nodes()]
# Then we compute the updated representation on the RHS.
# The shape of h now becomes (num_nodes_RHS, D)
h = layer(block, (h, h_dst))
if l != len(self.layers) - 1:
h = self.activation(h)
h = self.dropout(h)
return h
def inference(self, g, x, batch_size, device):
"""
Inference with the GraphSAGE model on full neighbors (i.e. without neighbor sampling).
g : the entire graph.
x : the input of entire node set.
The inference code is written in a fashion that it could handle any number of nodes and
layers.
"""
# During inference with sampling, multi-layer blocks are very inefficient because
# lots of computations in the first few layers are repeated.
# Therefore, we compute the representation of all nodes layer by layer. The nodes
# on each layer are of course splitted in batches.
# TODO: can we standardize this?
nodes = th.arange(g.number_of_nodes())
for l, layer in enumerate(self.layers):
y = th.zeros(g.number_of_nodes(), self.n_hidden if l != len(self.layers) - 1 else self.n_classes)
for start in tqdm.trange(0, len(nodes), batch_size):
end = start + batch_size
batch_nodes = nodes[start:end]
block = dgl.to_block(dgl.in_subgraph(g, batch_nodes), batch_nodes)
input_nodes = block.srcdata[dgl.NID]
h = x[input_nodes].to(device)
h_dst = h[:block.number_of_dst_nodes()]
h = layer(block, (h, h_dst))
if l != len(self.layers) - 1:
h = self.activation(h)
h = self.dropout(h)
y[start:end] = h.cpu()
x = y
return y
class CrossEntropyLoss(nn.Module):
def forward(self, block_outputs, pos_graph, neg_graph):
with pos_graph.local_scope():
pos_graph.ndata['h'] = block_outputs
pos_graph.apply_edges(fn.u_dot_v('h', 'h', 'score'))
pos_score = pos_graph.edata['score']
with neg_graph.local_scope():
neg_graph.ndata['h'] = block_outputs
neg_graph.apply_edges(fn.u_dot_v('h', 'h', 'score'))
neg_score = neg_graph.edata['score']
score = th.cat([pos_score, neg_score])
label = th.cat([th.ones_like(pos_score), th.zeros_like(neg_score)]).long()
loss = F.binary_cross_entropy_with_logits(score, label.float())
return loss
def prepare_mp(g):
"""
Explicitly materialize the CSR, CSC and COO representation of the given graph
so that they could be shared via copy-on-write to sampler workers and GPU
trainers.
This is a workaround before full shared memory support on heterogeneous graphs.
"""
g.in_degree(0)
g.out_degree(0)
g.find_edges([0])
def compute_acc(emb, labels, train_nids, val_nids, test_nids):
"""
Compute the accuracy of prediction given the labels.
"""
emb = emb.cpu().numpy()
train_nids = train_nids.cpu().numpy()
train_labels = labels[train_nids].cpu().numpy()
val_nids = val_nids.cpu().numpy()
val_labels = labels[val_nids].cpu().numpy()
test_nids = test_nids.cpu().numpy()
test_labels = labels[test_nids].cpu().numpy()
emb = (emb - emb.mean(0, keepdims=True)) / emb.std(0, keepdims=True)
lr = lm.LogisticRegression(multi_class='multinomial', max_iter=10000)
lr.fit(emb[train_nids], labels[train_nids])
pred = lr.predict(emb)
f1_micro_eval = skm.f1_score(labels[val_nids], pred[val_nids], average='micro')
f1_micro_test = skm.f1_score(labels[test_nids], pred[test_nids], average='micro')
f1_macro_eval = skm.f1_score(labels[val_nids], pred[val_nids], average='macro')
f1_macro_test = skm.f1_score(labels[test_nids], pred[test_nids], average='macro')
return f1_micro_eval, f1_micro_test
def evaluate(model, g, inputs, labels, train_nids, val_nids, test_nids, batch_size, device):
"""
Evaluate the model on the validation set specified by ``val_mask``.
g : The entire graph.
inputs : The features of all the nodes.
labels : The labels of all the nodes.
val_mask : A 0-1 mask indicating which nodes do we actually compute the accuracy for.
batch_size : Number of nodes to compute at the same time.
device : The GPU device to evaluate on.
"""
model.eval()
with th.no_grad():
pred = model.inference(g, inputs, batch_size, device)
model.train()
return compute_acc(pred, labels, train_nids, val_nids, test_nids)
def load_subtensor(g, seeds, input_nodes, device):
"""
Copys features and labels of a set of nodes onto GPU.
"""
batch_inputs = g.ndata['features'][input_nodes].to(device)
return batch_inputs
#### Entry point
def run(args, device, data):
# Unpack data
train_mask, val_mask, test_mask, in_feats, labels, n_classes, g = data
train_nid = th.LongTensor(np.nonzero(train_mask)[0])
val_nid = th.LongTensor(np.nonzero(val_mask)[0])
test_nid = th.LongTensor(np.nonzero(test_mask)[0])
# Create sampler
sampler = NeighborSampler(g, [int(fanout) for fanout in args.fan_out.split(',')], args.num_negs)
# Create PyTorch DataLoader for constructing blocks
dataloader = DataLoader(
dataset=np.arange(g.number_of_edges()),
batch_size=args.batch_size,
collate_fn=sampler.sample_blocks,
shuffle=True,
drop_last=False,
num_workers=args.num_workers)
# Define model and optimizer
model = SAGE(in_feats, args.num_hidden, args.num_hidden, args.num_layers, F.relu, args.dropout)
model = model.to(device)
loss_fcn = CrossEntropyLoss()
loss_fcn = loss_fcn.to(device)
optimizer = optim.Adam(model.parameters(), lr=args.lr)
# Training loop
avg = 0
iter_tput = []
best_eval_acc = 0
best_test_acc = 0
for epoch in range(args.num_epochs):
tic = time.time()
# Loop over the dataloader to sample the computation dependency graph as a list of
# blocks.
for step, (pos_graph, neg_graph, blocks) in enumerate(dataloader):
tic_step = time.time()
# The nodes for input lies at the LHS side of the first block.
# The nodes for output lies at the RHS side of the last block.
input_nodes = blocks[0].srcdata[dgl.NID]
seeds = blocks[-1].dstdata[dgl.NID]
# Load the input features as well as output labels
batch_inputs = load_subtensor(g, seeds, input_nodes, device)
# Compute loss and prediction
batch_pred = model(blocks, batch_inputs)
loss = loss_fcn(batch_pred, pos_graph, neg_graph)
optimizer.zero_grad()
loss.backward()
optimizer.step()
iter_tput.append(len(seeds) / (time.time() - tic_step))
if step % args.log_every == 0:
gpu_mem_alloc = th.cuda.max_memory_allocated() / 1000000 if th.cuda.is_available() else 0
print('Epoch {:05d} | Step {:05d} | Loss {:.4f} | Speed (samples/sec) {:.4f} | GPU {:.1f} MiB'.format(
epoch, step, loss.item(), np.mean(iter_tput[3:]), gpu_mem_alloc))
if step % args.eval_every == 0:
eval_acc, test_acc = evaluate(model, g, g.ndata['features'], labels, train_nid, val_nid, test_nid, args.batch_size, device)
print('Eval Acc {:.4f} Test Acc {:.4f}'.format(eval_acc, test_acc))
if eval_acc > best_eval_acc:
best_eval_acc = eval_acc
best_test_acc = test_acc
print('Best Eval Acc {:.4f} Test Acc {:.4f}'.format(best_eval_acc, best_test_acc))
print('Avg epoch time: {}'.format(avg / (epoch - 4)))
if __name__ == '__main__':
argparser = argparse.ArgumentParser("multi-gpu training")
argparser.add_argument('--gpu', type=int, default=0,
help="GPU device ID. Use -1 for CPU training")
argparser.add_argument('--num-epochs', type=int, default=20)
argparser.add_argument('--num-hidden', type=int, default=16)
argparser.add_argument('--num-layers', type=int, default=2)
argparser.add_argument('--num-negs', type=int, default=1)
argparser.add_argument('--fan-out', type=str, default='10,25')
argparser.add_argument('--batch-size', type=int, default=10000)
argparser.add_argument('--log-every', type=int, default=20)
argparser.add_argument('--eval-every', type=int, default=1000)
argparser.add_argument('--lr', type=float, default=0.003)
argparser.add_argument('--dropout', type=float, default=0.5)
argparser.add_argument('--num-workers', type=int, default=0,
help="Number of sampling processes. Use 0 for no extra process.")
args = argparser.parse_args()
if args.gpu >= 0:
device = th.device('cuda:%d' % args.gpu)
else:
device = th.device('cpu')
# load reddit data
data = RedditDataset(self_loop=True)
train_mask = data.train_mask
val_mask = data.val_mask
test_mask = data.test_mask
features = th.Tensor(data.features)
in_feats = features.shape[1]
labels = th.LongTensor(data.labels)
n_classes = data.num_labels
# Construct graph
g = dgl.graph(data.graph.all_edges())
g.ndata['features'] = features
prepare_mp(g)
# Pack data
data = train_mask, val_mask, test_mask, in_feats, labels, n_classes, g
run(args, device, data)
...@@ -175,6 +175,7 @@ std::vector<NDArray> COOGetDataAndIndices( ...@@ -175,6 +175,7 @@ std::vector<NDArray> COOGetDataAndIndices(
COOMatrix coo, NDArray rows, NDArray cols) { COOMatrix coo, NDArray rows, NDArray cols) {
const int64_t rowlen = rows->shape[0]; const int64_t rowlen = rows->shape[0];
const int64_t collen = cols->shape[0]; const int64_t collen = cols->shape[0];
const int64_t len = std::max(rowlen, collen);
CHECK((rowlen == collen) || (rowlen == 1) || (collen == 1)) CHECK((rowlen == collen) || (rowlen == 1) || (collen == 1))
<< "Invalid row and col id array."; << "Invalid row and col id array.";
...@@ -190,7 +191,34 @@ std::vector<NDArray> COOGetDataAndIndices( ...@@ -190,7 +191,34 @@ std::vector<NDArray> COOGetDataAndIndices(
std::vector<IdType> ret_rows, ret_cols; std::vector<IdType> ret_rows, ret_cols;
std::vector<IdType> ret_data; std::vector<IdType> ret_data;
ret_rows.reserve(len);
ret_cols.reserve(len);
ret_data.reserve(len);
// NOTE(BarclayII): With a small number of lookups, linear scan is faster.
// The threshold 200 comes from benchmarking both algorithms on a P3.8x instance.
// I also tried sorting plus binary search. The speed gain is only significant for
// medium-sized graphs and lookups, so I didn't include it.
if (len >= 200) {
// TODO(BarclayII) Ideally we would want to cache this object. However I'm not sure
// what is the best way to do so since this object is valid for CPU only.
std::unordered_multimap<std::pair<IdType, IdType>, IdType, PairHash> pair_map;
pair_map.reserve(coo.row->shape[0]);
for (int64_t k = 0; k < coo.row->shape[0]; ++k)
pair_map.emplace(std::make_pair(coo_row_data[k], coo_col_data[k]), data ? data[k]: k);
for (int64_t i = 0, j = 0; i < rowlen && j < collen; i += row_stride, j += col_stride) {
const IdType row_id = row_data[i], col_id = col_data[j];
CHECK(row_id >= 0 && row_id < coo.num_rows) << "Invalid row index: " << row_id;
CHECK(col_id >= 0 && col_id < coo.num_cols) << "Invalid col index: " << col_id;
auto range = pair_map.equal_range({row_id, col_id});
for (auto it = range.first; it != range.second; ++it) {
ret_rows.push_back(row_id);
ret_cols.push_back(col_id);
ret_data.push_back(it->second);
}
}
} else {
for (int64_t i = 0, j = 0; i < rowlen && j < collen; i += row_stride, j += col_stride) { for (int64_t i = 0, j = 0; i < rowlen && j < collen; i += row_stride, j += col_stride) {
const IdType row_id = row_data[i], col_id = col_data[j]; const IdType row_id = row_data[i], col_id = col_data[j];
CHECK(row_id >= 0 && row_id < coo.num_rows) << "Invalid row index: " << row_id; CHECK(row_id >= 0 && row_id < coo.num_rows) << "Invalid row index: " << row_id;
...@@ -203,6 +231,7 @@ std::vector<NDArray> COOGetDataAndIndices( ...@@ -203,6 +231,7 @@ std::vector<NDArray> COOGetDataAndIndices(
} }
} }
} }
}
return {NDArray::FromVector(ret_rows), return {NDArray::FromVector(ret_rows),
NDArray::FromVector(ret_cols), NDArray::FromVector(ret_cols),
......
...@@ -829,7 +829,7 @@ IdArray UnitGraph::Successors(dgl_type_t etype, dgl_id_t src) const { ...@@ -829,7 +829,7 @@ IdArray UnitGraph::Successors(dgl_type_t etype, dgl_id_t src) const {
} }
IdArray UnitGraph::EdgeId(dgl_type_t etype, dgl_id_t src, dgl_id_t dst) const { IdArray UnitGraph::EdgeId(dgl_type_t etype, dgl_id_t src, dgl_id_t dst) const {
const SparseFormat fmt = SelectFormat(SparseFormat::kAny); const SparseFormat fmt = SelectFormat(SparseFormat::kCSR);
const auto ptr = GetFormat(fmt); const auto ptr = GetFormat(fmt);
if (fmt == SparseFormat::kCSC) if (fmt == SparseFormat::kCSC)
return ptr->EdgeId(etype, dst, src); return ptr->EdgeId(etype, dst, src);
...@@ -838,7 +838,7 @@ IdArray UnitGraph::EdgeId(dgl_type_t etype, dgl_id_t src, dgl_id_t dst) const { ...@@ -838,7 +838,7 @@ IdArray UnitGraph::EdgeId(dgl_type_t etype, dgl_id_t src, dgl_id_t dst) const {
} }
EdgeArray UnitGraph::EdgeIds(dgl_type_t etype, IdArray src, IdArray dst) const { EdgeArray UnitGraph::EdgeIds(dgl_type_t etype, IdArray src, IdArray dst) const {
const SparseFormat fmt = SelectFormat(SparseFormat::kAny); const SparseFormat fmt = SelectFormat(SparseFormat::kCSR);
const auto ptr = GetFormat(fmt); const auto ptr = GetFormat(fmt);
if (fmt == SparseFormat::kCSC) { if (fmt == SparseFormat::kCSC) {
EdgeArray edges = ptr->EdgeIds(etype, dst, src); EdgeArray edges = ptr->EdgeIds(etype, dst, src);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment