Unverified Commit 1d80d91a authored by Rhett Ying's avatar Rhett Ying Committed by GitHub
Browse files

[BugFix] call g.is_homogenous instead of len(g.etypes) (#3793)

parent 7f1a5e08
...@@ -241,7 +241,7 @@ def _get_orig_ids(g, sim_g, reshuffle, orig_nids, orig_eids): ...@@ -241,7 +241,7 @@ def _get_orig_ids(g, sim_g, reshuffle, orig_nids, orig_eids):
------- -------
tensor or dict of tensors, tensor or dict of tensors tensor or dict of tensors, tensor or dict of tensors
''' '''
is_hetero = len(g.etypes) > 1 or len(g.ntypes) > 1 is_hetero = not g.is_homogeneous
if reshuffle and is_hetero: if reshuffle and is_hetero:
# Get the type IDs # Get the type IDs
orig_ntype = F.gather_row(sim_g.ndata[NTYPE], orig_nids) orig_ntype = F.gather_row(sim_g.ndata[NTYPE], orig_nids)
...@@ -275,7 +275,7 @@ def _set_trainer_ids(g, sim_g, node_parts): ...@@ -275,7 +275,7 @@ def _set_trainer_ids(g, sim_g, node_parts):
node_parts : tensor node_parts : tensor
The node partition ID for each node in `sim_g`. The node partition ID for each node in `sim_g`.
''' '''
if len(g.etypes) == 1: if g.is_homogeneous:
g.ndata['trainer_id'] = node_parts g.ndata['trainer_id'] = node_parts
# An edge is assigned to a partition based on its destination node. # An edge is assigned to a partition based on its destination node.
g.edata['trainer_id'] = F.gather_row(node_parts, g.edges()[1]) g.edata['trainer_id'] = F.gather_row(node_parts, g.edges()[1])
...@@ -497,7 +497,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method= ...@@ -497,7 +497,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
... 'output/test.json', 0) ... 'output/test.json', 0)
''' '''
def get_homogeneous(g, balance_ntypes): def get_homogeneous(g, balance_ntypes):
if len(g.etypes) == 1: if g.is_homogeneous:
sim_g = to_homogeneous(g) sim_g = to_homogeneous(g)
if isinstance(balance_ntypes, dict): if isinstance(balance_ntypes, dict):
assert len(balance_ntypes) == 1 assert len(balance_ntypes) == 1
...@@ -612,7 +612,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method= ...@@ -612,7 +612,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
# NTYPE: the node type. # NTYPE: the node type.
# orig_id: the global node IDs in the homogeneous version of input graph. # orig_id: the global node IDs in the homogeneous version of input graph.
# NID: the global node IDs in the reshuffled homogeneous version of the input graph. # NID: the global node IDs in the reshuffled homogeneous version of the input graph.
if len(g.etypes) > 1: if not g.is_homogeneous:
if reshuffle: if reshuffle:
for name in parts: for name in parts:
orig_ids = parts[name].ndata['orig_id'] orig_ids = parts[name].ndata['orig_id']
...@@ -778,7 +778,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method= ...@@ -778,7 +778,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
inner_edge_mask = _get_inner_edge_mask(part, etype_id) inner_edge_mask = _get_inner_edge_mask(part, etype_id)
# This is global edge IDs. # This is global edge IDs.
local_edges = F.boolean_mask(part.edata[edata_name], inner_edge_mask) local_edges = F.boolean_mask(part.edata[edata_name], inner_edge_mask)
if len(g.etypes) > 1: if not g.is_homogeneous:
local_edges = F.gather_row(sim_g.edata[EID], local_edges) local_edges = F.gather_row(sim_g.edata[EID], local_edges)
print('part {} has {} edges of type {} and {} are inside the partition'.format( print('part {} has {} edges of type {} and {} are inside the partition'.format(
part_id, F.as_scalar(F.sum(part.edata[ETYPE] == etype_id, 0)), part_id, F.as_scalar(F.sum(part.edata[ETYPE] == etype_id, 0)),
...@@ -813,7 +813,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method= ...@@ -813,7 +813,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
else: else:
node_feats[ntype + '/' + name] = g.nodes[ntype].data[name] node_feats[ntype + '/' + name] = g.nodes[ntype].data[name]
for etype in g.etypes: for etype in g.etypes:
if reshuffle and len(g.etypes) > 1: if reshuffle and not g.is_homogeneous:
edata_name = 'orig_id' edata_name = 'orig_id'
etype_id = g.get_etype_id(etype) etype_id = g.get_etype_id(etype)
inner_edge_mask = _get_inner_edge_mask(part, etype_id) inner_edge_mask = _get_inner_edge_mask(part, etype_id)
...@@ -831,7 +831,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method= ...@@ -831,7 +831,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
else: else:
edge_feats[etype + '/' + name] = g.edges[etype].data[name] edge_feats[etype + '/' + name] = g.edges[etype].data[name]
# Some adjustment for heterogeneous graphs. # Some adjustment for heterogeneous graphs.
if len(g.etypes) > 1: if not g.is_homogeneous:
part.ndata['orig_id'] = F.gather_row(sim_g.ndata[NID], part.ndata['orig_id']) part.ndata['orig_id'] = F.gather_row(sim_g.ndata[NID], part.ndata['orig_id'])
part.edata['orig_id'] = F.gather_row(sim_g.edata[EID], part.edata['orig_id']) part.edata['orig_id'] = F.gather_row(sim_g.edata[EID], part.edata['orig_id'])
......
...@@ -11,6 +11,7 @@ import backend as F ...@@ -11,6 +11,7 @@ import backend as F
import unittest import unittest
import pickle import pickle
import random import random
import tempfile
def _get_inner_node_mask(graph, ntype_id): def _get_inner_node_mask(graph, ntype_id):
if dgl.NTYPE in graph.ndata: if dgl.NTYPE in graph.ndata:
...@@ -374,6 +375,24 @@ def check_partition(g, part_method, reshuffle, num_parts=4, num_trainers_per_mac ...@@ -374,6 +375,24 @@ def check_partition(g, part_method, reshuffle, num_parts=4, num_trainers_per_mac
assert F.dtype(eid2pid) in (F.int32, F.int64) assert F.dtype(eid2pid) in (F.int32, F.int64)
assert np.all(F.asnumpy(eid2pid) == edge_map) assert np.all(F.asnumpy(eid2pid) == edge_map)
def check_hetero_partition_single_etype(num_trainers):
user_ids = np.arange(1000)
item_ids = np.arange(2000)
num_edges = 3 * 1000
src_ids = np.random.choice(user_ids, size=num_edges)
dst_ids = np.random.choice(item_ids, size=num_edges)
hg = dgl.heterograph({('user', 'like', 'item'): (src_ids, dst_ids)})
with tempfile.TemporaryDirectory() as test_dir:
orig_nids, orig_eids = partition_graph(
hg, 'test', 2, test_dir, num_trainers_per_machine=num_trainers, return_mapping=True)
assert len(orig_nids) == len(hg.ntypes)
assert len(orig_eids) == len(hg.etypes)
for ntype in hg.ntypes:
assert len(orig_nids[ntype]) == hg.number_of_nodes(ntype)
for etype in hg.etypes:
assert len(orig_eids[etype]) == hg.number_of_edges(etype)
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet') @unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
def test_partition(): def test_partition():
g = create_random_graph(1000) g = create_random_graph(1000)
...@@ -387,6 +406,8 @@ def test_partition(): ...@@ -387,6 +406,8 @@ def test_partition():
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet') @unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support some of operations in DistGraph") @unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support some of operations in DistGraph")
def test_hetero_partition(): def test_hetero_partition():
check_hetero_partition_single_etype(1)
check_hetero_partition_single_etype(4)
hg = create_random_hetero() hg = create_random_hetero()
check_hetero_partition(hg, 'metis') check_hetero_partition(hg, 'metis')
check_hetero_partition(hg, 'metis', 1, 8) check_hetero_partition(hg, 'metis', 1, 8)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment