Commit 8918cce0 authored by Da Zheng's avatar Da Zheng Committed by Minjie Wang
Browse files

[MXNet] add unit tests for mxnet (#227)

[MXNet] add unit tests for mxnet
parent 6f4898a1
......@@ -3,6 +3,7 @@ os.environ['DGLBACKEND'] = 'mxnet'
import mxnet as mx
import numpy as np
from dgl.graph import DGLGraph
import dgl
import scipy.sparse as spsp
D = 5
......@@ -42,9 +43,14 @@ def generate_graph(grad=False, readonly=False):
csr = spsp.csr_matrix((ones, (row_idx, col_idx)), shape=(10, 10))
g = DGLGraph(csr, readonly=True)
ncol = mx.nd.random.normal(shape=(10, D))
ecol = mx.nd.random.normal(shape=(17, D))
if grad:
ncol.attach_grad()
ecol.attach_grad()
g.ndata['h'] = ncol
g.edata['w'] = ecol
g.set_n_initializer(dgl.init.zero_initializer)
g.set_e_initializer(dgl.init.zero_initializer)
return g
else:
g = DGLGraph()
......@@ -56,9 +62,14 @@ def generate_graph(grad=False, readonly=False):
# add a back flow from 9 to 0
g.add_edge(9, 0)
ncol = mx.nd.random.normal(shape=(10, D))
ecol = mx.nd.random.normal(shape=(17, D))
if grad:
ncol.attach_grad()
ecol.attach_grad()
g.ndata['h'] = ncol
g.edata['w'] = ecol
g.set_n_initializer(dgl.init.zero_initializer)
g.set_e_initializer(dgl.init.zero_initializer)
return g
def test_batch_setter_getter():
......@@ -104,8 +115,9 @@ def test_batch_setter_getter():
g.edata['l'] = mx.nd.zeros((17, D))
assert _pfc(g.edata['l']) == [0.] * 17
# pop edges
old_len = len(g.edata)
assert _pfc(g.pop_e_repr('l')) == [0.] * 17
assert len(g.edata) == 0
assert len(g.edata) == old_len - 1
g.edata['l'] = mx.nd.zeros((17, D))
# set partial edges (many-many)
u = mx.nd.array([0, 0, 2, 5, 9], dtype='int64')
......@@ -191,6 +203,34 @@ def test_batch_recv():
check_batch_recv(True)
check_batch_recv(False)
def test_apply_nodes():
def _upd(nodes):
return {'h' : nodes.data['h'] * 2}
g = generate_graph()
g.register_apply_node_func(_upd)
old = g.ndata['h']
g.apply_nodes()
assert np.allclose((old * 2).asnumpy(), g.ndata['h'].asnumpy())
u = mx.nd.array([0, 3, 4, 6], dtype=np.int64)
g.apply_nodes(lambda nodes : {'h' : nodes.data['h'] * 0.}, u)
h = g.ndata['h'][u].asnumpy()
assert np.allclose(h, np.zeros(shape=(4, D), dtype=h.dtype))
def test_apply_edges():
def _upd(edges):
return {'w' : edges.data['w'] * 2}
g = generate_graph()
g.register_apply_edge_func(_upd)
old = g.edata['w']
g.apply_edges()
assert np.allclose((old * 2).asnumpy(), g.edata['w'].asnumpy())
u = mx.nd.array([0, 0, 0, 4, 5, 6], dtype=np.int64)
v = mx.nd.array([1, 2, 3, 9, 9, 9], dtype=np.int64)
g.apply_edges(lambda edges : {'w' : edges.data['w'] * 0.}, (u, v))
eid = g.edge_ids(u, v)
w = g.edata['w'][eid].asnumpy()
assert np.allclose(w, np.zeros(shape=(6, D), dtype=w.dtype))
def check_update_routines(readonly):
g = generate_graph(readonly=readonly)
g.register_message_func(message_func)
......@@ -258,13 +298,91 @@ def check_reduce_0deg(readonly):
g.update_all(_message, _reduce)
new_repr = g.ndata['h']
assert np.allclose(new_repr[1:].asnumpy(), 2+mx.nd.zeros((4, 5)).asnumpy())
assert np.allclose(new_repr[1:].asnumpy(), 2+np.zeros((4, 5)))
assert np.allclose(new_repr[0].asnumpy(), old_repr.sum(0).asnumpy())
def test_reduce_0deg():
check_reduce_0deg(True)
check_reduce_0deg(False)
def test_recv_0deg_newfld():
# test recv with 0deg nodes; the reducer also creates a new field
g = DGLGraph()
g.add_nodes(2)
g.add_edge(0, 1)
def _message(edges):
return {'m' : edges.src['h']}
def _reduce(nodes):
return {'h1' : nodes.data['h'] + mx.nd.sum(nodes.mailbox['m'], 1)}
def _apply(nodes):
return {'h1' : nodes.data['h1'] * 2}
def _init2(shape, dtype, ctx, ids):
return 2 + mx.nd.zeros(shape=shape, dtype=dtype, ctx=ctx)
g.register_message_func(_message)
g.register_reduce_func(_reduce)
g.register_apply_node_func(_apply)
# test#1: recv both 0deg and non-0deg nodes
old = mx.nd.random.normal(shape=(2, 5))
g.set_n_initializer(_init2, 'h1')
g.ndata['h'] = old
g.send((0, 1))
g.recv([0, 1])
new = g.ndata.pop('h1')
# 0deg check: initialized with the func and got applied
assert np.allclose(new[0].asnumpy(), np.full((5,), 4))
# non-0deg check
assert np.allclose(new[1].asnumpy(), mx.nd.sum(old, 0).asnumpy() * 2)
# test#2: recv only 0deg node
old = mx.nd.random.normal(shape=(2, 5))
g.ndata['h'] = old
g.ndata['h1'] = mx.nd.full((2, 5), -1) # this is necessary
g.send((0, 1))
g.recv(0)
new = g.ndata.pop('h1')
# 0deg check: fallback to apply
assert np.allclose(new[0].asnumpy(), np.full((5,), -2))
# non-0deg check: not changed
assert np.allclose(new[1].asnumpy(), np.full((5,), -1))
def test_update_all_0deg():
# test#1
g = DGLGraph()
g.add_nodes(5)
g.add_edge(1, 0)
g.add_edge(2, 0)
g.add_edge(3, 0)
g.add_edge(4, 0)
def _message(edges):
return {'m' : edges.src['h']}
def _reduce(nodes):
return {'h' : nodes.data['h'] + mx.nd.sum(nodes.mailbox['m'], 1)}
def _apply(nodes):
return {'h' : nodes.data['h'] * 2}
def _init2(shape, dtype, ctx, ids):
return 2 + mx.nd.zeros(shape, dtype=dtype, ctx=ctx)
g.set_n_initializer(_init2, 'h')
old_repr = mx.nd.random.normal(shape=(5, 5))
g.ndata['h'] = old_repr
g.update_all(_message, _reduce, _apply)
new_repr = g.ndata['h']
# the first row of the new_repr should be the sum of all the node
# features; while the 0-deg nodes should be initialized by the
# initializer and applied with UDF.
assert np.allclose(new_repr[1:].asnumpy(), 2*(2+np.zeros((4,5))))
assert np.allclose(new_repr[0].asnumpy(), 2 * mx.nd.sum(old_repr, 0).asnumpy())
# test#2: graph with no edge
g = DGLGraph()
g.add_nodes(5)
g.set_n_initializer(_init2, 'h')
g.ndata['h'] = old_repr
g.update_all(_message, _reduce, _apply)
new_repr = g.ndata['h']
# should fallback to apply
assert np.allclose(new_repr.asnumpy(), 2*old_repr.asnumpy())
def check_pull_0deg(readonly):
if readonly:
row_idx = []
......@@ -326,6 +444,10 @@ if __name__ == '__main__':
test_batch_setter_autograd()
test_batch_send()
test_batch_recv()
test_apply_nodes()
test_apply_edges()
test_update_routines()
test_reduce_0deg()
test_recv_0deg_newfld()
test_update_all_0deg()
test_pull_0deg()
import os
os.environ['DGLBACKEND'] = 'mxnet'
import mxnet as mx
import numpy as np
import dgl
import dgl.function as fn
def generate_graph():
g = dgl.DGLGraph()
g.add_nodes(10) # 10 nodes.
h = mx.nd.arange(1, 11, dtype=np.float32)
g.ndata['h'] = h
# create a graph where 0 is the source and 9 is the sink
for i in range(1, 9):
g.add_edge(0, i)
g.add_edge(i, 9)
# add a back flow from 9 to 0
g.add_edge(9, 0)
h = mx.nd.array([1., 2., 1., 3., 1., 4., 1., 5., 1., 6.,\
1., 7., 1., 8., 1., 9., 10.])
g.edata['h'] = h
return g
def reducer_both(nodes):
return {'h' : mx.nd.sum(nodes.mailbox['m'], 1)}
def test_copy_src():
# copy_src with both fields
g = generate_graph()
g.register_message_func(fn.copy_src(src='h', out='m'))
g.register_reduce_func(reducer_both)
g.update_all()
assert np.allclose(g.ndata['h'].asnumpy(),
np.array([10., 1., 1., 1., 1., 1., 1., 1., 1., 44.]))
def test_copy_edge():
# copy_edge with both fields
g = generate_graph()
g.register_message_func(fn.copy_edge(edge='h', out='m'))
g.register_reduce_func(reducer_both)
g.update_all()
assert np.allclose(g.ndata['h'].asnumpy(),
np.array([10., 1., 1., 1., 1., 1., 1., 1., 1., 44.]))
def test_src_mul_edge():
# src_mul_edge with all fields
g = generate_graph()
g.register_message_func(fn.src_mul_edge(src='h', edge='h', out='m'))
g.register_reduce_func(reducer_both)
g.update_all()
assert np.allclose(g.ndata['h'].asnumpy(),
np.array([100., 1., 1., 1., 1., 1., 1., 1., 1., 284.]))
if __name__ == '__main__':
test_copy_src()
test_copy_edge()
test_src_mul_edge()
import os
os.environ['DGLBACKEND'] = 'mxnet'
import mxnet as mx
import numpy as np
import scipy.sparse as sp
import dgl
import dgl.function as fn
D = 5
def generate_graph():
g = dgl.DGLGraph()
g.add_nodes(10)
# create a graph where 0 is the source and 9 is the sink
for i in range(1, 9):
g.add_edge(0, i)
g.add_edge(i, 9)
# add a back flow from 9 to 0
g.add_edge(9, 0)
g.ndata['f'] = mx.nd.random.normal(shape=(10, D))
g.edata['e'] = mx.nd.random.normal(shape=(17, D))
return g
def test_inplace_recv():
u = mx.nd.array([0, 0, 0, 3, 4, 9], dtype=np.int64)
v = mx.nd.array([1, 2, 3, 9, 9, 0], dtype=np.int64)
def message_func(edges):
return {'m' : edges.src['f'] + edges.dst['f']}
def reduce_func(nodes):
return {'f' : mx.nd.sum(nodes.mailbox['m'], 1)}
def apply_func(nodes):
return {'f' : 2 * nodes.data['f']}
def _test(apply_func):
g = generate_graph()
f = g.ndata['f']
# one out place run to get result
g.send((u, v), message_func)
g.recv(mx.nd.array([0,1,2,3,9], dtype=np.int64),
reduce_func, apply_func)
result = g.get_n_repr()['f']
# inplace deg bucket run
v1 = f.copy()
g.ndata['f'] = v1
g.send((u, v), message_func)
g.recv(mx.nd.array([0,1,2,3,9], dtype=np.int64),
reduce_func, apply_func, inplace=True)
r1 = g.get_n_repr()['f']
# check result
assert np.allclose(r1.asnumpy(), result.asnumpy())
# check inplace
assert np.allclose(v1.asnumpy(), r1.asnumpy())
# inplace e2v
v1 = f.copy()
g.ndata['f'] = v1
g.send((u, v), message_func)
g.recv(mx.nd.array([0,1,2,3,9], dtype=np.int64),
fn.sum(msg='m', out='f'), apply_func, inplace=True)
r1 = g.ndata['f']
# check result
assert np.allclose(r1.asnumpy(), result.asnumpy())
# check inplace
assert np.allclose(v1.asnumpy(), r1.asnumpy())
# test send_and_recv with apply_func
_test(apply_func)
# test send_and_recv without apply_func
_test(None)
def test_inplace_snr():
u = mx.nd.array([0, 0, 0, 3, 4, 9], dtype=np.int64)
v = mx.nd.array([1, 2, 3, 9, 9, 0], dtype=np.int64)
def message_func(edges):
return {'m' : edges.src['f']}
def reduce_func(nodes):
return {'f' : mx.nd.sum(nodes.mailbox['m'], 1)}
def apply_func(nodes):
return {'f' : 2 * nodes.data['f']}
def _test(apply_func):
g = generate_graph()
f = g.ndata['f']
# an out place run to get result
g.send_and_recv((u, v), fn.copy_src(src='f', out='m'),
fn.sum(msg='m', out='f'), apply_func)
result = g.ndata['f']
# inplace deg bucket
v1 = f.copy()
g.ndata['f'] = v1
g.send_and_recv((u, v), message_func, reduce_func, apply_func, inplace=True)
r1 = g.ndata['f']
# check result
assert np.allclose(r1.asnumpy(), result.asnumpy())
# check inplace
assert np.allclose(v1.asnumpy(), r1.asnumpy())
# inplace v2v spmv
v1 = f.copy()
g.ndata['f'] = v1
g.send_and_recv((u, v), fn.copy_src(src='f', out='m'),
fn.sum(msg='m', out='f'), apply_func, inplace=True)
r1 = g.ndata['f']
# check result
assert np.allclose(r1.asnumpy(), result.asnumpy())
# check inplace
assert np.allclose(v1.asnumpy(), r1.asnumpy())
# inplace e2v spmv
v1 = f.copy()
g.ndata['f'] = v1
g.send_and_recv((u, v), message_func,
fn.sum(msg='m', out='f'), apply_func, inplace=True)
r1 = g.ndata['f']
# check result
assert np.allclose(r1.asnumpy(), result.asnumpy())
# check inplace
assert np.allclose(v1.asnumpy(), r1.asnumpy())
# test send_and_recv with apply_func
_test(apply_func)
# test send_and_recv without apply_func
_test(None)
def test_inplace_push():
nodes = mx.nd.array([0, 3, 4, 9], dtype=np.int64)
def message_func(edges):
return {'m' : edges.src['f']}
def reduce_func(nodes):
return {'f' : mx.nd.sum(nodes.mailbox['m'], 1)}
def apply_func(nodes):
return {'f' : 2 * nodes.data['f']}
def _test(apply_func):
g = generate_graph()
f = g.ndata['f']
# an out place run to get result
g.push(nodes,
fn.copy_src(src='f', out='m'), fn.sum(msg='m', out='f'), apply_func)
result = g.ndata['f']
# inplace deg bucket
v1 = f.copy()
g.ndata['f'] = v1
g.push(nodes, message_func, reduce_func, apply_func, inplace=True)
r1 = g.ndata['f']
# check result
assert np.allclose(r1.asnumpy(), result.asnumpy())
# check inplace
assert np.allclose(v1.asnumpy(), r1.asnumpy())
# inplace v2v spmv
v1 = f.copy()
g.ndata['f'] = v1
g.push(nodes, fn.copy_src(src='f', out='m'),
fn.sum(msg='m', out='f'), apply_func, inplace=True)
r1 = g.ndata['f']
# check result
assert np.allclose(r1.asnumpy(), result.asnumpy())
# check inplace
assert np.allclose(v1.asnumpy(), r1.asnumpy())
# inplace e2v spmv
v1 = f.copy()
g.ndata['f'] = v1
g.push(nodes,
message_func, fn.sum(msg='m', out='f'), apply_func, inplace=True)
r1 = g.ndata['f']
# check result
assert np.allclose(r1.asnumpy(), result.asnumpy())
# check inplace
assert np.allclose(v1.asnumpy(), r1.asnumpy())
# test send_and_recv with apply_func
_test(apply_func)
# test send_and_recv without apply_func
_test(None)
def test_inplace_pull():
nodes = mx.nd.array([1, 2, 3, 9], dtype=np.int64)
def message_func(edges):
return {'m' : edges.src['f']}
def reduce_func(nodes):
return {'f' : mx.nd.sum(nodes.mailbox['m'], 1)}
def apply_func(nodes):
return {'f' : 2 * nodes.data['f']}
def _test(apply_func):
g = generate_graph()
f = g.ndata['f']
# an out place run to get result
g.pull(nodes,
fn.copy_src(src='f', out='m'), fn.sum(msg='m', out='f'), apply_func)
result = g.ndata['f']
# inplace deg bucket
v1 = f.copy()
g.ndata['f'] = v1
g.pull(nodes, message_func, reduce_func, apply_func, inplace=True)
r1 = g.ndata['f']
# check result
assert np.allclose(r1.asnumpy(), result.asnumpy())
# check inplace
assert np.allclose(v1.asnumpy(), r1.asnumpy())
# inplace v2v spmv
v1 = f.copy()
g.ndata['f'] = v1
g.pull(nodes, fn.copy_src(src='f', out='m'),
fn.sum(msg='m', out='f'), apply_func, inplace=True)
r1 = g.ndata['f']
# check result
assert np.allclose(r1.asnumpy(), result.asnumpy())
# check inplace
assert np.allclose(v1.asnumpy(), r1.asnumpy())
# inplace e2v spmv
v1 = f.copy()
g.ndata['f'] = v1
g.pull(nodes,
message_func, fn.sum(msg='m', out='f'), apply_func, inplace=True)
r1 = g.ndata['f']
# check result
assert np.allclose(r1.asnumpy(), result.asnumpy())
# check inplace
assert np.allclose(v1.asnumpy(), r1.asnumpy())
# test send_and_recv with apply_func
_test(apply_func)
# test send_and_recv without apply_func
_test(None)
def test_inplace_apply():
def apply_node_func(nodes):
return {'f': nodes.data['f'] * 2}
def apply_edge_func(edges):
return {'e': edges.data['e'] * 2}
g = generate_graph()
nodes = [1, 2, 3, 9]
nf = g.ndata['f']
# out place run
g.apply_nodes(apply_node_func, nodes)
new_nf = g.ndata['f']
# in place run
g.ndata['f'] = nf
g.apply_nodes(apply_node_func, nodes, inplace=True)
# check results correct and in place
assert np.allclose(nf.asnumpy(), new_nf.asnumpy())
# test apply all nodes, should not be done in place
g.ndata['f'] = nf
g.apply_nodes(apply_node_func, inplace=True)
assert np.allclose(nf.asnumpy(), g.ndata['f'].asnumpy()) == False
edges = [3, 5, 7, 10]
ef = g.edata['e']
# out place run
g.apply_edges(apply_edge_func, edges)
new_ef = g.edata['e']
# in place run
g.edata['e'] = ef
g.apply_edges(apply_edge_func, edges, inplace=True)
g.edata['e'] = ef
assert np.allclose(ef.asnumpy(), new_ef.asnumpy())
# test apply all edges, should not be done in place
g.edata['e'] == ef
g.apply_edges(apply_edge_func, inplace=True)
assert np.allclose(ef.asnumpy(), g.edata['e'].asnumpy()) == False
if __name__ == '__main__':
test_inplace_recv()
test_inplace_snr()
test_inplace_push()
test_inplace_pull()
test_inplace_apply()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment