"src/git@developer.sourcefind.cn:OpenDAS/dgl.git" did not exist on "91cb347719da745fe0bd309607ce1c272604acdb"
Commit 51a73504 authored by Zardinality's avatar Zardinality Committed by Mufei Li
Browse files

[Model] Add Pytorch example for Cluster GCN (#877)

* initial commit of cluster GCN

* update readme

* fix small bugs running ppi

* nearly sota ppi training script&update readme

* rm unused imports&add shebang line to scripts

* minor comments&readme appended

* add rnd seed control&update readme
parent c03046a0
Cluster-GCN: An Efficient Algorithm for Training Deep and Large Graph Convolutional Networks
============
- Paper link: [Cluster-GCN: An Efficient Algorithm for Training Deep and Large Graph Convolutional Networks](https://arxiv.org/abs/1905.07953)
- Author's code repo: [https://github.com/google-research/google-research/blob/master/cluster_gcn/](https://github.com/google-research/google-research/blob/master/cluster_gcn/).
This repo reproduce the reported speed and performance maximally on Reddit and PPI. However, the diag enhancement is not covered, as the GraphSage aggregator already achieves satisfying F1 score.
Dependencies
------------
- Python 3.7+(for string formatting features)
- PyTorch 1.1.0+
- metis
- sklearn
* install clustering toolkit: metis and its Python interface.
download and install metis: http://glaros.dtc.umn.edu/gkhome/metis/metis/download
METIS - Serial Graph Partitioning and Fill-reducing Matrix Ordering ([official website](http://glaros.dtc.umn.edu/gkhome/metis/metis/overview))
```
1) Download metis-5.1.0.tar.gz from http://glaros.dtc.umn.edu/gkhome/metis/metis/download and unpack it
2) cd metis-5.1.0
3) make config shared=1 prefix=~/.local/
4) make install
5) export METIS_DLL=~/.local/lib/libmetis.so
6) `pip install metis`
```
quick test to see whether you install metis correctly:
```
>>> import networkx as nx
>>> import metis
>>> G = metis.example_networkx()
>>> (edgecuts, parts) = metis.part_graph(G, 3)
```
## Run Experiments.
* For reddit data, you may run the following scripts
```
./run_reddit.sh
```
You should be able to see the final test F1 is around `Test F1-mic0.9612, Test F1-mac0.9399`.
Note that the first run of provided script is considerably slow than reported in the paper, which is presumably due to dataloader used. After caching the partition allocation, the overall speed would be in a normal scale. On a 1080Ti and Intel(R) Xeon(R) Bronze 3104 CPU @ 1.70GHz machine I am able to train it within 45s. After the first epoch the F1-mic on Validation dataset should be around `0.93`.
* For PPI data, you may run the following scripts
```
./run_ppi.sh
```
You should be able to see the final test F1 is around `Test F1-mic0.9924, Test F1-mac0.9917`. The training finished in 10 mins.
import argparse
import os
import time
import random
import numpy as np
import sklearn.preprocessing
import torch
import torch.nn as nn
import torch.nn.functional as F
from dgl import DGLGraph
from dgl.data import register_data_args
from torch.utils.tensorboard import SummaryWriter
from modules import GCNCluster, GraphSAGE
from sampler import ClusterIter
from utils import Logger, evaluate, save_log_dir, load_data
def main(args):
torch.manual_seed(args.rnd_seed)
np.random.seed(args.rnd_seed)
random.seed(args.rnd_seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
multitask_data = set(['ppi', 'amazon', 'amazon-0.1',
'amazon-0.3', 'amazon2M', 'amazon2M-47'])
multitask = args.dataset in multitask_data
# load and preprocess dataset
data = load_data(args)
train_nid = np.nonzero(data.train_mask)[0].astype(np.int64)
test_nid = np.nonzero(data.test_mask)[0].astype(np.int64)
# Normalize features
if args.normalize:
train_feats = data.features[train_nid]
scaler = sklearn.preprocessing.StandardScaler()
scaler.fit(train_feats)
features = scaler.transform(data.features)
else:
features = data.features
features = torch.FloatTensor(features)
if not multitask:
labels = torch.LongTensor(data.labels)
else:
labels = torch.FloatTensor(data.labels)
train_mask = torch.ByteTensor(data.train_mask).type(torch.bool)
val_mask = torch.ByteTensor(data.val_mask).type(torch.bool)
test_mask = torch.ByteTensor(data.test_mask).type(torch.bool)
in_feats = features.shape[1]
n_classes = data.num_labels
n_edges = data.graph.number_of_edges()
n_train_samples = train_mask.sum().item()
n_val_samples = val_mask.sum().item()
n_test_samples = test_mask.sum().item()
print("""----Data statistics------'
#Edges %d
#Classes %d
#Train samples %d
#Val samples %d
#Test samples %d""" %
(n_edges, n_classes,
n_train_samples,
n_val_samples,
n_test_samples))
# create GCN model
g = data.graph
if args.self_loop and not args.dataset.startswith('reddit'):
g.remove_edges_from(g.selfloop_edges())
g.add_edges_from(zip(g.nodes(), g.nodes()))
print("adding self-loop edges")
g = DGLGraph(g, readonly=True)
# set device for dataset tensors
if args.gpu < 0:
cuda = False
else:
cuda = True
torch.cuda.set_device(args.gpu)
features = features.cuda()
labels = labels.cuda()
train_mask = train_mask.cuda()
val_mask = val_mask.cuda()
test_mask = test_mask.cuda()
print(torch.cuda.get_device_name(0))
g.ndata['features'] = features
g.ndata['labels'] = labels
g.ndata['train_mask'] = train_mask
print('labels shape:', labels.shape)
cluster_iterator = ClusterIter(
args.dataset, g, args.psize, args.batch_size, train_nid, use_pp=args.use_pp)
print("features shape, ", features.shape)
model_sel = {'GCN': GCNCluster, 'graphsage': GraphSAGE}
model_class = model_sel[args.model_type]
print('using model:', model_class)
model = model_class(in_feats,
args.n_hidden,
n_classes,
args.n_layers,
F.relu,
args.dropout, args.use_pp)
if cuda:
model.cuda()
# logger and so on
log_dir = save_log_dir(args)
writer = SummaryWriter(log_dir)
logger = Logger(os.path.join(log_dir, 'loggings'))
logger.write(args)
# Loss function
if multitask:
print('Using multi-label loss')
loss_f = nn.BCEWithLogitsLoss()
else:
print('Using multi-class loss')
loss_f = nn.CrossEntropyLoss()
# use optimizer
optimizer = torch.optim.Adam(model.parameters(),
lr=args.lr,
weight_decay=args.weight_decay)
# initialize graph
dur = []
# set train_nids to cuda tensor
if cuda:
train_nid = torch.from_numpy(train_nid).cuda()
print("current memory after model before training",
torch.cuda.memory_allocated(device=train_nid.device) / 1024 / 1024)
start_time = time.time()
best_f1 = -1
for epoch in range(args.n_epochs):
for j, cluster in enumerate(cluster_iterator):
# sync with upper level training graph
cluster.copy_from_parent()
model.train()
# forward
pred = model(cluster)
batch_labels = cluster.ndata['labels']
batch_train_mask = cluster.ndata['train_mask']
loss = loss_f(pred[batch_train_mask],
batch_labels[batch_train_mask])
optimizer.zero_grad()
loss.backward()
optimizer.step()
# in PPI case, `log_every` is chosen to log one time per epoch.
# Choose your log freq dynamically when you want more info within one epoch
if j % args.log_every == 0:
print(f"epoch:{epoch}/{args.n_epochs}, Iteration {j}/{len(cluster_iterator)}:training loss", loss.item())
writer.add_scalar('train/loss', loss.item(),
global_step=j + epoch * len(cluster_iterator))
print("current memory:",
torch.cuda.memory_allocated(device=pred.device) / 1024 / 1024)
# evaluate
if epoch % args.val_every == 0:
val_f1_mic, val_f1_mac = evaluate(
model, g, labels, val_mask, multitask)
print(
"Val F1-mic{:.4f}, Val F1-mac{:.4f}". format(val_f1_mic, val_f1_mac))
if val_f1_mic > best_f1:
best_f1 = val_f1_mic
print('new best val f1:', best_f1)
torch.save(model.state_dict(), os.path.join(
log_dir, 'best_model.pkl'))
writer.add_scalar('val/f1-mic', val_f1_mic, global_step=epoch)
writer.add_scalar('val/f1-mac', val_f1_mac, global_step=epoch)
end_time = time.time()
print(f'training using time {start_time-end_time}')
# test
if args.use_val:
model.load_state_dict(torch.load(os.path.join(
log_dir, 'best_model.pkl')))
test_f1_mic, test_f1_mac = evaluate(
model, g, labels, test_mask, multitask)
print(
"Test F1-mic{:.4f}, Test F1-mac{:.4f}". format(test_f1_mic, test_f1_mac))
writer.add_scalar('test/f1-mic', test_f1_mic)
writer.add_scalar('test/f1-mac', test_f1_mac)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='GCN')
register_data_args(parser)
parser.add_argument("--dropout", type=float, default=0.5,
help="dropout probability")
parser.add_argument("--gpu", type=int, default=-1,
help="gpu")
parser.add_argument("--lr", type=float, default=3e-2,
help="learning rate")
parser.add_argument("--n-epochs", type=int, default=200,
help="number of training epochs")
parser.add_argument("--log-every", type=int, default=100,
help="number of training epochs")
parser.add_argument("--batch-size", type=int, default=20,
help="batch size")
parser.add_argument("--psize", type=int, default=1500,
help="partition number")
parser.add_argument("--test-batch-size", type=int, default=1000,
help="test batch size")
parser.add_argument("--n-hidden", type=int, default=16,
help="number of hidden gcn units")
parser.add_argument("--n-layers", type=int, default=1,
help="number of hidden gcn layers")
parser.add_argument("--val-every", type=int, default=1,
help="number of epoch of doing inference on validation")
parser.add_argument("--rnd-seed", type=int, default=3,
help="number of epoch of doing inference on validation")
parser.add_argument("--self-loop", action='store_true',
help="graph self-loop (default=False)")
parser.add_argument("--use-pp", action='store_true',
help="whether to use percomputation")
parser.add_argument("--normalize", action='store_true',
help="whether to use normalized feature")
parser.add_argument("--use-val", action='store_true',
help="whether to use validated best model to test")
parser.add_argument("--weight-decay", type=float, default=5e-4,
help="Weight for L2 loss")
parser.add_argument("--model-type", type=str, default='GCN',
help="model to be used")
parser.add_argument("--note", type=str, default='none',
help="note for log dir")
args = parser.parse_args()
print(args)
main(args)
import math
import dgl.function as fn
import torch
import torch.nn as nn
class GCNLayer(nn.Module):
def __init__(self,
in_feats,
out_feats,
activation,
dropout,
bias=True,
use_pp=False,
use_lynorm=True):
super(GCNLayer, self).__init__()
self.weight = nn.Parameter(torch.Tensor(in_feats, out_feats))
if bias:
self.bias = nn.Parameter(torch.Tensor(out_feats))
else:
self.bias = None
self.activation = activation
self.use_pp = use_pp
if dropout:
self.dropout = nn.Dropout(p=dropout)
else:
self.dropout = 0.
if use_lynorm:
self.lynorm = nn.LayerNorm(out_feats, elementwise_affine=True)
else:
self.lynorm = lambda x: x
self.reset_parameters()
def reset_parameters(self):
stdv = 1. / math.sqrt(self.weight.size(1))
self.weight.data.uniform_(-stdv, stdv)
if self.bias is not None:
self.bias.data.uniform_(-stdv, stdv)
def forward(self, g):
h = g.ndata['h']
norm = self.get_norm(g)
if not self.use_pp or not self.training:
g.ndata['h'] = h
g.update_all(fn.copy_src(src='h', out='m'),
fn.sum(msg='m', out='h'))
ah = g.ndata.pop('h')
h = self.concat(h, ah, norm)
if self.dropout:
h = self.dropout(h)
h = torch.mm(h, self.weight)
if self.bias is not None:
h = h + self.bias
h = self.lynorm(h)
if self.activation:
h = self.activation(h)
return h
def concat(self, h, ah, norm):
# normalization by square root of dst degree
return ah * norm
def get_norm(self, g):
norm = 1. / g.in_degrees().float().unsqueeze(1)
# .sqrt()
norm[torch.isinf(norm)] = 0
norm = norm.to(self.weight.device)
return norm
class GCNCluster(nn.Module):
def __init__(self,
in_feats,
n_hidden,
n_classes,
n_layers,
activation,
dropout,
use_pp):
super(GCNCluster, self).__init__()
self.layers = nn.ModuleList()
# input layer
self.layers.append(
GCNLayer(in_feats, n_hidden, activation=activation, dropout=dropout, use_pp=use_pp, use_lynorm=True))
# hidden layers
for i in range(n_layers):
self.layers.append(
GCNLayer(n_hidden, n_hidden, activation=activation, dropout=dropout, use_lynorm=True
))
# output layer
self.layers.append(GCNLayer(n_hidden, n_classes,
activation=None, dropout=dropout, use_lynorm=False))
def forward(self, g):
g.ndata['h'] = g.ndata['features']
for i, layer in enumerate(self.layers):
g.ndata['h'] = layer(g)
h = g.ndata.pop('h')
return h
class GCNLayerSAGE(GCNLayer):
def __init__(self, *args, **xargs):
super(GCNLayerSAGE, self).__init__(*args, **xargs)
in_feats, out_feats = self.weight.shape
self.weight = nn.Parameter(torch.Tensor(2 * in_feats, out_feats))
self.reset_parameters()
def concat(self, h, ah, norm):
ah = ah * norm
h = torch.cat((h, ah), dim=1)
return h
class GraphSAGE(nn.Module):
def __init__(self,
in_feats,
n_hidden,
n_classes,
n_layers,
activation,
dropout,
use_pp):
super(GraphSAGE, self).__init__()
self.layers = nn.ModuleList()
# input layer
self.layers.append(GCNLayerSAGE(in_feats, n_hidden, activation=activation,
dropout=dropout, use_pp=use_pp, use_lynorm=True))
# hidden layers
for i in range(n_layers - 1):
self.layers.append(
GCNLayerSAGE(n_hidden, n_hidden, activation=activation, dropout=dropout, use_pp=False, use_lynorm=True))
# output layer
self.layers.append(GCNLayerSAGE(n_hidden, n_classes, activation=None,
dropout=dropout, use_pp=False, use_lynorm=False))
def forward(self, g):
h = g.ndata['features']
g.ndata['h'] = h
for layer in self.layers:
g.ndata['h'] = layer(g)
h = g.ndata.pop('h')
return h
from time import time
import metis
import numpy as np
from utils import arg_list
def get_partition_list(g, psize):
tmp_time = time()
ng = g.to_networkx()
print("getting adj using time{:.4f}".format(time() - tmp_time))
print("run metis with partition size {}".format(psize))
_, nd_group = metis.part_graph(ng, psize)
print("metis finished in {} seconds.".format(time() - tmp_time))
print("train group {}".format(len(nd_group)))
al = arg_list(nd_group)
return al
def get_subgraph(g, par_arr, i, psize, batch_size):
par_batch_ind_arr = [par_arr[s] for s in range(
i * batch_size, (i + 1) * batch_size) if s < psize]
g1 = g.subgraph(np.concatenate(
par_batch_ind_arr).reshape(-1).astype(np.int64))
return g1
#!/bin/bash
python cluster_gcn.py --gpu 0 --dataset ppi --lr 1e-2 --weight-decay 0.0 --psize 50 --batch-size 1 --n-epochs 300 --n-hidden 2048 --n-layers 3 --log-every 100 --use-pp --self-loop --note self-loop-ppi-non-sym-ly3-pp-cluster-2-2-wd-0 --dropout 0.2 --model-type graphsage --use-val --normalize
\ No newline at end of file
#!/bin/bash
python cluster_gcn.py --gpu 0 --dataset reddit-self-loop --lr 1e-2 --weight-decay 0.0 --psize 1500 --batch-size 20 --n-epochs 30 --n-hidden 128 --n-layers 0 --log-every 100 --use-pp --self-loop --note self-loop-reddit-non-sym-ly3-pp-cluster-2-2-wd-5e-4 --dropout 0.2 --model-type graphsage --use-val --normalize
\ No newline at end of file
import os
import random
import dgl.function as fn
import numpy as np
import torch
from partition_utils import *
class ClusterIter(object):
'''
The partition sampler given a DGLGraph and partition number. The metis is used as the graph partition backend.
'''
def __init__(self, dn, g, psize, batch_size, seed_nid, use_pp=True):
"""Initialize the sampler.
Paramters
---------
dn : str
The dataset name.
g : DGLGraph
The full graph of dataset
psize: int
The partition number
batch_size: int
The number of partitions in one batch
seed_nid: np.ndarray
The training nodes ids, used to extract the training graph
use_pp: bool
Whether to use precompute of AX
"""
self.use_pp = use_pp
self.g = g.subgraph(seed_nid)
self.g.copy_from_parent()
# precalc the aggregated features from training graph only
if use_pp:
self.precalc(self.g)
print('precalculating')
self.psize = psize
self.batch_size = batch_size
# cache the partitions of known datasets&partition number
if dn:
fn = os.path.join('./datasets/', dn + '_{}.npy'.format(psize))
if os.path.exists(fn):
self.par_li = np.load(fn, allow_pickle=True)
else:
os.makedirs('./datasets/', exist_ok=True)
self.par_li = get_partition_list(self.g, psize)
np.save(fn, self.par_li)
else:
self.par_li = get_partition_list(self.g, psize)
self.max = int((psize) // batch_size)
random.shuffle(self.par_li)
self.get_fn = get_subgraph
def precalc(self, g):
norm = self.get_norm(g)
g.ndata['norm'] = norm
features = g.ndata['features']
print("features shape, ", features.shape)
with torch.no_grad():
g.update_all(fn.copy_src(src='features', out='m'),
fn.sum(msg='m', out='features'),
None
)
pre_feats = g.ndata['features'] * norm
# use graphsage embedding aggregation style
g.ndata['features'] = torch.cat([features, pre_feats], dim=1)
# use one side normalization
def get_norm(self, g):
norm = 1. / g.in_degrees().float().unsqueeze(1)
norm[torch.isinf(norm)] = 0
norm = norm.to(self.g.ndata['features'].device)
return norm
def __len__(self):
return self.max
def __iter__(self):
self.n = 0
return self
def __next__(self):
if self.n < self.max:
result = self.get_fn(self.g, self.par_li, self.n,
self.psize, self.batch_size)
self.n += 1
return result
else:
random.shuffle(self.par_li)
raise StopIteration
import os
from functools import namedtuple
import dgl
import numpy as np
import torch
from dgl.data import PPIDataset
from dgl.data import load_data as _load_data
from sklearn.metrics import f1_score
class Logger(object):
'''
A custom logger to log stdout to a logging file.
'''
def __init__(self, path):
"""Initialize the logger.
Paramters
---------
path : str
The file path to be stored in.
"""
self.path = path
def write(self, s):
with open(self.path, 'a') as f:
f.write(str(s))
print(s)
return
def arg_list(labels):
hist, indexes, inverse, counts = np.unique(
labels, return_index=True, return_counts=True, return_inverse=True)
li = []
for h in hist:
li.append(np.argwhere(inverse == h))
return li
def save_log_dir(args):
log_dir = './log/{}/{}'.format(args.dataset, args.note)
os.makedirs(log_dir, exist_ok=True)
return log_dir
def calc_f1(y_true, y_pred, multitask):
if multitask:
y_pred[y_pred > 0] = 1
y_pred[y_pred <= 0] = 0
else:
y_pred = np.argmax(y_pred, axis=1)
return f1_score(y_true, y_pred, average="micro"), \
f1_score(y_true, y_pred, average="macro")
def evaluate(model, g, labels, mask, multitask=False):
model.eval()
with torch.no_grad():
logits = model(g)
logits = logits[mask]
labels = labels[mask]
f1_mic, f1_mac = calc_f1(labels.cpu().numpy(),
logits.cpu().numpy(), multitask)
return f1_mic, f1_mac
def load_data(args):
'''
wraps the dgl's load_data utility to handle ppi special case
'''
if args.dataset != 'ppi':
return _load_data(args)
train_dataset = PPIDataset('train')
val_dataset = PPIDataset('valid')
test_dataset = PPIDataset('test')
PPIDataType = namedtuple('PPIDataset', ['train_mask', 'test_mask',
'val_mask', 'features', 'labels', 'num_labels', 'graph'])
G = dgl.BatchedDGLGraph(
[train_dataset.graph, val_dataset.graph, test_dataset.graph], edge_attrs=None, node_attrs=None)
G = G.to_networkx()
# hack to dodge the potential bugs of to_networkx
for (n1, n2, d) in G.edges(data=True):
d.clear()
train_nodes_num = train_dataset.graph.number_of_nodes()
test_nodes_num = test_dataset.graph.number_of_nodes()
val_nodes_num = val_dataset.graph.number_of_nodes()
nodes_num = G.number_of_nodes()
assert(nodes_num == (train_nodes_num + test_nodes_num + val_nodes_num))
# construct mask
mask = np.zeros((nodes_num,), dtype=bool)
train_mask = mask.copy()
train_mask[:train_nodes_num] = True
val_mask = mask.copy()
val_mask[train_nodes_num:-test_nodes_num] = True
test_mask = mask.copy()
test_mask[-test_nodes_num:] = True
# construct features
features = np.concatenate(
[train_dataset.features, val_dataset.features, test_dataset.features], axis=0)
labels = np.concatenate(
[train_dataset.labels, val_dataset.labels, test_dataset.labels], axis=0)
data = PPIDataType(graph=G, train_mask=train_mask, test_mask=test_mask,
val_mask=val_mask, features=features, labels=labels, num_labels=121)
return data
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment