Commit 84702916 authored by zengxy's avatar zengxy Committed by Da Zheng
Browse files

[BugFix] Some negative layer_id/block_id are not converted to the actual one. (#584)

* [BugFix] Some negative layer_id/block_id are not converted to the actual one.

* Add some tests for negatives block/layer ids
parent a3f10c21
...@@ -276,6 +276,7 @@ class NodeFlow(DGLBaseGraph): ...@@ -276,6 +276,7 @@ class NodeFlow(DGLBaseGraph):
Tensor Tensor
Node Ids in the NodeFlow. Node Ids in the NodeFlow.
""" """
layer_id = self._get_layer_id(layer_id)
parent_nids = utils.toindex(parent_nids) parent_nids = utils.toindex(parent_nids)
layers = self._layer_offsets layers = self._layer_offsets
start = int(layers[layer_id]) start = int(layers[layer_id])
...@@ -414,6 +415,7 @@ class NodeFlow(DGLBaseGraph): ...@@ -414,6 +415,7 @@ class NodeFlow(DGLBaseGraph):
Tensor Tensor
The edge ids. The edge ids.
""" """
block_id = self._get_block_id(block_id)
layer0_size = self._layer_offsets[block_id + 1] - self._layer_offsets[block_id] layer0_size = self._layer_offsets[block_id + 1] - self._layer_offsets[block_id]
rst = _CAPI_NodeFlowGetBlockAdj(self._graph._handle, "coo", rst = _CAPI_NodeFlowGetBlockAdj(self._graph._handle, "coo",
int(layer0_size), int(layer0_size),
...@@ -446,6 +448,7 @@ class NodeFlow(DGLBaseGraph): ...@@ -446,6 +448,7 @@ class NodeFlow(DGLBaseGraph):
A index for data shuffling due to sparse format change. Return None A index for data shuffling due to sparse format change. Return None
if shuffle is not required. if shuffle is not required.
""" """
block_id = self._get_block_id(block_id)
fmt = F.get_preferred_sparse_format() fmt = F.get_preferred_sparse_format()
# We need to extract two layers. # We need to extract two layers.
layer0_size = self._layer_offsets[block_id + 1] - self._layer_offsets[block_id] layer0_size = self._layer_offsets[block_id + 1] - self._layer_offsets[block_id]
...@@ -511,6 +514,7 @@ class NodeFlow(DGLBaseGraph): ...@@ -511,6 +514,7 @@ class NodeFlow(DGLBaseGraph):
A index for data shuffling due to sparse format change. Return None A index for data shuffling due to sparse format change. Return None
if shuffle is not required. if shuffle is not required.
""" """
block_id = self._get_block_id(block_id)
src, dst, eid = self.block_edges(block_id) src, dst, eid = self.block_edges(block_id)
src = F.copy_to(src, ctx) # the index of the ctx will be cached src = F.copy_to(src, ctx) # the index of the ctx will be cached
dst = F.copy_to(dst, ctx) # the index of the ctx will be cached dst = F.copy_to(dst, ctx) # the index of the ctx will be cached
...@@ -730,6 +734,7 @@ class NodeFlow(DGLBaseGraph): ...@@ -730,6 +734,7 @@ class NodeFlow(DGLBaseGraph):
inplace : bool, optional inplace : bool, optional
If True, update will be done in place, but autograd will break. If True, update will be done in place, but autograd will break.
""" """
block_id = self._get_block_id(block_id)
if func == "default": if func == "default":
func = self._apply_edge_funcs[block_id] func = self._apply_edge_funcs[block_id]
assert func is not None assert func is not None
...@@ -797,6 +802,7 @@ class NodeFlow(DGLBaseGraph): ...@@ -797,6 +802,7 @@ class NodeFlow(DGLBaseGraph):
inplace: bool, optional inplace: bool, optional
If True, update will be done in place, but autograd will break. If True, update will be done in place, but autograd will break.
""" """
block_id = self._get_block_id(block_id)
if message_func == "default": if message_func == "default":
message_func = self._message_funcs[block_id] message_func = self._message_funcs[block_id]
if reduce_func == "default": if reduce_func == "default":
......
...@@ -81,6 +81,12 @@ def test_basic(): ...@@ -81,6 +81,12 @@ def test_basic():
nids = nf.map_from_parent_nid(0, parent_nids) nids = nf.map_from_parent_nid(0, parent_nids)
assert F.array_equal(nids, parent_nids) assert F.array_equal(nids, parent_nids)
# should also work for negative layer ids
for l in range(-1, -num_layers, -1):
nids1 = nf.map_from_parent_nid(l, parent_nids)
nids2 = nf.map_from_parent_nid(l + num_layers, parent_nids)
assert F.array_equal(nids1, nids2)
g = generate_rand_graph(100) g = generate_rand_graph(100)
nf = create_mini_batch(g, num_layers) nf = create_mini_batch(g, num_layers)
assert nf.num_layers == num_layers + 1 assert nf.num_layers == num_layers + 1
...@@ -126,6 +132,10 @@ def check_apply_edges(create_node_flow): ...@@ -126,6 +132,10 @@ def check_apply_edges(create_node_flow):
nf.apply_block(i, update_func) nf.apply_block(i, update_func)
assert F.array_equal(nf.blocks[i].data['h2'], new_feats) assert F.array_equal(nf.blocks[i].data['h2'], new_feats)
# should also work for negative block ids
nf.apply_block(-num_layers + i, update_func)
assert F.array_equal(nf.blocks[i].data['h2'], new_feats)
eids = nf.block_parent_eid(i) eids = nf.block_parent_eid(i)
srcs, dsts = g.find_edges(eids) srcs, dsts = g.find_edges(eids)
expected_f_sum = g.ndata["f"][srcs] + g.ndata["f"][dsts] expected_f_sum = g.ndata["f"][srcs] + g.ndata["f"][dsts]
...@@ -137,7 +147,7 @@ def test_apply_edges(): ...@@ -137,7 +147,7 @@ def test_apply_edges():
check_apply_edges(create_mini_batch) check_apply_edges(create_mini_batch)
def check_flow_compute(create_node_flow): def check_flow_compute(create_node_flow, use_negative_block_id=False):
num_layers = 2 num_layers = 2
g = generate_rand_graph(100) g = generate_rand_graph(100)
nf = create_node_flow(g, num_layers) nf = create_node_flow(g, num_layers)
...@@ -146,7 +156,8 @@ def check_flow_compute(create_node_flow): ...@@ -146,7 +156,8 @@ def check_flow_compute(create_node_flow):
nf.layers[0].data['h'] = nf.layers[0].data['h1'] nf.layers[0].data['h'] = nf.layers[0].data['h1']
# Test the computation on a layer at a time. # Test the computation on a layer at a time.
for i in range(num_layers): for i in range(num_layers):
nf.block_compute(i, fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'), l = -num_layers + i if use_negative_block_id else i
nf.block_compute(l, fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
lambda nodes: {'h' : nodes.data['t'] + 1}) lambda nodes: {'h' : nodes.data['t'] + 1})
g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'), g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
lambda nodes: {'h' : nodes.data['t'] + 1}) lambda nodes: {'h' : nodes.data['t'] + 1})
...@@ -155,8 +166,9 @@ def check_flow_compute(create_node_flow): ...@@ -155,8 +166,9 @@ def check_flow_compute(create_node_flow):
# Test the computation when only a few nodes are active in a layer. # Test the computation when only a few nodes are active in a layer.
g.ndata['h'] = g.ndata['h1'] g.ndata['h'] = g.ndata['h1']
for i in range(num_layers): for i in range(num_layers):
l = -num_layers + i if use_negative_block_id else i
vs = nf.layer_nid(i+1)[0:4] vs = nf.layer_nid(i+1)[0:4]
nf.block_compute(i, fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'), nf.block_compute(l, fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
lambda nodes: {'h' : nodes.data['t'] + 1}, v=vs) lambda nodes: {'h' : nodes.data['t'] + 1}, v=vs)
g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'), g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
lambda nodes: {'h' : nodes.data['t'] + 1}) lambda nodes: {'h' : nodes.data['t'] + 1})
...@@ -168,6 +180,8 @@ def check_flow_compute(create_node_flow): ...@@ -168,6 +180,8 @@ def check_flow_compute(create_node_flow):
def test_flow_compute(): def test_flow_compute():
check_flow_compute(create_full_nodeflow) check_flow_compute(create_full_nodeflow)
check_flow_compute(create_mini_batch) check_flow_compute(create_mini_batch)
check_flow_compute(create_full_nodeflow, use_negative_block_id=True)
check_flow_compute(create_mini_batch, use_negative_block_id=True)
def check_prop_flows(create_node_flow): def check_prop_flows(create_node_flow):
...@@ -258,13 +272,20 @@ def test_copy(): ...@@ -258,13 +272,20 @@ def test_copy():
nf.block_compute(i, partial(msg_func, ind=i), partial(reduce_func, ind=i)) nf.block_compute(i, partial(msg_func, ind=i), partial(reduce_func, ind=i))
def test_block_adj_matrix(): def test_block_edges():
num_layers = 3 num_layers = 3
g = generate_rand_graph(100) g = generate_rand_graph(100)
nf = create_mini_batch(g, num_layers) nf = create_mini_batch(g, num_layers)
assert nf.num_layers == num_layers + 1 assert nf.num_layers == num_layers + 1
for i in range(nf.num_blocks): for i in range(nf.num_blocks):
src, dst, eid = nf.block_edges(i) src, dst, eid = nf.block_edges(i)
# should also work for negative block ids
src_by_neg, dst_by_neg, eid_by_neg = nf.block_edges(-nf.num_blocks + i)
assert F.array_equal(src, src_by_neg)
assert F.array_equal(dst, dst_by_neg)
assert F.array_equal(eid, eid_by_neg)
dest_nodes = utils.toindex(nf.layer_nid(i + 1)) dest_nodes = utils.toindex(nf.layer_nid(i + 1))
u, v, _ = nf._graph.in_edges(dest_nodes) u, v, _ = nf._graph.in_edges(dest_nodes)
u = nf._glb2lcl_nid(u.tousertensor(), i) u = nf._glb2lcl_nid(u.tousertensor(), i)
...@@ -272,14 +293,68 @@ def test_block_adj_matrix(): ...@@ -272,14 +293,68 @@ def test_block_adj_matrix():
assert F.array_equal(src, u) assert F.array_equal(src, u)
assert F.array_equal(dst, v) assert F.array_equal(dst, v)
def test_block_adj_matrix():
num_layers = 3
g = generate_rand_graph(100)
nf = create_mini_batch(g, num_layers)
assert nf.num_layers == num_layers + 1
for i in range(nf.num_blocks):
u, v, _ = nf.block_edges(i)
adj, _ = nf.block_adjacency_matrix(i, F.cpu()) adj, _ = nf.block_adjacency_matrix(i, F.cpu())
adj = F.sparse_to_numpy(adj) adj = F.sparse_to_numpy(adj)
# should also work for negative block ids
adj_by_neg, _ = nf.block_adjacency_matrix(-nf.num_blocks + i, F.cpu())
adj_by_neg = F.sparse_to_numpy(adj_by_neg)
data = np.ones((len(u)), dtype=np.float32) data = np.ones((len(u)), dtype=np.float32)
v = utils.toindex(v) v = utils.toindex(v)
u = utils.toindex(u) u = utils.toindex(u)
coo = sp.sparse.coo_matrix((data, (v.tonumpy(), u.tonumpy())), coo = sp.sparse.coo_matrix((data, (v.tonumpy(), u.tonumpy())),
shape=adj.shape).todense() shape=adj.shape).todense()
assert np.array_equal(adj, coo) assert np.array_equal(adj, coo)
assert np.array_equal(adj_by_neg, coo)
def test_block_incidence_matrix():
num_layers = 3
g = generate_rand_graph(100)
nf = create_mini_batch(g, num_layers)
assert nf.num_layers == num_layers + 1
for i in range(nf.num_blocks):
typestrs = ["in", "out"] # todo need fix for "both"
adjs = []
for typestr in typestrs:
adj, _ = nf.block_incidence_matrix(i, typestr, F.cpu())
adj = F.sparse_to_numpy(adj)
adjs.append(adj)
# should work for negative block ids
adjs_by_neg = []
for typestr in typestrs:
adj_by_neg, _ = nf.block_incidence_matrix(-nf.num_blocks + i, typestr, F.cpu())
adj_by_neg = F.sparse_to_numpy(adj_by_neg)
adjs_by_neg.append(adj_by_neg)
u, v, e = nf.block_edges(i)
u = utils.toindex(u)
v = utils.toindex(v)
e = utils.toindex(e)
expected = []
data_in_and_out = np.ones((len(u)), dtype=np.float32)
expected.append(
sp.sparse.coo_matrix((data_in_and_out, (v.tonumpy(), e.tonumpy())),
shape=adjs[0].shape).todense()
)
expected.append(
sp.sparse.coo_matrix((data_in_and_out, (u.tonumpy(), e.tonumpy())),
shape=adjs[1].shape).todense()
)
for i in range(len(typestrs)):
assert np.array_equal(adjs[i], expected[i])
assert np.array_equal(adjs_by_neg[i], expected[i])
if __name__ == '__main__': if __name__ == '__main__':
...@@ -291,3 +366,5 @@ if __name__ == '__main__': ...@@ -291,3 +366,5 @@ if __name__ == '__main__':
test_flow_compute() test_flow_compute()
test_prop_flows() test_prop_flows()
test_self_loop() test_self_loop()
test_block_edges()
test_block_incidence_matrix()
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment