"src/git@developer.sourcefind.cn:renzhc/diffusers_dcu.git" did not exist on "d9e7857af3217f4a971c1e8754078543732a671a"
Unverified Commit 5f89cd66 authored by Da Zheng's avatar Da Zheng Committed by GitHub
Browse files

Fix the convergence problem in SSE. (#162)

* add a test.

* move file.

* debug

* add degree normalization.

* it can converge now.

* remove hidden_data.

* use readonly graph index.

* use subgraph loader.

* refactor code and remove intermediate data.

* split inference.

* mix cpu and gpu training.

* fix mxnet sparse_matrix constructor.

* convert tensor context.

* set up the prediction model.

* load mxnet csr directly.

* add timing.

* move test_sse.py to sse_batch.py

* fix the tensor version of SSE.

* update README.
parent e515f531
# Benchmark SSE on multi-GPUs # Benchmark SSE on multi-GPUs
# Use a small embedding. # Use a small embedding.
DGLBACKEND=mxnet python3 -m pyinstrument -o prof.out examples/mxnet/sse/sse_batch.py --graph-file ../../data/5_5_csr.nd --n-epochs 1 --lr 0.0005 --batch-size 1024 --use-spmv 1 --num-parallel-subgraphs 32 --gpu 8 DGLBACKEND=mxnet python3 -m pyinstrument -o prof.out examples/mxnet/sse/sse_batch.py --graph-file ../../data/5_5_csr.nd --n-epochs 1 --lr 0.0005 --batch-size 1024 --use-spmv 1 --num-parallel-subgraphs 32 --gpu 8
# Use a large embedding. # Use a large embedding.
DGLBACKEND=mxnet python3 examples/mxnet/sse/sse_batch.py --graph-file ../../data/5_5_csr.nd --n-epochs 1 --lr 0.0005 --batch-size 2048 --use-spmv 1 --num-parallel-subgraphs 32 --gpu 8 --n-hidden 500 DGLBACKEND=mxnet python3 examples/mxnet/sse/sse_batch.py --graph-file ../../data/5_5_csr.nd --n-epochs 1 --lr 0.0005 --batch-size 2048 --use-spmv 1 --num-parallel-subgraphs 32 --gpu 8 --n-hidden 500
# test convergence
DGLBACKEND=mxnet python3 examples/mxnet/sse/sse_batch.py --dataset "pubmed" --n-epochs 100 --lr 0.005 --batch-size 1024 --use-spmv 1
...@@ -19,69 +19,110 @@ def gcn_msg(edges): ...@@ -19,69 +19,110 @@ def gcn_msg(edges):
return {'m': mx.nd.concat(edges.src['in'], edges.src['h'], dim=1)} return {'m': mx.nd.concat(edges.src['in'], edges.src['h'], dim=1)}
def gcn_reduce(nodes): def gcn_reduce(nodes):
return {'accum': mx.nd.sum(nodes.mailbox['m'], 1)} return {'accum': mx.nd.sum(nodes.mailbox['m'], 1) / nodes.mailbox['m'].shape[1]}
class NodeUpdate(gluon.Block): class NodeUpdate(gluon.Block):
def __init__(self, out_feats, activation=None, alpha=0.9, **kwargs): def __init__(self, out_feats, activation=None, alpha=0.1, **kwargs):
super(NodeUpdate, self).__init__(**kwargs) super(NodeUpdate, self).__init__(**kwargs)
self.linear1 = gluon.nn.Dense(out_feats, activation=activation) self.linear1 = gluon.nn.Dense(out_feats, activation=activation)
# TODO what is the dimension here? # TODO what is the dimension here?
self.linear2 = gluon.nn.Dense(out_feats) self.linear2 = gluon.nn.Dense(out_feats)
self.alpha = alpha self.alpha = alpha
def forward(self, nodes): def forward(self, in_data, hidden_data, accum):
hidden = mx.nd.concat(nodes.data['in'], nodes.data['accum'], dim=1) tmp = mx.nd.concat(in_data, accum, dim=1)
hidden = self.linear2(self.linear1(hidden)) hidden = self.linear2(self.linear1(tmp))
return {'h': nodes.data['h'] * (1 - self.alpha) + self.alpha * hidden} return hidden_data * (1 - self.alpha) + self.alpha * hidden
class DGLNodeUpdate(gluon.Block):
def __init__(self, update):
super(DGLNodeUpdate, self).__init__()
self.update = update
def forward(self, node):
return {'h1': self.update(node.data['in'], node.data['h'], node.data['accum'])}
class SSEUpdateHidden(gluon.Block): class SSEUpdateHidden(gluon.Block):
def __init__(self, def __init__(self,
n_hidden, n_hidden,
activation,
dropout, dropout,
use_spmv, activation,
**kwargs): **kwargs):
super(SSEUpdateHidden, self).__init__(**kwargs) super(SSEUpdateHidden, self).__init__(**kwargs)
with self.name_scope(): with self.name_scope():
self.layer = NodeUpdate(n_hidden, activation) self.layer = NodeUpdate(n_hidden, activation)
self.dropout = dropout self.dropout = dropout
def forward(self, g, vertices):
if vertices is None:
deg = mx.nd.expand_dims(g.in_degrees(np.arange(0, g.number_of_nodes())), 1).astype(np.float32)
feat = g.get_n_repr()['in']
cat = mx.nd.concat(feat, g.ndata['h'], dim=1)
accum = mx.nd.dot(g.adjacency_matrix(), cat) / deg
return self.layer(feat, g.ndata['h'], accum)
else:
deg = mx.nd.expand_dims(g.in_degrees(vertices), 1).astype(np.float32)
# We don't need dropout for inference.
if self.dropout:
# TODO here we apply dropout on all vertex representation.
g.ndata['h'] = mx.nd.Dropout(g.ndata['h'], p=self.dropout)
feat = g.get_n_repr()['in']
cat = mx.nd.concat(feat, g.ndata['h'], dim=1)
slices = mx.nd.take(g.adjacency_matrix(), vertices).as_in_context(cat.context)
accum = mx.nd.dot(slices, cat) / deg.as_in_context(cat.context)
vertices = vertices.as_in_context(g.ndata['in'].context)
return self.layer(mx.nd.take(feat, vertices),
mx.nd.take(g.ndata['h'], vertices), accum)
class DGLSSEUpdateHidden(gluon.Block):
def __init__(self,
n_hidden,
activation,
dropout,
use_spmv,
**kwargs):
super(DGLSSEUpdateHidden, self).__init__(**kwargs)
with self.name_scope():
self.layer = DGLNodeUpdate(NodeUpdate(n_hidden, activation))
self.dropout = dropout
self.use_spmv = use_spmv self.use_spmv = use_spmv
def forward(self, g, vertices): def forward(self, g, vertices):
if self.use_spmv: if self.use_spmv:
feat = g.ndata['in'] feat = g.ndata['in']
h = g.ndata['h'] g.ndata['cat'] = mx.nd.concat(feat, g.ndata['h'], dim=1)
g.ndata['cat'] = mx.nd.concat(feat, h, dim=1)
msg_func = fn.copy_src(src='cat', out='tmp') msg_func = fn.copy_src(src='cat', out='m')
reduce_func = fn.sum(msg='tmp', out='accum') reduce_func = fn.sum(msg='m', out='accum')
else: else:
msg_func = gcn_msg msg_func = gcn_msg
reduce_func = gcn_reduce reduce_func = gcn_reduce
deg = mx.nd.expand_dims(g.in_degrees(np.arange(0, g.number_of_nodes())), 1).astype(np.float32)
if vertices is None: if vertices is None:
g.update_all(msg_func, reduce_func, None) g.update_all(msg_func, reduce_func, None)
if self.use_spmv: if self.use_spmv:
g.ndata.pop('cat') g.ndata.pop('cat')
g.ndata['accum'] = g.ndata['accum'] / deg
batch_size = 100000 batch_size = 100000
num_batches = int(math.ceil(g.number_of_nodes() / batch_size)) num_batches = int(math.ceil(g.number_of_nodes() / batch_size))
for i in range(num_batches): for i in range(num_batches):
vs = mx.nd.arange(i * batch_size, min((i + 1) * batch_size, g.number_of_nodes()), dtype=np.int64) vs = mx.nd.arange(i * batch_size, min((i + 1) * batch_size, g.number_of_nodes()), dtype=np.int64)
g.apply_nodes(self.layer, vs, inplace=True) g.apply_nodes(self.layer, vs, inplace=True)
g.ndata.pop('accum') g.ndata.pop('accum')
ret = g.ndata['h'] return g.get_n_repr()['h1']
else: else:
# We don't need dropout for inference. # We don't need dropout for inference.
if self.dropout: if self.dropout:
# TODO here we apply dropout on all vertex representation. # TODO here we apply dropout on all vertex representation.
val = mx.nd.Dropout(g.ndata['h'], p=self.dropout) g.ndata['h'] = mx.nd.Dropout(g.ndata['h'], p=self.dropout)
g.ndata['h'] = val g.pull(vertices, msg_func, reduce_func, None)
g.pull(vertices, msg_func, reduce_func, self.layer)
ctx = g.ndata['h'].context
ret = mx.nd.take(g.ndata['h'], vertices.tousertensor().as_in_context(ctx))
if self.use_spmv: if self.use_spmv:
g.ndata.pop('cat') g.ndata.pop('cat')
deg = deg.as_in_context(g.ndata['accum'].context)
g.ndata['accum'] = g.ndata['accum'] / deg
g.apply_nodes(self.layer, vertices)
g.ndata.pop('accum') g.ndata.pop('accum')
return ret return g.ndata['h1'][vertices.as_in_context(g.ndata['h1'].context)]
class SSEPredict(gluon.Block): class SSEPredict(gluon.Block):
def __init__(self, update_hidden, out_feats, dropout, **kwargs): def __init__(self, update_hidden, out_feats, dropout, **kwargs):
...@@ -117,7 +158,7 @@ def main(args, data): ...@@ -117,7 +158,7 @@ def main(args, data):
eval_vs = np.arange(train_size, len(labels), dtype='int64') eval_vs = np.arange(train_size, len(labels), dtype='int64')
print("train size: " + str(len(train_vs))) print("train size: " + str(len(train_vs)))
print("eval size: " + str(len(eval_vs))) print("eval size: " + str(len(eval_vs)))
labels = data.labels eval_labels = mx.nd.array(data.labels[eval_vs])
in_feats = features.shape[1] in_feats = features.shape[1]
n_classes = data.num_labels n_classes = data.num_labels
n_edges = data.graph.number_of_edges() n_edges = data.graph.number_of_edges()
...@@ -132,31 +173,41 @@ def main(args, data): ...@@ -132,31 +173,41 @@ def main(args, data):
g.ndata['h'] = mx.nd.random.normal(shape=(g.number_of_nodes(), args.n_hidden), g.ndata['h'] = mx.nd.random.normal(shape=(g.number_of_nodes(), args.n_hidden),
ctx=mx.cpu(0)) ctx=mx.cpu(0))
update_hidden_infer = SSEUpdateHidden(args.n_hidden, 'relu', update_hidden_infer = DGLSSEUpdateHidden(args.n_hidden, 'relu',
args.update_dropout, args.use_spmv, prefix='sse') args.update_dropout, args.use_spmv,
update_hidden_infer.initialize(ctx=mx.cpu(0)) prefix='sse')
update_hidden_train = DGLSSEUpdateHidden(args.n_hidden, 'relu',
args.update_dropout, args.use_spmv,
prefix='sse')
if not args.dgl:
update_hidden_infer = SSEUpdateHidden(args.n_hidden, args.update_dropout, 'relu',
prefix='sse')
update_hidden_train = SSEUpdateHidden(args.n_hidden, args.update_dropout, 'relu',
prefix='sse')
train_ctxs = [] model_train = SSEPredict(update_hidden_train, args.n_hidden, args.predict_dropout, prefix='app')
update_hidden_train = SSEUpdateHidden(args.n_hidden, 'relu', model_infer = SSEPredict(update_hidden_infer, args.n_hidden, args.predict_dropout, prefix='app')
args.update_dropout, args.use_spmv, prefix='sse') model_infer.initialize(ctx=mx.cpu(0))
model = SSEPredict(update_hidden_train, args.n_hidden, args.predict_dropout, prefix='app')
if args.gpu <= 0: if args.gpu <= 0:
model.initialize(ctx=mx.cpu(0)) model_train.initialize(ctx=mx.cpu(0))
train_ctxs.append(mx.cpu(0))
else: else:
train_ctxs = []
for i in range(args.gpu): for i in range(args.gpu):
train_ctxs.append(mx.gpu(i)) train_ctxs.append(mx.gpu(i))
model.initialize(ctx=train_ctxs) model_train.initialize(ctx=train_ctxs)
# use optimizer # use optimizer
num_batches = int(g.number_of_nodes() / args.batch_size) num_batches = int(g.number_of_nodes() / args.batch_size)
scheduler = mx.lr_scheduler.CosineScheduler(args.n_epochs * num_batches, scheduler = mx.lr_scheduler.CosineScheduler(args.n_epochs * num_batches,
args.lr * 10, 0, 0, args.lr/5) args.lr * 10, 0, 0, args.lr/5)
trainer = gluon.Trainer(model.collect_params(), 'adam', {'learning_rate': args.lr, trainer = gluon.Trainer(model_train.collect_params(), 'adam', {'learning_rate': args.lr,
'lr_scheduler': scheduler}, kvstore=mx.kv.create('device')) 'lr_scheduler': scheduler}, kvstore=mx.kv.create('device'))
# compute vertex embedding. # compute vertex embedding.
update_hidden_infer(g, None) all_hidden = update_hidden_infer(g, None)
g.ndata['h'] = all_hidden
rets = []
rets.append(all_hidden)
# initialize graph # initialize graph
dur = [] dur = []
...@@ -164,7 +215,9 @@ def main(args, data): ...@@ -164,7 +215,9 @@ def main(args, data):
t0 = time.time() t0 = time.time()
train_loss = 0 train_loss = 0
i = 0 i = 0
for subg, seeds in dgl.sampling.NeighborSampler(g, args.batch_size, g.number_of_nodes(), num_batches = len(train_vs) / args.batch_size
start1 = time.time()
for subg, seeds in dgl.contrib.sampling.NeighborSampler(g, args.batch_size, g.number_of_nodes(),
neighbor_type='in', num_workers=args.num_parallel_subgraphs, seed_nodes=train_vs, neighbor_type='in', num_workers=args.num_parallel_subgraphs, seed_nodes=train_vs,
shuffle=True): shuffle=True):
subg.copy_from_parent() subg.copy_from_parent()
...@@ -176,34 +229,53 @@ def main(args, data): ...@@ -176,34 +229,53 @@ def main(args, data):
subg_seeds = subg.map_to_subgraph_nid(seeds) subg_seeds = subg.map_to_subgraph_nid(seeds)
with mx.autograd.record(): with mx.autograd.record():
logits = model(subg, subg_seeds) logits = model_train(subg, subg_seeds.tousertensor())
batch_labels = mx.nd.array(labels[seeds.asnumpy()], ctx=logits.context) batch_labels = mx.nd.array(labels[seeds.asnumpy()], ctx=logits.context)
loss = mx.nd.softmax_cross_entropy(logits, batch_labels) loss = mx.nd.softmax_cross_entropy(logits, batch_labels)
loss.backward() loss.backward()
losses.append(loss) losses.append(loss)
i = i + 1 i += 1
if i % args.gpu == 0: if args.gpu <= 0:
trainer.step(seeds.shape[0])
train_loss += loss.asnumpy()[0]
losses = []
elif i % args.gpu == 0:
trainer.step(len(seeds) * len(losses)) trainer.step(len(seeds) * len(losses))
for loss in losses: for loss in losses:
train_loss += loss.asnumpy()[0] train_loss += loss.asnumpy()[0]
losses = [] losses = []
#logits = model(eval_vs) if i % args.num_parallel_subgraphs == 0:
#eval_loss = mx.nd.softmax_cross_entropy(logits, eval_labels) end1 = time.time()
#eval_loss = eval_loss.asnumpy()[0] print("process " + str(args.num_parallel_subgraphs)
eval_loss = 0 + " subgraphs takes " + str(end1 - start1))
start1 = end1
# compute vertex embedding. if i > num_batches / 3:
infer_params = update_hidden_infer.collect_params() break
# prediction.
logits = model_infer(g, mx.nd.array(eval_vs, dtype=np.int64))
eval_loss = mx.nd.softmax_cross_entropy(logits, eval_labels)
eval_loss = eval_loss.asnumpy()[0]
# update the inference model.
infer_params = model_infer.collect_params()
for key in infer_params: for key in infer_params:
idx = trainer._param2idx[key] idx = trainer._param2idx[key]
trainer._kvstore.pull(idx, out=infer_params[key].data()) trainer._kvstore.pull(idx, out=infer_params[key].data())
update_hidden_infer(g, None)
# Update node embeddings.
all_hidden = update_hidden_infer(g, None)
g.ndata['h'] = all_hidden
rets.append(all_hidden)
dur.append(time.time() - t0) dur.append(time.time() - t0)
print("Epoch {:05d} | Train Loss {:.4f} | Eval Loss {:.4f} | Time(s) {:.4f} | ETputs(KTEPS) {:.2f}".format( print("Epoch {:05d} | Train Loss {:.4f} | Eval Loss {:.4f} | Time(s) {:.4f} | ETputs(KTEPS) {:.2f}".format(
epoch, train_loss, eval_loss, np.mean(dur), n_edges / np.mean(dur) / 1000)) epoch, train_loss, eval_loss, np.mean(dur), n_edges / np.mean(dur) / 1000))
return rets
class MXNetGraph(object): class MXNetGraph(object):
"""A simple graph object that uses scipy matrix.""" """A simple graph object that uses scipy matrix."""
def __init__(self, mat): def __init__(self, mat):
...@@ -216,7 +288,7 @@ class MXNetGraph(object): ...@@ -216,7 +288,7 @@ class MXNetGraph(object):
return self._mat.shape[0] return self._mat.shape[0]
def number_of_edges(self): def number_of_edges(self):
return mx.nd.contrib.getnnz(self._mat) return mx.nd.contrib.getnnz(self._mat).asnumpy()[0]
class GraphData: class GraphData:
def __init__(self, csr, num_feats): def __init__(self, csr, num_feats):
...@@ -247,14 +319,15 @@ if __name__ == '__main__': ...@@ -247,14 +319,15 @@ if __name__ == '__main__':
help="number of hidden gcn units") help="number of hidden gcn units")
parser.add_argument("--warmup", type=int, default=10, parser.add_argument("--warmup", type=int, default=10,
help="number of iterations to warm up with large learning rate") help="number of iterations to warm up with large learning rate")
parser.add_argument("--update-dropout", type=float, default=0.5, parser.add_argument("--update-dropout", type=float, default=0,
help="the dropout rate for updating vertex embedding") help="the dropout rate for updating vertex embedding")
parser.add_argument("--predict-dropout", type=float, default=0.5, parser.add_argument("--predict-dropout", type=float, default=0,
help="the dropout rate for prediction") help="the dropout rate for prediction")
parser.add_argument("--train_percent", type=float, default=0.5, parser.add_argument("--train_percent", type=float, default=0.5,
help="the percentage of data used for training") help="the percentage of data used for training")
parser.add_argument("--use-spmv", type=bool, default=False, parser.add_argument("--use-spmv", action="store_true",
help="use SpMV for faster speed.") help="use SpMV for faster speed.")
parser.add_argument("--dgl", action="store_true")
parser.add_argument("--num-parallel-subgraphs", type=int, default=1, parser.add_argument("--num-parallel-subgraphs", type=int, default=1,
help="the number of subgraphs to construct in parallel.") help="the number of subgraphs to construct in parallel.")
args = parser.parse_args() args = parser.parse_args()
...@@ -266,4 +339,7 @@ if __name__ == '__main__': ...@@ -266,4 +339,7 @@ if __name__ == '__main__':
csr = None csr = None
else: else:
data = load_data(args) data = load_data(args)
main(args, data) rets1 = main(args, data)
rets2 = main(args, data)
for hidden1, hidden2 in zip(rets1, rets2):
print("hidden: " + str(mx.nd.sum(mx.nd.abs(hidden1 - hidden2)).asnumpy()))
...@@ -27,11 +27,13 @@ def sparse_matrix(data, index, shape, force_format=False): ...@@ -27,11 +27,13 @@ def sparse_matrix(data, index, shape, force_format=False):
raise TypeError('MXNet backend only supports CSR format,' raise TypeError('MXNet backend only supports CSR format,'
' but COO format is forced.') ' but COO format is forced.')
coord = index[1] coord = index[1]
return nd.sparse.csr_matrix((data, (coord[0], coord[1])), tuple(shape)) return nd.sparse.csr_matrix((data, (coord[0], coord[1])),
tuple(shape), ctx=data.context)
elif fmt == 'csr': elif fmt == 'csr':
indices = index[1] indices = index[1]
indptr = index[2] indptr = index[2]
return nd.sparse.csr_matrix((data, indices, indptr), tuple(shape)) return nd.sparse.csr_matrix((data, indices, indptr),
tuple(shape), ctx=data.context)
else: else:
raise TypeError('Invalid format: %s.' % fmt) raise TypeError('Invalid format: %s.' % fmt)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment