Unverified Commit 7ad0485a authored by Da Zheng's avatar Da Zheng Committed by GitHub
Browse files

[BUGFIX] handle more negative layer id and block id. (#606)

* handle negative layer id and block id.

* add tests.

* fix.

* fix.
parent e35e860a
......@@ -305,6 +305,7 @@ class NodeFlow(DGLBaseGraph):
Tensor
The degree of the nodes in the specified layer.
"""
layer_id = self._get_layer_id(layer_id)
return self._graph.in_degrees(utils.toindex(self.layer_nid(layer_id))).tousertensor()
def layer_out_degree(self, layer_id):
......@@ -320,6 +321,7 @@ class NodeFlow(DGLBaseGraph):
Tensor
The degree of the nodes in the specified layer.
"""
layer_id = self._get_layer_id(layer_id)
return self._graph.out_degrees(utils.toindex(self.layer_nid(layer_id))).tousertensor()
def layer_nid(self, layer_id):
......@@ -592,7 +594,8 @@ class NodeFlow(DGLBaseGraph):
for i in range(self.num_layers):
self._node_frames[i].set_initializer(initializer, field)
else:
self._node_frames[i].set_initializer(initializer, field)
layer_id = self._get_layer_id(layer_id)
self._node_frames[layer_id].set_initializer(initializer, field)
def set_e_initializer(self, initializer, block_id=ALL, field=None):
"""Set the initializer for empty edge features.
......@@ -617,6 +620,7 @@ class NodeFlow(DGLBaseGraph):
for i in range(self.num_blocks):
self._edge_frames[i].set_initializer(initializer, field)
else:
block_id = self._get_block_id(block_id)
self._edge_frames[block_id].set_initializer(initializer, field)
......@@ -638,6 +642,7 @@ class NodeFlow(DGLBaseGraph):
if is_all(block_id):
self._message_funcs = [func] * self.num_blocks
else:
block_id = self._get_block_id(block_id)
self._message_funcs[block_id] = func
def register_reduce_func(self, func, block_id=ALL):
......@@ -658,6 +663,7 @@ class NodeFlow(DGLBaseGraph):
if is_all(block_id):
self._reduce_funcs = [func] * self.num_blocks
else:
block_id = self._get_block_id(block_id)
self._reduce_funcs[block_id] = func
def register_apply_node_func(self, func, block_id=ALL):
......@@ -678,6 +684,7 @@ class NodeFlow(DGLBaseGraph):
if is_all(block_id):
self._apply_node_funcs = [func] * self.num_blocks
else:
block_id = self._get_block_id(block_id)
self._apply_node_funcs[block_id] = func
def register_apply_edge_func(self, func, block_id=ALL):
......@@ -697,6 +704,7 @@ class NodeFlow(DGLBaseGraph):
if is_all(block_id):
self._apply_edge_funcs = [func] * self.num_blocks
else:
block_id = self._get_block_id(block_id)
self._apply_edge_funcs[block_id] = func
def apply_layer(self, layer_id, func="default", v=ALL, inplace=False):
......@@ -714,6 +722,7 @@ class NodeFlow(DGLBaseGraph):
inplace : bool, optional
If True, update will be done in place, but autograd will break.
"""
layer_id = self._get_layer_id(layer_id)
if func == "default":
func = self._apply_node_funcs[layer_id]
if is_all(v):
......
......@@ -65,6 +65,12 @@ def check_basic(g, nf):
out_deg = nf.layer_out_degree(i - 1)
assert F.asnumpy(F.sum(in_deg, 0) == F.sum(out_deg, 0))
# negative layer Ids.
for i in range(-1, -nf.num_layers, -1):
in_deg = nf.layer_in_degree(i)
out_deg = nf.layer_out_degree(i - 1)
assert F.asnumpy(F.sum(in_deg, 0) == F.sum(out_deg, 0))
def test_basic():
num_layers = 2
......@@ -93,28 +99,31 @@ def test_basic():
check_basic(g, nf)
def check_apply_nodes(create_node_flow):
def check_apply_nodes(create_node_flow, use_negative_block_id):
num_layers = 2
for i in range(num_layers):
l = -num_layers + i if use_negative_block_id else i
g = generate_rand_graph(100)
nf = create_node_flow(g, num_layers)
nf.copy_from_parent()
new_feats = F.randn((nf.layer_size(i), 5))
new_feats = F.randn((nf.layer_size(l), 5))
def update_func(nodes):
return {'h1' : new_feats}
nf.apply_layer(i, update_func)
assert F.array_equal(nf.layers[i].data['h1'], new_feats)
nf.apply_layer(l, update_func)
assert F.array_equal(nf.layers[l].data['h1'], new_feats)
new_feats = F.randn((4, 5))
def update_func1(nodes):
return {'h1' : new_feats}
nf.apply_layer(i, update_func1, v=nf.layer_nid(i)[0:4])
assert F.array_equal(nf.layers[i].data['h1'][0:4], new_feats)
nf.apply_layer(l, update_func1, v=nf.layer_nid(l)[0:4])
assert F.array_equal(nf.layers[l].data['h1'][0:4], new_feats)
def test_apply_nodes():
check_apply_nodes(create_full_nodeflow)
check_apply_nodes(create_mini_batch)
check_apply_nodes(create_full_nodeflow, use_negative_block_id=False)
check_apply_nodes(create_mini_batch, use_negative_block_id=False)
check_apply_nodes(create_full_nodeflow, use_negative_block_id=True)
check_apply_nodes(create_mini_batch, use_negative_block_id=True)
def check_apply_edges(create_node_flow):
......@@ -142,9 +151,38 @@ def check_apply_edges(create_node_flow):
assert F.array_equal(nf.blocks[i].data['f2'], expected_f_sum)
def check_apply_edges1(create_node_flow):
num_layers = 2
for i in range(num_layers):
g = generate_rand_graph(100)
g.ndata["f"] = F.randn((100, 10))
nf = create_node_flow(g, num_layers)
nf.copy_from_parent()
new_feats = F.randn((nf.block_size(i), 5))
def update_func(edges):
return {'h2': new_feats, "f2": edges.src["f"] + edges.dst["f"]}
nf.register_apply_edge_func(update_func, i)
nf.apply_block(i)
assert F.array_equal(nf.blocks[i].data['h2'], new_feats)
# should also work for negative block ids
nf.register_apply_edge_func(update_func, -num_layers + i)
nf.apply_block(-num_layers + i)
assert F.array_equal(nf.blocks[i].data['h2'], new_feats)
eids = nf.block_parent_eid(i)
srcs, dsts = g.find_edges(eids)
expected_f_sum = g.nodes[srcs].data["f"] + g.nodes[dsts].data["f"]
#expected_f_sum = g.ndata["f"][srcs] + g.ndata["f"][dsts]
assert F.array_equal(nf.blocks[i].data['f2'], expected_f_sum)
def test_apply_edges():
check_apply_edges(create_full_nodeflow)
check_apply_edges(create_mini_batch)
check_apply_edges1(create_mini_batch)
def check_flow_compute(create_node_flow, use_negative_block_id=False):
......@@ -177,11 +215,48 @@ def check_flow_compute(create_node_flow, use_negative_block_id=False):
assert F.allclose(data1, data2)
def check_flow_compute1(create_node_flow, use_negative_block_id=False):
num_layers = 2
g = generate_rand_graph(100)
# test the case that we register UDFs per block.
nf = create_node_flow(g, num_layers)
nf.copy_from_parent()
g.ndata['h'] = g.ndata['h1']
nf.layers[0].data['h'] = nf.layers[0].data['h1']
for i in range(num_layers):
l = -num_layers + i if use_negative_block_id else i
nf.register_message_func(fn.copy_src(src='h', out='m'), l)
nf.register_reduce_func(fn.sum(msg='m', out='t'), l)
nf.register_apply_node_func(lambda nodes: {'h' : nodes.data['t'] + 1}, l)
nf.block_compute(l)
g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
lambda nodes: {'h' : nodes.data['t'] + 1})
assert F.allclose(nf.layers[i + 1].data['h'], g.nodes[nf.layer_parent_nid(i + 1)].data['h'])
# test the case that we register UDFs in all blocks.
nf = create_node_flow(g, num_layers)
nf.copy_from_parent()
g.ndata['h'] = g.ndata['h1']
nf.layers[0].data['h'] = nf.layers[0].data['h1']
nf.register_message_func(fn.copy_src(src='h', out='m'))
nf.register_reduce_func(fn.sum(msg='m', out='t'))
nf.register_apply_node_func(lambda nodes: {'h' : nodes.data['t'] + 1})
for i in range(num_layers):
l = -num_layers + i if use_negative_block_id else i
nf.block_compute(l)
g.update_all(fn.copy_src(src='h', out='m'), fn.sum(msg='m', out='t'),
lambda nodes: {'h' : nodes.data['t'] + 1})
assert F.allclose(nf.layers[i + 1].data['h'], g.nodes[nf.layer_parent_nid(i + 1)].data['h'])
def test_flow_compute():
check_flow_compute(create_full_nodeflow)
check_flow_compute(create_mini_batch)
check_flow_compute(create_full_nodeflow, use_negative_block_id=True)
check_flow_compute(create_mini_batch, use_negative_block_id=True)
check_flow_compute1(create_mini_batch)
check_flow_compute1(create_mini_batch, use_negative_block_id=True)
def check_prop_flows(create_node_flow):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment