Unverified Commit 33abd275 authored by Jinjing Zhou's avatar Jinjing Zhou Committed by GitHub
Browse files

[HeteroGraph] Metis partition for hetero (#1843)

* metis for hetero

* fix partition
parent 562871e7
"""Module for graph partition utilities."""
import time
import numpy as np
from ._ffi.function import _init_api
from .heterograph import DGLHeteroGraph
from . import backend as F
from . import utils
from .base import EID, NID
__all__ = ["metis_partition", "metis_partition_assignment",
"partition_graph_with_halo"]
def reorder_nodes(g, new_node_ids):
""" Generate a new graph with new node Ids.
We assign each node in the input graph with a new node Id. This results in
a new graph.
Parameters
----------
g : DGLGraph
The input graph
new_node_ids : a tensor
The new node Ids
Returns
-------
DGLGraph
The graph with new node Ids.
"""
assert len(new_node_ids) == g.number_of_nodes(), \
"The number of new node ids must match #nodes in the graph."
new_node_ids = utils.toindex(new_node_ids)
sorted_ids, idx = F.sort_1d(new_node_ids.tousertensor())
assert F.asnumpy(sorted_ids[0]) == 0 \
and F.asnumpy(sorted_ids[-1]) == g.number_of_nodes() - 1, \
"The new node Ids are incorrect."
new_gidx = _CAPI_DGLReorderGraph_Hetero(
g._graph, new_node_ids.todgltensor())
new_g = DGLHeteroGraph(gidx=new_gidx, ntypes=['_N'], etypes=['_E'])
new_g.ndata['orig_id'] = idx
return new_g
def _get_halo_heterosubgraph_inner_node(halo_subg):
return _CAPI_GetHaloSubgraphInnerNodes_Hetero(halo_subg)
def partition_graph_with_halo(g, node_part, extra_cached_hops, reshuffle=False):
'''Partition a graph.
Based on the given node assignments for each partition, the function splits
the input graph into subgraphs. A subgraph may contain HALO nodes which does
not belong to the partition of a subgraph but are connected to the nodes
in the partition within a fixed number of hops.
If `reshuffle` is turned on, the function reshuffles node Ids and edge Ids
of the input graph before partitioning. After reshuffling, all nodes and edges
in a partition fall in a contiguous Id range in the input graph.
The partitioend subgraphs have node data 'orig_id', which stores the node Ids
in the original input graph.
Parameters
------------
g: DGLGraph
The graph to be partitioned
node_part: 1D tensor
Specify which partition a node is assigned to. The length of this tensor
needs to be the same as the number of nodes of the graph. Each element
indicates the partition Id of a node.
extra_cached_hops: int
The number of hops a HALO node can be accessed.
reshuffle : bool
Resuffle nodes so that nodes in the same partition are in the same Id range.
Returns
--------
a dict of DGLGraphs
The key is the partition Id and the value is the DGLGraph of the partition.
'''
assert len(node_part) == g.number_of_nodes()
node_part = utils.toindex(node_part)
if reshuffle:
start = time.time()
node_part = node_part.tousertensor()
sorted_part, new2old_map = F.sort_1d(node_part)
new_node_ids = np.zeros((g.number_of_nodes(),), dtype=np.int64)
new_node_ids[F.asnumpy(new2old_map)] = np.arange(
0, g.number_of_nodes())
g = reorder_nodes(g, new_node_ids)
node_part = utils.toindex(sorted_part)
# We reassign edges in in-CSR. In this way, after partitioning, we can ensure
# that all edges in a partition are in the contiguous Id space.
orig_eids = _CAPI_DGLReassignEdges_Hetero(g._graph, True)
orig_eids = utils.toindex(orig_eids)
orig_eids = orig_eids.tousertensor()
orig_nids = g.ndata['orig_id']
print('Reshuffle nodes and edges: {:.3f} seconds'.format(
time.time() - start))
start = time.time()
subgs = _CAPI_DGLPartitionWithHalo_Hetero(
g._graph, node_part.todgltensor(), extra_cached_hops)
# g is no longer needed. Free memory.
g = None
print('Split the graph: {:.3f} seconds'.format(time.time() - start))
subg_dict = {}
node_part = node_part.tousertensor()
start = time.time()
# This creaets a subgraph from subgraphs returned from the CAPI above.
def create_subgraph(subg, induced_nodes, induced_edges):
subg1 = DGLHeteroGraph(gidx=subg.graph, ntypes=['_N'], etypes=['_E'])
subg1.ndata[NID] = induced_nodes[0].tousertensor()
subg1.edata[EID] = induced_edges[0].tousertensor()
return subg1
for i, subg in enumerate(subgs):
inner_node = _get_halo_heterosubgraph_inner_node(subg)
subg = create_subgraph(subg, subg.induced_nodes, subg.induced_edges)
inner_node = F.zerocopy_from_dlpack(inner_node.to_dlpack())
subg.ndata['inner_node'] = inner_node
subg.ndata['part_id'] = F.gather_row(node_part, subg.ndata[NID])
if reshuffle:
subg.ndata['orig_id'] = F.gather_row(orig_nids, subg.ndata[NID])
subg.edata['orig_id'] = F.gather_row(orig_eids, subg.edata[EID])
if extra_cached_hops >= 1:
inner_edge = F.zeros((subg.number_of_edges(),), F.int8, F.cpu())
inner_nids = F.nonzero_1d(subg.ndata['inner_node'])
# TODO(zhengda) we need to fix utils.toindex() to avoid the dtype cast below.
inner_nids = F.astype(inner_nids, F.int64)
inner_eids = subg.in_edges(inner_nids, form='eid')
inner_edge = F.scatter_row(inner_edge, inner_eids,
F.ones((len(inner_eids),), F.dtype(inner_edge), F.cpu()))
else:
inner_edge = F.ones((subg.number_of_edges(),), F.int8, F.cpu())
subg.edata['inner_edge'] = inner_edge
subg_dict[i] = subg
print('Construct subgraphs: {:.3f} seconds'.format(time.time() - start))
return subg_dict
def metis_partition_assignment(g, k, balance_ntypes=None, balance_edges=False):
''' This assigns nodes to different partitions with Metis partitioning algorithm.
When performing Metis partitioning, we can put some constraint on the partitioning.
Current, it supports two constrants to balance the partitioning. By default, Metis
always tries to balance the number of nodes in each partition.
* `balance_ntypes` balances the number of nodes of different types in each partition.
* `balance_edges` balances the number of edges in each partition.
To balance the node types, a user needs to pass a vector of N elements to indicate
the type of each node. N is the number of nodes in the input graph.
After the partition assignment, we construct partitions.
Parameters
----------
g : DGLGraph
The graph to be partitioned
k : int
The number of partitions.
balance_ntypes : tensor
Node type of each node
balance_edges : bool
Indicate whether to balance the edges.
Returns
-------
a 1-D tensor
A vector with each element that indicates the partition Id of a vertex.
'''
# METIS works only on symmetric graphs.
# The METIS runs on the symmetric graph to generate the node assignment to partitions.
from .transform import to_bidirected # avoid cyclic import
start = time.time()
sym_g = to_bidirected(g, copy_ndata=False)
print('Convert a graph into a bidirected graph: {:.3f} seconds'.format(
time.time() - start))
vwgt = []
# To balance the node types in each partition, we can take advantage of the vertex weights
# in Metis. When vertex weights are provided, Metis will tries to generate partitions with
# balanced vertex weights. A vertex can be assigned with multiple weights. The vertex weights
# are stored in a vector of N * w elements, where N is the number of vertices and w
# is the number of weights per vertex. Metis tries to balance the first weight, and then
# the second weight, and so on.
# When balancing node types, we use the first weight to indicate the first node type.
# if a node belongs to the first node type, its weight is set to 1; otherwise, 0.
# Similary, we set the second weight for the second node type and so on. The number
# of weights is the same as the number of node types.
start = time.time()
if balance_ntypes is not None:
assert len(balance_ntypes) == g.number_of_nodes(), \
"The length of balance_ntypes should be equal to #nodes in the graph"
balance_ntypes = F.tensor(balance_ntypes)
uniq_ntypes = F.unique(balance_ntypes)
for ntype in uniq_ntypes:
vwgt.append(F.astype(balance_ntypes == ntype, F.int64))
# When balancing edges in partitions, we use in-degree as one of the weights.
if balance_edges:
vwgt.append(F.astype(g.in_degrees(), F.int64))
# The vertex weights have to be stored in a vector.
if len(vwgt) > 0:
vwgt = F.stack(vwgt, 1)
shape = (np.prod(F.shape(vwgt),),)
vwgt = F.reshape(vwgt, shape)
vwgt = F.zerocopy_to_dgl_ndarray(vwgt)
print(
'Construct multi-constraint weights: {:.3f} seconds'.format(time.time() - start))
else:
vwgt = F.zeros((0,), F.int64, F.cpu())
vwgt = F.zerocopy_to_dgl_ndarray(vwgt)
start = time.time()
node_part = _CAPI_DGLMetisPartition_Hetero(sym_g._graph, k, vwgt)
print('Metis partitioning: {:.3f} seconds'.format(time.time() - start))
if len(node_part) == 0:
return None
else:
node_part = utils.toindex(node_part)
return node_part.tousertensor()
def metis_partition(g, k, extra_cached_hops=0, reshuffle=False,
balance_ntypes=None, balance_edges=False):
''' This is to partition a graph with Metis partitioning.
Metis assigns vertices to partitions. This API constructs subgraphs with the vertices assigned
to the partitions and their incoming edges. A subgraph may contain HALO nodes which does
not belong to the partition of a subgraph but are connected to the nodes
in the partition within a fixed number of hops.
When performing Metis partitioning, we can put some constraint on the partitioning.
Current, it supports two constrants to balance the partitioning. By default, Metis
always tries to balance the number of nodes in each partition.
* `balance_ntypes` balances the number of nodes of different types in each partition.
* `balance_edges` balances the number of edges in each partition.
To balance the node types, a user needs to pass a vector of N elements to indicate
the type of each node. N is the number of nodes in the input graph.
If `reshuffle` is turned on, the function reshuffles node Ids and edge Ids
of the input graph before partitioning. After reshuffling, all nodes and edges
in a partition fall in a contiguous Id range in the input graph.
The partitioend subgraphs have node data 'orig_id', which stores the node Ids
in the original input graph.
The partitioned subgraph is stored in DGLGraph. The DGLGraph has the `part_id`
node data that indicates the partition a node belongs to. The subgraphs do not contain
the node/edge data in the input graph.
Parameters
------------
g: DGLGraph
The graph to be partitioned
k: int
The number of partitions.
extra_cached_hops: int
The number of hops a HALO node can be accessed.
reshuffle : bool
Resuffle nodes so that nodes in the same partition are in the same Id range.
balance_ntypes : tensor
Node type of each node
balance_edges : bool
Indicate whether to balance the edges.
Returns
--------
a dict of DGLGraphs
The key is the partition Id and the value is the DGLGraph of the partition.
'''
node_part = metis_partition_assignment(g, k, balance_ntypes, balance_edges)
if node_part is None:
return None
# Then we split the original graph into parts based on the METIS partitioning results.
return partition_graph_with_halo(g, node_part, extra_cached_hops, reshuffle)
_init_api("dgl.partition")
......@@ -18,7 +18,9 @@ from .convert import graph, bipartite, heterograph
from . import utils
from .base import EID, NID
from . import ndarray as nd
from .partition import metis_partition_assignment as hetero_metis_partition_assignment
from .partition import partition_graph_with_halo as hetero_partition_graph_with_halo
from .partition import metis_partition as hetero_metis_partition
__all__ = [
'line_graph',
......@@ -948,6 +950,8 @@ def partition_graph_with_halo(g, node_part, extra_cached_hops, reshuffle=False):
a dict of DGLGraphs
The key is the partition Id and the value is the DGLGraph of the partition.
'''
if isinstance(g, DGLHeteroGraph):
return hetero_partition_graph_with_halo(g, node_part, extra_cached_hops, reshuffle)
assert len(node_part) == g.number_of_nodes()
node_part = utils.toindex(node_part)
if reshuffle:
......@@ -1038,6 +1042,8 @@ def metis_partition_assignment(g, k, balance_ntypes=None, balance_edges=False):
a 1-D tensor
A vector with each element that indicates the partition Id of a vertex.
'''
if isinstance(g, DGLHeteroGraph):
return hetero_metis_partition_assignment(g, k, balance_ntypes, balance_edges)
# METIS works only on symmetric graphs.
# The METIS runs on the symmetric graph to generate the node assignment to partitions.
start = time.time()
......@@ -1136,6 +1142,9 @@ def metis_partition(g, k, extra_cached_hops=0, reshuffle=False,
a dict of DGLGraphs
The key is the partition Id and the value is the DGLGraph of the partition.
'''
if isinstance(g, DGLHeteroGraph):
return hetero_metis_partition(g, k, extra_cached_hops, reshuffle,
balance_ntypes, balance_edges)
node_part = metis_partition_assignment(g, k, balance_ntypes, balance_edges)
if node_part is None:
return None
......
/*!
* Copyright (c) 2020 by Contributors
* \file graph/metis_partition.cc
* \brief Call Metis partitioning
*/
#include <dgl/base_heterograph.h>
#include <dgl/packed_func_ext.h>
#include <metis.h>
#include "../heterograph.h"
#include "../unit_graph.h"
using namespace dgl::runtime;
namespace dgl {
namespace transform {
IdArray MetisPartition(UnitGraphPtr g, int k, NDArray vwgt_arr) {
// The index type of Metis needs to be compatible with DGL index type.
CHECK_EQ(sizeof(idx_t), sizeof(int64_t))
<< "Metis only supports int64 graph for now";
// This is a symmetric graph, so in-csr and out-csr are the same.
const auto mat = g->GetCSRMatrix(0);
// const auto mat = g->GetInCSR()->ToCSRMatrix();
idx_t nvtxs = g->NumVertices(0);
idx_t ncon = 1; // # balacing constraints.
idx_t *xadj = static_cast<idx_t *>(mat.indptr->data);
idx_t *adjncy = static_cast<idx_t *>(mat.indices->data);
idx_t nparts = k;
IdArray part_arr = aten::NewIdArray(nvtxs);
idx_t objval = 0;
idx_t *part = static_cast<idx_t *>(part_arr->data);
int64_t vwgt_len = vwgt_arr->shape[0];
CHECK_EQ(sizeof(idx_t), vwgt_arr->dtype.bits / 8)
<< "The vertex weight array doesn't have right type";
CHECK(vwgt_len % g->NumVertices(0) == 0)
<< "The vertex weight array doesn't have right number of elements";
idx_t *vwgt = NULL;
if (vwgt_len > 0) {
ncon = vwgt_len / g->NumVertices(0);
vwgt = static_cast<idx_t *>(vwgt_arr->data);
}
int ret = METIS_PartGraphKway(
&nvtxs, // The number of vertices
&ncon, // The number of balancing constraints.
xadj, // indptr
adjncy, // indices
vwgt, // the weights of the vertices
NULL, // The size of the vertices for computing
// the total communication volume
NULL, // The weights of the edges
&nparts, // The number of partitions.
NULL, // the desired weight for each partition and constraint
NULL, // the allowed load imbalance tolerance
NULL, // the array of options
&objval, // the edge-cut or the total communication volume of
// the partitioning solution
part);
LOG(INFO) << "Partition a graph with " << g->NumVertices(0) << " nodes and "
<< g->NumEdges(0) << " edges into " << k << " parts and get "
<< objval << " edge cuts";
switch (ret) {
case METIS_OK:
return part_arr;
case METIS_ERROR_INPUT:
LOG(FATAL) << "Error in Metis partitioning: input error";
case METIS_ERROR_MEMORY:
LOG(FATAL) << "Error in Metis partitioning: cannot allocate memory";
default:
LOG(FATAL) << "Error in Metis partitioning: other errors";
}
// return an array of 0 elements to indicate the error.
return aten::NullArray();
}
DGL_REGISTER_GLOBAL("partition._CAPI_DGLMetisPartition_Hetero")
.set_body([](DGLArgs args, DGLRetValue *rv) {
HeteroGraphRef g = args[0];
auto hgptr = std::dynamic_pointer_cast<HeteroGraph>(g.sptr());
CHECK(hgptr) << "Invalid HeteroGraph object";
CHECK_EQ(hgptr->relation_graphs().size(), 1)
<< "Metis partition only supports HomoGraph";
auto ugptr = hgptr->relation_graphs()[0];
int k = args[1];
NDArray vwgt = args[2];
*rv = MetisPartition(ugptr, k, vwgt);
});
} // namespace transform
} // namespace dgl
/*!
* Copyright (c) 2020 by Contributors
* \file graph/metis_partition.cc
* \brief Call Metis partitioning
*/
#include <dgl/base_heterograph.h>
#include <dgl/packed_func_ext.h>
#include "../heterograph.h"
#include "../unit_graph.h"
using namespace dgl::runtime;
namespace dgl {
namespace transform {
class HaloHeteroSubgraph : public HeteroSubgraph {
public:
std::vector<IdArray> inner_nodes;
};
HeteroGraphPtr ReorderUnitGraph(UnitGraphPtr ug, IdArray new_order) {
// We only need to reorder one of the graph structure.
// Only to in_csr for now
auto csrmat = ug->GetCSRMatrix(0);
auto new_csrmat = aten::CSRReorder(csrmat, new_order, new_order);
return UnitGraph::CreateFromCSR(ug->NumVertexTypes(), new_csrmat);
}
HaloHeteroSubgraph GetSubgraphWithHalo(std::shared_ptr<HeteroGraph> hg,
IdArray nodes, int num_hops) {
CHECK_EQ(hg->NumBits(), 64) << "halo subgraph only supports 64bits graph";
CHECK_EQ(hg->relation_graphs().size(), 1)
<< "halo subgraph only supports homograph";
CHECK_EQ(nodes->dtype.bits, 64)
<< "halo subgraph only supports 64bits nodes tensor";
const dgl_id_t *nid = static_cast<dgl_id_t *>(nodes->data);
const auto id_len = nodes->shape[0];
// A map contains all nodes in the subgraph.
// The key is the old node Ids, the value indicates whether a node is a inner
// node.
std::unordered_map<dgl_id_t, bool> all_nodes;
// The old Ids of all nodes. We want to preserve the order of the nodes in the
// vector. The first few nodes are the inner nodes in the subgraph.
std::vector<dgl_id_t> old_node_ids(nid, nid + id_len);
std::vector<std::vector<dgl_id_t>> outer_nodes(num_hops);
for (int64_t i = 0; i < id_len; i++) all_nodes[nid[i]] = true;
auto orig_nodes = all_nodes;
std::vector<dgl_id_t> edge_src, edge_dst, edge_eid;
// When we deal with in-edges, we need to do two things:
// * find the edges inside the partition and the edges between partitions.
// * find the nodes outside the partition that connect the partition.
EdgeArray in_edges = hg->InEdges(0, nodes);
auto src = in_edges.src;
auto dst = in_edges.dst;
auto eid = in_edges.id;
auto num_edges = eid->shape[0];
const dgl_id_t *src_data = static_cast<dgl_id_t *>(src->data);
const dgl_id_t *dst_data = static_cast<dgl_id_t *>(dst->data);
const dgl_id_t *eid_data = static_cast<dgl_id_t *>(eid->data);
for (int64_t i = 0; i < num_edges; i++) {
// We check if the source node is in the original node.
auto it1 = orig_nodes.find(src_data[i]);
if (it1 != orig_nodes.end() || num_hops > 0) {
edge_src.push_back(src_data[i]);
edge_dst.push_back(dst_data[i]);
edge_eid.push_back(eid_data[i]);
}
// We need to expand only if the node hasn't been seen before.
auto it = all_nodes.find(src_data[i]);
if (it == all_nodes.end() && num_hops > 0) {
all_nodes[src_data[i]] = false;
old_node_ids.push_back(src_data[i]);
outer_nodes[0].push_back(src_data[i]);
}
}
// Now we need to traverse the graph with the in-edges to access nodes
// and edges more hops away.
for (int k = 1; k < num_hops; k++) {
const std::vector<dgl_id_t> &nodes = outer_nodes[k - 1];
EdgeArray in_edges = hg->InEdges(0, aten::VecToIdArray(nodes));
auto src = in_edges.src;
auto dst = in_edges.dst;
auto eid = in_edges.id;
auto num_edges = eid->shape[0];
const dgl_id_t *src_data = static_cast<dgl_id_t *>(src->data);
const dgl_id_t *dst_data = static_cast<dgl_id_t *>(dst->data);
const dgl_id_t *eid_data = static_cast<dgl_id_t *>(eid->data);
for (int64_t i = 0; i < num_edges; i++) {
edge_src.push_back(src_data[i]);
edge_dst.push_back(dst_data[i]);
edge_eid.push_back(eid_data[i]);
// If we haven't seen this node.
auto it = all_nodes.find(src_data[i]);
if (it == all_nodes.end()) {
all_nodes[src_data[i]] = false;
old_node_ids.push_back(src_data[i]);
outer_nodes[k].push_back(src_data[i]);
}
}
}
// We assign new Ids to the nodes in the subgraph. We ensure that the HALO
// nodes are behind the input nodes.
std::unordered_map<dgl_id_t, dgl_id_t> old2new;
for (size_t i = 0; i < old_node_ids.size(); i++) {
old2new[old_node_ids[i]] = i;
}
num_edges = edge_src.size();
IdArray new_src = IdArray::Empty({num_edges}, DLDataType{kDLInt, 64, 1},
DLContext{kDLCPU, 0});
IdArray new_dst = IdArray::Empty({num_edges}, DLDataType{kDLInt, 64, 1},
DLContext{kDLCPU, 0});
dgl_id_t *new_src_data = static_cast<dgl_id_t *>(new_src->data);
dgl_id_t *new_dst_data = static_cast<dgl_id_t *>(new_dst->data);
for (size_t i = 0; i < edge_src.size(); i++) {
new_src_data[i] = old2new[edge_src[i]];
new_dst_data[i] = old2new[edge_dst[i]];
}
std::vector<int> inner_nodes(old_node_ids.size());
for (size_t i = 0; i < old_node_ids.size(); i++) {
dgl_id_t old_nid = old_node_ids[i];
inner_nodes[i] = all_nodes[old_nid];
}
aten::COOMatrix coo(old_node_ids.size(), old_node_ids.size(), new_src,
new_dst);
HeteroGraphPtr ugptr = UnitGraph::CreateFromCOO(1, coo);
HeteroGraphPtr subg = CreateHeteroGraph(hg->meta_graph(), {ugptr});
HaloHeteroSubgraph halo_subg;
halo_subg.graph = subg;
halo_subg.induced_vertices = {aten::VecToIdArray(old_node_ids)};
halo_subg.induced_edges = {aten::VecToIdArray(edge_eid)};
// TODO(zhengda) we need to switch to 8 bytes afterwards.
halo_subg.inner_nodes = {aten::VecToIdArray<int>(inner_nodes, 32)};
return halo_subg;
}
DGL_REGISTER_GLOBAL("partition._CAPI_DGLReorderGraph_Hetero")
.set_body([](DGLArgs args, DGLRetValue *rv) {
HeteroGraphRef g = args[0];
auto hgptr = std::dynamic_pointer_cast<HeteroGraph>(g.sptr());
CHECK(hgptr) << "Invalid HeteroGraph object";
CHECK_EQ(hgptr->relation_graphs().size(), 1)
<< "Reorder only supports HomoGraph";
auto ugptr = hgptr->relation_graphs()[0];
const IdArray new_order = args[1];
auto reorder_ugptr = ReorderUnitGraph(ugptr, new_order);
std::vector<HeteroGraphPtr> rel_graphs = {reorder_ugptr};
*rv = HeteroGraphRef(std::make_shared<HeteroGraph>(
hgptr->meta_graph(), rel_graphs, hgptr->NumVerticesPerType()));
});
DGL_REGISTER_GLOBAL("partition._CAPI_DGLPartitionWithHalo_Hetero")
.set_body([](DGLArgs args, DGLRetValue *rv) {
HeteroGraphRef g = args[0];
auto hgptr = std::dynamic_pointer_cast<HeteroGraph>(g.sptr());
CHECK(hgptr) << "Invalid HeteroGraph object";
CHECK_EQ(hgptr->relation_graphs().size(), 1)
<< "Metis partition only supports HomoGraph";
auto ugptr = hgptr->relation_graphs()[0];
IdArray node_parts = args[1];
int num_hops = args[2];
CHECK_EQ(node_parts->dtype.bits, 64)
<< "Only supports 64bits tensor for now";
const int64_t *part_data = static_cast<int64_t *>(node_parts->data);
int64_t num_nodes = node_parts->shape[0];
std::unordered_map<int, std::vector<int64_t>> part_map;
for (int64_t i = 0; i < num_nodes; i++) {
dgl_id_t part_id = part_data[i];
auto it = part_map.find(part_id);
if (it == part_map.end()) {
std::vector<int64_t> vec;
vec.push_back(i);
part_map[part_id] = vec;
} else {
it->second.push_back(i);
}
}
std::vector<int> part_ids;
std::vector<std::vector<int64_t>> part_nodes;
int max_part_id = 0;
for (auto it = part_map.begin(); it != part_map.end(); it++) {
max_part_id = std::max(it->first, max_part_id);
part_ids.push_back(it->first);
part_nodes.push_back(it->second);
}
// When we construct subgraphs, we only access in-edges.
// We need to make sure the in-CSR exists. Otherwise, we'll
// try to construct in-CSR in openmp for loop, which will lead
// to some unexpected results.
ugptr->GetInCSR();
std::vector<std::shared_ptr<HaloHeteroSubgraph>> subgs(max_part_id + 1);
int num_partitions = part_nodes.size();
#pragma omp parallel for
for (int i = 0; i < num_partitions; i++) {
auto nodes = aten::VecToIdArray(part_nodes[i]);
HaloHeteroSubgraph subg = GetSubgraphWithHalo(hgptr, nodes, num_hops);
std::shared_ptr<HaloHeteroSubgraph> subg_ptr(
new HaloHeteroSubgraph(subg));
int part_id = part_ids[i];
subgs[part_id] = subg_ptr;
}
List<HeteroSubgraphRef> ret_list;
for (size_t i = 0; i < subgs.size(); i++) {
ret_list.push_back(HeteroSubgraphRef(subgs[i]));
}
*rv = ret_list;
});
// TODO(JJ): What's this?
DGL_REGISTER_GLOBAL("partition._CAPI_DGLReassignEdges_Hetero")
.set_body([](DGLArgs args, DGLRetValue *rv) {
HeteroGraphRef g = args[0];
auto hgptr = std::dynamic_pointer_cast<HeteroGraph>(g.sptr());
CHECK(hgptr) << "Invalid HeteroGraph object";
CHECK_EQ(hgptr->relation_graphs().size(), 1)
<< "Reorder only supports HomoGraph";
auto ugptr = hgptr->relation_graphs()[0];
bool is_incsr = args[1];
auto csrmat = is_incsr ? ugptr->GetCSCMatrix(0) : ugptr->GetCSRMatrix(0);
int64_t num_edges = csrmat.data->shape[0];
IdArray new_data =
IdArray::Empty({num_edges}, csrmat.data->dtype, csrmat.data->ctx);
// Return the original edge Ids.
*rv = new_data;
// TODO(zhengda) I need to invalidate out-CSR and COO.
// Generate new edge Ids.
// TODO(zhengda) after assignment, we actually don't need to store them
// physically.
ATEN_ID_TYPE_SWITCH(new_data->dtype, IdType, {
IdType *typed_new_data = static_cast<IdType *>(new_data->data);
IdType *typed_data = static_cast<IdType *>(csrmat.data->data);
for (int64_t i = 0; i < num_edges; i++) {
typed_new_data[i] = typed_data[i];
typed_data[i] = i;
}
});
});
DGL_REGISTER_GLOBAL("partition._CAPI_GetHaloSubgraphInnerNodes_Hetero")
.set_body([](DGLArgs args, DGLRetValue *rv) {
HeteroSubgraphRef g = args[0];
auto gptr = std::dynamic_pointer_cast<HaloHeteroSubgraph>(g.sptr());
CHECK(gptr) << "The input graph has to be HaloHeteroSubgraph";
*rv = gptr->inner_nodes[0];
});
} // namespace transform
} // namespace dgl
......@@ -490,6 +490,17 @@ def test_metis_partition():
check_metis_partition(g, 2)
check_metis_partition_with_constraint(g)
@unittest.skipIf(F._default_context_str == 'gpu', reason="METIS doesn't support GPU")
def test_hetero_metis_partition():
# TODO(zhengda) Metis fails to partition a small graph.
g = dgl.DGLGraph(create_large_graph_index(1000), readonly=True)
g = dgl.as_heterograph(g)
check_metis_partition(g, 0)
check_metis_partition(g, 1)
check_metis_partition(g, 2)
check_metis_partition_with_constraint(g)
def check_metis_partition_with_constraint(g):
ntypes = np.zeros((g.number_of_nodes(),), dtype=np.int32)
ntypes[0:int(g.number_of_nodes()/4)] = 1
......@@ -999,12 +1010,12 @@ def test_cast():
assert F.array_equal(g2dst, gdst)
if __name__ == '__main__':
test_reorder_nodes()
# test_reorder_nodes()
# test_line_graph()
# test_no_backtracking()
test_reverse()
# test_reverse()
# test_reverse_shared_frames()
test_to_bidirected()
# test_to_bidirected()
# test_simple_graph()
# test_bidirected_graph()
# test_khop_adj()
......@@ -1013,10 +1024,11 @@ if __name__ == '__main__':
# test_remove_self_loop()
# test_add_self_loop()
# test_partition_with_halo()
# test_metis_partition()
test_metis_partition()
test_hetero_metis_partition()
# test_hetero_linegraph('int32')
# test_compact()
test_to_simple("int32")
# test_to_simple("int32")
# test_in_subgraph("int32")
# test_out_subgraph()
# test_to_block("int32")
......
......@@ -17,8 +17,7 @@ def create_random_graph(n):
ig = create_graph_index(arr, readonly=True)
return dgl.DGLGraph(ig)
def check_partition(part_method, reshuffle):
g = create_random_graph(10000)
def check_partition(g, part_method, reshuffle):
g.ndata['labels'] = F.arange(0, g.number_of_nodes())
g.ndata['feats'] = F.tensor(np.random.randn(g.number_of_nodes(), 10))
g.edata['feats'] = F.tensor(np.random.randn(g.number_of_edges(), 10))
......@@ -105,12 +104,22 @@ def check_partition(part_method, reshuffle):
assert np.all(F.asnumpy(eid2pid) == edge_map)
def test_partition():
check_partition('metis', True)
check_partition('metis', False)
check_partition('random', True)
check_partition('random', False)
g = create_random_graph(10000)
check_partition(g, 'metis', True)
check_partition(g, 'metis', False)
check_partition(g, 'random', True)
check_partition(g, 'random', False)
def test_hetero_partition():
g = create_random_graph(10000)
g = dgl.as_heterograph(g)
check_partition(g, 'metis', True)
check_partition(g, 'metis', False)
check_partition(g, 'random', True)
check_partition(g, 'random', False)
if __name__ == '__main__':
os.makedirs('/tmp/partition', exist_ok=True)
test_partition()
test_hetero_partition()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment