Unverified Commit 41349dce authored by Da Zheng's avatar Da Zheng Committed by GitHub
Browse files

[Feature] Range partition (#1522)



* add reorder immutable graph.

* add python API.

* add reorder for csr.

* remove gk version.

* fix

* add cpp test.

* bug fixes

* fix tests.

* fix bugs and add check

* fix test.

* add omp.

* add comments.

* add coo reorder.

* fix a bug.

* handle reorder for different graph structues.

* fix lint.

* fix.

* add original ids.

* reshuffle nodes before metis partition.

* inner nodes are in contiguous Id range.

* reshuffle nodes/edges when partitioning.

* load partition return graph partition book.

* use inner_node/inner_edges

* add and test range partition book.

* count inner_edge correctly.

* fix lint.

* fix lint.

* fix lint.

* fix errors.

* fix errors.

* fix for TF.

* fix.

* fix.

* change docstring.

* support logical and.

* add comments.

* avoid copy.

* fix

* update docstring.

* fix a bug.

* add range search.

* fix

* fix a bug.

* add more tests.

* load graph partition book.

* support shared memory for range partition book.

* fix a bug.

* fix.

* fix lint.

* remove check

* fix test.

* remove num_nodes and num_edges

* fix lint.

* fix graph partition book.

* address comments.

* use makedirs.

* fix compile
Co-authored-by: default avatarxiang song(charlie.song) <classicxsong@gmail.com>
Co-authored-by: default avatarChao Ma <mctt90@gmail.com>
parent 3e72c53a
......@@ -589,6 +589,14 @@ bool CSRHasDuplicate(CSRMatrix csr);
*/
void CSRSort_(CSRMatrix* csr);
/*!
* \brief Reorder the rows and colmns according to the new row and column order.
* \param csr The input csr matrix.
* \param new_row_ids the new row Ids (the index is the old row Id)
* \param new_col_ids the new column Ids (the index is the old col Id).
*/
CSRMatrix CSRReorder(CSRMatrix csr, runtime::NDArray new_row_ids, runtime::NDArray new_col_ids);
/*!
* \brief Remove entries from CSR matrix by entry indices (data indices)
* \return A new CSR matrix as well as a mapping from the new CSR entries to the old CSR
......@@ -778,6 +786,14 @@ COOMatrix COOSort(COOMatrix mat, bool sort_column = false);
*/
COOMatrix COORemove(COOMatrix coo, IdArray entries);
/*!
* \brief Reorder the rows and colmns according to the new row and column order.
* \param csr The input coo matrix.
* \param new_row_ids the new row Ids (the index is the old row Id)
* \param new_col_ids the new column Ids (the index is the old col Id).
*/
COOMatrix COOReorder(COOMatrix coo, runtime::NDArray new_row_ids, runtime::NDArray new_col_ids);
/*!
* \brief Randomly select a fixed number of non-zero entries along each given row independently.
*
......
......@@ -159,6 +159,14 @@ class GraphOp {
*/
static HaloSubgraph GetSubgraphWithHalo(GraphPtr graph, IdArray nodes, int num_hops);
/*!
* \brief Reorder the nodes in the immutable graph.
* \param graph The input graph.
* \param new_order The node Ids in the new graph. The index in `new_order` is old node Ids.
* \return the graph with reordered node Ids
*/
static GraphPtr ReorderImmutableGraph(ImmutableGraphPtr ig, IdArray new_order);
/*!
* \brief Partition a graph with Metis.
* The partitioning algorithm assigns each vertex to a partition.
......
......@@ -982,6 +982,9 @@ def logical_not(input):
"""
pass
def logical_and(input1, input2):
pass
def clone(input):
"""Return a clone of the input tensor.
......
......@@ -313,6 +313,9 @@ def equal(x, y):
def logical_not(input):
return nd.logical_not(input)
def logical_and(input1, input2):
return nd.logical_and(input1, input2)
def clone(input):
return input.copy()
......
......@@ -249,6 +249,9 @@ def equal(x, y):
def logical_not(input):
return ~input
def logical_and(input1, input2):
return input1 & input2
def clone(input):
return input.clone()
......
......@@ -350,6 +350,9 @@ def equal(x, y):
def logical_not(input):
return ~input
def logical_and(input1, input2):
return tf.math.logical_and(input1, input2)
def clone(input):
# TF tensor is always immutable so returning the input is safe.
return input
......
......@@ -10,7 +10,7 @@ from ..graph_index import from_shared_mem_graph_index
from .._ffi.ndarray import empty_shared_mem
from ..frame import infer_scheme
from .partition import load_partition
from .graph_partition_book import GraphPartitionBook, PartitionPolicy, get_shared_mem_partition_book
from .graph_partition_book import PartitionPolicy, get_shared_mem_partition_book
from .. import utils
from .shared_mem_utils import _to_shared_mem, _get_ndata_path, _get_edata_path, DTYPE_DICT
from .rpc_client import connect_to_server
......@@ -25,17 +25,16 @@ def _copy_graph_to_shared_mem(g, graph_name):
new_g = DGLGraph(gidx)
# We should share the node/edge data to the client explicitly instead of putting them
# in the KVStore because some of the node/edge data may be duplicated.
local_node_path = _get_ndata_path(graph_name, 'local_node')
new_g.ndata['local_node'] = _to_shared_mem(g.ndata['local_node'],
local_node_path)
local_edge_path = _get_edata_path(graph_name, 'local_edge')
new_g.edata['local_edge'] = _to_shared_mem(g.edata['local_edge'], local_edge_path)
local_node_path = _get_ndata_path(graph_name, 'inner_node')
new_g.ndata['inner_node'] = _to_shared_mem(g.ndata['inner_node'], local_node_path)
local_edge_path = _get_edata_path(graph_name, 'inner_edge')
new_g.edata['inner_edge'] = _to_shared_mem(g.edata['inner_edge'], local_edge_path)
new_g.ndata[NID] = _to_shared_mem(g.ndata[NID], _get_ndata_path(graph_name, NID))
new_g.edata[EID] = _to_shared_mem(g.edata[EID], _get_edata_path(graph_name, EID))
return new_g
FIELD_DICT = {'local_node': F.int64,
'local_edge': F.int64,
FIELD_DICT = {'inner_node': F.int64,
'inner_edge': F.int64,
NID: F.int64,
EID: F.int64}
......@@ -99,8 +98,8 @@ def _get_graph_from_shared_mem(graph_name):
return gidx
g = DGLGraph(gidx)
g.ndata['local_node'] = _get_shared_mem_ndata(g, graph_name, 'local_node')
g.edata['local_edge'] = _get_shared_mem_edata(g, graph_name, 'local_edge')
g.ndata['inner_node'] = _get_shared_mem_ndata(g, graph_name, 'inner_node')
g.edata['inner_edge'] = _get_shared_mem_edata(g, graph_name, 'inner_edge')
g.ndata[NID] = _get_shared_mem_ndata(g, graph_name, NID)
g.edata[EID] = _get_shared_mem_edata(g, graph_name, EID)
return g
......@@ -271,12 +270,10 @@ class DistGraphServer(KVServer):
self.ip_config = ip_config
# Load graph partition data.
self.client_g, node_feats, edge_feats, self.meta = load_partition(conf_file, server_id)
_, _, node_map, edge_map, num_partitions = self.meta
self.client_g, node_feats, edge_feats, self.gpb = load_partition(conf_file, server_id)
self.client_g = _copy_graph_to_shared_mem(self.client_g, graph_name)
# Init kvstore.
self.gpb = GraphPartitionBook(server_id, num_partitions, node_map, edge_map, self.client_g)
self.gpb.shared_memory(graph_name)
self.add_part_policy(PartitionPolicy('node', server_id, self.gpb))
self.add_part_policy(PartitionPolicy('edge', server_id, self.gpb))
......@@ -332,6 +329,12 @@ class DistGraph:
self._default_init_ndata = _default_init_data
self._default_init_edata = _default_init_data
self._num_nodes = 0
self._num_edges = 0
for part_md in self._gpb.metadata():
self._num_nodes += int(part_md['num_nodes'])
self._num_edges += int(part_md['num_edges'])
def init_ndata(self, ndata_name, shape, dtype):
'''Initialize node data
......@@ -425,11 +428,11 @@ class DistGraph:
def number_of_nodes(self):
"""Return the number of nodes"""
return self._gpb.num_nodes()
return self._num_nodes
def number_of_edges(self):
"""Return the number of edges"""
return self._gpb.num_edges()
return self._num_edges
def node_attr_schemes(self):
"""Return the node feature and embedding schemes."""
......
......@@ -8,13 +8,13 @@ from .. import utils
from .shared_mem_utils import _to_shared_mem, _get_ndata_path, _get_edata_path, DTYPE_DICT
from .._ffi.ndarray import empty_shared_mem
def _move_metadata_to_shared_mam(graph_name, num_nodes, num_edges, part_id,
num_partitions, node_map, edge_map):
''' Move all metadata to the shared memory.
def _move_metadata_to_shared_mem(graph_name, num_nodes, num_edges, part_id,
num_partitions, node_map, edge_map, is_range_part):
''' Move all metadata of the partition book to the shared memory.
We need these metadata to construct graph partition book.
'''
meta = _to_shared_mem(F.tensor([num_nodes, num_edges,
meta = _to_shared_mem(F.tensor([int(is_range_part), num_nodes, num_edges,
num_partitions, part_id]),
_get_ndata_path(graph_name, 'meta'))
node_map = _to_shared_mem(node_map, _get_ndata_path(graph_name, 'node_map'))
......@@ -27,25 +27,29 @@ def _get_shared_mem_metadata(graph_name):
The metadata includes the number of nodes and the number of edges. In the future,
we can add more information, especially for heterograph.
'''
shape = (4,)
# The metadata has 5 elements: is_range_part, num_nodes, num_edges, num_partitions, part_id
# We might need to extend the list in the future.
shape = (5,)
dtype = F.int64
dtype = DTYPE_DICT[dtype]
data = empty_shared_mem(_get_ndata_path(graph_name, 'meta'), False, shape, dtype)
dlpack = data.to_dlpack()
meta = F.asnumpy(F.zerocopy_from_dlpack(dlpack))
num_nodes, num_edges, num_partitions, part_id = meta[0], meta[1], meta[2], meta[3]
is_range_part, num_nodes, num_edges, num_partitions, part_id = meta
# Load node map
data = empty_shared_mem(_get_ndata_path(graph_name, 'node_map'), False, (num_nodes,), dtype)
length = num_partitions if is_range_part else num_nodes
data = empty_shared_mem(_get_ndata_path(graph_name, 'node_map'), False, (length,), dtype)
dlpack = data.to_dlpack()
node_map = F.zerocopy_from_dlpack(dlpack)
# Load edge_map
data = empty_shared_mem(_get_edata_path(graph_name, 'edge_map'), False, (num_edges,), dtype)
length = num_partitions if is_range_part else num_edges
data = empty_shared_mem(_get_edata_path(graph_name, 'edge_map'), False, (length,), dtype)
dlpack = data.to_dlpack()
edge_map = F.zerocopy_from_dlpack(dlpack)
return part_id, num_partitions, node_map, edge_map
return is_range_part, part_id, num_partitions, node_map, edge_map
def get_shared_mem_partition_book(graph_name, graph_part):
......@@ -63,11 +67,14 @@ def get_shared_mem_partition_book(graph_name, graph_part):
Returns
-------
GraphPartitionBook
GraphPartitionBook or RangePartitionBook
A graph partition book for a particular partition.
'''
part_id, num_parts, node_map, edge_map = _get_shared_mem_metadata(graph_name)
return GraphPartitionBook(part_id, num_parts, node_map, edge_map, graph_part)
is_range_part, part_id, num_parts, node_map, edge_map = _get_shared_mem_metadata(graph_name)
if is_range_part == 1:
return RangePartitionBook(part_id, num_parts, node_map, edge_map)
else:
return GraphPartitionBook(part_id, num_parts, node_map, edge_map, graph_part)
class GraphPartitionBook:
"""GraphPartitionBook is used to store parition information.
......@@ -75,7 +82,7 @@ class GraphPartitionBook:
Parameters
----------
part_id : int
partition id of current GraphPartitionBook
partition id of current partition book
num_parts : int
number of total partitions
node_map : tensor
......@@ -94,16 +101,15 @@ class GraphPartitionBook:
self._nid2partid = node_map.tousertensor()
edge_map = utils.toindex(edge_map)
self._eid2partid = edge_map.tousertensor()
self._graph = part_graph
# Get meta data of GraphPartitionBook
# Get meta data of the partition book.
self._partition_meta_data = []
_, nid_count = np.unique(F.asnumpy(self._nid2partid), return_counts=True)
_, eid_count = np.unique(F.asnumpy(self._eid2partid), return_counts=True)
for partid in range(self._num_partitions):
part_info = {}
part_info['machine_id'] = partid
part_info['num_nodes'] = nid_count[partid]
part_info['num_edges'] = eid_count[partid]
part_info['num_nodes'] = int(nid_count[partid])
part_info['num_edges'] = int(eid_count[partid])
self._partition_meta_data.append(part_info)
# Get partid2nids
self._partid2nids = []
......@@ -123,7 +129,7 @@ class GraphPartitionBook:
self._partid2eids.append(part_eids)
# Get nidg2l
self._nidg2l = [None] * self._num_partitions
global_id = self._graph.ndata[NID]
global_id = part_graph.ndata[NID]
max_global_id = np.amax(F.asnumpy(global_id))
# TODO(chao): support int32 index
g2l = F.zeros((max_global_id+1), F.int64, F.context(global_id))
......@@ -131,7 +137,7 @@ class GraphPartitionBook:
self._nidg2l[self._part_id] = g2l
# Get eidg2l
self._eidg2l = [None] * self._num_partitions
global_id = self._graph.edata[EID]
global_id = part_graph.edata[EID]
max_global_id = np.amax(F.asnumpy(global_id))
# TODO(chao): support int32 index
g2l = F.zeros((max_global_id+1), F.int64, F.context(global_id))
......@@ -149,9 +155,9 @@ class GraphPartitionBook:
graph_name : str
The graph name
"""
self._meta, self._nid2partid, self._eid2partid = _move_metadata_to_shared_mam(
graph_name, self.num_nodes(), self.num_edges(), self._part_id, self._num_partitions,
self._nid2partid, self._eid2partid)
self._meta, self._nid2partid, self._eid2partid = _move_metadata_to_shared_mem(
graph_name, self._num_nodes(), self._num_edges(), self._part_id, self._num_partitions,
self._nid2partid, self._eid2partid, False)
def num_partitions(self):
"""Return the number of partitions.
......@@ -169,7 +175,6 @@ class GraphPartitionBook:
The meta data includes:
* The machine ID.
* The machine IP address.
* Number of nodes and edges of each partition.
Examples
......@@ -186,12 +191,12 @@ class GraphPartitionBook:
"""
return self._partition_meta_data
def num_nodes(self):
def _num_nodes(self):
""" The total number of nodes
"""
return len(self._nid2partid)
def num_edges(self):
def _num_edges(self):
""" The total number of edges
"""
return len(self._eid2partid)
......@@ -227,7 +232,7 @@ class GraphPartitionBook:
return F.gather_row(self._eid2partid, eids)
def partid2nids(self, partid):
"""From partition id to node IDs
"""From partition id to global node IDs
Parameters
----------
......@@ -242,7 +247,7 @@ class GraphPartitionBook:
return self._partid2nids[partid]
def partid2eids(self, partid):
"""From partition id to edge IDs
"""From partition id to global edge IDs
Parameters
----------
......@@ -309,11 +314,7 @@ class GraphPartitionBook:
DGLGraph
The graph of the partition.
"""
if partid != self._part_id:
raise RuntimeError('Now GraphPartitionBook does not support \
getting remote partitions.')
return self._graph
#TODO(zhengda) add implementation later.
def get_node_size(self):
"""Get the number of nodes in the current partition.
......@@ -335,6 +336,260 @@ class GraphPartitionBook:
"""
return self._edge_size
class RangePartitionBook:
"""RangePartitionBook is used to store parition information.
Parameters
----------
part_id : int
partition id of current partition book
num_parts : int
number of total partitions
node_map : tensor
map global node id to partition id
edge_map : tensor
map global edge id to partition id
"""
def __init__(self, part_id, num_parts, node_map, edge_map):
assert part_id >= 0, 'part_id cannot be a negative number.'
assert num_parts > 0, 'num_parts must be greater than zero.'
self._partid = part_id
self._num_partitions = num_parts
node_map = utils.toindex(node_map)
edge_map = utils.toindex(edge_map)
self._node_map = node_map.tonumpy()
self._edge_map = edge_map.tonumpy()
# Get meta data of the partition book
self._partition_meta_data = []
for partid in range(self._num_partitions):
nrange_start = node_map[partid - 1] if partid > 0 else 0
nrange_end = node_map[partid]
erange_start = edge_map[partid - 1] if partid > 0 else 0
erange_end = edge_map[partid]
part_info = {}
part_info['machine_id'] = partid
part_info['num_nodes'] = int(nrange_end - nrange_start)
part_info['num_edges'] = int(erange_end - erange_start)
self._partition_meta_data.append(part_info)
def shared_memory(self, graph_name):
"""Move data to shared memory.
Parameters
----------
graph_name : str
The graph name
"""
self._meta = _move_metadata_to_shared_mem(
graph_name, self._num_nodes(), self._num_edges(), self._partid,
self._num_partitions, F.tensor(self._node_map), F.tensor(self._edge_map), True)
def num_partitions(self):
"""Return the number of partitions.
Returns
-------
int
number of partitions
"""
return self._num_partitions
def _num_nodes(self):
""" The total number of nodes
"""
return int(self._node_map[-1])
def _num_edges(self):
""" The total number of edges
"""
return int(self._edge_map[-1])
def metadata(self):
"""Return the partition meta data.
The meta data includes:
* The machine ID.
* Number of nodes and edges of each partition.
Examples
--------
>>> print(g.get_partition_book().metadata())
>>> [{'machine_id' : 0, 'num_nodes' : 3000, 'num_edges' : 5000},
... {'machine_id' : 1, 'num_nodes' : 2000, 'num_edges' : 4888},
... ...]
Returns
-------
list[dict[str, any]]
Meta data of each partition.
"""
return self._partition_meta_data
def nid2partid(self, nids):
"""From global node IDs to partition IDs
Parameters
----------
nids : tensor
global node IDs
Returns
-------
tensor
partition IDs
"""
nids = utils.toindex(nids)
ret = np.searchsorted(self._node_map, nids.tonumpy(), side='right')
ret = utils.toindex(ret)
return ret.tousertensor()
def eid2partid(self, eids):
"""From global edge IDs to partition IDs
Parameters
----------
eids : tensor
global edge IDs
Returns
-------
tensor
partition IDs
"""
eids = utils.toindex(eids)
ret = np.searchsorted(self._edge_map, eids.tonumpy(), side='right')
ret = utils.toindex(ret)
return ret.tousertensor()
def partid2nids(self, partid):
"""From partition id to global node IDs
Parameters
----------
partid : int
partition id
Returns
-------
tensor
node IDs
"""
# TODO do we need to cache it?
start = self._node_map[partid - 1] if partid > 0 else 0
end = self._node_map[partid]
return F.arange(start, end)
def partid2eids(self, partid):
"""From partition id to global edge IDs
Parameters
----------
partid : int
partition id
Returns
-------
tensor
edge IDs
"""
# TODO do we need to cache it?
start = self._edge_map[partid - 1] if partid > 0 else 0
end = self._edge_map[partid]
return F.arange(start, end)
def nid2localnid(self, nids, partid):
"""Get local node IDs within the given partition.
Parameters
----------
nids : tensor
global node IDs
partid : int
partition ID
Returns
-------
tensor
local node IDs
"""
if partid != self._partid:
raise RuntimeError('Now RangePartitionBook does not support \
getting remote tensor of nid2localnid.')
start = self._node_map[partid - 1] if partid > 0 else 0
return nids - start
def eid2localeid(self, eids, partid):
"""Get the local edge ids within the given partition.
Parameters
----------
eids : tensor
global edge ids
partid : int
partition ID
Returns
-------
tensor
local edge ids
"""
if partid != self._partid:
raise RuntimeError('Now RangePartitionBook does not support \
getting remote tensor of eid2localeid.')
start = self._edge_map[partid - 1] if partid > 0 else 0
return eids - start
def get_partition(self, partid):
"""Get the graph of one partition.
Parameters
----------
partid : int
Partition ID.
Returns
-------
DGLGraph
The graph of the partition.
"""
#TODO(zhengda) add implementation later.
def get_node_size(self):
"""Get the number of nodes in the current partition.
Return
------
int
The number of nodes in current partition
"""
range_start = self._node_map[self._partid - 1] if self._partid > 0 else 0
range_end = self._node_map[self._partid]
return range_end - range_start
def get_edge_size(self):
"""Get the number of edges in the current partition.
Return
------
int
The number of edges in current partition
"""
range_start = self._edge_map[self._partid - 1] if self._partid > 0 else 0
range_end = self._edge_map[self._partid]
return range_end - range_start
class PartitionPolicy(object):
"""Wrapper for GraphPartitionBook and RangePartitionBook.
......
......@@ -85,6 +85,7 @@ from .. import backend as F
from ..base import NID, EID
from ..data.utils import load_graphs, save_graphs, load_tensors, save_tensors
from ..transform import metis_partition_assignment, partition_graph_with_halo
from .graph_partition_book import GraphPartitionBook, RangePartitionBook
def load_partition(conf_file, part_id):
''' Load data of a partition from the data path in the DistGraph server.
......@@ -115,8 +116,8 @@ def load_partition(conf_file, part_id):
All node features.
dict of tensors
All edge features.
(int, int, NumPy ndarray, Numpy ndarray))
The metadata of the global graph: number of nodes, number of edges, node map, edge map.
GraphPartitionBook
The global partition information.
'''
with open(conf_file) as conf_f:
part_metadata = json.load(conf_f)
......@@ -134,24 +135,27 @@ def load_partition(conf_file, part_id):
assert 'num_edges' in part_metadata, "cannot get the number of edges of the global graph."
assert 'node_map' in part_metadata, "cannot get the node map."
assert 'edge_map' in part_metadata, "cannot get the edge map."
node_map = np.load(part_metadata['node_map'], allow_pickle=True)
edge_map = np.load(part_metadata['edge_map'], allow_pickle=True)
meta = (part_metadata['num_nodes'], part_metadata['num_edges'], node_map, edge_map, num_parts)
# If this is a range partitioning, node_map actually stores a list, whose elements
# indicate the boundary of range partitioning. Otherwise, node_map stores a filename
# that contains node map in a NumPy array.
is_range_part = isinstance(part_metadata['node_map'], list)
node_map = part_metadata['node_map'] if is_range_part else np.load(part_metadata['node_map'])
edge_map = part_metadata['edge_map'] if is_range_part else np.load(part_metadata['edge_map'])
assert isinstance(node_map, list) == isinstance(edge_map, list), \
"The node map and edge map need to have the same format"
assert NID in graph.ndata, "the partition graph should contain node mapping to global node Id"
assert EID in graph.edata, "the partition graph should contain edge mapping to global edge Id"
# TODO we need to fix this. DGL backend doesn't support boolean or byte.
# int64 is unnecessary.
node_map = F.zerocopy_from_numpy(node_map)
part_ids = F.gather_row(node_map, graph.ndata[NID])
graph.ndata['local_node'] = F.astype(part_ids == part_id, F.int64)
edge_map = F.zerocopy_from_numpy(edge_map)
part_ids = F.gather_row(edge_map, graph.edata[EID])
graph.edata['local_edge'] = F.astype(part_ids == part_id, F.int64)
return graph, node_feats, edge_feats, meta
if is_range_part:
gpb = RangePartitionBook(part_id, num_parts, np.array(node_map), np.array(edge_map))
else:
gpb = GraphPartitionBook(part_id, num_parts, node_map, edge_map, graph)
return graph, node_feats, edge_feats, gpb
def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method="metis"):
def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method="metis",
reshuffle=True):
''' Partition a graph for distributed training and store the partitions on files.
The partitioning occurs in three steps: 1) run a partition algorithm (e.g., Metis) to
......@@ -192,31 +196,37 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
The partition method. It supports "random" and "metis".
out_path : str
The path to store the files for all partitioned data.
reshuffle : bool
Reshuffle nodes and edges so that nodes and edges in a partition are in
contiguous Id range.
'''
if num_parts == 1:
client_parts = {0: g}
node_parts = F.zeros((g.number_of_nodes(),), F.int64, F.cpu())
g.ndata[NID] = F.arange(0, g.number_of_nodes())
g.edata[EID] = F.arange(0, g.number_of_edges())
g.ndata['inner_node'] = F.ones((g.number_of_nodes(),), F.int64, F.cpu())
g.edata['inner_edge'] = F.ones((g.number_of_edges(),), F.int64, F.cpu())
g.ndata['orig_id'] = F.arange(0, g.number_of_nodes())
elif part_method == 'metis':
node_parts = metis_partition_assignment(g, num_parts)
client_parts = partition_graph_with_halo(g, node_parts, num_hops)
client_parts = partition_graph_with_halo(g, node_parts, num_hops, reshuffle=reshuffle)
elif part_method == 'random':
node_parts = dgl.random.choice(num_parts, g.number_of_nodes())
client_parts = partition_graph_with_halo(g, node_parts, num_hops)
client_parts = partition_graph_with_halo(g, node_parts, num_hops, reshuffle=reshuffle)
else:
raise Exception('Unknown partitioning method: ' + part_method)
# Let's calculate edge assignment.
# TODO(zhengda) we should replace int64 with int16. int16 should be sufficient.
if num_parts > 1:
if not reshuffle:
edge_parts = np.zeros((g.number_of_edges(),), dtype=np.int64) - 1
num_edges = 0
lnodes_list = [] # The node ids of each partition
ledges_list = [] # The edge Ids of each partition
for part_id in range(num_parts):
part = client_parts[part_id]
local_nodes = F.boolean_mask(part.ndata[NID], part.ndata['inner_node'] == 1)
local_nodes = F.boolean_mask(part.ndata[NID], part.ndata['inner_node'])
local_edges = F.asnumpy(g.in_edges(local_nodes, form='eid'))
edge_parts[local_edges] = part_id
num_edges += len(local_edges)
......@@ -224,23 +234,53 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
ledges_list.append(local_edges)
assert num_edges == g.number_of_edges()
else:
edge_parts = np.zeros((g.number_of_edges(),), dtype=np.int64)
num_edges = 0
num_nodes = 0
lnodes_list = [] # The node ids of each partition
ledges_list = [] # The edge Ids of each partition
for part_id in range(num_parts):
part = client_parts[part_id]
num_local_nodes = F.asnumpy(F.sum(part.ndata['inner_node'], 0))
# To get the edges in the input graph, we should use original node Ids.
local_nodes = F.boolean_mask(part.ndata['orig_id'], part.ndata['inner_node'])
num_local_edges = F.asnumpy(F.sum(g.in_degrees(local_nodes), 0))
num_edges += int(num_local_edges)
num_nodes += int(num_local_nodes)
lnodes_list.append(num_nodes)
ledges_list.append(num_edges)
assert num_edges == g.number_of_edges()
assert num_nodes == g.number_of_nodes()
os.makedirs(out_path, mode=0o775, exist_ok=True)
tot_num_inner_edges = 0
out_path = os.path.abspath(out_path)
node_part_file = os.path.join(out_path, "node_map")
edge_part_file = os.path.join(out_path, "edge_map")
np.save(node_part_file, F.asnumpy(node_parts), allow_pickle=True)
np.save(edge_part_file, edge_parts, allow_pickle=True)
# Without reshuffling, we have to store the entire node/edge mapping in a file.
if not reshuffle:
node_part_file = os.path.join(out_path, "node_map")
edge_part_file = os.path.join(out_path, "edge_map")
np.save(node_part_file, F.asnumpy(node_parts), allow_pickle=False)
np.save(edge_part_file, edge_parts, allow_pickle=False)
node_map_val = node_part_file + ".npy"
edge_map_val = edge_part_file + ".npy"
else:
# With reshuffling, we can ensure that all nodes and edges are reshuffled
# and are in contiguous Id space.
if num_parts > 1:
node_map_val = lnodes_list
edge_map_val = ledges_list
else:
node_map_val = [g.number_of_nodes()]
edge_map_val = [g.number_of_edges()]
part_metadata = {'graph_name': graph_name,
'num_nodes': g.number_of_nodes(),
'num_edges': g.number_of_edges(),
'part_method': part_method,
'num_parts': num_parts,
'halo_hops': num_hops,
'node_map': node_part_file + ".npy",
'edge_map': edge_part_file + ".npy"}
'node_map': node_map_val,
'edge_map': edge_map_val}
for part_id in range(num_parts):
part = client_parts[part_id]
......@@ -248,8 +288,15 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
node_feats = {}
edge_feats = {}
if num_parts > 1:
local_nodes = lnodes_list[part_id]
local_edges = ledges_list[part_id]
if reshuffle and part_id == 0:
local_nodes = F.arange(0, lnodes_list[part_id])
local_edges = F.arange(0, ledges_list[part_id])
elif reshuffle:
local_nodes = F.arange(lnodes_list[part_id - 1], lnodes_list[part_id])
local_edges = F.arange(ledges_list[part_id - 1], ledges_list[part_id])
else:
local_nodes = lnodes_list[part_id]
local_edges = ledges_list[part_id]
print('part {} has {} nodes and {} edges.'.format(
part_id, part.number_of_nodes(), part.number_of_edges()))
print('{} nodes and {} edges are inside the partition'.format(
......
......@@ -11,7 +11,6 @@ from . import ndarray as nd
from . import backend as F
from .graph_index import from_coo
from .graph_index import _get_halo_subgraph_inner_node
from .graph_index import _get_halo_subgraph_inner_edge
from .graph import unbatch
from .convert import graph, bipartite
from . import utils
......@@ -549,9 +548,48 @@ def remove_self_loop(g):
new_g.add_edges(src[non_self_edges_idx], dst[non_self_edges_idx])
return new_g
def partition_graph_with_halo(g, node_part, num_hops):
''' This is to partition a graph. Each partition contains HALO nodes
so that we can generate NodeFlow in each partition correctly.
def reorder_nodes(g, new_node_ids):
""" Generate a new graph with new node Ids.
We assign each node in the input graph with a new node Id. This results in
a new graph.
Parameters
----------
g : DGLGraph
The input graph
new_node_ids : a tensor
The new node Ids
Returns
-------
DGLGraph
The graph with new node Ids.
"""
assert len(new_node_ids) == g.number_of_nodes(), \
"The number of new node ids must match #nodes in the graph."
new_node_ids = utils.toindex(new_node_ids)
sorted_ids, idx = F.sort_1d(new_node_ids.tousertensor())
assert F.asnumpy(sorted_ids[0]) == 0 \
and F.asnumpy(sorted_ids[-1]) == g.number_of_nodes() - 1, \
"The new node Ids are incorrect."
new_gidx = _CAPI_DGLReorderGraph(g._graph, new_node_ids.todgltensor())
new_g = DGLGraph(new_gidx)
new_g.ndata['orig_id'] = idx
return new_g
def partition_graph_with_halo(g, node_part, extra_cached_hops, reshuffle=False):
'''Partition a graph.
Based on the given node assignments for each partition, the function splits
the input graph into subgraphs. A subgraph may contain HALO nodes which does
not belong to the partition of a subgraph but are connected to the nodes
in the partition within a fixed number of hops.
If `reshuffle` is turned on, the function reshuffles node Ids and edge Ids
of the input graph before partitioning. After reshuffling, all nodes and edges
in a partition fall in a contiguous Id range in the input graph.
The partitioend subgraphs have node data 'orig_id', which stores the node Ids
in the original input graph.
Parameters
------------
......@@ -563,9 +601,12 @@ def partition_graph_with_halo(g, node_part, num_hops):
needs to be the same as the number of nodes of the graph. Each element
indicates the partition Id of a node.
num_hops: int
extra_cached_hops: int
The number of hops a HALO node can be accessed.
reshuffle : bool
Resuffle nodes so that nodes in the same partition are in the same Id range.
Returns
--------
a dict of DGLGraphs
......@@ -573,15 +614,41 @@ def partition_graph_with_halo(g, node_part, num_hops):
'''
assert len(node_part) == g.number_of_nodes()
node_part = utils.toindex(node_part)
subgs = _CAPI_DGLPartitionWithHalo(g._graph, node_part.todgltensor(), num_hops)
if reshuffle:
node_part = node_part.tousertensor()
sorted_part, new2old_map = F.sort_1d(node_part)
new_node_ids = F.gather_row(F.arange(0, g.number_of_nodes()), new2old_map)
g = reorder_nodes(g, new_node_ids)
node_part = utils.toindex(sorted_part)
# We reassign edges in in-CSR. In this way, after partitioning, we can ensure
# that all edges in a partition are in the contiguous Id space.
orig_eids = _CAPI_DGLReassignEdges(g._graph, True)
orig_eids = utils.toindex(orig_eids)
g.edata['orig_id'] = orig_eids.tousertensor()
subgs = _CAPI_DGLPartitionWithHalo(g._graph, node_part.todgltensor(), extra_cached_hops)
subg_dict = {}
node_part = node_part.tousertensor()
for i, subg in enumerate(subgs):
inner_node = _get_halo_subgraph_inner_node(subg)
inner_edge = _get_halo_subgraph_inner_edge(subg)
subg = g._create_subgraph(subg, subg.induced_nodes, subg.induced_edges)
inner_node = F.zerocopy_from_dlpack(inner_node.to_dlpack())
subg.ndata['inner_node'] = inner_node
inner_edge = F.zerocopy_from_dlpack(inner_edge.to_dlpack())
subg.ndata['part_id'] = F.gather_row(node_part, subg.parent_nid)
if reshuffle:
subg.ndata['orig_id'] = F.gather_row(g.ndata['orig_id'], subg.ndata[NID])
subg.edata['orig_id'] = F.gather_row(g.edata['orig_id'], subg.edata[EID])
if extra_cached_hops >= 1:
inner_edge = F.zeros((subg.number_of_edges(),), F.int64, F.cpu())
inner_nids = F.nonzero_1d(subg.ndata['inner_node'])
# TODO(zhengda) we need to fix utils.toindex() to avoid the dtype cast below.
inner_nids = F.astype(inner_nids, F.int64)
inner_eids = subg.in_edges(inner_nids, form='eid')
inner_edge = F.scatter_row(inner_edge, inner_eids,
F.ones((len(inner_eids),), F.dtype(inner_edge), F.cpu()))
else:
inner_edge = F.ones((subg.number_of_edges(),), F.int64, F.cpu())
subg.edata['inner_edge'] = inner_edge
subg_dict[i] = subg
return subg_dict
......@@ -613,14 +680,23 @@ def metis_partition_assignment(g, k):
node_part = utils.toindex(node_part)
return node_part.tousertensor()
def metis_partition(g, k, extra_cached_hops=0):
def metis_partition(g, k, extra_cached_hops=0, reshuffle=False):
''' This is to partition a graph with Metis partitioning.
Metis assigns vertices to partitions. This API constructs graphs with the vertices assigned
to the partitions and their incoming edges.
Metis assigns vertices to partitions. This API constructs subgraphs with the vertices assigned
to the partitions and their incoming edges. A subgraph may contain HALO nodes which does
not belong to the partition of a subgraph but are connected to the nodes
in the partition within a fixed number of hops.
The partitioned graph is stored in DGLGraph. The DGLGraph has the `part_id`
node data that indicates the partition a node belongs to.
If `reshuffle` is turned on, the function reshuffles node Ids and edge Ids
of the input graph before partitioning. After reshuffling, all nodes and edges
in a partition fall in a contiguous Id range in the input graph.
The partitioend subgraphs have node data 'orig_id', which stores the node Ids
in the original input graph.
The partitioned subgraph is stored in DGLGraph. The DGLGraph has the `part_id`
node data that indicates the partition a node belongs to. The subgraphs do not contain
the node/edge data in the input graph.
Parameters
------------
......@@ -633,6 +709,9 @@ def metis_partition(g, k, extra_cached_hops=0):
extra_cached_hops: int
The number of hops a HALO node can be accessed.
reshuffle : bool
Resuffle nodes so that nodes in the same partition are in the same Id range.
Returns
--------
a dict of DGLGraphs
......@@ -645,14 +724,8 @@ def metis_partition(g, k, extra_cached_hops=0):
if len(node_part) == 0:
return None
node_part = utils.toindex(node_part)
# Then we split the original graph into parts based on the METIS partitioning results.
parts = partition_graph_with_halo(g, node_part, extra_cached_hops)
node_part = node_part.tousertensor()
for part_id in parts:
part = parts[part_id]
part.ndata['part_id'] = F.gather_row(node_part, part.parent_nid)
return parts
return partition_graph_with_halo(g, node_part, extra_cached_hops, reshuffle)
def compact_graphs(graphs, always_preserve=None):
"""Given a list of graphs with the same set of nodes, find and eliminate the common
......@@ -953,10 +1026,6 @@ def remove_edges(g, edge_ids):
The nodes are preserved.
Note: `remove_edges` is slow especially when removing a small number of edges from
a large graph. It creates a new graph with all remaining edges and return the new graph.
Please use it with caution especially when using it in mini-batch training.
Parameters
----------
graph : DGLHeteroGraph
......
......@@ -441,6 +441,22 @@ void CSRSort_(CSRMatrix* csr) {
});
}
CSRMatrix CSRReorder(CSRMatrix csr, runtime::NDArray new_row_ids, runtime::NDArray new_col_ids) {
CSRMatrix ret;
ATEN_CSR_SWITCH(csr, XPU, IdType, "CSRReorder", {
ret = impl::CSRReorder<XPU, IdType>(csr, new_row_ids, new_col_ids);
});
return ret;
}
COOMatrix COOReorder(COOMatrix coo, runtime::NDArray new_row_ids, runtime::NDArray new_col_ids) {
COOMatrix ret;
ATEN_COO_SWITCH(coo, XPU, IdType, "COOReorder", {
ret = impl::COOReorder<XPU, IdType>(coo, new_row_ids, new_col_ids);
});
return ret;
}
CSRMatrix CSRRemove(CSRMatrix csr, IdArray entries) {
CSRMatrix ret;
ATEN_CSR_SWITCH(csr, XPU, IdType, "CSRRemove", {
......
......@@ -113,6 +113,12 @@ CSRMatrix CSRSliceMatrix(CSRMatrix csr, runtime::NDArray rows, runtime::NDArray
template <DLDeviceType XPU, typename IdType>
void CSRSort_(CSRMatrix* csr);
template <DLDeviceType XPU, typename IdType>
CSRMatrix CSRReorder(CSRMatrix csr, runtime::NDArray new_row_ids, runtime::NDArray new_col_ids);
template <DLDeviceType XPU, typename IdType>
COOMatrix COOReorder(COOMatrix coo, runtime::NDArray new_row_ids, runtime::NDArray new_col_ids);
template <DLDeviceType XPU, typename IdType>
CSRMatrix CSRRemove(CSRMatrix csr, IdArray entries);
......
......@@ -556,6 +556,8 @@ template CSRMatrix CSRSliceMatrix<kDLCPU, int32_t>(
template CSRMatrix CSRSliceMatrix<kDLCPU, int64_t>(
CSRMatrix csr, runtime::NDArray rows, runtime::NDArray cols);
///////////////////////////// CSRSort /////////////////////////////
template <DLDeviceType XPU, typename IdType>
void CSRSort_(CSRMatrix* csr) {
typedef std::pair<IdType, IdType> ShufflePair;
......@@ -597,6 +599,82 @@ void CSRSort_(CSRMatrix* csr) {
template void CSRSort_<kDLCPU, int64_t>(CSRMatrix* csr);
template void CSRSort_<kDLCPU, int32_t>(CSRMatrix* csr);
///////////////////////////// CSRReorder /////////////////////////////
template <DLDeviceType XPU, typename IdType>
CSRMatrix CSRReorder(CSRMatrix csr, runtime::NDArray new_row_id_arr,
runtime::NDArray new_col_id_arr) {
CHECK_SAME_DTYPE(csr.indices, new_row_id_arr);
CHECK_SAME_DTYPE(csr.indices, new_col_id_arr);
// Input CSR
const IdType* in_indptr = static_cast<IdType*>(csr.indptr->data);
const IdType* in_indices = static_cast<IdType*>(csr.indices->data);
const IdType* in_data = static_cast<IdType*>(csr.data->data);
int64_t num_rows = csr.num_rows;
int64_t num_cols = csr.num_cols;
int64_t nnz = csr.indices->shape[0];
CHECK_EQ(nnz, in_indptr[num_rows]);
CHECK_EQ(num_rows, new_row_id_arr->shape[0])
<< "The new row Id array needs to be the same as the number of rows of CSR";
CHECK_EQ(num_cols, new_col_id_arr->shape[0])
<< "The new col Id array needs to be the same as the number of cols of CSR";
// New row/col Ids.
const IdType* new_row_ids = static_cast<IdType*>(new_row_id_arr->data);
const IdType* new_col_ids = static_cast<IdType*>(new_col_id_arr->data);
// Output CSR
NDArray out_indptr_arr = NDArray::Empty({num_rows + 1}, csr.indptr->dtype, csr.indptr->ctx);
NDArray out_indices_arr = NDArray::Empty({nnz}, csr.indices->dtype, csr.indices->ctx);
NDArray out_data_arr = NDArray::Empty({nnz}, csr.data->dtype, csr.data->ctx);
IdType *out_indptr = static_cast<IdType*>(out_indptr_arr->data);
IdType *out_indices = static_cast<IdType*>(out_indices_arr->data);
IdType *out_data = static_cast<IdType*>(out_data_arr->data);
// Compute the length of rows for the new matrix.
std::vector<IdType> new_row_lens(num_rows, -1);
#pragma omp parallel for
for (int64_t i = 0; i < num_rows; i++) {
int64_t new_row_id = new_row_ids[i];
new_row_lens[new_row_id] = in_indptr[i + 1] - in_indptr[i];
}
// Compute the starting location of each row in the new matrix.
out_indptr[0] = 0;
// This is sequential. It should be pretty fast.
for (int64_t i = 0; i < num_rows; i++) {
CHECK_GE(new_row_lens[i], 0);
out_indptr[i + 1] = out_indptr[i] + new_row_lens[i];
}
CHECK_EQ(out_indptr[num_rows], nnz);
// Copy indieces and data with the new order.
// Here I iterate rows in the order of the old matrix.
#pragma omp parallel for
for (int64_t i = 0; i < num_rows; i++) {
const IdType *in_row = in_indices + in_indptr[i];
const IdType *in_row_data = in_data + in_indptr[i];
int64_t new_row_id = new_row_ids[i];
IdType *out_row = out_indices + out_indptr[new_row_id];
IdType *out_row_data = out_data + out_indptr[new_row_id];
int64_t row_len = new_row_lens[new_row_id];
// Here I iterate col indices in a row in the order of the old matrix.
for (int64_t j = 0; j < row_len; j++) {
out_row[j] = new_col_ids[in_row[j]];
out_row_data[j] = in_row_data[j];
}
// TODO(zhengda) maybe we should sort the column indices.
}
return CSRMatrix(num_rows, num_cols,
out_indptr_arr, out_indices_arr, out_data_arr);
}
template CSRMatrix CSRReorder<kDLCPU, int64_t>(CSRMatrix csr, runtime::NDArray new_row_ids,
runtime::NDArray new_col_ids);
template CSRMatrix CSRReorder<kDLCPU, int32_t>(CSRMatrix csr, runtime::NDArray new_row_ids,
runtime::NDArray new_col_ids);
} // namespace impl
} // namespace aten
} // namespace dgl
......@@ -427,6 +427,51 @@ template COOMatrix COOSliceMatrix<kDLCPU, int32_t>(
template COOMatrix COOSliceMatrix<kDLCPU, int64_t>(
COOMatrix coo, runtime::NDArray rows, runtime::NDArray cols);
///////////////////////////// COOReorder /////////////////////////////
template <DLDeviceType XPU, typename IdType>
COOMatrix COOReorder(COOMatrix coo, runtime::NDArray new_row_id_arr,
runtime::NDArray new_col_id_arr) {
CHECK_SAME_DTYPE(coo.row, new_row_id_arr);
CHECK_SAME_DTYPE(coo.col, new_col_id_arr);
// Input COO
const IdType* in_rows = static_cast<IdType*>(coo.row->data);
const IdType* in_cols = static_cast<IdType*>(coo.col->data);
const IdType* in_data = COOHasData(coo) ? static_cast<IdType*>(coo.data->data) : nullptr;
int64_t num_rows = coo.num_rows;
int64_t num_cols = coo.num_cols;
int64_t nnz = coo.row->shape[0];
CHECK_EQ(num_rows, new_row_id_arr->shape[0])
<< "The new row Id array needs to be the same as the number of rows of COO";
CHECK_EQ(num_cols, new_col_id_arr->shape[0])
<< "The new col Id array needs to be the same as the number of cols of COO";
// New row/col Ids.
const IdType* new_row_ids = static_cast<IdType*>(new_row_id_arr->data);
const IdType* new_col_ids = static_cast<IdType*>(new_col_id_arr->data);
// Output COO
NDArray out_row_arr = NDArray::Empty({nnz}, coo.row->dtype, coo.row->ctx);
NDArray out_col_arr = NDArray::Empty({nnz}, coo.col->dtype, coo.col->ctx);
NDArray out_data_arr = COOHasData(coo) ? coo.data : NullArray();
IdType *out_row = static_cast<IdType*>(out_row_arr->data);
IdType *out_col = static_cast<IdType*>(out_col_arr->data);
#pragma omp parallel for
for (int64_t i = 0; i < nnz; i++) {
out_row[i] = new_row_ids[in_rows[i]];
out_col[i] = new_col_ids[in_cols[i]];
}
return COOMatrix(num_rows, num_cols, out_row_arr, out_col_arr, out_data_arr);
}
template COOMatrix COOReorder<kDLCPU, int64_t>(COOMatrix csr, runtime::NDArray new_row_ids,
runtime::NDArray new_col_ids);
template COOMatrix COOReorder<kDLCPU, int32_t>(COOMatrix csr, runtime::NDArray new_row_ids,
runtime::NDArray new_col_ids);
} // namespace impl
} // namespace aten
} // namespace dgl
......@@ -403,7 +403,13 @@ GraphPtr GraphOp::ToBidirectedImmutableGraph(GraphPtr g) {
HaloSubgraph GraphOp::GetSubgraphWithHalo(GraphPtr g, IdArray nodes, int num_hops) {
const dgl_id_t *nid = static_cast<dgl_id_t *>(nodes->data);
const auto id_len = nodes->shape[0];
// A map contains all nodes in the subgraph.
// The key is the old node Ids, the value indicates whether a node is a inner
// node.
std::unordered_map<dgl_id_t, bool> all_nodes;
// The old Ids of all nodes. We want to preserve the order of the nodes in the
// vector. The first few nodes are the inner nodes in the subgraph.
std::vector<dgl_id_t> old_node_ids(nid, nid + id_len);
std::vector<std::vector<dgl_id_t>> outer_nodes(num_hops);
for (int64_t i = 0; i < id_len; i++)
all_nodes[nid[i]] = true;
......@@ -436,6 +442,7 @@ HaloSubgraph GraphOp::GetSubgraphWithHalo(GraphPtr g, IdArray nodes, int num_hop
auto it = all_nodes.find(src_data[i]);
if (it == all_nodes.end() && num_hops > 0) {
all_nodes[src_data[i]] = false;
old_node_ids.push_back(src_data[i]);
outer_nodes[0].push_back(src_data[i]);
}
}
......@@ -461,22 +468,14 @@ HaloSubgraph GraphOp::GetSubgraphWithHalo(GraphPtr g, IdArray nodes, int num_hop
auto it = all_nodes.find(src_data[i]);
if (it == all_nodes.end()) {
all_nodes[src_data[i]] = false;
old_node_ids.push_back(src_data[i]);
outer_nodes[k].push_back(src_data[i]);
}
}
}
// We assign new Ids to the nodes in the subgraph. We ensure that nodes
// with smaller Ids in the original graph will also get smaller Ids in
// the subgraph.
// Move all nodes to a vector.
std::vector<dgl_id_t> old_node_ids;
old_node_ids.reserve(all_nodes.size());
for (auto it = all_nodes.begin(); it != all_nodes.end(); it++) {
old_node_ids.push_back(it->first);
}
std::sort(old_node_ids.begin(), old_node_ids.end());
// We assign new Ids to the nodes in the subgraph. We ensure that the HALO
// nodes are behind the input nodes.
std::unordered_map<dgl_id_t, dgl_id_t> old2new;
for (size_t i = 0; i < old_node_ids.size(); i++) {
old2new[old_node_ids[i]] = i;
......@@ -508,6 +507,32 @@ HaloSubgraph GraphOp::GetSubgraphWithHalo(GraphPtr g, IdArray nodes, int num_hop
return halo_subg;
}
GraphPtr GraphOp::ReorderImmutableGraph(ImmutableGraphPtr ig, IdArray new_order) {
CSRPtr in_csr, out_csr;
COOPtr coo;
// We only need to reorder one of the graph structure.
if (ig->HasInCSR()) {
in_csr = ig->GetInCSR();
auto csrmat = in_csr->ToCSRMatrix();
auto new_csrmat = aten::CSRReorder(csrmat, new_order, new_order);
in_csr = CSRPtr(new CSR(new_csrmat.indptr, new_csrmat.indices, new_csrmat.data));
} else if (ig->HasOutCSR()) {
out_csr = ig->GetOutCSR();
auto csrmat = out_csr->ToCSRMatrix();
auto new_csrmat = aten::CSRReorder(csrmat, new_order, new_order);
out_csr = CSRPtr(new CSR(new_csrmat.indptr, new_csrmat.indices, new_csrmat.data));
} else {
coo = ig->GetCOO();
auto coomat = coo->ToCOOMatrix();
auto new_coomat = aten::COOReorder(coomat, new_order, new_order);
coo = COOPtr(new COO(ig->NumVertices(), new_coomat.row, new_coomat.col));
}
if (in_csr || out_csr)
return GraphPtr(new ImmutableGraph(in_csr, out_csr));
else
return GraphPtr(new ImmutableGraph(coo));
}
DGL_REGISTER_GLOBAL("transform._CAPI_DGLPartitionWithHalo")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
GraphRef graph = args[0];
......@@ -537,6 +562,7 @@ DGL_REGISTER_GLOBAL("transform._CAPI_DGLPartitionWithHalo")
part_nodes.push_back(it->second);
}
auto graph_ptr = std::dynamic_pointer_cast<ImmutableGraph>(graph.sptr());
CHECK(graph_ptr) << "The input graph has to be an immutable graph";
// When we construct subgraphs, we only access in-edges.
// We need to make sure the in-CSR exists. Otherwise, we'll
// try to construct in-CSR in openmp for loop, which will lead
......@@ -573,6 +599,7 @@ DGL_REGISTER_GLOBAL("graph_index._CAPI_GetHaloSubgraphInnerNodes")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
SubgraphRef g = args[0];
auto gptr = std::dynamic_pointer_cast<HaloSubgraph>(g.sptr());
CHECK(gptr) << "The input graph has to be immutable graph";
*rv = gptr->inner_nodes;
});
......@@ -580,6 +607,7 @@ DGL_REGISTER_GLOBAL("graph_index._CAPI_GetHaloSubgraphInnerEdges")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
SubgraphRef g = args[0];
auto gptr = std::dynamic_pointer_cast<HaloSubgraph>(g.sptr());
CHECK(gptr) << "The input graph has to be immutable graph";
*rv = gptr->inner_edges;
});
......@@ -642,6 +670,42 @@ DGL_REGISTER_GLOBAL("transform._CAPI_DGLToBidirectedMutableGraph")
*rv = GraphOp::ToBidirectedMutableGraph(g.sptr());
});
DGL_REGISTER_GLOBAL("transform._CAPI_DGLReorderGraph")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
GraphRef g = args[0];
const IdArray new_order = args[1];
auto gptr = std::dynamic_pointer_cast<ImmutableGraph>(g.sptr());
CHECK(gptr) << "The input graph has to be immutable graph";
*rv = GraphOp::ReorderImmutableGraph(gptr, new_order);
});
DGL_REGISTER_GLOBAL("transform._CAPI_DGLReassignEdges")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
GraphRef graph = args[0];
bool is_incsr = args[1];
auto gptr = std::dynamic_pointer_cast<ImmutableGraph>(graph.sptr());
CHECK(gptr) << "We can only reassign edge Ids on immutable graphs";
CSRPtr csr = is_incsr ? gptr->GetInCSR() : gptr->GetOutCSR();
auto csrmat = csr->ToCSRMatrix();
int64_t num_edges = csrmat.data->shape[0];
IdArray new_data = IdArray::Empty({num_edges}, csrmat.data->dtype, csrmat.data->ctx);
// Return the original edge Ids.
*rv = new_data;
// TODO(zhengda) I need to invalidate out-CSR and COO.
// Generate new edge Ids.
// TODO(zhengda) after assignment, we actually don't need to store them
// physically.
ATEN_ID_TYPE_SWITCH(new_data->dtype, IdType, {
IdType *typed_new_data = static_cast<IdType*>(new_data->data);
IdType *typed_data = static_cast<IdType*>(csrmat.data->data);
for (int64_t i = 0; i < num_edges; i++) {
typed_new_data[i] = typed_data[i];
typed_data[i] = i;
}
});
});
DGL_REGISTER_GLOBAL("transform._CAPI_DGLToBidirectedImmutableGraph")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
GraphRef g = args[0];
......
......@@ -244,23 +244,35 @@ def test_partition_with_halo():
@unittest.skipIf(F._default_context_str == 'gpu', reason="METIS doesn't support GPU")
def test_metis_partition():
# TODO(zhengda) Metis fails to partition a small graph.
g = dgl.DGLGraph(create_large_graph_index(1000), readonly=True)
subgs = dgl.transform.metis_partition(g, 4, 0)
check_metis_partition(g, 0)
check_metis_partition(g, 1)
check_metis_partition(g, 2)
def check_metis_partition(g, extra_hops):
# partitions with 1-hop HALO nodes
subgs = dgl.transform.metis_partition(g, 4, extra_cached_hops=extra_hops)
num_inner_nodes = 0
num_inner_edges = 0
if subgs is not None:
for part_id, subg in subgs.items():
assert np.all(F.asnumpy(subg.ndata['inner_node']) == 1)
assert np.all(F.asnumpy(subg.edata['inner_edge']) == 1)
assert np.all(F.asnumpy(subg.ndata['part_id']) == part_id)
num_inner_nodes += subg.number_of_nodes()
num_inner_edges += subg.number_of_edges()
lnode_ids = np.nonzero(F.asnumpy(subg.ndata['inner_node']))[0]
ledge_ids = np.nonzero(F.asnumpy(subg.edata['inner_edge']))[0]
num_inner_nodes += len(lnode_ids)
num_inner_edges += len(ledge_ids)
assert np.sum(F.asnumpy(subg.ndata['part_id']) == part_id) == len(lnode_ids)
assert num_inner_nodes == g.number_of_nodes()
print(g.number_of_edges() - num_inner_edges)
subgs = dgl.transform.metis_partition(g, 4, 1)
if extra_hops == 0:
return
# partitions with 1-hop HALO nodes and reshuffling nodes
subgs = dgl.transform.metis_partition(g, 4, extra_cached_hops=extra_hops, reshuffle=True)
num_inner_nodes = 0
num_inner_edges = 0
edge_cnts = np.zeros((g.number_of_edges(),))
if subgs is not None:
for part_id, subg in subgs.items():
lnode_ids = np.nonzero(F.asnumpy(subg.ndata['inner_node']))[0]
......@@ -268,9 +280,61 @@ def test_metis_partition():
num_inner_nodes += len(lnode_ids)
num_inner_edges += len(ledge_ids)
assert np.sum(F.asnumpy(subg.ndata['part_id']) == part_id) == len(lnode_ids)
nids = F.asnumpy(subg.ndata[dgl.NID])
# ensure the local node Ids are contiguous.
parent_ids = F.asnumpy(subg.ndata[dgl.NID])
parent_ids = parent_ids[:len(lnode_ids)]
assert np.all(parent_ids == np.arange(parent_ids[0], parent_ids[-1] + 1))
# count the local edges.
parent_ids = F.asnumpy(subg.edata[dgl.EID])[ledge_ids]
edge_cnts[parent_ids] += 1
orig_ids = subg.ndata['orig_id']
inner_node = F.asnumpy(subg.ndata['inner_node'])
for nid in range(subg.number_of_nodes()):
neighs = subg.predecessors(nid)
old_neighs1 = F.gather_row(orig_ids, neighs)
old_nid = F.asnumpy(orig_ids[nid])
old_neighs2 = g.predecessors(old_nid)
# If this is an inner node, it should have the full neighborhood.
if inner_node[nid]:
assert np.all(np.sort(F.asnumpy(old_neighs1)) == np.sort(F.asnumpy(old_neighs2)))
# Normally, local edges are only counted once.
assert np.all(edge_cnts == 1)
assert num_inner_nodes == g.number_of_nodes()
print(g.number_of_edges() - num_inner_edges)
@unittest.skipIf(F._default_context_str == 'gpu', reason="It doesn't support GPU")
def test_reorder_nodes():
g = dgl.DGLGraph(create_large_graph_index(1000), readonly=True)
new_nids = np.random.permutation(g.number_of_nodes())
# TODO(zhengda) we need to test both CSR and COO.
new_g = dgl.transform.reorder_nodes(g, new_nids)
new_in_deg = new_g.in_degrees()
new_out_deg = new_g.out_degrees()
in_deg = g.in_degrees()
out_deg = g.out_degrees()
new_in_deg1 = F.scatter_row(in_deg, F.tensor(new_nids), in_deg)
new_out_deg1 = F.scatter_row(out_deg, F.tensor(new_nids), out_deg)
assert np.all(F.asnumpy(new_in_deg == new_in_deg1))
assert np.all(F.asnumpy(new_out_deg == new_out_deg1))
orig_ids = F.asnumpy(new_g.ndata['orig_id'])
for nid in range(new_g.number_of_nodes()):
neighs = F.asnumpy(new_g.successors(nid))
old_neighs1 = orig_ids[neighs]
old_nid = orig_ids[nid]
old_neighs2 = g.successors(old_nid)
assert np.all(np.sort(old_neighs1) == np.sort(F.asnumpy(old_neighs2)))
neighs = F.asnumpy(new_g.predecessors(nid))
old_neighs1 = orig_ids[neighs]
old_nid = orig_ids[nid]
old_neighs2 = g.predecessors(old_nid)
assert np.all(np.sort(old_neighs1) == np.sort(F.asnumpy(old_neighs2)))
@unittest.skipIf(F._default_context_str == 'gpu', reason="GPU not implemented")
@parametrize_dtype
def test_in_subgraph(index_dtype):
......@@ -615,6 +679,7 @@ def test_cast():
assert F.array_equal(g2dst, gdst)
if __name__ == '__main__':
test_reorder_nodes()
# test_line_graph()
# test_no_backtracking()
# test_reverse()
......@@ -627,7 +692,7 @@ if __name__ == '__main__':
# test_remove_self_loop()
# test_add_self_loop()
# test_partition_with_halo()
# test_metis_partition()
test_metis_partition()
# test_compact()
# test_to_simple()
# test_in_subgraph("int32")
......
......@@ -480,3 +480,37 @@ TEST(SpmatTest, TestCOOSort) {
_TestCOOSort<int32_t>(GPU);
#endif
}
template <typename IDX>
void _TestCSRReorder() {
auto csr = CSR2<IDX>();
auto new_row = aten::VecToIdArray(
std::vector<IDX>({2, 0, 3, 1}), sizeof(IDX)*8, CTX);
auto new_col = aten::VecToIdArray(
std::vector<IDX>({2, 0, 4, 3, 1}), sizeof(IDX)*8, CTX);
auto new_csr = CSRReorder(csr, new_row, new_col);
ASSERT_EQ(new_csr.num_rows, csr.num_rows);
ASSERT_EQ(new_csr.num_cols, csr.num_cols);
}
TEST(SpmatTest, TestCSRReorder) {
_TestCSRReorder<int32_t>();
_TestCSRReorder<int64_t>();
}
template <typename IDX>
void _TestCOOReorder() {
auto coo = COO2<IDX>();
auto new_row = aten::VecToIdArray(
std::vector<IDX>({2, 0, 3, 1}), sizeof(IDX)*8, CTX);
auto new_col = aten::VecToIdArray(
std::vector<IDX>({2, 0, 4, 3, 1}), sizeof(IDX)*8, CTX);
auto new_coo = COOReorder(coo, new_row, new_col);
ASSERT_EQ(new_coo.num_rows, coo.num_rows);
ASSERT_EQ(new_coo.num_cols, coo.num_cols);
}
TEST(SpmatTest, TestCOOReorder) {
_TestCOOReorder<int32_t>();
_TestCOOReorder<int64_t>();
}
......@@ -51,15 +51,13 @@ def create_random_graph(n):
ig = create_graph_index(arr, readonly=True)
return dgl.DGLGraph(ig)
def run_server(graph_name, server_id, num_clients, barrier):
def run_server(graph_name, server_id, num_clients):
g = DistGraphServer(server_id, "kv_ip_config.txt", num_clients, graph_name,
'/tmp/dist_graph/{}.json'.format(graph_name))
barrier.wait()
print('start server', server_id)
g.start()
def run_client(graph_name, barrier, num_nodes, num_edges):
barrier.wait()
def run_client(graph_name, num_nodes, num_edges):
g = DistGraph("kv_ip_config.txt", graph_name)
# Test API
......@@ -132,19 +130,18 @@ def test_server_client():
# let's just test on one partition for now.
# We cannot run multiple servers and clients on the same machine.
barrier = mp.Barrier(2)
serv_ps = []
ctx = mp.get_context('spawn')
for serv_id in range(1):
p = ctx.Process(target=run_server, args=(graph_name, serv_id, 1, barrier))
p = ctx.Process(target=run_server, args=(graph_name, serv_id, 1))
serv_ps.append(p)
p.start()
cli_ps = []
for cli_id in range(1):
print('start client', cli_id)
p = ctx.Process(target=run_client, args=(graph_name, barrier, g.number_of_nodes(),
g.number_of_edges()))
p = ctx.Process(target=run_client, args=(graph_name, g.number_of_nodes(),
g.number_of_edges()))
p.start()
cli_ps.append(p)
......@@ -168,14 +165,8 @@ def test_split():
selected_nodes = np.nonzero(node_mask)[0]
selected_edges = np.nonzero(edge_mask)[0]
for i in range(num_parts):
part_g, node_feats, edge_feats, meta = load_partition('/tmp/dist_graph/dist_graph_test.json', i)
num_nodes, num_edges, node_map, edge_map, num_partitions = meta
gpb = GraphPartitionBook(part_id=i,
num_parts=num_partitions,
node_map=node_map,
edge_map=edge_map,
part_graph=part_g)
local_nids = F.nonzero_1d(part_g.ndata['local_node'])
part_g, node_feats, edge_feats, gpb = load_partition('/tmp/dist_graph/dist_graph_test.json', i)
local_nids = F.nonzero_1d(part_g.ndata['inner_node'])
local_nids = F.gather_row(part_g.ndata[dgl.NID], local_nids)
nodes1 = np.intersect1d(selected_nodes, F.asnumpy(local_nids))
nodes2 = node_split(node_mask, gpb, i)
......@@ -184,7 +175,7 @@ def test_split():
for n in nodes1:
assert n in local_nids
local_eids = F.nonzero_1d(part_g.edata['local_edge'])
local_eids = F.nonzero_1d(part_g.edata['inner_edge'])
local_eids = F.gather_row(part_g.edata[dgl.EID], local_eids)
edges1 = np.intersect1d(selected_edges, F.asnumpy(local_eids))
edges2 = edge_split(edge_mask, gpb, i)
......@@ -200,6 +191,6 @@ def prepare_dist():
ip_config.close()
if __name__ == '__main__':
os.mkdir('/tmp/dist_graph')
test_split()
#test_server_client()
os.makedirs('/tmp/dist_graph', exist_ok=True)
#test_split()
test_server_client()
import dgl
import sys
import os
import numpy as np
from scipy import sparse as spsp
from numpy.testing import assert_array_equal
from dgl.graph_index import create_graph_index
from dgl.distributed import partition_graph, load_partition, GraphPartitionBook
import backend as F
import unittest
import pickle
def create_random_graph(n):
arr = (spsp.random(n, n, density=0.001, format='coo') != 0).astype(np.int64)
ig = create_graph_index(arr, readonly=True)
return dgl.DGLGraph(ig)
@unittest.skipIf(F._default_context_str == 'gpu', reason="METIS doesn't support GPU")
def test_graph_partition_book():
g = create_random_graph(10000)
g.ndata['labels'] = F.arange(0, g.number_of_nodes())
g.ndata['feats'] = F.tensor(np.random.randn(g.number_of_nodes(), 10))
num_parts = 4
num_hops = 2
partition_graph(g, 'gpb_test', num_parts, '/tmp/gpb', num_hops=num_hops, part_method='metis')
for i in range(num_parts):
part_g, node_feats, edge_feats, meta = load_partition('/tmp/gpb/gpb_test.json', i)
num_nodes, num_edges, node_map, edge_map, num_partitions = meta
gpb = GraphPartitionBook(part_id=i,
num_parts=num_partitions,
node_map=node_map,
edge_map=edge_map,
part_graph=part_g)
assert gpb.num_partitions() == num_parts
gpb_meta = gpb.metadata()
assert len(gpb_meta) == num_parts
assert np.all(F.asnumpy(gpb.nid2partid(F.arange(0, len(node_map)))) == node_map)
assert np.all(F.asnumpy(gpb.eid2partid(F.arange(0, len(edge_map)))) == edge_map)
assert len(gpb.partid2nids(i)) == gpb_meta[i]['num_nodes']
assert len(gpb.partid2eids(i)) == gpb_meta[i]['num_edges']
local_nid = gpb.nid2localnid(part_g.ndata[dgl.NID], i)
assert np.all(F.asnumpy(local_nid) == F.asnumpy(F.arange(0, len(local_nid))))
local_eid = gpb.eid2localeid(part_g.edata[dgl.EID], i)
assert np.all(F.asnumpy(local_eid) == F.asnumpy(F.arange(0, len(local_eid))))
if __name__ == '__main__':
os.mkdir('/tmp/gpb')
test_graph_partition_book()
......@@ -17,34 +17,44 @@ def create_random_graph(n):
ig = create_graph_index(arr, readonly=True)
return dgl.DGLGraph(ig)
def test_partition():
def check_partition(reshuffle):
g = create_random_graph(10000)
g.ndata['labels'] = F.arange(0, g.number_of_nodes())
g.ndata['feats'] = F.tensor(np.random.randn(g.number_of_nodes(), 10))
num_parts = 4
num_hops = 2
partition_graph(g, 'test', num_parts, '/tmp/partition', num_hops=num_hops, part_method='metis')
partition_graph(g, 'test', num_parts, '/tmp/partition', num_hops=num_hops,
part_method='metis', reshuffle=reshuffle)
part_sizes = []
for i in range(num_parts):
part_g, node_feats, edge_feats, meta = load_partition('/tmp/partition/test.json', i)
num_nodes, num_edges, node_map, edge_map, num_partitions = meta
part_g, node_feats, edge_feats, gpb = load_partition('/tmp/partition/test.json', i)
# Check the metadata
assert num_nodes == g.number_of_nodes()
assert num_edges == g.number_of_edges()
assert num_partitions == num_parts
assert gpb._num_nodes() == g.number_of_nodes()
assert gpb._num_edges() == g.number_of_edges()
assert gpb.num_partitions() == num_parts
gpb_meta = gpb.metadata()
assert len(gpb_meta) == num_parts
assert len(gpb.partid2nids(i)) == gpb_meta[i]['num_nodes']
assert len(gpb.partid2eids(i)) == gpb_meta[i]['num_edges']
part_sizes.append((gpb_meta[i]['num_nodes'], gpb_meta[i]['num_edges']))
local_nid = gpb.nid2localnid(F.boolean_mask(part_g.ndata[dgl.NID], part_g.ndata['inner_node']), i)
assert np.all(F.asnumpy(local_nid) == np.arange(0, len(local_nid)))
local_eid = gpb.eid2localeid(F.boolean_mask(part_g.edata[dgl.EID], part_g.edata['inner_edge']), i)
assert np.all(F.asnumpy(local_eid) == np.arange(0, len(local_eid)))
# Check the node map.
local_nodes = np.nonzero(node_map == i)[0]
part_ids = node_map[F.asnumpy(part_g.ndata[dgl.NID])]
local_nodes1 = F.asnumpy(part_g.ndata[dgl.NID])[part_ids == i]
assert np.all(local_nodes == local_nodes1)
local_nodes = F.asnumpy(F.boolean_mask(part_g.ndata[dgl.NID], part_g.ndata['inner_node']))
local_nodes1 = F.asnumpy(gpb.partid2nids(i))
assert np.all(np.sort(local_nodes) == np.sort(local_nodes1))
# Check the edge map.
assert np.all(edge_map >= 0)
local_edges = np.nonzero(edge_map == i)[0]
part_ids = edge_map[F.asnumpy(part_g.edata[dgl.EID])]
local_edges1 = F.asnumpy(part_g.edata[dgl.EID])[part_ids == i]
assert np.all(local_edges == np.sort(local_edges1))
local_edges = F.asnumpy(F.boolean_mask(part_g.edata[dgl.EID], part_g.edata['inner_edge']))
local_edges1 = F.asnumpy(gpb.partid2eids(i))
assert np.all(np.sort(local_edges) == np.sort(local_edges1))
for name in ['labels', 'feats']:
assert name in node_feats
......@@ -53,7 +63,22 @@ def test_partition():
assert np.all(F.asnumpy(g.ndata[name])[local_nodes] == F.asnumpy(node_feats[name]))
assert len(edge_feats) == 0
if reshuffle:
node_map = []
edge_map = []
for i, (num_nodes, num_edges) in enumerate(part_sizes):
node_map.append(np.ones(num_nodes) * i)
edge_map.append(np.ones(num_edges) * i)
node_map = np.concatenate(node_map)
edge_map = np.concatenate(edge_map)
assert np.all(F.asnumpy(gpb.nid2partid(F.arange(0, len(node_map)))) == node_map)
assert np.all(F.asnumpy(gpb.eid2partid(F.arange(0, len(edge_map)))) == edge_map)
def test_partition():
check_partition(True)
check_partition(False)
if __name__ == '__main__':
os.mkdir('/tmp/partition')
os.makedirs('/tmp/partition', exist_ok=True)
test_partition()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment