Unverified Commit 8b64037e authored by xiang song(charlie.song)'s avatar xiang song(charlie.song) Committed by GitHub
Browse files

[Example] cluster-gat for ogb (#1875)



* cluster-gat for ogb

* Fix new graph

* Inference using minibatch style

* update readme

* Update params
Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-34-186.ec2.internal>
Co-authored-by: default avatarZihao Ye <expye@outlook.com>
parent e5e1a99d
# ClusterGAT
Params: 1540848
## OGB Products
Run `main.py` and you should directly see the result.
Valid over 10 runs: 0.8985 ± 0.00224
Accuracy over 10 runs: 0.79232 ± 0.007786
import dgl
from functools import partial
import numpy as np
import torch as th
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.multiprocessing as mp
from torch.utils.data import DataLoader
import dgl.function as fn
import dgl.nn.pytorch as dglnn
import time
import argparse
from _thread import start_new_thread
from functools import wraps
from dgl.data import RedditDataset
import tqdm
import traceback
from ogb.nodeproppred import DglNodePropPredDataset
from sampler import ClusterIter, subgraph_collate_fn
#### Neighbor sampler
class GAT(nn.Module):
def __init__(self,
in_feats,
num_heads,
n_hidden,
n_classes,
n_layers,
activation,
dropout=0.):
super().__init__()
self.n_layers = n_layers
self.n_hidden = n_hidden
self.n_classes = n_classes
self.layers = nn.ModuleList()
self.num_heads = num_heads
self.layers.append(dglnn.GATConv(in_feats,
n_hidden,
num_heads=num_heads,
feat_drop=dropout,
attn_drop=dropout,
activation=activation,
negative_slope=0.2))
for i in range(1, n_layers - 1):
self.layers.append(dglnn.GATConv(n_hidden * num_heads,
n_hidden,
num_heads=num_heads,
feat_drop=dropout,
attn_drop=dropout,
activation=activation,
negative_slope=0.2))
self.layers.append(dglnn.GATConv(n_hidden * num_heads,
n_classes,
num_heads=num_heads,
feat_drop=dropout,
attn_drop=dropout,
activation=None,
negative_slope=0.2))
def forward(self, g, x):
h = x
for l, conv in enumerate(self.layers):
h = conv(g, h)
if l < len(self.layers) - 1:
h = h.flatten(1)
h = h.mean(1)
return h.log_softmax(dim=-1)
def inference(self, g, x, batch_size, device):
"""
Inference with the GAT model on full neighbors (i.e. without neighbor sampling).
g : the entire graph.
x : the input of entire node set.
The inference code is written in a fashion that it could handle any number of nodes and
layers.
"""
num_heads = self.num_heads
nodes = th.arange(g.number_of_nodes())
for l, layer in enumerate(self.layers):
if l < self.n_layers - 1:
y = th.zeros(g.number_of_nodes(), self.n_hidden * num_heads if l != len(self.layers) - 1 else self.n_classes)
else:
y = th.zeros(g.number_of_nodes(), self.n_hidden if l != len(self.layers) - 1 else self.n_classes)
sampler = dgl.sampling.MultiLayerNeighborSampler([None])
dataloader = dgl.sampling.NodeDataLoader(
g,
th.arange(g.number_of_nodes()),
sampler,
batch_size=batch_size,
shuffle=False,
drop_last=False,
num_workers=args.num_workers)
layer.fc_src = layer.fc
layer.fc_dst = layer.fc
for input_nodes, output_nodes, blocks in tqdm.tqdm(dataloader):
block = blocks[0].to(device)
h = x[input_nodes].to(device)
h_dst = h[:block.number_of_dst_nodes()].to(device)
if l < self.n_layers - 1:
h = layer(block, (h, h_dst)).flatten(1)
else:
h = layer(block, (h, h_dst))
h = h.mean(1)
h = h.log_softmax(dim=-1)
y[output_nodes] = h.cpu()
x = y
return y
def compute_acc(pred, labels):
"""
Compute the accuracy of prediction given the labels.
"""
return (th.argmax(pred, dim=1) == labels).float().sum() / len(pred)
def evaluate(model, g, labels, val_nid, test_nid, batch_size, device):
"""
Evaluate the model on the validation set specified by ``val_mask``.
g : The entire graph.
inputs : The features of all the nodes.
labels : The labels of all the nodes.
val_mask : A 0-1 mask indicating which nodes do we actually compute the accuracy for.
batch_size : Number of nodes to compute at the same time.
device : The GPU device to evaluate on.
"""
model.eval()
with th.no_grad():
inputs = g.ndata['feat']
pred = model.inference(g, inputs, batch_size, device)
model.train()
return compute_acc(pred[val_nid], labels[val_nid]), compute_acc(pred[test_nid], labels[test_nid]), pred
def model_param_summary(model):
""" Count the model parameters """
cnt = sum(p.numel() for p in model.parameters() if p.requires_grad)
print("Total Params {}".format(cnt))
#### Entry point
def run(args, device, data):
# Unpack data
train_nid, val_nid, test_nid, in_feats, labels, n_classes, g, cluster_iterator = data
# Define model and optimizer
model = GAT(in_feats, args.num_heads, args.num_hidden, n_classes, args.num_layers, F.relu, args.dropout)
model_param_summary(model)
model = model.to(device)
optimizer = optim.Adam(model.parameters(), lr=args.lr, weight_decay=args.wd)
# Training loop
avg = 0
best_eval_acc = 0
best_test_acc = 0
for epoch in range(args.num_epochs):
iter_load = 0
iter_far = 0
iter_back = 0
tic = time.time()
# Loop over the dataloader to sample the computation dependency graph as a list of
# blocks.
tic_start = time.time()
for step, cluster in enumerate(cluster_iterator):
cluster = cluster.to(device)
mask = cluster.ndata['train_mask']
if mask.sum() == 0:
continue
feat = cluster.ndata['feat']
batch_labels = cluster.ndata['labels']
tic_step = time.time()
# Compute loss and prediction
batch_pred = model(cluster, feat)
batch_pred = batch_pred[mask]
batch_labels = batch_labels[mask]
loss = nn.functional.nll_loss(batch_pred, batch_labels)
optimizer.zero_grad()
tic_far = time.time()
loss.backward()
optimizer.step()
tic_back = time.time()
iter_load += (tic_step - tic_start)
iter_far += (tic_far - tic_step)
iter_back += (tic_back - tic_far)
if step % args.log_every == 0:
acc = compute_acc(batch_pred, batch_labels)
gpu_mem_alloc = th.cuda.max_memory_allocated() / 1000000 if th.cuda.is_available() else 0
print('Epoch {:05d} | Step {:05d} | Loss {:.4f} | Train Acc {:.4f} | GPU {:.1f} MiB'.format(
epoch, step, loss.item(), acc.item(), gpu_mem_alloc))
tic_start = time.time()
toc = time.time()
print('Epoch Time(s): {:.4f} Load {:.4f} Forward {:.4f} Backward {:.4f}'.format(toc - tic, iter_load, iter_far, iter_back))
if epoch >= 5:
avg += toc - tic
if epoch % args.eval_every == 0 and epoch != 0:
eval_acc, test_acc, pred = evaluate(model, g, labels, val_nid, test_nid, args.val_batch_size, device)
model = model.to(device)
if args.save_pred:
np.savetxt(args.save_pred + '%02d' % epoch, pred.argmax(1).cpu().numpy(), '%d')
print('Eval Acc {:.4f}'.format(eval_acc))
if eval_acc > best_eval_acc:
best_eval_acc = eval_acc
best_test_acc = test_acc
print('Best Eval Acc {:.4f} Test Acc {:.4f}'.format(best_eval_acc, best_test_acc))
print('Avg epoch time: {}'.format(avg / (epoch - 4)))
return best_test_acc
if __name__ == '__main__':
argparser = argparse.ArgumentParser("multi-gpu training")
argparser.add_argument('--gpu', type=int, default=0,
help="GPU device ID. Use -1 for CPU training")
argparser.add_argument('--num-epochs', type=int, default=20)
argparser.add_argument('--num-hidden', type=int, default=128)
argparser.add_argument('--num-layers', type=int, default=3)
argparser.add_argument('--num-heads', type=int, default=8)
argparser.add_argument('--batch-size', type=int, default=32)
argparser.add_argument('--val-batch-size', type=int, default=2000)
argparser.add_argument('--log-every', type=int, default=20)
argparser.add_argument('--eval-every', type=int, default=1)
argparser.add_argument('--lr', type=float, default=0.001)
argparser.add_argument('--dropout', type=float, default=0.5)
argparser.add_argument('--save-pred', type=str, default='')
argparser.add_argument('--wd', type=float, default=0)
argparser.add_argument('--num_partitions', type=int, default=15000)
argparser.add_argument('--num-workers', type=int, default=0)
args = argparser.parse_args()
if args.gpu >= 0:
device = th.device('cuda:%d' % args.gpu)
else:
device = th.device('cpu')
# load reddit data
data = DglNodePropPredDataset(name='ogbn-products')
splitted_idx = data.get_idx_split()
train_idx, val_idx, test_idx = splitted_idx['train'], splitted_idx['valid'], splitted_idx['test']
graph, labels = data[0]
labels = labels[:, 0]
print('Total edges before adding self-loop {}'.format(graph.number_of_edges()))
graph = dgl.remove_self_loop(graph)
graph = dgl.add_self_loop(graph)
print('Total edges after adding self-loop {}'.format(graph.number_of_edges()))
num_nodes = train_idx.shape[0] + val_idx.shape[0] + test_idx.shape[0]
assert num_nodes == graph.number_of_nodes()
graph.ndata['labels'] = labels
mask = th.zeros(num_nodes, dtype=th.bool)
mask[train_idx] = True
graph.ndata['train_mask'] = mask
mask = th.zeros(num_nodes, dtype=th.bool)
mask[val_idx] = True
graph.ndata['valid_mask'] = mask
mask = th.zeros(num_nodes, dtype=th.bool)
mask[test_idx] = True
graph.ndata['test_mask'] = mask
graph.in_degrees(0)
graph.out_degrees(0)
graph.find_edges(0)
cluster_iter_data = ClusterIter(
'ogbn-products', graph, args.num_partitions, args.batch_size)
cluster_iterator = DataLoader(cluster_iter_data, batch_size=args.batch_size, shuffle=True, pin_memory=True, num_workers=0, collate_fn=partial(subgraph_collate_fn, graph))
in_feats = graph.ndata['feat'].shape[1]
n_classes = (labels.max() + 1).item()
# Pack data
data = train_idx, val_idx, test_idx, in_feats, labels, n_classes, graph, cluster_iterator
# Run 10 times
test_accs = []
for i in range(10):
test_accs.append(run(args, device, data))
print('Average test accuracy:', np.mean(test_accs), '±', np.std(test_accs))
from time import time
import numpy as np
import dgl
from dgl.transform import metis_partition
from dgl import backend as F
def get_partition_list(g, psize):
p_gs = metis_partition(g, psize)
graphs = []
for k, val in p_gs.items():
nids = val.ndata[dgl.NID]
nids = F.asnumpy(nids)
graphs.append(nids)
return graphs
import os
import random
import dgl.function as fn
import torch
import time
from partition_utils import *
class ClusterIter(object):
'''The partition sampler given a DGLGraph and partition number.
The metis is used as the graph partition backend.
'''
def __init__(self, dn, g, psize, batch_size):
"""Initialize the sampler.
Paramters
---------
dn : str
The dataset name.
g : DGLGraph
The full graph of dataset
psize: int
The partition number
batch_size: int
The number of partitions in one batch
"""
self.psize = psize
self.batch_size = batch_size
# cache the partitions of known datasets&partition number
if dn:
fn = os.path.join('./datasets/', dn + '_{}.npy'.format(psize))
if os.path.exists(fn):
self.par_li = np.load(fn, allow_pickle=True)
else:
os.makedirs('./datasets/', exist_ok=True)
self.par_li = get_partition_list(g, psize)
np.save(fn, self.par_li)
else:
self.par_li = get_partition_list(g, psize)
par_list = []
for p in self.par_li:
par = torch.Tensor(p)
par_list.append(par)
self.par_list = par_list
def __len__(self):
return self.psize
def __getitem__(self, idx):
return self.par_li[idx]
def subgraph_collate_fn(g, batch):
nids = np.concatenate(batch).reshape(-1).astype(np.int64)
g1 = g.subgraph(nids)
g1 = dgl.remove_self_loop(g1)
g1 = dgl.add_self_loop(g1)
return g1
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment