Unverified Commit 20734637 authored by Da Zheng's avatar Da Zheng Committed by GitHub
Browse files

[Distributed] Fix partition (#1821)



* fix partition and print timing.

* fix lint.
Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-19-1.us-west-2.compute.internal>
parent df3683a2
...@@ -23,7 +23,9 @@ def load_reddit(): ...@@ -23,7 +23,9 @@ def load_reddit():
def load_ogb(name): def load_ogb(name):
from ogb.nodeproppred import DglNodePropPredDataset from ogb.nodeproppred import DglNodePropPredDataset
print('load', name)
data = DglNodePropPredDataset(name=name) data = DglNodePropPredDataset(name=name)
print('finish loading', name)
splitted_idx = data.get_idx_split() splitted_idx = data.get_idx_split()
graph, labels = data[0] graph, labels = data[0]
labels = labels[:, 0] labels = labels[:, 0]
...@@ -31,7 +33,7 @@ def load_ogb(name): ...@@ -31,7 +33,7 @@ def load_ogb(name):
graph.ndata['features'] = graph.ndata['feat'] graph.ndata['features'] = graph.ndata['feat']
graph.ndata['labels'] = labels graph.ndata['labels'] = labels
in_feats = graph.ndata['features'].shape[1] in_feats = graph.ndata['features'].shape[1]
num_labels = len(th.unique(labels)) num_labels = len(th.unique(labels[th.logical_not(th.isnan(labels))]))
# Find the node IDs in the training, validation, and test set. # Find the node IDs in the training, validation, and test set.
train_nid, val_nid, test_nid = splitted_idx['train'], splitted_idx['valid'], splitted_idx['test'] train_nid, val_nid, test_nid = splitted_idx['train'], splitted_idx['valid'], splitted_idx['test']
...@@ -44,7 +46,8 @@ def load_ogb(name): ...@@ -44,7 +46,8 @@ def load_ogb(name):
graph.ndata['train_mask'] = train_mask graph.ndata['train_mask'] = train_mask
graph.ndata['val_mask'] = val_mask graph.ndata['val_mask'] = val_mask
graph.ndata['test_mask'] = test_mask graph.ndata['test_mask'] = test_mask
return graph, len(th.unique(graph.ndata['labels'])) print('finish constructing', name)
return graph, num_labels
def inductive_split(g): def inductive_split(g):
"""Split the graph into training graph, validation graph, and test graph by training """Split the graph into training graph, validation graph, and test graph by training
......
...@@ -79,6 +79,7 @@ Two useful functions in this module: ...@@ -79,6 +79,7 @@ Two useful functions in this module:
import json import json
import os import os
import time
import numpy as np import numpy as np
from .. import backend as F from .. import backend as F
...@@ -274,6 +275,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method= ...@@ -274,6 +275,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
# Let's calculate edge assignment. # Let's calculate edge assignment.
# TODO(zhengda) we should replace int64 with int16. int16 should be sufficient. # TODO(zhengda) we should replace int64 with int16. int16 should be sufficient.
start = time.time()
if not reshuffle: if not reshuffle:
edge_parts = np.zeros((g.number_of_edges(),), dtype=np.int64) - 1 edge_parts = np.zeros((g.number_of_edges(),), dtype=np.int64) - 1
num_edges = 0 num_edges = 0
...@@ -294,6 +296,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method= ...@@ -294,6 +296,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
ledges_list.append(local_edges) ledges_list.append(local_edges)
assert num_edges == g.number_of_edges() assert num_edges == g.number_of_edges()
assert num_nodes == g.number_of_nodes() assert num_nodes == g.number_of_nodes()
print('Calculate edge assignment: {:.3f} seconds'.format(time.time() - start))
os.makedirs(out_path, mode=0o775, exist_ok=True) os.makedirs(out_path, mode=0o775, exist_ok=True)
tot_num_inner_edges = 0 tot_num_inner_edges = 0
...@@ -317,6 +320,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method= ...@@ -317,6 +320,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
node_map_val = [g.number_of_nodes()] node_map_val = [g.number_of_nodes()]
edge_map_val = [g.number_of_edges()] edge_map_val = [g.number_of_edges()]
start = time.time()
part_metadata = {'graph_name': graph_name, part_metadata = {'graph_name': graph_name,
'num_nodes': g.number_of_nodes(), 'num_nodes': g.number_of_nodes(),
'num_edges': g.number_of_edges(), 'num_edges': g.number_of_edges(),
...@@ -363,6 +367,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method= ...@@ -363,6 +367,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
with open('{}/{}.json'.format(out_path, graph_name), 'w') as outfile: with open('{}/{}.json'.format(out_path, graph_name), 'w') as outfile:
json.dump(part_metadata, outfile, sort_keys=True, indent=4) json.dump(part_metadata, outfile, sort_keys=True, indent=4)
print('Save partitions: {:.3f} seconds'.format(time.time() - start))
num_cuts = g.number_of_edges() - tot_num_inner_edges num_cuts = g.number_of_edges() - tot_num_inner_edges
if num_parts == 1: if num_parts == 1:
......
...@@ -2,8 +2,10 @@ ...@@ -2,8 +2,10 @@
from collections.abc import Iterable, Mapping from collections.abc import Iterable, Mapping
from collections import defaultdict from collections import defaultdict
import time
import numpy as np import numpy as np
from scipy import sparse from scipy import sparse
from ._ffi.function import _init_api from ._ffi.function import _init_api
from .graph import DGLGraph from .graph import DGLGraph
from .heterograph import DGLHeteroGraph from .heterograph import DGLHeteroGraph
...@@ -949,6 +951,7 @@ def partition_graph_with_halo(g, node_part, extra_cached_hops, reshuffle=False): ...@@ -949,6 +951,7 @@ def partition_graph_with_halo(g, node_part, extra_cached_hops, reshuffle=False):
assert len(node_part) == g.number_of_nodes() assert len(node_part) == g.number_of_nodes()
node_part = utils.toindex(node_part) node_part = utils.toindex(node_part)
if reshuffle: if reshuffle:
start = time.time()
node_part = node_part.tousertensor() node_part = node_part.tousertensor()
sorted_part, new2old_map = F.sort_1d(node_part) sorted_part, new2old_map = F.sort_1d(node_part)
new_node_ids = np.zeros((g.number_of_nodes(),), dtype=np.int64) new_node_ids = np.zeros((g.number_of_nodes(),), dtype=np.int64)
...@@ -960,10 +963,14 @@ def partition_graph_with_halo(g, node_part, extra_cached_hops, reshuffle=False): ...@@ -960,10 +963,14 @@ def partition_graph_with_halo(g, node_part, extra_cached_hops, reshuffle=False):
orig_eids = _CAPI_DGLReassignEdges(g._graph, True) orig_eids = _CAPI_DGLReassignEdges(g._graph, True)
orig_eids = utils.toindex(orig_eids) orig_eids = utils.toindex(orig_eids)
g.edata['orig_id'] = orig_eids.tousertensor() g.edata['orig_id'] = orig_eids.tousertensor()
print('Reshuffle nodes and edges: {:.3f} seconds'.format(time.time() - start))
start = time.time()
subgs = _CAPI_DGLPartitionWithHalo(g._graph, node_part.todgltensor(), extra_cached_hops) subgs = _CAPI_DGLPartitionWithHalo(g._graph, node_part.todgltensor(), extra_cached_hops)
print('Split the graph: {:.3f} seconds'.format(time.time() - start))
subg_dict = {} subg_dict = {}
node_part = node_part.tousertensor() node_part = node_part.tousertensor()
start = time.time()
for i, subg in enumerate(subgs): for i, subg in enumerate(subgs):
inner_node = _get_halo_subgraph_inner_node(subg) inner_node = _get_halo_subgraph_inner_node(subg)
subg = g._create_subgraph(subg, subg.induced_nodes, subg.induced_edges) subg = g._create_subgraph(subg, subg.induced_nodes, subg.induced_edges)
...@@ -986,6 +993,7 @@ def partition_graph_with_halo(g, node_part, extra_cached_hops, reshuffle=False): ...@@ -986,6 +993,7 @@ def partition_graph_with_halo(g, node_part, extra_cached_hops, reshuffle=False):
inner_edge = F.ones((subg.number_of_edges(),), F.int64, F.cpu()) inner_edge = F.ones((subg.number_of_edges(),), F.int64, F.cpu())
subg.edata['inner_edge'] = inner_edge subg.edata['inner_edge'] = inner_edge
subg_dict[i] = subg subg_dict[i] = subg
print('Construct subgraphs: {:.3f} seconds'.format(time.time() - start))
return subg_dict return subg_dict
def metis_partition_assignment(g, k, balance_ntypes=None, balance_edges=False): def metis_partition_assignment(g, k, balance_ntypes=None, balance_edges=False):
...@@ -1021,7 +1029,9 @@ def metis_partition_assignment(g, k, balance_ntypes=None, balance_edges=False): ...@@ -1021,7 +1029,9 @@ def metis_partition_assignment(g, k, balance_ntypes=None, balance_edges=False):
''' '''
# METIS works only on symmetric graphs. # METIS works only on symmetric graphs.
# The METIS runs on the symmetric graph to generate the node assignment to partitions. # The METIS runs on the symmetric graph to generate the node assignment to partitions.
start = time.time()
sym_g = to_bidirected_stale(g, readonly=True) sym_g = to_bidirected_stale(g, readonly=True)
print('Convert a graph into a bidirected graph: {:.3f} seconds'.format(time.time() - start))
vwgt = [] vwgt = []
# To balance the node types in each partition, we can take advantage of the vertex weights # To balance the node types in each partition, we can take advantage of the vertex weights
# in Metis. When vertex weights are provided, Metis will tries to generate partitions with # in Metis. When vertex weights are provided, Metis will tries to generate partitions with
...@@ -1033,6 +1043,7 @@ def metis_partition_assignment(g, k, balance_ntypes=None, balance_edges=False): ...@@ -1033,6 +1043,7 @@ def metis_partition_assignment(g, k, balance_ntypes=None, balance_edges=False):
# if a node belongs to the first node type, its weight is set to 1; otherwise, 0. # if a node belongs to the first node type, its weight is set to 1; otherwise, 0.
# Similary, we set the second weight for the second node type and so on. The number # Similary, we set the second weight for the second node type and so on. The number
# of weights is the same as the number of node types. # of weights is the same as the number of node types.
start = time.time()
if balance_ntypes is not None: if balance_ntypes is not None:
assert len(balance_ntypes) == g.number_of_nodes(), \ assert len(balance_ntypes) == g.number_of_nodes(), \
"The length of balance_ntypes should be equal to #nodes in the graph" "The length of balance_ntypes should be equal to #nodes in the graph"
...@@ -1051,11 +1062,14 @@ def metis_partition_assignment(g, k, balance_ntypes=None, balance_edges=False): ...@@ -1051,11 +1062,14 @@ def metis_partition_assignment(g, k, balance_ntypes=None, balance_edges=False):
shape = (np.prod(F.shape(vwgt),),) shape = (np.prod(F.shape(vwgt),),)
vwgt = F.reshape(vwgt, shape) vwgt = F.reshape(vwgt, shape)
vwgt = F.zerocopy_to_dgl_ndarray(vwgt) vwgt = F.zerocopy_to_dgl_ndarray(vwgt)
print('Construct multi-constraint weights: {:.3f} seconds'.format(time.time() - start))
else: else:
vwgt = F.zeros((0,), F.int64, F.cpu()) vwgt = F.zeros((0,), F.int64, F.cpu())
vwgt = F.zerocopy_to_dgl_ndarray(vwgt) vwgt = F.zerocopy_to_dgl_ndarray(vwgt)
start = time.time()
node_part = _CAPI_DGLMetisPartition(sym_g._graph, k, vwgt) node_part = _CAPI_DGLMetisPartition(sym_g._graph, k, vwgt)
print('Metis partitioning: {:.3f} seconds'.format(time.time() - start))
if len(node_part) == 0: if len(node_part) == 0:
return None return None
else: else:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment