Unverified Commit 28379f92 authored by Chao Ma's avatar Chao Ma Committed by GitHub
Browse files

[DEMO] Reproduce numbers of distributed training in AMLC giant graph paper (#556)

* update

* update

* update

* update num_hops

* fix bug

* update

* report numbers of distributed training in AMLC giant graph paper
parent f99725ad
### Demo for Distributed Sampler
First we need to change the `--ip` in `run_trainer.sh` and `run_sampler.sh` for your own environemnt. # Stochastic Training for Graph Convolutional Networks Using Distributed Sampler
Then we need to start trainer node: * Paper: [Control Variate](https://arxiv.org/abs/1710.10568)
* Paper: [Skip Connection](https://arxiv.org/abs/1809.05343)
* Author's code: [https://github.com/thu-ml/stochastic_gcn](https://github.com/thu-ml/stochastic_gcn)
### Dependencies
- MXNet nightly build
```bash
pip install mxnet --pre
```
### Neighbor Sampling & Skip Connection
#### cora
Test accuracy ~83% with `--num-neighbors 2`, ~84% by training on the full graph
Trainer side:
```
DGLBACKEND=mxnet python3 examples/mxnet/sampling/dis_sampling/train.py --model gcn_ns --dataset cora --self-loop --num-neighbors 2 --batch-size 1000 --test-batch-size 5000 --ip 127.0.0.1:50051 --num-sampler 1
```
Sampler side:
``` ```
./run_trainer.sh DGLBACKEND=mxnet python3 examples/mxnet/sampling/dis_sampling/sampler.py --model gcn_ns --dataset cora --self-loop --num-neighbors 2 --batch-size 1000 --ip 127.0.0.1:50051
``` ```
When you see the message: #### citeseer
Test accuracy ~69% with `--num-neighbors 2`, ~70% by training on the full graph
Trainer side:
```
DGLBACKEND=mxnet python3 examples/mxnet/sampling/dis_sampling/train.py --model gcn_ns --dataset citeseer --self-loop --num-neighbors 2 --batch-size 1000 --test-batch-size 5000 --ip 127.0.0.1:50051 --num-sampler 1
```
Sampler side:
``` ```
[04:48:20] .../socket_communicator.cc:68: Bind to 127.0.0.1:2049 DGLBACKEND=mxnet python3 examples/mxnet/sampling/dis_sampling/sampler.py --model gcn_ns --dataset citeseer --self-loop --num-neighbors 2 --batch-size 1000 --ip 127.0.0.1:50051
[04:48:20] .../socket_communicator.cc:74: Listen on 127.0.0.1:2049, wait sender connect ...
``` ```
then, you can start sampler: #### pubmed
Test accuracy ~78% with `--num-neighbors 3`, ~77% by training on the full graph
Trainer side:
```
DGLBACKEND=mxnet python3 examples/mxnet/sampling/dis_sampling/train.py --model gcn_ns --dataset pubmed --self-loop --num-neighbors 3 --batch-size 1000 --test-batch-size 5000 --ip 127.0.0.1:50051 --num-sampler 1
```
Sampler side:
```
DGLBACKEND=mxnet python3 examples/mxnet/sampling/dis_sampling/sampler.py --model gcn_ns --dataset pubmed --self-loop --num-neighbors 3 --batch-size 1000 --ip 127.0.0.1:50051
```
#### reddit
Test accuracy ~91% with `--num-neighbors 2` and `--batch-size 1000`, ~93% by training on the full graph
Trainer side:
```
DGLBACKEND=mxnet python3 examples/mxnet/sampling/dis_sampling/train.py --model gcn_ns --dataset reddit-self-loop --num-neighbors 2 --batch-size 1000 --test-batch-size 5000 --n-hidden 64 --ip 127.0.0.1:2049 --num-sampler 1
```
Sampler side:
```
DGLBACKEND=mxnet python3 examples/mxnet/sampling/dis_sampling/sampler.py --model gcn_ns --dataset reddit-self-loop --num-neighbors 2 --batch-size 1000 --ip 127.0.0.1:2049
```
### Control Variate & Skip Connection
#### cora
Test accuracy ~84% with `--num-neighbors 1`, ~84% by training on the full graph
Trainer side:
```
DGLBACKEND=mxnet python3 examples/mxnet/sampling/dis_sampling/train.py --model gcn_cv --dataset cora --self-loop --num-neighbors 1 --batch-size 1000000 --test-batch-size 1000000 --ip 127.0.0.1:50051 --num-sampler 1
```
Sampler side:
```
DGLBACKEND=mxnet python3 examples/mxnet/sampling/dis_sampling/sampler.py --model gcn_cv --dataset cora --self-loop --num-neighbors 1 --batch-size 1000000 --ip 127.0.0.1:50051
```
#### citeseer
Test accuracy ~69% with `--num-neighbors 1`, ~70% by training on the full graph
Trainer Side:
```
DGLBACKEND=mxnet python3 examples/mxnet/sampling/dis_sampling/train.py --model gcn_cv --dataset citeseer --self-loop --num-neighbors 1 --batch-size 1000000 --test-batch-size 1000000 --ip 127.0.0.1:50051 --num-sampler 1
```
Sampler Side:
```
DGLBACKEND=mxnet python3 examples/mxnet/sampling/dis_sampling/sampler.py --model gcn_cv --dataset citeseer --self-loop --num-neighbors 1 --batch-size 1000000 --ip 127.0.0.1:50051
```
#### pubmed
Trainer Side:
```
DGLBACKEND=mxnet python3 examples/mxnet/sampling/dis_sampling/train.py --model gcn_cv --dataset pubmed --self-loop --num-neighbors 1 --batch-size 1000000 --test-batch-size 1000000 --ip 127.0.0.1:50051 --num-sampler 1
```
Sampler Side:
```
DGLBACKEND=mxnet python3 examples/mxnet/sampling/dis_sampling/sampler.py --model gcn_cv --dataset pubmed --self-loop --num-neighbors 1 --batch-size 1000000 --ip 127.0.0.1:50051
```
#### reddit
Test accuracy ~93% with `--num-neighbors 1` and `--batch-size 1000`, ~93% by training on the full graph
Trainer Side:
```
DGLBACKEND=mxnet python3 examples/mxnet/sampling/dis_sampling/train.py --model gcn_cv --dataset reddit-self-loop --num-neighbors 1 --batch-size 10000 --test-batch-size 5000 --n-hidden 64 --ip 127.0.0.1:50051 --num-sampler 1
```
Sampler Side:
```
DGLBACKEND=mxnet python3 examples/mxnet/sampling/dis_sampling/sampler.py --model gcn_cv --dataset reddit-self-loop --num-neighbors 1 --batch-size 10000 --ip 127.0.0.1:50051
```
### Control Variate & GraphSAGE-mean
Following [Control Variate](https://arxiv.org/abs/1710.10568), we use the mean pooling architecture GraphSAGE-mean, two linear layers and layer normalization per graph convolution layer.
#### reddit
Test accuracy 96.1% with `--num-neighbors 1` and `--batch-size 1000`, ~96.2% in [Control Variate](https://arxiv.org/abs/1710.10568) with `--num-neighbors 2` and `--batch-size 1000`
Trainer side:
```
DGLBACKEND=mxnet python3 examples/mxnet/sampling/dis_sampling/train.py --model graphsage_cv --batch-size 1000 --test-batch-size 5000 --n-epochs 50 --dataset reddit --num-neighbors 1 --n-hidden 128 --dropout 0.2 --weight-decay 0 --ip 127.0.0.1:50051 --num-sampler 1
```
Sampler side:
```
DGLBACKEND=mxnet python3 examples/mxnet/sampling/dis_sampling/sampler.py --model graphsage_cv --batch-size 1000 --dataset reddit --num-neighbors 1 --ip 127.0.0.1:50051
``` ```
./run_sampler.sh
```
\ No newline at end of file
import argparse, time, math
import numpy as np
import mxnet as mx
from mxnet import gluon
import dgl
import dgl.function as fn
from dgl import DGLGraph
from dgl.data import register_data_args, load_data
class NodeUpdate(gluon.Block):
def __init__(self, layer_id, in_feats, out_feats, dropout, activation=None, test=False, concat=False):
super(NodeUpdate, self).__init__()
self.layer_id = layer_id
self.dropout = dropout
self.test = test
self.concat = concat
with self.name_scope():
self.dense = gluon.nn.Dense(out_feats, in_units=in_feats)
self.activation = activation
def forward(self, node):
h = node.data['h']
norm = node.data['norm']
if self.test:
h = h * norm
else:
agg_history_str = 'agg_h_{}'.format(self.layer_id-1)
agg_history = node.data[agg_history_str]
subg_norm = node.data['subg_norm']
# control variate
h = h * subg_norm + agg_history * norm
if self.dropout:
h = mx.nd.Dropout(h, p=self.dropout)
h = self.dense(h)
if self.concat:
h = mx.nd.concat(h, self.activation(h))
elif self.activation:
h = self.activation(h)
return {'activation': h}
class GCNSampling(gluon.Block):
def __init__(self,
in_feats,
n_hidden,
n_classes,
n_layers,
activation,
dropout,
**kwargs):
super(GCNSampling, self).__init__(**kwargs)
self.dropout = dropout
self.n_layers = n_layers
with self.name_scope():
self.layers = gluon.nn.Sequential()
# input layer
self.dense = gluon.nn.Dense(n_hidden, in_units=in_feats)
self.activation = activation
# hidden layers
for i in range(1, n_layers):
skip_start = (i == self.n_layers-1)
self.layers.add(NodeUpdate(i, n_hidden, n_hidden, dropout, activation, concat=skip_start))
# output layer
self.layers.add(NodeUpdate(n_layers, 2*n_hidden, n_classes, dropout))
def forward(self, nf):
h = nf.layers[0].data['preprocess']
if self.dropout:
h = mx.nd.Dropout(h, p=self.dropout)
h = self.dense(h)
skip_start = (0 == self.n_layers-1)
if skip_start:
h = mx.nd.concat(h, self.activation(h))
else:
h = self.activation(h)
for i, layer in enumerate(self.layers):
new_history = h.copy().detach()
history_str = 'h_{}'.format(i)
history = nf.layers[i].data[history_str]
h = h - history
nf.layers[i].data['h'] = h
nf.block_compute(i,
fn.copy_src(src='h', out='m'),
fn.sum(msg='m', out='h'),
layer)
h = nf.layers[i+1].data.pop('activation')
# update history
if i < nf.num_layers-1:
nf.layers[i].data[history_str] = new_history
return h
class GCNInfer(gluon.Block):
def __init__(self,
in_feats,
n_hidden,
n_classes,
n_layers,
activation,
**kwargs):
super(GCNInfer, self).__init__(**kwargs)
self.n_layers = n_layers
with self.name_scope():
self.layers = gluon.nn.Sequential()
# input layer
self.dense = gluon.nn.Dense(n_hidden, in_units=in_feats)
self.activation = activation
# hidden layers
for i in range(1, n_layers):
skip_start = (i == self.n_layers-1)
self.layers.add(NodeUpdate(i, n_hidden, n_hidden, 0, activation, True, concat=skip_start))
# output layer
self.layers.add(NodeUpdate(n_layers, 2*n_hidden, n_classes, 0, None, True))
def forward(self, nf):
h = nf.layers[0].data['preprocess']
h = self.dense(h)
skip_start = (0 == self.n_layers-1)
if skip_start:
h = mx.nd.concat(h, self.activation(h))
else:
h = self.activation(h)
for i, layer in enumerate(self.layers):
nf.layers[i].data['h'] = h
nf.block_compute(i,
fn.copy_src(src='h', out='m'),
fn.sum(msg='m', out='h'),
layer)
h = nf.layers[i+1].data.pop('activation')
return h
def gcn_cv_train(g, ctx, args, n_classes, train_nid, test_nid, n_test_samples, distributed):
n0_feats = g.nodes[0].data['features']
num_nodes = g.number_of_nodes()
in_feats = n0_feats.shape[1]
g_ctx = n0_feats.context
norm = mx.nd.expand_dims(1./g.in_degrees().astype('float32'), 1)
g.set_n_repr({'norm': norm.as_in_context(g_ctx)})
degs = g.in_degrees().astype('float32').asnumpy()
degs[degs > args.num_neighbors] = args.num_neighbors
g.set_n_repr({'subg_norm': mx.nd.expand_dims(mx.nd.array(1./degs, ctx=g_ctx), 1)})
n_layers = args.n_layers
g.update_all(fn.copy_src(src='features', out='m'),
fn.sum(msg='m', out='preprocess'),
lambda node : {'preprocess': node.data['preprocess'] * node.data['norm']})
for i in range(n_layers - 1):
g.init_ndata('h_{}'.format(i), (num_nodes, args.n_hidden), 'float32')
g.init_ndata('agg_h_{}'.format(i), (num_nodes, args.n_hidden), 'float32')
g.init_ndata('h_{}'.format(n_layers-1), (num_nodes, 2*args.n_hidden), 'float32')
g.init_ndata('agg_h_{}'.format(n_layers-1), (num_nodes, 2*args.n_hidden), 'float32')
model = GCNSampling(in_feats,
args.n_hidden,
n_classes,
n_layers,
mx.nd.relu,
args.dropout,
prefix='GCN')
model.initialize(ctx=ctx)
loss_fcn = gluon.loss.SoftmaxCELoss()
infer_model = GCNInfer(in_feats,
args.n_hidden,
n_classes,
n_layers,
mx.nd.relu,
prefix='GCN')
infer_model.initialize(ctx=ctx)
# use optimizer
print(model.collect_params())
kv_type = 'dist_sync' if distributed else 'local'
trainer = gluon.Trainer(model.collect_params(), 'adam',
{'learning_rate': args.lr, 'wd': args.weight_decay},
kvstore=mx.kv.create(kv_type))
# Create sampler receiver
sampler = dgl.contrib.sampling.SamplerReceiver(graph=g, addr=args.ip, num_sender=args.num_sampler)
# initialize graph
dur = []
adj = g.adjacency_matrix().as_in_context(g_ctx)
for epoch in range(args.n_epochs):
start = time.time()
if distributed:
msg_head = "Worker {:d}, epoch {:d}".format(g.worker_id, epoch)
else:
msg_head = "epoch {:d}".format(epoch)
for nf in sampler:
for i in range(n_layers):
agg_history_str = 'agg_h_{}'.format(i)
dests = nf.layer_parent_nid(i+1).as_in_context(g_ctx)
# TODO we could use DGLGraph.pull to implement this, but the current
# implementation of pull is very slow. Let's manually do it for now.
agg = mx.nd.dot(mx.nd.take(adj, dests), g.nodes[:].data['h_{}'.format(i)])
g.set_n_repr({agg_history_str: agg}, dests)
node_embed_names = [['preprocess', 'h_0']]
for i in range(1, n_layers):
node_embed_names.append(['h_{}'.format(i), 'agg_h_{}'.format(i-1), 'subg_norm', 'norm'])
node_embed_names.append(['agg_h_{}'.format(n_layers-1), 'subg_norm', 'norm'])
nf.copy_from_parent(node_embed_names=node_embed_names, ctx=ctx)
# forward
with mx.autograd.record():
pred = model(nf)
batch_nids = nf.layer_parent_nid(-1)
batch_labels = g.nodes[batch_nids].data['labels'].as_in_context(ctx)
loss = loss_fcn(pred, batch_labels)
loss = loss.sum() / len(batch_nids)
loss.backward()
trainer.step(batch_size=1)
node_embed_names = [['h_{}'.format(i)] for i in range(n_layers)]
node_embed_names.append([])
nf.copy_to_parent(node_embed_names=node_embed_names)
mx.nd.waitall()
print(msg_head + ': training takes ' + str(time.time() - start))
infer_params = infer_model.collect_params()
for key in infer_params:
idx = trainer._param2idx[key]
trainer._kvstore.pull(idx, out=infer_params[key].data())
num_acc = 0.
num_tests = 0
if not distributed or g.worker_id == 0:
for nf in dgl.contrib.sampling.NeighborSampler(g, args.test_batch_size,
g.number_of_nodes(),
neighbor_type='in',
num_hops=n_layers,
seed_nodes=test_nid):
node_embed_names = [['preprocess']]
for i in range(n_layers):
node_embed_names.append(['norm'])
nf.copy_from_parent(node_embed_names=node_embed_names, ctx=ctx)
pred = infer_model(nf)
batch_nids = nf.layer_parent_nid(-1)
batch_labels = g.nodes[batch_nids].data['labels'].as_in_context(ctx)
num_acc += (pred.argmax(axis=1) == batch_labels).sum().asscalar()
num_tests += nf.layer_size(-1)
if distributed:
g._sync_barrier()
print("Test Accuracy {:.4f}". format(num_acc/num_tests))
break
elif distributed:
g._sync_barrier()
...@@ -10,17 +10,15 @@ from dgl.data import register_data_args, load_data ...@@ -10,17 +10,15 @@ from dgl.data import register_data_args, load_data
class NodeUpdate(gluon.Block): class NodeUpdate(gluon.Block):
def __init__(self, in_feats, out_feats, activation=None, test=False, concat=False): def __init__(self, in_feats, out_feats, activation=None, concat=False):
super(NodeUpdate, self).__init__() super(NodeUpdate, self).__init__()
self.dense = gluon.nn.Dense(out_feats, in_units=in_feats) self.dense = gluon.nn.Dense(out_feats, in_units=in_feats)
self.activation = activation self.activation = activation
self.concat = concat self.concat = concat
self.test = test
def forward(self, node): def forward(self, node):
h = node.data['h'] h = node.data['h']
if self.test: h = h * node.data['norm']
h = h * node.data['norm']
h = self.dense(h) h = self.dense(h)
# skip connection # skip connection
if self.concat: if self.concat:
...@@ -63,9 +61,11 @@ class GCNSampling(gluon.Block): ...@@ -63,9 +61,11 @@ class GCNSampling(gluon.Block):
if self.dropout: if self.dropout:
h = mx.nd.Dropout(h, p=self.dropout) h = mx.nd.Dropout(h, p=self.dropout)
nf.layers[i].data['h'] = h nf.layers[i].data['h'] = h
degs = nf.layer_in_degree(i + 1).astype('float32').as_in_context(h.context)
nf.layers[i + 1].data['norm'] = mx.nd.expand_dims(1./degs, 1)
nf.block_compute(i, nf.block_compute(i,
fn.copy_src(src='h', out='m'), fn.copy_src(src='h', out='m'),
lambda node : {'h': node.mailbox['m'].mean(axis=1)}, fn.sum(msg='m', out='h'),
layer) layer)
h = nf.layers[-1].data.pop('activation') h = nf.layers[-1].data.pop('activation')
...@@ -86,13 +86,13 @@ class GCNInfer(gluon.Block): ...@@ -86,13 +86,13 @@ class GCNInfer(gluon.Block):
self.layers = gluon.nn.Sequential() self.layers = gluon.nn.Sequential()
# input layer # input layer
skip_start = (0 == n_layers-1) skip_start = (0 == n_layers-1)
self.layers.add(NodeUpdate(in_feats, n_hidden, activation, test=True, concat=skip_start)) self.layers.add(NodeUpdate(in_feats, n_hidden, activation, concat=skip_start))
# hidden layers # hidden layers
for i in range(1, n_layers): for i in range(1, n_layers):
skip_start = (i == n_layers-1) skip_start = (i == n_layers-1)
self.layers.add(NodeUpdate(n_hidden, n_hidden, activation, test=True, concat=skip_start)) self.layers.add(NodeUpdate(n_hidden, n_hidden, activation, concat=skip_start))
# output layer # output layer
self.layers.add(NodeUpdate(2*n_hidden, n_classes, test=True)) self.layers.add(NodeUpdate(2*n_hidden, n_classes))
def forward(self, nf): def forward(self, nf):
...@@ -106,62 +106,17 @@ class GCNInfer(gluon.Block): ...@@ -106,62 +106,17 @@ class GCNInfer(gluon.Block):
fn.sum(msg='m', out='h'), fn.sum(msg='m', out='h'),
layer) layer)
h = nf.layers[-1].data.pop('activation') return nf.layers[-1].data.pop('activation')
return h
def main(args):
# load and preprocess dataset
data = load_data(args)
if args.gpu >= 0:
ctx = mx.gpu(args.gpu)
else:
ctx = mx.cpu()
if args.self_loop and not args.dataset.startswith('reddit'):
data.graph.add_edges_from([(i,i) for i in range(len(data.graph))])
train_nid = mx.nd.array(np.nonzero(data.train_mask)[0]).astype(np.int64).as_in_context(ctx)
test_nid = mx.nd.array(np.nonzero(data.test_mask)[0]).astype(np.int64).as_in_context(ctx)
features = mx.nd.array(data.features).as_in_context(ctx)
labels = mx.nd.array(data.labels).as_in_context(ctx)
train_mask = mx.nd.array(data.train_mask).as_in_context(ctx)
val_mask = mx.nd.array(data.val_mask).as_in_context(ctx)
test_mask = mx.nd.array(data.test_mask).as_in_context(ctx)
in_feats = features.shape[1]
n_classes = data.num_labels
n_edges = data.graph.number_of_edges()
n_train_samples = train_mask.sum().asscalar()
n_val_samples = val_mask.sum().asscalar()
n_test_samples = test_mask.sum().asscalar()
print("""----Data statistics------' def gcn_ns_train(g, ctx, args, n_classes, train_nid, test_nid, n_test_samples):
#Edges %d n0_feats = g.nodes[0].data['features']
#Classes %d in_feats = n0_feats.shape[1]
#Train samples %d g_ctx = n0_feats.context
#Val samples %d
#Test samples %d""" %
(n_edges, n_classes,
n_train_samples,
n_val_samples,
n_test_samples))
# create GCN model degs = g.in_degrees().astype('float32').as_in_context(g_ctx)
g = DGLGraph(data.graph, readonly=True)
g.ndata['features'] = features
num_neighbors = args.num_neighbors
degs = g.in_degrees().astype('float32').as_in_context(ctx)
norm = mx.nd.expand_dims(1./degs, 1) norm = mx.nd.expand_dims(1./degs, 1)
g.ndata['norm'] = norm g.set_n_repr({'norm': norm})
# Create sampler receiver
sampler = dgl.contrib.sampling.SamplerReceiver(graph=g, addr=args.ip, num_sender=args.num_sender)
model = GCNSampling(in_feats, model = GCNSampling(in_feats,
args.n_hidden, args.n_hidden,
...@@ -189,19 +144,19 @@ def main(args): ...@@ -189,19 +144,19 @@ def main(args):
{'learning_rate': args.lr, 'wd': args.weight_decay}, {'learning_rate': args.lr, 'wd': args.weight_decay},
kvstore=mx.kv.create('local')) kvstore=mx.kv.create('local'))
# Create sampler receiver
sampler = dgl.contrib.sampling.SamplerReceiver(graph=g, addr=args.ip, num_sender=args.num_sampler)
# initialize graph # initialize graph
dur = [] dur = []
for epoch in range(args.n_epochs): for epoch in range(args.n_epochs):
idx = 0
for nf in sampler: for nf in sampler:
print("Train epoch: %d, subgraph: %d" %(epoch, idx)) nf.copy_from_parent(ctx=ctx)
idx += 1
nf.copy_from_parent()
# forward # forward
with mx.autograd.record(): with mx.autograd.record():
pred = model(nf) pred = model(nf)
batch_nids = nf.layer_parent_nid(-1).astype('int64').as_in_context(ctx) batch_nids = nf.layer_parent_nid(-1)
batch_labels = labels[batch_nids] batch_labels = g.nodes[batch_nids].data['labels'].as_in_context(ctx)
loss = loss_fcn(pred, batch_labels) loss = loss_fcn(pred, batch_labels)
loss = loss.sum() / len(batch_nids) loss = loss.sum() / len(batch_nids)
...@@ -215,56 +170,19 @@ def main(args): ...@@ -215,56 +170,19 @@ def main(args):
trainer._kvstore.pull(idx, out=infer_params[key].data()) trainer._kvstore.pull(idx, out=infer_params[key].data())
num_acc = 0. num_acc = 0.
num_tests = 0
idx = 0
for nf in dgl.contrib.sampling.NeighborSampler(g, args.test_batch_size, for nf in dgl.contrib.sampling.NeighborSampler(g, args.test_batch_size,
g.number_of_nodes(), g.number_of_nodes(),
neighbor_type='in', neighbor_type='in',
num_hops=args.n_layers+1, num_hops=args.n_layers+1,
seed_nodes=test_nid): seed_nodes=test_nid):
print("Test epoch: %d, subgraph: %d" %(epoch, idx)) nf.copy_from_parent(ctx=ctx)
idx += 1
nf.copy_from_parent()
pred = infer_model(nf) pred = infer_model(nf)
batch_nids = nf.layer_parent_nid(-1).astype('int64').as_in_context(ctx) batch_nids = nf.layer_parent_nid(-1)
batch_labels = labels[batch_nids] batch_labels = g.nodes[batch_nids].data['labels'].as_in_context(ctx)
num_acc += (pred.argmax(axis=1) == batch_labels).sum().asscalar() num_acc += (pred.argmax(axis=1) == batch_labels).sum().asscalar()
num_tests += nf.layer_size(-1)
break
print("Test Accuracy {:.4f}". format(num_acc/n_test_samples)) print("Test Accuracy {:.4f}". format(num_acc/num_tests))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='GCN')
register_data_args(parser)
parser.add_argument("--dropout", type=float, default=0.5,
help="dropout probability")
parser.add_argument("--gpu", type=int, default=-1,
help="gpu")
parser.add_argument("--lr", type=float, default=3e-2,
help="learning rate")
parser.add_argument("--n-epochs", type=int, default=200,
help="number of training epochs")
parser.add_argument("--batch-size", type=int, default=1000,
help="batch size")
parser.add_argument("--test-batch-size", type=int, default=1000,
help="test batch size")
parser.add_argument("--num-neighbors", type=int, default=3,
help="number of neighbors to be sampled")
parser.add_argument("--n-hidden", type=int, default=16,
help="number of hidden gcn units")
parser.add_argument("--n-layers", type=int, default=1,
help="number of hidden gcn layers")
parser.add_argument("--self-loop", action='store_true',
help="graph self-loop (default=False)")
parser.add_argument("--weight-decay", type=float, default=5e-4,
help="Weight for L2 loss")
parser.add_argument("--ip", type=str, default='127.0.0.1:50051',
help="IP address of sampler receiver machine")
parser.add_argument("--num-sender", type=int, default=1,
help="Number of sampler sender machine")
args = parser.parse_args()
print(args)
main(args)
import argparse, time, math
import numpy as np
import mxnet as mx
from mxnet import gluon
import argparse, time, math
import dgl
import dgl.function as fn
from dgl import DGLGraph
from dgl.data import register_data_args, load_data
class GraphSAGELayer(gluon.Block):
def __init__(self,
in_feats,
hidden,
out_feats,
dropout,
last=False,
**kwargs):
super(GraphSAGELayer, self).__init__(**kwargs)
self.last = last
self.dropout = dropout
with self.name_scope():
self.dense1 = gluon.nn.Dense(hidden, in_units=in_feats)
self.layer_norm1 = gluon.nn.LayerNorm(in_channels=hidden)
self.dense2 = gluon.nn.Dense(out_feats, in_units=hidden)
if not self.last:
self.layer_norm2 = gluon.nn.LayerNorm(in_channels=out_feats)
def forward(self, h):
h = self.dense1(h)
h = self.layer_norm1(h)
h = mx.nd.relu(h)
if self.dropout:
h = mx.nd.Dropout(h, p=self.dropout)
h = self.dense2(h)
if not self.last:
h = self.layer_norm2(h)
h = mx.nd.relu(h)
return h
class NodeUpdate(gluon.Block):
def __init__(self, layer_id, in_feats, out_feats, hidden, dropout,
test=False, last=False):
super(NodeUpdate, self).__init__()
self.layer_id = layer_id
self.dropout = dropout
self.test = test
self.last = last
with self.name_scope():
self.layer = GraphSAGELayer(in_feats, hidden, out_feats, dropout, last)
def forward(self, node):
h = node.data['h']
norm = node.data['norm']
# activation from previous layer of myself
self_h = node.data['self_h']
if self.test:
h = (h - self_h) * norm
# graphsage
h = mx.nd.concat(h, self_h)
else:
agg_history_str = 'agg_h_{}'.format(self.layer_id-1)
agg_history = node.data[agg_history_str]
# normalization constant
subg_norm = node.data['subg_norm']
# delta_h (h - history) from previous layer of myself
self_delta_h = node.data['self_delta_h']
# control variate
h = (h - self_delta_h) * subg_norm + agg_history * norm
# graphsage
h = mx.nd.concat(h, self_h)
if self.dropout:
h = mx.nd.Dropout(h, p=self.dropout)
h = self.layer(h)
return {'activation': h}
class GraphSAGETrain(gluon.Block):
def __init__(self,
in_feats,
n_hidden,
n_classes,
n_layers,
dropout,
**kwargs):
super(GraphSAGETrain, self).__init__(**kwargs)
self.dropout = dropout
with self.name_scope():
self.layers = gluon.nn.Sequential()
# input layer
self.input_layer = GraphSAGELayer(2*in_feats, n_hidden, n_hidden, dropout)
# hidden layers
for i in range(1, n_layers):
self.layers.add(NodeUpdate(i, 2*n_hidden, n_hidden, n_hidden, dropout))
# output layer
self.layers.add(NodeUpdate(n_layers, 2*n_hidden, n_classes, n_hidden, dropout, last=True))
def forward(self, nf):
h = nf.layers[0].data['preprocess']
features = nf.layers[0].data['features']
h = mx.nd.concat(h, features)
if self.dropout:
h = mx.nd.Dropout(h, p=self.dropout)
h = self.input_layer(h)
for i, layer in enumerate(self.layers):
parent_nid = dgl.utils.toindex(nf.layer_parent_nid(i+1))
layer_nid = nf.map_from_parent_nid(i, parent_nid).as_in_context(h.context)
self_h = h[layer_nid]
# activation from previous layer of myself, used in graphSAGE
nf.layers[i+1].data['self_h'] = self_h
new_history = h.copy().detach()
history_str = 'h_{}'.format(i)
history = nf.layers[i].data[history_str]
# delta_h used in control variate
delta_h = h - history
# delta_h from previous layer of the nodes in (i+1)-th layer, used in control variate
nf.layers[i+1].data['self_delta_h'] = delta_h[layer_nid]
nf.layers[i].data['h'] = delta_h
nf.block_compute(i,
fn.copy_src(src='h', out='m'),
fn.sum(msg='m', out='h'),
layer)
h = nf.layers[i+1].data.pop('activation')
# update history
if i < nf.num_layers-1:
nf.layers[i].data[history_str] = new_history
return h
class GraphSAGEInfer(gluon.Block):
def __init__(self,
in_feats,
n_hidden,
n_classes,
n_layers,
**kwargs):
super(GraphSAGEInfer, self).__init__(**kwargs)
with self.name_scope():
self.layers = gluon.nn.Sequential()
# input layer
self.input_layer = GraphSAGELayer(2*in_feats, n_hidden, n_hidden, 0)
# hidden layers
for i in range(1, n_layers):
self.layers.add(NodeUpdate(i, 2*n_hidden, n_hidden, n_hidden, 0, True))
# output layer
self.layers.add(NodeUpdate(n_layers, 2*n_hidden, n_classes, n_hidden, 0, True, last=True))
def forward(self, nf):
h = nf.layers[0].data['preprocess']
features = nf.layers[0].data['features']
h = mx.nd.concat(h, features)
h = self.input_layer(h)
for i, layer in enumerate(self.layers):
nf.layers[i].data['h'] = h
parent_nid = dgl.utils.toindex(nf.layer_parent_nid(i+1))
layer_nid = nf.map_from_parent_nid(i, parent_nid).as_in_context(h.context)
# activation from previous layer of the nodes in (i+1)-th layer, used in graphSAGE
self_h = h[layer_nid]
nf.layers[i+1].data['self_h'] = self_h
nf.block_compute(i,
fn.copy_src(src='h', out='m'),
fn.sum(msg='m', out='h'),
layer)
h = nf.layers[i+1].data.pop('activation')
return h
def graphsage_cv_train(g, ctx, args, n_classes, train_nid, test_nid, n_test_samples, distributed):
n0_feats = g.nodes[0].data['features']
num_nodes = g.number_of_nodes()
in_feats = n0_feats.shape[1]
g_ctx = n0_feats.context
norm = mx.nd.expand_dims(1./g.in_degrees().astype('float32'), 1)
g.set_n_repr({'norm': norm.as_in_context(g_ctx)})
degs = g.in_degrees().astype('float32').asnumpy()
degs[degs > args.num_neighbors] = args.num_neighbors
g.set_n_repr({'subg_norm': mx.nd.expand_dims(mx.nd.array(1./degs, ctx=g_ctx), 1)})
n_layers = args.n_layers
g.update_all(fn.copy_src(src='features', out='m'),
fn.sum(msg='m', out='preprocess'),
lambda node : {'preprocess': node.data['preprocess'] * node.data['norm']})
for i in range(n_layers):
g.init_ndata('h_{}'.format(i), (num_nodes, args.n_hidden), 'float32')
g.init_ndata('agg_h_{}'.format(i), (num_nodes, args.n_hidden), 'float32')
model = GraphSAGETrain(in_feats,
args.n_hidden,
n_classes,
n_layers,
args.dropout,
prefix='GraphSAGE')
model.initialize(ctx=ctx)
loss_fcn = gluon.loss.SoftmaxCELoss()
infer_model = GraphSAGEInfer(in_feats,
args.n_hidden,
n_classes,
n_layers,
prefix='GraphSAGE')
infer_model.initialize(ctx=ctx)
# use optimizer
print(model.collect_params())
kv_type = 'dist_sync' if distributed else 'local'
trainer = gluon.Trainer(model.collect_params(), 'adam',
{'learning_rate': args.lr, 'wd': args.weight_decay},
kvstore=mx.kv.create(kv_type))
# Create sampler receiver
sampler = dgl.contrib.sampling.SamplerReceiver(graph=g, addr=args.ip, num_sender=args.num_sampler)
# initialize graph
dur = []
adj = g.adjacency_matrix().as_in_context(g_ctx)
for epoch in range(args.n_epochs):
start = time.time()
if distributed:
msg_head = "Worker {:d}, epoch {:d}".format(g.worker_id, epoch)
else:
msg_head = "epoch {:d}".format(epoch)
for nf in sampler:
for i in range(n_layers):
agg_history_str = 'agg_h_{}'.format(i)
dests = nf.layer_parent_nid(i+1).as_in_context(g_ctx)
# TODO we could use DGLGraph.pull to implement this, but the current
# implementation of pull is very slow. Let's manually do it for now.
agg = mx.nd.dot(mx.nd.take(adj, dests), g.nodes[:].data['h_{}'.format(i)])
g.set_n_repr({agg_history_str: agg}, dests)
node_embed_names = [['preprocess', 'features', 'h_0']]
for i in range(1, n_layers):
node_embed_names.append(['h_{}'.format(i), 'agg_h_{}'.format(i-1), 'subg_norm', 'norm'])
node_embed_names.append(['agg_h_{}'.format(n_layers-1), 'subg_norm', 'norm'])
nf.copy_from_parent(node_embed_names=node_embed_names, ctx=ctx)
# forward
with mx.autograd.record():
pred = model(nf)
batch_nids = nf.layer_parent_nid(-1)
batch_labels = g.nodes[batch_nids].data['labels'].as_in_context(ctx)
loss = loss_fcn(pred, batch_labels)
if distributed:
loss = loss.sum() / (len(batch_nids) * g.num_workers)
else:
loss = loss.sum() / (len(batch_nids))
loss.backward()
trainer.step(batch_size=1)
node_embed_names = [['h_{}'.format(i)] for i in range(n_layers)]
node_embed_names.append([])
nf.copy_to_parent(node_embed_names=node_embed_names)
mx.nd.waitall()
print(msg_head + ': training takes ' + str(time.time() - start))
infer_params = infer_model.collect_params()
for key in infer_params:
idx = trainer._param2idx[key]
trainer._kvstore.pull(idx, out=infer_params[key].data())
num_acc = 0.
num_tests = 0
if not distributed or g.worker_id == 0:
for nf in dgl.contrib.sampling.NeighborSampler(g, args.test_batch_size,
g.number_of_nodes(),
neighbor_type='in',
num_hops=n_layers,
seed_nodes=test_nid,
add_self_loop=True):
node_embed_names = [['preprocess', 'features']]
for i in range(n_layers):
node_embed_names.append(['norm', 'subg_norm'])
nf.copy_from_parent(node_embed_names=node_embed_names, ctx=ctx)
pred = infer_model(nf)
batch_nids = nf.layer_parent_nid(-1)
batch_labels = g.nodes[batch_nids].data['labels'].as_in_context(ctx)
num_acc += (pred.argmax(axis=1) == batch_labels).sum().asscalar()
num_tests += nf.layer_size(-1)
if distributed:
g._sync_barrier()
print(msg_head + ": Test Accuracy {:.4f}". format(num_acc/num_tests))
break
elif distributed:
g._sync_barrier()
DGLBACKEND=mxnet python3 gcn_ns_sampler.py --ip 127.0.0.1:2049 --num-sender=1 --dataset reddit-self-loop --num-neighbors 3 --batch-size 1000 --test-batch-size 5000
DGLBACKEND=mxnet python3 gcn_trainer.py --ip 127.0.0.1:2049 --num-sender=1 --dataset reddit-self-loop --num-neighbors 3 --batch-size 1000 --test-batch-size 5000 --n-hidden 64
...@@ -14,6 +14,20 @@ class MySamplerPool(SamplerPool): ...@@ -14,6 +14,20 @@ class MySamplerPool(SamplerPool):
def worker(self, args): def worker(self, args):
"""User-defined worker function """User-defined worker function
""" """
is_shuffle = True
self_loop = False;
number_hops = 1
if args.model == "gcn_ns":
number_hops = args.n_layers + 1
elif args.model == "gcn_cv":
number_hops = args.n_layers
elif args.model == "graphsage_cv":
num_hops = args.n_layers
self_loop = True
else:
print("unknown model. Please choose from gcn_ns, gcn_cv, graphsage_cv")
# Start sender # Start sender
namebook = { 0:args.ip } namebook = { 0:args.ip }
sender = dgl.contrib.sampling.SamplerSender(namebook) sender = dgl.contrib.sampling.SamplerSender(namebook)
...@@ -37,8 +51,10 @@ class MySamplerPool(SamplerPool): ...@@ -37,8 +51,10 @@ class MySamplerPool(SamplerPool):
for nf in dgl.contrib.sampling.NeighborSampler(g, args.batch_size, for nf in dgl.contrib.sampling.NeighborSampler(g, args.batch_size,
args.num_neighbors, args.num_neighbors,
neighbor_type='in', neighbor_type='in',
shuffle=True, shuffle=is_shuffle,
num_hops=args.n_layers+1, num_workers=32,
num_hops=number_hops,
add_self_loop=self_loop,
seed_nodes=train_nid): seed_nodes=train_nid):
print("send train nodeflow: %d" %(idx)) print("send train nodeflow: %d" %(idx))
sender.send(nf, 0) sender.send(nf, 0)
...@@ -47,17 +63,15 @@ class MySamplerPool(SamplerPool): ...@@ -47,17 +63,15 @@ class MySamplerPool(SamplerPool):
def main(args): def main(args):
pool = MySamplerPool() pool = MySamplerPool()
pool.start(args.num_sender, args) pool.start(args.num_sampler, args)
if __name__ == '__main__': if __name__ == '__main__':
parser = argparse.ArgumentParser(description='GCN') parser = argparse.ArgumentParser(description='GCN')
register_data_args(parser) register_data_args(parser)
parser.add_argument("--n-epochs", type=int, default=200, parser.add_argument("--model", type=str,
help="number of training epochs") help="select a model. Valid models: gcn_ns, gcn_cv, graphsage_cv")
parser.add_argument("--batch-size", type=int, default=1000, parser.add_argument("--batch-size", type=int, default=1000,
help="batch size") help="batch size")
parser.add_argument("--test-batch-size", type=int, default=1000,
help="test batch size")
parser.add_argument("--num-neighbors", type=int, default=3, parser.add_argument("--num-neighbors", type=int, default=3,
help="number of neighbors to be sampled") help="number of neighbors to be sampled")
parser.add_argument("--self-loop", action='store_true', parser.add_argument("--self-loop", action='store_true',
...@@ -65,12 +79,11 @@ if __name__ == '__main__': ...@@ -65,12 +79,11 @@ if __name__ == '__main__':
parser.add_argument("--n-layers", type=int, default=1, parser.add_argument("--n-layers", type=int, default=1,
help="number of hidden gcn layers") help="number of hidden gcn layers")
parser.add_argument("--ip", type=str, default='127.0.0.1:50051', parser.add_argument("--ip", type=str, default='127.0.0.1:50051',
help="IP address of remote trainer machine") help="IP address")
parser.add_argument("--num-sender", type=int, default=1, parser.add_argument("--num-sampler", type=int, default=1,
help="Number of sampler sender machine") help="number of sampler")
args = parser.parse_args() args = parser.parse_args()
print(args) print(args)
main(args) main(args)
\ No newline at end of file
import argparse, time, math
import numpy as np
import mxnet as mx
from mxnet import gluon
from functools import partial
import dgl
import dgl.function as fn
from dgl import DGLGraph
from dgl.data import register_data_args, load_data
from dis_gcn_ns_sc import gcn_ns_train
from dis_gcn_cv_sc import gcn_cv_train
from dis_graphsage_cv import graphsage_cv_train
def main(args):
# load and preprocess dataset
data = load_data(args)
if args.gpu >= 0:
ctx = mx.gpu(args.gpu)
else:
ctx = mx.cpu()
if args.self_loop and not args.dataset.startswith('reddit'):
data.graph.add_edges_from([(i,i) for i in range(len(data.graph))])
train_nid = mx.nd.array(np.nonzero(data.train_mask)[0]).astype(np.int64)
test_nid = mx.nd.array(np.nonzero(data.test_mask)[0]).astype(np.int64)
features = mx.nd.array(data.features)
labels = mx.nd.array(data.labels)
train_mask = mx.nd.array(data.train_mask)
val_mask = mx.nd.array(data.val_mask)
test_mask = mx.nd.array(data.test_mask)
in_feats = features.shape[1]
n_classes = data.num_labels
n_edges = data.graph.number_of_edges()
n_train_samples = train_mask.sum().asscalar()
n_val_samples = val_mask.sum().asscalar()
n_test_samples = test_mask.sum().asscalar()
print("""----Data statistics------'
#Edges %d
#Classes %d
#Train samples %d
#Val samples %d
#Test samples %d""" %
(n_edges, n_classes,
n_train_samples,
n_val_samples,
n_test_samples))
# create GCN model
g = DGLGraph(data.graph, readonly=True)
g.ndata['features'] = features
g.ndata['labels'] = labels
if args.model == "gcn_ns":
gcn_ns_train(g, ctx, args, n_classes, train_nid, test_nid, n_test_samples)
elif args.model == "gcn_cv":
gcn_cv_train(g, ctx, args, n_classes, train_nid, test_nid, n_test_samples, False)
elif args.model == "graphsage_cv":
graphsage_cv_train(g, ctx, args, n_classes, train_nid, test_nid, n_test_samples, False)
else:
print("unknown model. Please choose from gcn_ns, gcn_cv, graphsage_cv")
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='GCN')
register_data_args(parser)
parser.add_argument("--model", type=str,
help="select a model. Valid models: gcn_ns, gcn_cv, graphsage_cv")
parser.add_argument("--dropout", type=float, default=0.5,
help="dropout probability")
parser.add_argument("--gpu", type=int, default=-1,
help="gpu")
parser.add_argument("--lr", type=float, default=3e-2,
help="learning rate")
parser.add_argument("--n-epochs", type=int, default=200,
help="number of training epochs")
parser.add_argument("--batch-size", type=int, default=1000,
help="batch size")
parser.add_argument("--test-batch-size", type=int, default=1000,
help="test batch size")
parser.add_argument("--num-neighbors", type=int, default=3,
help="number of neighbors to be sampled")
parser.add_argument("--n-hidden", type=int, default=16,
help="number of hidden gcn units")
parser.add_argument("--n-layers", type=int, default=1,
help="number of hidden gcn layers")
parser.add_argument("--self-loop", action='store_true',
help="graph self-loop (default=False)")
parser.add_argument("--weight-decay", type=float, default=5e-4,
help="Weight for L2 loss")
parser.add_argument("--nworkers", type=int, default=1,
help="number of workers")
parser.add_argument("--ip", type=str, default='127.0.0.1:50051',
help="IP address")
parser.add_argument("--num-sampler", type=int, default=1,
help="number of sampler")
args = parser.parse_args()
print(args)
main(args)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment