Unverified Commit f8b3ebce authored by xiang song(charlie.song)'s avatar xiang song(charlie.song) Committed by GitHub
Browse files

[Regression Test] Add sage neighbor sample test for reddit (#2449)



* Add sage neighbor sample test for reddit

* Add ogbn-products dataset

* upd

* upd

* use symbolic other than copy data for ogb datasets

* upd

* upd

* Add graphsage unsupervised neighbor sampler
Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-59-204.ec2.internal>
Co-authored-by: default avatarMinjie Wang <wmjlyjemaine@gmail.com>
parent 3d1f2e87
......@@ -105,7 +105,7 @@ def track_time(l, u):
Tips
----
* Feed flags `-e --verbose` to `asv run` to print out stderr and more information. Use `--bench` flag
to run specific benchmarks.
to run specific benchmarks, e.g., `--bench bench_gat`.
* When running benchmarks locally (e.g., with `--python=same`), ASV will not write results to disk
so `asv publish` will not generate plots.
* When running benchmarks in docker, ASV will pull the codes from remote and build them in conda
......@@ -115,3 +115,4 @@ Tips
- Commit your local changes and push it to remote `origin`.
- Add the corresponding branch to `asv.conf.json`.
* Try make your benchmarks compatible with all the versions being tested.
* For ogbn dataset, put the dataset into /tmp/dataset/
import dgl
import torch as th
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.multiprocessing as mp
from torch.utils.data import DataLoader
import dgl.nn.pytorch as dglnn
import time
from .. import utils
class SAGE(nn.Module):
def __init__(self,
in_feats,
n_hidden,
n_classes,
n_layers,
activation,
dropout):
super().__init__()
self.n_layers = n_layers
self.n_hidden = n_hidden
self.n_classes = n_classes
self.layers = nn.ModuleList()
self.layers.append(dglnn.SAGEConv(in_feats, n_hidden, 'mean'))
for i in range(1, n_layers - 1):
self.layers.append(dglnn.SAGEConv(n_hidden, n_hidden, 'mean'))
self.layers.append(dglnn.SAGEConv(n_hidden, n_classes, 'mean'))
self.dropout = nn.Dropout(dropout)
self.activation = activation
def forward(self, blocks, x):
h = x
for l, (layer, block) in enumerate(zip(self.layers, blocks)):
h = layer(block, h)
if l != len(self.layers) - 1:
h = self.activation(h)
h = self.dropout(h)
return h
def load_subtensor(g, seeds, input_nodes, device):
"""
Copys features and labels of a set of nodes onto GPU.
"""
batch_inputs = g.ndata['features'][input_nodes].to(device)
batch_labels = g.ndata['labels'][seeds].to(device)
return batch_inputs, batch_labels
@utils.benchmark('time', 3600)
@utils.parametrize('data', ['reddit', 'ogbn-products'])
def track_time(data):
data = utils.process_data(data)
device = utils.get_bench_device()
g = data[0]
g.ndata['features'] = g.ndata['feat']
g.ndata['labels'] = g.ndata['label']
in_feats = g.ndata['features'].shape[1]
n_classes = data.num_labels
# Create csr/coo/csc formats before launching training processes with multi-gpu.
# This avoids creating certain formats in each sub-process, which saves momory and CPU.
g.create_formats_()
num_epochs = 20
num_hidden = 16
num_layers = 2
fan_out = '10,25'
batch_size = 1024
lr = 0.003
dropout = 0.5
num_workers = 4
train_nid = th.nonzero(g.ndata['train_mask'], as_tuple=True)[0]
# Create PyTorch DataLoader for constructing blocks
sampler = dgl.dataloading.MultiLayerNeighborSampler(
[int(fanout) for fanout in fan_out.split(',')])
dataloader = dgl.dataloading.NodeDataLoader(
g,
train_nid,
sampler,
batch_size=batch_size,
shuffle=True,
drop_last=False,
num_workers=num_workers)
# Define model and optimizer
model = SAGE(in_feats, num_hidden, n_classes, num_layers, F.relu, dropout)
model = model.to(device)
loss_fcn = nn.CrossEntropyLoss()
loss_fcn = loss_fcn.to(device)
optimizer = optim.Adam(model.parameters(), lr=lr)
# dry run one epoch
for step, (input_nodes, seeds, blocks) in enumerate(dataloader):
# Load the input features as well as output labels
#batch_inputs, batch_labels = load_subtensor(g, seeds, input_nodes, device)
blocks = [block.int().to(device) for block in blocks]
batch_inputs = blocks[0].srcdata['features']
batch_labels = blocks[-1].dstdata['labels']
# Compute loss and prediction
batch_pred = model(blocks, batch_inputs)
loss = loss_fcn(batch_pred, batch_labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# Training loop
avg = 0
iter_tput = []
t0 = time.time()
for epoch in range(num_epochs):
# Loop over the dataloader to sample the computation dependency graph as a list of
# blocks.
for step, (input_nodes, seeds, blocks) in enumerate(dataloader):
# Load the input features as well as output labels
#batch_inputs, batch_labels = load_subtensor(g, seeds, input_nodes, device)
blocks = [block.int().to(device) for block in blocks]
batch_inputs = blocks[0].srcdata['features']
batch_labels = blocks[-1].dstdata['labels']
# Compute loss and prediction
batch_pred = model(blocks, batch_inputs)
loss = loss_fcn(batch_pred, batch_labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
t1 = time.time()
return t1 - t0
import dgl
import numpy as np
import torch as th
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.multiprocessing as mp
from torch.utils.data import DataLoader
import dgl.nn.pytorch as dglnn
import dgl.function as fn
import time
import traceback
from .. import utils
class NegativeSampler(object):
def __init__(self, g, k, neg_share=False):
self.weights = g.in_degrees().float() ** 0.75
self.k = k
self.neg_share = neg_share
def __call__(self, g, eids):
src, _ = g.find_edges(eids)
n = len(src)
if self.neg_share and n % self.k == 0:
dst = self.weights.multinomial(n, replacement=True)
dst = dst.view(-1, 1, self.k).expand(-1, self.k, -1).flatten()
else:
dst = self.weights.multinomial(n*self.k, replacement=True)
src = src.repeat_interleave(self.k)
return src, dst
def load_subtensor(g, input_nodes, device):
"""
Copys features and labels of a set of nodes onto GPU.
"""
batch_inputs = g.ndata['features'][input_nodes].to(device)
return batch_inputs
class SAGE(nn.Module):
def __init__(self,
in_feats,
n_hidden,
n_classes,
n_layers,
activation,
dropout):
super().__init__()
self.n_layers = n_layers
self.n_hidden = n_hidden
self.n_classes = n_classes
self.layers = nn.ModuleList()
self.layers.append(dglnn.SAGEConv(in_feats, n_hidden, 'mean'))
for i in range(1, n_layers - 1):
self.layers.append(dglnn.SAGEConv(n_hidden, n_hidden, 'mean'))
self.layers.append(dglnn.SAGEConv(n_hidden, n_classes, 'mean'))
self.dropout = nn.Dropout(dropout)
self.activation = activation
def forward(self, blocks, x):
h = x
for l, (layer, block) in enumerate(zip(self.layers, blocks)):
h = layer(block, h)
if l != len(self.layers) - 1:
h = self.activation(h)
h = self.dropout(h)
return h
def load_subtensor(g, input_nodes, device):
"""
Copys features and labels of a set of nodes onto GPU.
"""
batch_inputs = g.ndata['features'][input_nodes].to(device)
return batch_inputs
class CrossEntropyLoss(nn.Module):
def forward(self, block_outputs, pos_graph, neg_graph):
with pos_graph.local_scope():
pos_graph.ndata['h'] = block_outputs
pos_graph.apply_edges(fn.u_dot_v('h', 'h', 'score'))
pos_score = pos_graph.edata['score']
with neg_graph.local_scope():
neg_graph.ndata['h'] = block_outputs
neg_graph.apply_edges(fn.u_dot_v('h', 'h', 'score'))
neg_score = neg_graph.edata['score']
score = th.cat([pos_score, neg_score])
label = th.cat([th.ones_like(pos_score), th.zeros_like(neg_score)]).long()
loss = F.binary_cross_entropy_with_logits(score, label.float())
return loss
@utils.benchmark('time', 3600)
@utils.parametrize('data', ['reddit', 'ogbn-products'])
def track_time(data):
data = utils.process_data(data)
device = utils.get_bench_device()
g = data[0]
g.ndata['features'] = g.ndata['feat']
g.ndata['labels'] = g.ndata['label']
in_feats = g.ndata['features'].shape[1]
n_classes = data.num_labels
# Create csr/coo/csc formats before launching training processes with multi-gpu.
# This avoids creating certain formats in each sub-process, which saves momory and CPU.
g.create_formats_()
num_epochs = 5
num_hidden = 16
num_layers = 2
fan_out = '10,25'
batch_size = 1024
lr = 0.003
dropout = 0.5
num_workers = 4
num_negs = 4
train_nid = th.nonzero(g.ndata['train_mask'], as_tuple=True)[0]
n_edges = g.number_of_edges()
train_seeds = np.arange(n_edges)
# Create PyTorch DataLoader for constructing blocks
sampler = dgl.dataloading.MultiLayerNeighborSampler(
[int(fanout) for fanout in fan_out.split(',')])
dataloader = dgl.dataloading.EdgeDataLoader(
g, train_seeds, sampler, exclude='reverse_id',
# For each edge with ID e in Reddit dataset, the reverse edge is e ± |E|/2.
reverse_eids=th.cat([
th.arange(n_edges // 2, n_edges),
th.arange(0, n_edges // 2)]),
negative_sampler=NegativeSampler(g, num_negs),
batch_size=batch_size,
shuffle=True,
drop_last=False,
pin_memory=True,
num_workers=num_workers)
# Define model and optimizer
model = SAGE(in_feats, num_hidden, n_classes, num_layers, F.relu, dropout)
model = model.to(device)
loss_fcn = CrossEntropyLoss()
loss_fcn = loss_fcn.to(device)
optimizer = optim.Adam(model.parameters(), lr=lr)
# dry run one epoch
for step, (input_nodes, pos_graph, neg_graph, blocks) in enumerate(dataloader):
# Load the input features as well as output labels
#batch_inputs, batch_labels = load_subtensor(g, seeds, input_nodes, device)
batch_inputs = load_subtensor(g, input_nodes, device)
pos_graph = pos_graph.to(device)
neg_graph = neg_graph.to(device)
blocks = [block.int().to(device) for block in blocks]
# Compute loss and prediction
batch_pred = model(blocks, batch_inputs)
loss = loss_fcn(batch_pred, pos_graph, neg_graph)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# Training loop
avg = 0
iter_tput = []
t0 = time.time()
for epoch in range(num_epochs):
for step, (input_nodes, pos_graph, neg_graph, blocks) in enumerate(dataloader):
# Load the input features as well as output labels
#batch_inputs, batch_labels = load_subtensor(g, seeds, input_nodes, device)
batch_inputs = load_subtensor(g, input_nodes, device)
pos_graph = pos_graph.to(device)
neg_graph = neg_graph.to(device)
blocks = [block.int().to(device) for block in blocks]
# Compute loss and prediction
batch_pred = model(blocks, batch_inputs)
loss = loss_fcn(batch_pred, pos_graph, neg_graph)
optimizer.zero_grad()
loss.backward()
optimizer.step()
t1 = time.time()
return t1 - t0
......@@ -37,11 +37,61 @@ def get_graph(name):
print(name + " doesn't exist")
return None
class ogb_data(object):
def __init__(self, g, num_labels):
self._g = g
self._num_labels = num_labels
@property
def num_labels(self):
return self._num_labels
@property
def num_classes(self):
return self._num_labels
def __getitem__(self, idx):
return self._g
def load_ogb_product(name):
from ogb.nodeproppred import DglNodePropPredDataset
os.symlink('/tmp/dataset/', os.path.join(os.getcwd(), 'dataset'))
print('load', name)
data = DglNodePropPredDataset(name=name)
print('finish loading', name)
splitted_idx = data.get_idx_split()
graph, labels = data[0]
labels = labels[:, 0]
graph.ndata['label'] = labels
in_feats = graph.ndata['feat'].shape[1]
num_labels = len(torch.unique(labels[torch.logical_not(torch.isnan(labels))]))
# Find the node IDs in the training, validation, and test set.
train_nid, val_nid, test_nid = splitted_idx['train'], splitted_idx['valid'], splitted_idx['test']
train_mask = torch.zeros((graph.number_of_nodes(),), dtype=torch.bool)
train_mask[train_nid] = True
val_mask = torch.zeros((graph.number_of_nodes(),), dtype=torch.bool)
val_mask[val_nid] = True
test_mask = torch.zeros((graph.number_of_nodes(),), dtype=torch.bool)
test_mask[test_nid] = True
graph.ndata['train_mask'] = train_mask
graph.ndata['val_mask'] = val_mask
graph.ndata['test_mask'] = test_mask
return ogb_data(graph, num_labels)
def process_data(name):
if name == 'cora':
return dgl.data.CoraGraphDataset()
elif name == 'pubmed':
return dgl.data.PubmedGraphDataset()
elif name == 'reddit':
return dgl.data.RedditDataset(self_loop=True)
elif name == 'ogbn-products':
return load_ogb_product('ogbn-products')
else:
raise ValueError('Invalid dataset name:', name)
......@@ -79,10 +129,11 @@ def parametrize(param_name, params):
return func
return _wrapper
def benchmark(track_type):
def benchmark(track_type, timeout=60):
assert track_type in ['time', 'acc']
def _wrapper(func):
func.unit = TRACK_UNITS[track_type]
func.setup = TRACK_SETUP[track_type]
func.timeout = timeout
return func
return _wrapper
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment