Unverified Commit b9ec3e36 authored by Da Zheng's avatar Da Zheng Committed by GitHub
Browse files

[BUGFIX] reshuffle node features in range partitioning. (#1659)

* fix a bug.

* fix test.

* simplify.

* fix test.

* test edge features.

* fix.
parent 8fccdaf6
...@@ -221,35 +221,24 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method= ...@@ -221,35 +221,24 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
# TODO(zhengda) we should replace int64 with int16. int16 should be sufficient. # TODO(zhengda) we should replace int64 with int16. int16 should be sufficient.
if not reshuffle: if not reshuffle:
edge_parts = np.zeros((g.number_of_edges(),), dtype=np.int64) - 1 edge_parts = np.zeros((g.number_of_edges(),), dtype=np.int64) - 1
num_edges = 0 num_edges = 0
lnodes_list = [] # The node ids of each partition num_nodes = 0
ledges_list = [] # The edge Ids of each partition lnodes_list = [] # The node ids of each partition
for part_id in range(num_parts): ledges_list = [] # The edge Ids of each partition
part = client_parts[part_id] for part_id in range(num_parts):
local_nodes = F.boolean_mask(part.ndata[NID], part.ndata['inner_node']) part = client_parts[part_id]
local_edges = F.asnumpy(g.in_edges(local_nodes, form='eid')) # To get the edges in the input graph, we should use original node Ids.
edge_parts[local_edges] = part_id data_name = 'orig_id' if reshuffle else NID
num_edges += len(local_edges) local_nodes = F.boolean_mask(part.ndata[data_name], part.ndata['inner_node'])
lnodes_list.append(local_nodes) local_edges = g.in_edges(local_nodes, form='eid')
ledges_list.append(local_edges) if not reshuffle:
assert num_edges == g.number_of_edges() edge_parts[F.asnumpy(local_edges)] = part_id
else: num_edges += len(local_edges)
num_edges = 0 num_nodes += len(local_nodes)
num_nodes = 0 lnodes_list.append(local_nodes)
lnodes_list = [] # The node ids of each partition ledges_list.append(local_edges)
ledges_list = [] # The edge Ids of each partition assert num_edges == g.number_of_edges()
for part_id in range(num_parts): assert num_nodes == g.number_of_nodes()
part = client_parts[part_id]
num_local_nodes = F.asnumpy(F.sum(part.ndata['inner_node'], 0))
# To get the edges in the input graph, we should use original node Ids.
local_nodes = F.boolean_mask(part.ndata['orig_id'], part.ndata['inner_node'])
num_local_edges = F.asnumpy(F.sum(g.in_degrees(local_nodes), 0))
num_edges += int(num_local_edges)
num_nodes += int(num_local_nodes)
lnodes_list.append(num_nodes)
ledges_list.append(num_edges)
assert num_edges == g.number_of_edges()
assert num_nodes == g.number_of_nodes()
os.makedirs(out_path, mode=0o775, exist_ok=True) os.makedirs(out_path, mode=0o775, exist_ok=True)
tot_num_inner_edges = 0 tot_num_inner_edges = 0
...@@ -267,8 +256,8 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method= ...@@ -267,8 +256,8 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
# With reshuffling, we can ensure that all nodes and edges are reshuffled # With reshuffling, we can ensure that all nodes and edges are reshuffled
# and are in contiguous Id space. # and are in contiguous Id space.
if num_parts > 1: if num_parts > 1:
node_map_val = lnodes_list node_map_val = np.cumsum([len(lnodes) for lnodes in lnodes_list]).tolist()
edge_map_val = ledges_list edge_map_val = np.cumsum([len(ledges) for ledges in ledges_list]).tolist()
else: else:
node_map_val = [g.number_of_nodes()] node_map_val = [g.number_of_nodes()]
edge_map_val = [g.number_of_edges()] edge_map_val = [g.number_of_edges()]
...@@ -288,15 +277,8 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method= ...@@ -288,15 +277,8 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
node_feats = {} node_feats = {}
edge_feats = {} edge_feats = {}
if num_parts > 1: if num_parts > 1:
if reshuffle and part_id == 0: local_nodes = lnodes_list[part_id]
local_nodes = F.arange(0, lnodes_list[part_id]) local_edges = ledges_list[part_id]
local_edges = F.arange(0, ledges_list[part_id])
elif reshuffle:
local_nodes = F.arange(lnodes_list[part_id - 1], lnodes_list[part_id])
local_edges = F.arange(ledges_list[part_id - 1], ledges_list[part_id])
else:
local_nodes = lnodes_list[part_id]
local_edges = ledges_list[part_id]
print('part {} has {} nodes and {} edges.'.format( print('part {} has {} nodes and {} edges.'.format(
part_id, part.number_of_nodes(), part.number_of_edges())) part_id, part.number_of_nodes(), part.number_of_edges()))
print('{} nodes and {} edges are inside the partition'.format( print('{} nodes and {} edges are inside the partition'.format(
......
...@@ -6,6 +6,7 @@ from scipy import sparse as spsp ...@@ -6,6 +6,7 @@ from scipy import sparse as spsp
from numpy.testing import assert_array_equal from numpy.testing import assert_array_equal
from dgl.graph_index import create_graph_index from dgl.graph_index import create_graph_index
from dgl.distributed import partition_graph, load_partition from dgl.distributed import partition_graph, load_partition
from dgl import function as fn
import backend as F import backend as F
import unittest import unittest
import pickle import pickle
...@@ -21,6 +22,9 @@ def check_partition(reshuffle): ...@@ -21,6 +22,9 @@ def check_partition(reshuffle):
g = create_random_graph(10000) g = create_random_graph(10000)
g.ndata['labels'] = F.arange(0, g.number_of_nodes()) g.ndata['labels'] = F.arange(0, g.number_of_nodes())
g.ndata['feats'] = F.tensor(np.random.randn(g.number_of_nodes(), 10)) g.ndata['feats'] = F.tensor(np.random.randn(g.number_of_nodes(), 10))
g.edata['feats'] = F.tensor(np.random.randn(g.number_of_edges(), 10))
g.update_all(fn.copy_src('feats', 'msg'), fn.sum('msg', 'h'))
g.update_all(fn.copy_edge('feats', 'msg'), fn.sum('msg', 'eh'))
num_parts = 4 num_parts = 4
num_hops = 2 num_hops = 2
...@@ -47,21 +51,40 @@ def check_partition(reshuffle): ...@@ -47,21 +51,40 @@ def check_partition(reshuffle):
assert np.all(F.asnumpy(local_eid) == np.arange(0, len(local_eid))) assert np.all(F.asnumpy(local_eid) == np.arange(0, len(local_eid)))
# Check the node map. # Check the node map.
local_nodes = F.asnumpy(F.boolean_mask(part_g.ndata[dgl.NID], part_g.ndata['inner_node'])) local_nodes = F.boolean_mask(part_g.ndata[dgl.NID], part_g.ndata['inner_node'])
local_nodes1 = F.asnumpy(gpb.partid2nids(i)) llocal_nodes = F.nonzero_1d(part_g.ndata['inner_node'])
assert np.all(np.sort(local_nodes) == np.sort(local_nodes1)) local_nodes1 = gpb.partid2nids(i)
assert np.all(np.sort(F.asnumpy(local_nodes)) == np.sort(F.asnumpy(local_nodes1)))
# Check the edge map. # Check the edge map.
local_edges = F.asnumpy(F.boolean_mask(part_g.edata[dgl.EID], part_g.edata['inner_edge'])) local_edges = F.boolean_mask(part_g.edata[dgl.EID], part_g.edata['inner_edge'])
local_edges1 = F.asnumpy(gpb.partid2eids(i)) local_edges1 = gpb.partid2eids(i)
assert np.all(np.sort(local_edges) == np.sort(local_edges1)) assert np.all(np.sort(F.asnumpy(local_edges)) == np.sort(F.asnumpy(local_edges1)))
if reshuffle:
part_g.ndata['feats'] = F.gather_row(g.ndata['feats'], part_g.ndata['orig_id'])
part_g.edata['feats'] = F.gather_row(g.edata['feats'], part_g.edata['orig_id'])
# when we read node data from the original global graph, we should use orig_id.
local_nodes = F.boolean_mask(part_g.ndata['orig_id'], part_g.ndata['inner_node'])
local_edges = F.boolean_mask(part_g.edata['orig_id'], part_g.edata['inner_edge'])
else:
part_g.ndata['feats'] = F.gather_row(g.ndata['feats'], part_g.ndata[dgl.NID])
part_g.edata['feats'] = F.gather_row(g.edata['feats'], part_g.edata[dgl.NID])
part_g.update_all(fn.copy_src('feats', 'msg'), fn.sum('msg', 'h'))
part_g.update_all(fn.copy_edge('feats', 'msg'), fn.sum('msg', 'eh'))
assert F.allclose(F.gather_row(g.ndata['h'], local_nodes),
F.gather_row(part_g.ndata['h'], llocal_nodes))
assert F.allclose(F.gather_row(g.ndata['eh'], local_nodes),
F.gather_row(part_g.ndata['eh'], llocal_nodes))
for name in ['labels', 'feats']: for name in ['labels', 'feats']:
assert name in node_feats assert name in node_feats
assert node_feats[name].shape[0] == len(local_nodes) assert node_feats[name].shape[0] == len(local_nodes)
assert len(local_nodes) == len(node_feats[name]) assert np.all(F.asnumpy(g.ndata[name])[F.asnumpy(local_nodes)] == F.asnumpy(node_feats[name]))
assert np.all(F.asnumpy(g.ndata[name])[local_nodes] == F.asnumpy(node_feats[name])) for name in ['feats']:
assert len(edge_feats) == 0 assert name in edge_feats
assert edge_feats[name].shape[0] == len(local_edges)
assert np.all(F.asnumpy(g.edata[name])[F.asnumpy(local_edges)] == F.asnumpy(edge_feats[name]))
if reshuffle: if reshuffle:
node_map = [] node_map = []
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment