Unverified Commit 39987bc5 authored by Rhett Ying's avatar Rhett Ying Committed by GitHub
Browse files

[Feature] enable graph partition book support canonical etypes (#4343)

* [Feature] enable graph partition book support canonical etypes

* fix lint

* fix lint

* add todo

* refine according to review comments

* fix lint

* refine naming

* revert PartitionPolicy __init__

* refine docstring

* fix doc string
parent 3685000a
...@@ -84,7 +84,7 @@ Graph partition book ...@@ -84,7 +84,7 @@ Graph partition book
.. currentmodule:: dgl.distributed.graph_partition_book .. currentmodule:: dgl.distributed.graph_partition_book
.. autoclass:: GraphPartitionBook .. autoclass:: GraphPartitionBook
:members: shared_memory, num_partitions, metadata, nid2partid, eid2partid, partid2nids, partid2eids, nid2localnid, eid2localeid, partid, map_to_per_ntype, map_to_per_etype, map_to_homo_nid, map_to_homo_eid :members: shared_memory, num_partitions, metadata, nid2partid, eid2partid, partid2nids, partid2eids, nid2localnid, eid2localeid, partid, map_to_per_ntype, map_to_per_etype, map_to_homo_nid, map_to_homo_eid, canonical_etypes
.. autoclass:: PartitionPolicy .. autoclass:: PartitionPolicy
:members: policy_str, part_id, partition_book, to_local, to_partid, get_part_size, get_size :members: policy_str, part_id, partition_book, to_local, to_partid, get_part_size, get_size
......
...@@ -5,3 +5,6 @@ MAX_QUEUE_SIZE = 20*1024*1024*1024 ...@@ -5,3 +5,6 @@ MAX_QUEUE_SIZE = 20*1024*1024*1024
SERVER_EXIT = "server_exit" SERVER_EXIT = "server_exit"
SERVER_KEEP_ALIVE = "server_keep_alive" SERVER_KEEP_ALIVE = "server_keep_alive"
DEFAULT_NTYPE = '_N'
DEFAULT_ETYPE = (DEFAULT_NTYPE, '_E', DEFAULT_NTYPE)
...@@ -2,16 +2,25 @@ ...@@ -2,16 +2,25 @@
import pickle import pickle
from abc import ABC from abc import ABC
from ast import literal_eval
import numpy as np import numpy as np
from .. import backend as F from .. import backend as F
from ..base import NID, EID from ..base import NID, EID, DGLError, dgl_warning
from .. import utils from .. import utils
from .shared_mem_utils import _to_shared_mem, _get_ndata_path, _get_edata_path, DTYPE_DICT from .shared_mem_utils import _to_shared_mem, _get_ndata_path, _get_edata_path, DTYPE_DICT
from .._ffi.ndarray import empty_shared_mem from .._ffi.ndarray import empty_shared_mem
from ..ndarray import exist_shared_mem_array from ..ndarray import exist_shared_mem_array
from ..partition import NDArrayPartition from ..partition import NDArrayPartition
from .id_map import IdMap from .id_map import IdMap
from .constants import DEFAULT_NTYPE, DEFAULT_ETYPE
def _str_to_tuple(s):
try:
ret = literal_eval(s)
except ValueError:
ret = s
return ret
def _move_metadata_to_shared_mem(graph_name, num_nodes, num_edges, part_id, def _move_metadata_to_shared_mem(graph_name, num_nodes, num_edges, part_id,
num_partitions, node_map, edge_map, is_range_part): num_partitions, node_map, edge_map, is_range_part):
...@@ -270,7 +279,7 @@ class GraphPartitionBook(ABC): ...@@ -270,7 +279,7 @@ class GraphPartitionBook(ABC):
---------- ----------
eids : tensor eids : tensor
global edge IDs global edge IDs
etype : str etype : str or (str, str, str)
The edge type The edge type
Returns Returns
...@@ -302,7 +311,7 @@ class GraphPartitionBook(ABC): ...@@ -302,7 +311,7 @@ class GraphPartitionBook(ABC):
---------- ----------
partid : int partid : int
partition id partition id
etype : str etype : str or (str, str, str)
The edge type The edge type
Returns Returns
...@@ -338,7 +347,7 @@ class GraphPartitionBook(ABC): ...@@ -338,7 +347,7 @@ class GraphPartitionBook(ABC):
global edge IDs global edge IDs
partid : int partid : int
partition ID partition ID
etype : str etype : str or (str, str, str)
The edge type The edge type
Returns Returns
...@@ -367,6 +376,16 @@ class GraphPartitionBook(ABC): ...@@ -367,6 +376,16 @@ class GraphPartitionBook(ABC):
"""Get the list of edge types """Get the list of edge types
""" """
@property
def canonical_etypes(self):
"""Get the list of canonical edge types
Returns
-------
list[(str, str, str)]
A list of canonical etypes
"""
@property @property
def is_homogeneous(self): def is_homogeneous(self):
"""check if homogeneous """check if homogeneous
...@@ -424,7 +443,7 @@ class GraphPartitionBook(ABC): ...@@ -424,7 +443,7 @@ class GraphPartitionBook(ABC):
---------- ----------
ids : tensor ids : tensor
Type-wise edge Ids Type-wise edge Ids
etype : str etype : str or (str, str, str)
edge type edge type
Returns Returns
...@@ -528,16 +547,17 @@ class BasicPartitionBook(GraphPartitionBook): ...@@ -528,16 +547,17 @@ class BasicPartitionBook(GraphPartitionBook):
""" """
return self._partition_meta_data return self._partition_meta_data
def _num_nodes(self, ntype='_N'): def _num_nodes(self, ntype=DEFAULT_NTYPE):
""" The total number of nodes """ The total number of nodes
""" """
assert ntype == '_N', 'Base partition book only supports homogeneous graph.' assert ntype == DEFAULT_NTYPE, 'Base partition book only supports homogeneous graph.'
return len(self._nid2partid) return len(self._nid2partid)
def _num_edges(self, etype='_E'): def _num_edges(self, etype=DEFAULT_ETYPE):
""" The total number of edges """ The total number of edges
""" """
assert etype == '_E', 'Base partition book only supports homogeneous graph.' assert etype in (DEFAULT_ETYPE, DEFAULT_ETYPE[1]), \
'Base partition book only supports homogeneous graph.'
return len(self._eid2partid) return len(self._eid2partid)
def map_to_per_ntype(self, ids): def map_to_per_ntype(self, ids):
...@@ -554,55 +574,59 @@ class BasicPartitionBook(GraphPartitionBook): ...@@ -554,55 +574,59 @@ class BasicPartitionBook(GraphPartitionBook):
""" """
return F.zeros((len(ids),), F.int32, F.cpu()), ids return F.zeros((len(ids),), F.int32, F.cpu()), ids
def map_to_homo_nid(self, ids, ntype): def map_to_homo_nid(self, ids, ntype=DEFAULT_NTYPE):
"""Map per-node-type IDs to global node IDs in the homogeneous format. """Map per-node-type IDs to global node IDs in the homogeneous format.
""" """
assert ntype == '_N', 'Base partition book only supports homogeneous graph.' assert ntype == DEFAULT_NTYPE, 'Base partition book only supports homogeneous graph.'
return ids return ids
def map_to_homo_eid(self, ids, etype): def map_to_homo_eid(self, ids, etype=DEFAULT_ETYPE):
"""Map per-edge-type IDs to global edge IDs in the homoenegeous format. """Map per-edge-type IDs to global edge IDs in the homoenegeous format.
""" """
assert etype == '_E', 'Base partition book only supports homogeneous graph.' assert etype in (DEFAULT_ETYPE, DEFAULT_ETYPE[1]), \
'Base partition book only supports homogeneous graph.'
return ids return ids
def nid2partid(self, nids, ntype='_N'): def nid2partid(self, nids, ntype=DEFAULT_NTYPE):
"""From global node IDs to partition IDs """From global node IDs to partition IDs
""" """
assert ntype == '_N', 'Base partition book only supports homogeneous graph.' assert ntype == DEFAULT_NTYPE, 'Base partition book only supports homogeneous graph.'
return F.gather_row(self._nid2partid, nids) return F.gather_row(self._nid2partid, nids)
def eid2partid(self, eids, etype='_E'): def eid2partid(self, eids, etype=DEFAULT_ETYPE):
"""From global edge IDs to partition IDs """From global edge IDs to partition IDs
""" """
assert etype == '_E', 'Base partition book only supports homogeneous graph.' assert etype in (DEFAULT_ETYPE, DEFAULT_ETYPE[1]), \
'Base partition book only supports homogeneous graph.'
return F.gather_row(self._eid2partid, eids) return F.gather_row(self._eid2partid, eids)
def partid2nids(self, partid, ntype='_N'): def partid2nids(self, partid, ntype=DEFAULT_NTYPE):
"""From partition id to global node IDs """From partition id to global node IDs
""" """
assert ntype == '_N', 'Base partition book only supports homogeneous graph.' assert ntype == DEFAULT_NTYPE, 'Base partition book only supports homogeneous graph.'
return self._partid2nids[partid] return self._partid2nids[partid]
def partid2eids(self, partid, etype='_E'): def partid2eids(self, partid, etype=DEFAULT_ETYPE):
"""From partition id to global edge IDs """From partition id to global edge IDs
""" """
assert etype == '_E', 'Base partition book only supports homogeneous graph.' assert etype in (DEFAULT_ETYPE, DEFAULT_ETYPE[1]), \
'Base partition book only supports homogeneous graph.'
return self._partid2eids[partid] return self._partid2eids[partid]
def nid2localnid(self, nids, partid, ntype='_N'): def nid2localnid(self, nids, partid, ntype=DEFAULT_NTYPE):
"""Get local node IDs within the given partition. """Get local node IDs within the given partition.
""" """
assert ntype == '_N', 'Base partition book only supports homogeneous graph.' assert ntype == DEFAULT_NTYPE, 'Base partition book only supports homogeneous graph.'
if partid != self._part_id: if partid != self._part_id:
raise RuntimeError('Now GraphPartitionBook does not support \ raise RuntimeError('Now GraphPartitionBook does not support \
getting remote tensor of nid2localnid.') getting remote tensor of nid2localnid.')
return F.gather_row(self._nidg2l[partid], nids) return F.gather_row(self._nidg2l[partid], nids)
def eid2localeid(self, eids, partid, etype='_E'): def eid2localeid(self, eids, partid, etype=DEFAULT_ETYPE):
"""Get the local edge ids within the given partition. """Get the local edge ids within the given partition.
""" """
assert etype == '_E', 'Base partition book only supports homogeneous graph.' assert etype in (DEFAULT_ETYPE, DEFAULT_ETYPE[1]), \
'Base partition book only supports homogeneous graph.'
if partid != self._part_id: if partid != self._part_id:
raise RuntimeError('Now GraphPartitionBook does not support \ raise RuntimeError('Now GraphPartitionBook does not support \
getting remote tensor of eid2localeid.') getting remote tensor of eid2localeid.')
...@@ -618,13 +642,24 @@ class BasicPartitionBook(GraphPartitionBook): ...@@ -618,13 +642,24 @@ class BasicPartitionBook(GraphPartitionBook):
def ntypes(self): def ntypes(self):
"""Get the list of node types """Get the list of node types
""" """
return ['_N'] return [DEFAULT_NTYPE]
@property @property
def etypes(self): def etypes(self):
"""Get the list of edge types """Get the list of edge types
""" """
return ['_E'] return [DEFAULT_ETYPE[1]]
@property
def canonical_etypes(self):
"""Get the list of canonical edge types
Returns
-------
list[(str, str, str)]
A list of canonical etypes
"""
return [DEFAULT_ETYPE]
class RangePartitionBook(GraphPartitionBook): class RangePartitionBook(GraphPartitionBook):
...@@ -632,7 +667,8 @@ class RangePartitionBook(GraphPartitionBook): ...@@ -632,7 +667,8 @@ class RangePartitionBook(GraphPartitionBook):
This partition book is used if the nodes and edges of a graph partition are assigned This partition book is used if the nodes and edges of a graph partition are assigned
with contiguous IDs. It uses very small amount of memory to store the partition with contiguous IDs. It uses very small amount of memory to store the partition
information. information. Canonical etypes are availabe only when the keys of argument ``etypes``
are canonical etypes.
Parameters Parameters
---------- ----------
...@@ -646,7 +682,7 @@ class RangePartitionBook(GraphPartitionBook): ...@@ -646,7 +682,7 @@ class RangePartitionBook(GraphPartitionBook):
the number of partitions. Each row has two integers: the starting and the ending IDs the number of partitions. Each row has two integers: the starting and the ending IDs
for a particular node type in a partition. For example, all nodes of type ``"T"`` in for a particular node type in a partition. For example, all nodes of type ``"T"`` in
partition ``i`` has ID range ``node_map["T"][i][0]`` to ``node_map["T"][i][1]``. partition ``i`` has ID range ``node_map["T"][i][0]`` to ``node_map["T"][i][1]``.
edge_map : dict[str, Tensor] edge_map : dict[str, Tensor] or dict[(str, str, str), Tensor]
Global edge ID ranges within partitions for each edge type. The key is the edge type Global edge ID ranges within partitions for each edge type. The key is the edge type
name in string. The value is a tensor of shape :math:`(K, 2)`, where :math:`K` is name in string. The value is a tensor of shape :math:`(K, 2)`, where :math:`K` is
the number of partitions. Each row has two integers: the starting and the ending IDs the number of partitions. Each row has two integers: the starting and the ending IDs
...@@ -654,8 +690,13 @@ class RangePartitionBook(GraphPartitionBook): ...@@ -654,8 +690,13 @@ class RangePartitionBook(GraphPartitionBook):
partition ``i`` has ID range ``edge_map["T"][i][0]`` to ``edge_map["T"][i][1]``. partition ``i`` has ID range ``edge_map["T"][i][0]`` to ``edge_map["T"][i][1]``.
ntypes : dict[str, int] ntypes : dict[str, int]
map ntype strings to ntype IDs. map ntype strings to ntype IDs.
etypes : dict[str, int] etypes : dict[str, int] or dict[(str, str, str), int]
map etype strings to etype IDs. map etype strings to etype IDs.
.. deprecated:: 0.9.1
Single string format for keys of ``edge_map`` and ``etypes`` is deprecated.
``(str, str, str)`` will be the only format supported in the future.
""" """
def __init__(self, part_id, num_parts, node_map, edge_map, ntypes, etypes): def __init__(self, part_id, num_parts, node_map, edge_map, ntypes, etypes):
assert part_id >= 0, 'part_id cannot be a negative number.' assert part_id >= 0, 'part_id cannot be a negative number.'
...@@ -664,14 +705,34 @@ class RangePartitionBook(GraphPartitionBook): ...@@ -664,14 +705,34 @@ class RangePartitionBook(GraphPartitionBook):
self._num_partitions = num_parts self._num_partitions = num_parts
self._ntypes = [None] * len(ntypes) self._ntypes = [None] * len(ntypes)
self._etypes = [None] * len(etypes) self._etypes = [None] * len(etypes)
self._canonical_etypes = [None] * len(etypes)
# map etypes to canonical ones
self._etype2canonical = {}
for ntype in ntypes: for ntype in ntypes:
ntype_id = ntypes[ntype] ntype_id = ntypes[ntype]
self._ntypes[ntype_id] = ntype self._ntypes[ntype_id] = ntype
assert all(ntype is not None for ntype in self._ntypes), \ assert all(ntype is not None for ntype in self._ntypes), \
"The node types have invalid IDs." "The node types have invalid IDs."
for etype in etypes: for etype, etype_id in etypes.items():
etype_id = etypes[etype] if isinstance(etype, tuple):
self._etypes[etype_id] = etype assert len(etype) == 3, \
'Canonical etype should be in format of (str, str, str).'
c_etype = etype
etype = etype[1]
self._etypes[etype_id] = etype
self._canonical_etypes[etype_id] = c_etype
if etype in self._etype2canonical:
# If one etype maps to multiple canonical etypes, empty
# tuple is used to indicate such ambiguity casued by etype.
# See more details in self._to_canonical_etype().
self._etype2canonical[etype] = tuple()
else:
self._etype2canonical[etype] = c_etype
else:
dgl_warning(
"Etype with 'str' format is deprecated. Please use '(str, str, str)'.")
self._etypes[etype_id] = etype
self._canonical_etypes[etype_id] = None
assert all(etype is not None for etype in self._etypes), \ assert all(etype is not None for etype in self._etypes), \
"The edge types have invalid IDs." "The edge types have invalid IDs."
...@@ -686,6 +747,7 @@ class RangePartitionBook(GraphPartitionBook): ...@@ -686,6 +747,7 @@ class RangePartitionBook(GraphPartitionBook):
self._typed_max_node_ids = {} self._typed_max_node_ids = {}
max_node_map = np.zeros((num_parts,), dtype=np.int64) max_node_map = np.zeros((num_parts,), dtype=np.int64)
for key in node_map: for key in node_map:
assert key in ntypes, 'Unexpected ntype: {}.'.format(key)
if not isinstance(node_map[key], np.ndarray): if not isinstance(node_map[key], np.ndarray):
node_map[key] = F.asnumpy(node_map[key]) node_map[key] = F.asnumpy(node_map[key])
assert node_map[key].shape == (num_parts, 2) assert node_map[key].shape == (num_parts, 2)
...@@ -705,6 +767,7 @@ class RangePartitionBook(GraphPartitionBook): ...@@ -705,6 +767,7 @@ class RangePartitionBook(GraphPartitionBook):
self._typed_max_edge_ids = {} self._typed_max_edge_ids = {}
max_edge_map = np.zeros((num_parts,), dtype=np.int64) max_edge_map = np.zeros((num_parts,), dtype=np.int64)
for key in edge_map: for key in edge_map:
assert key in etypes, 'Unexpected etype: {}.'.format(key)
if not isinstance(edge_map[key], np.ndarray): if not isinstance(edge_map[key], np.ndarray):
edge_map[key] = F.asnumpy(edge_map[key]) edge_map[key] = F.asnumpy(edge_map[key])
assert edge_map[key].shape == (num_parts, 2) assert edge_map[key].shape == (num_parts, 2)
...@@ -750,7 +813,8 @@ class RangePartitionBook(GraphPartitionBook): ...@@ -750,7 +813,8 @@ class RangePartitionBook(GraphPartitionBook):
eid_range = [None] * len(self.etypes) eid_range = [None] * len(self.etypes)
for i, etype in enumerate(self.etypes): for i, etype in enumerate(self.etypes):
eid_range[i] = (etype, self._typed_eid_range[etype]) c_etype = self._to_canonical_etype(etype)
eid_range[i] = (c_etype, self._typed_eid_range[c_etype])
eid_range_pickle = list(pickle.dumps(eid_range)) eid_range_pickle = list(pickle.dumps(eid_range))
self._meta = _move_metadata_to_shared_mem(graph_name, self._meta = _move_metadata_to_shared_mem(graph_name,
...@@ -767,21 +831,22 @@ class RangePartitionBook(GraphPartitionBook): ...@@ -767,21 +831,22 @@ class RangePartitionBook(GraphPartitionBook):
return self._num_partitions return self._num_partitions
def _num_nodes(self, ntype='_N'): def _num_nodes(self, ntype=DEFAULT_NTYPE):
""" The total number of nodes """ The total number of nodes
""" """
if ntype == '_N': if ntype == DEFAULT_NTYPE:
return int(self._max_node_ids[-1]) return int(self._max_node_ids[-1])
else: else:
return int(self._typed_max_node_ids[ntype][-1]) return int(self._typed_max_node_ids[ntype][-1])
def _num_edges(self, etype='_E'): def _num_edges(self, etype=DEFAULT_ETYPE):
""" The total number of edges """ The total number of edges
""" """
if etype == '_E': if etype in (DEFAULT_ETYPE, DEFAULT_ETYPE[1]):
return int(self._max_edge_ids[-1]) return int(self._max_edge_ids[-1])
else: else:
return int(self._typed_max_edge_ids[etype][-1]) c_etype = self._to_canonical_etype(etype)
return int(self._typed_max_edge_ids[c_etype][-1])
def metadata(self): def metadata(self):
"""Return the partition meta data. """Return the partition meta data.
...@@ -816,40 +881,42 @@ class RangePartitionBook(GraphPartitionBook): ...@@ -816,40 +881,42 @@ class RangePartitionBook(GraphPartitionBook):
"""Map per-edge-type IDs to global edge IDs in the homoenegeous format. """Map per-edge-type IDs to global edge IDs in the homoenegeous format.
""" """
ids = utils.toindex(ids).tousertensor() ids = utils.toindex(ids).tousertensor()
partids = self.eid2partid(ids, etype) c_etype = self._to_canonical_etype(etype)
typed_max_eids = F.zerocopy_from_numpy(self._typed_max_edge_ids[etype]) partids = self.eid2partid(ids, c_etype)
typed_max_eids = F.zerocopy_from_numpy(self._typed_max_edge_ids[c_etype])
end_diff = F.gather_row(typed_max_eids, partids) - ids end_diff = F.gather_row(typed_max_eids, partids) - ids
typed_eid_range = F.zerocopy_from_numpy(self._typed_eid_range[etype][:, 1]) typed_eid_range = F.zerocopy_from_numpy(self._typed_eid_range[c_etype][:, 1])
return F.gather_row(typed_eid_range, partids) - end_diff return F.gather_row(typed_eid_range, partids) - end_diff
def nid2partid(self, nids, ntype='_N'): def nid2partid(self, nids, ntype=DEFAULT_NTYPE):
"""From global node IDs to partition IDs """From global node IDs to partition IDs
""" """
nids = utils.toindex(nids) nids = utils.toindex(nids)
if ntype == '_N': if ntype == DEFAULT_NTYPE:
ret = np.searchsorted(self._max_node_ids, nids.tonumpy(), side='right') ret = np.searchsorted(self._max_node_ids, nids.tonumpy(), side='right')
else: else:
ret = np.searchsorted(self._typed_max_node_ids[ntype], nids.tonumpy(), side='right') ret = np.searchsorted(self._typed_max_node_ids[ntype], nids.tonumpy(), side='right')
ret = utils.toindex(ret) ret = utils.toindex(ret)
return ret.tousertensor() return ret.tousertensor()
def eid2partid(self, eids, etype='_E'): def eid2partid(self, eids, etype=DEFAULT_ETYPE):
"""From global edge IDs to partition IDs """From global edge IDs to partition IDs
""" """
eids = utils.toindex(eids) eids = utils.toindex(eids)
if etype == '_E': if etype in (DEFAULT_ETYPE, DEFAULT_ETYPE[1]):
ret = np.searchsorted(self._max_edge_ids, eids.tonumpy(), side='right') ret = np.searchsorted(self._max_edge_ids, eids.tonumpy(), side='right')
else: else:
ret = np.searchsorted(self._typed_max_edge_ids[etype], eids.tonumpy(), side='right') c_etype = self._to_canonical_etype(etype)
ret = np.searchsorted(self._typed_max_edge_ids[c_etype], eids.tonumpy(), side='right')
ret = utils.toindex(ret) ret = utils.toindex(ret)
return ret.tousertensor() return ret.tousertensor()
def partid2nids(self, partid, ntype='_N'): def partid2nids(self, partid, ntype=DEFAULT_NTYPE):
"""From partition ID to global node IDs """From partition ID to global node IDs
""" """
# TODO do we need to cache it? # TODO do we need to cache it?
if ntype == '_N': if ntype == DEFAULT_NTYPE:
start = self._max_node_ids[partid - 1] if partid > 0 else 0 start = self._max_node_ids[partid - 1] if partid > 0 else 0
end = self._max_node_ids[partid] end = self._max_node_ids[partid]
return F.arange(start, end) return F.arange(start, end)
...@@ -859,21 +926,22 @@ class RangePartitionBook(GraphPartitionBook): ...@@ -859,21 +926,22 @@ class RangePartitionBook(GraphPartitionBook):
return F.arange(start, end) return F.arange(start, end)
def partid2eids(self, partid, etype='_E'): def partid2eids(self, partid, etype=DEFAULT_ETYPE):
"""From partition ID to global edge IDs """From partition ID to global edge IDs
""" """
# TODO do we need to cache it? # TODO do we need to cache it?
if etype == '_E': if etype in (DEFAULT_ETYPE, DEFAULT_ETYPE[1]):
start = self._max_edge_ids[partid - 1] if partid > 0 else 0 start = self._max_edge_ids[partid - 1] if partid > 0 else 0
end = self._max_edge_ids[partid] end = self._max_edge_ids[partid]
return F.arange(start, end) return F.arange(start, end)
else: else:
start = self._typed_max_edge_ids[etype][partid - 1] if partid > 0 else 0 c_etype = self._to_canonical_etype(etype)
end = self._typed_max_edge_ids[etype][partid] start = self._typed_max_edge_ids[c_etype][partid - 1] if partid > 0 else 0
end = self._typed_max_edge_ids[c_etype][partid]
return F.arange(start, end) return F.arange(start, end)
def nid2localnid(self, nids, partid, ntype='_N'): def nid2localnid(self, nids, partid, ntype=DEFAULT_NTYPE):
"""Get local node IDs within the given partition. """Get local node IDs within the given partition.
""" """
if partid != self._partid: if partid != self._partid:
...@@ -882,14 +950,14 @@ class RangePartitionBook(GraphPartitionBook): ...@@ -882,14 +950,14 @@ class RangePartitionBook(GraphPartitionBook):
nids = utils.toindex(nids) nids = utils.toindex(nids)
nids = nids.tousertensor() nids = nids.tousertensor()
if ntype == '_N': if ntype == DEFAULT_NTYPE:
start = self._max_node_ids[partid - 1] if partid > 0 else 0 start = self._max_node_ids[partid - 1] if partid > 0 else 0
else: else:
start = self._typed_max_node_ids[ntype][partid - 1] if partid > 0 else 0 start = self._typed_max_node_ids[ntype][partid - 1] if partid > 0 else 0
return nids - int(start) return nids - int(start)
def eid2localeid(self, eids, partid, etype='_E'): def eid2localeid(self, eids, partid, etype=DEFAULT_ETYPE):
"""Get the local edge IDs within the given partition. """Get the local edge IDs within the given partition.
""" """
if partid != self._partid: if partid != self._partid:
...@@ -898,10 +966,11 @@ class RangePartitionBook(GraphPartitionBook): ...@@ -898,10 +966,11 @@ class RangePartitionBook(GraphPartitionBook):
eids = utils.toindex(eids) eids = utils.toindex(eids)
eids = eids.tousertensor() eids = eids.tousertensor()
if etype == '_E': if etype in (DEFAULT_ETYPE, DEFAULT_ETYPE[1]):
start = self._max_edge_ids[partid - 1] if partid > 0 else 0 start = self._max_edge_ids[partid - 1] if partid > 0 else 0
else: else:
start = self._typed_max_edge_ids[etype][partid - 1] if partid > 0 else 0 c_etype = self._to_canonical_etype(etype)
start = self._typed_max_edge_ids[c_etype][partid - 1] if partid > 0 else 0
return eids - int(start) return eids - int(start)
...@@ -923,6 +992,41 @@ class RangePartitionBook(GraphPartitionBook): ...@@ -923,6 +992,41 @@ class RangePartitionBook(GraphPartitionBook):
""" """
return self._etypes return self._etypes
@property
def canonical_etypes(self):
"""Get the list of canonical edge types
Returns
-------
list[(str, str, str)] or list[None]
A list of canonical etypes. If keys of ``edge_map`` and ``etypes``
are strings, a list of ``None`` is returned as canonical etypes
are not available.
"""
return self._canonical_etypes
def _to_canonical_etype(self, etype):
"""Convert an edge type to the corresponding canonical edge type.
If canonical etype is not available, no conversion is applied.
"""
if isinstance(etype, tuple):
if etype not in self.canonical_etypes:
raise DGLError('Edge type "{}" does not exist.'.format(etype))
return etype
if not self._etype2canonical:
# canonical etype is not available, no conversion is applied.
# This is the case that 'etypes' passed in when instantiating
# are in format of str instead of (str, str, str).
# [TODO] Deprecate support for str etypes.
return etype
ret = self._etype2canonical.get(etype, None)
if ret is None:
raise DGLError('Edge type "{}" does not exist.'.format(etype))
if len(ret) == 0:
raise DGLError('Edge type "%s" is ambiguous. Please use canonical edge type '
'in the form of (srctype, etype, dsttype)' % etype)
return ret
NODE_PART_POLICY = 'node' NODE_PART_POLICY = 'node'
EDGE_PART_POLICY = 'edge' EDGE_PART_POLICY = 'edge'
...@@ -949,12 +1053,13 @@ class PartitionPolicy(object): ...@@ -949,12 +1053,13 @@ class PartitionPolicy(object):
assert policy_str in (EDGE_PART_POLICY, NODE_PART_POLICY), \ assert policy_str in (EDGE_PART_POLICY, NODE_PART_POLICY), \
'policy_str must contain \'edge\' or \'node\'.' 'policy_str must contain \'edge\' or \'node\'.'
if NODE_PART_POLICY == policy_str: if NODE_PART_POLICY == policy_str:
policy_str = NODE_PART_POLICY + ":_N" policy_str = NODE_PART_POLICY + ":" + DEFAULT_NTYPE
else: else:
policy_str = EDGE_PART_POLICY + ":_E" policy_str = EDGE_PART_POLICY + ":" + DEFAULT_ETYPE[1]
self._policy_str = policy_str self._policy_str = policy_str
self._part_id = partition_book.partid self._part_id = partition_book.partid
self._partition_book = partition_book self._partition_book = partition_book
self._type_name = _str_to_tuple(self.policy_str[5:])
@property @property
def policy_str(self): def policy_str(self):
...@@ -967,6 +1072,17 @@ class PartitionPolicy(object): ...@@ -967,6 +1072,17 @@ class PartitionPolicy(object):
""" """
return self._policy_str return self._policy_str
@property
def type_name(self):
"""Get the type name: ntype or etype
Returns
-------
str or (str, str, str)
The ntype or etype.
"""
return self._type_name
@property @property
def part_id(self): def part_id(self):
"""Get partition ID """Get partition ID
...@@ -992,8 +1108,8 @@ class PartitionPolicy(object): ...@@ -992,8 +1108,8 @@ class PartitionPolicy(object):
def get_data_name(self, name): def get_data_name(self, name):
"""Get HeteroDataName """Get HeteroDataName
""" """
is_node = NODE_PART_POLICY in self._policy_str is_node = NODE_PART_POLICY in self.policy_str
return HeteroDataName(is_node, self._policy_str[5:], name) return HeteroDataName(is_node, self.type_name, name)
def to_local(self, id_tensor): def to_local(self, id_tensor):
"""Mapping global ID to local ID. """Mapping global ID to local ID.
...@@ -1008,12 +1124,12 @@ class PartitionPolicy(object): ...@@ -1008,12 +1124,12 @@ class PartitionPolicy(object):
tensor tensor
local ID tensor local ID tensor
""" """
if EDGE_PART_POLICY in self._policy_str: if EDGE_PART_POLICY in self.policy_str:
return self._partition_book.eid2localeid(id_tensor, self._part_id, self._policy_str[5:]) return self._partition_book.eid2localeid(id_tensor, self._part_id, self.type_name)
elif NODE_PART_POLICY in self._policy_str: elif NODE_PART_POLICY in self.policy_str:
return self._partition_book.nid2localnid(id_tensor, self._part_id, self._policy_str[5:]) return self._partition_book.nid2localnid(id_tensor, self._part_id, self.type_name)
else: else:
raise RuntimeError('Cannot support policy: %s ' % self._policy_str) raise RuntimeError('Cannot support policy: %s ' % self.policy_str)
def to_partid(self, id_tensor): def to_partid(self, id_tensor):
"""Mapping global ID to partition ID. """Mapping global ID to partition ID.
...@@ -1028,12 +1144,12 @@ class PartitionPolicy(object): ...@@ -1028,12 +1144,12 @@ class PartitionPolicy(object):
tensor tensor
partition ID partition ID
""" """
if EDGE_PART_POLICY in self._policy_str: if EDGE_PART_POLICY in self.policy_str:
return self._partition_book.eid2partid(id_tensor, self._policy_str[5:]) return self._partition_book.eid2partid(id_tensor, self.type_name)
elif NODE_PART_POLICY in self._policy_str: elif NODE_PART_POLICY in self.policy_str:
return self._partition_book.nid2partid(id_tensor, self._policy_str[5:]) return self._partition_book.nid2partid(id_tensor, self.type_name)
else: else:
raise RuntimeError('Cannot support policy: %s ' % self._policy_str) raise RuntimeError('Cannot support policy: %s ' % self.policy_str)
def get_part_size(self): def get_part_size(self):
"""Get data size of current partition. """Get data size of current partition.
...@@ -1043,12 +1159,12 @@ class PartitionPolicy(object): ...@@ -1043,12 +1159,12 @@ class PartitionPolicy(object):
int int
data size data size
""" """
if EDGE_PART_POLICY in self._policy_str: if EDGE_PART_POLICY in self.policy_str:
return len(self._partition_book.partid2eids(self._part_id, self._policy_str[5:])) return len(self._partition_book.partid2eids(self._part_id, self.type_name))
elif NODE_PART_POLICY in self._policy_str: elif NODE_PART_POLICY in self.policy_str:
return len(self._partition_book.partid2nids(self._part_id, self._policy_str[5:])) return len(self._partition_book.partid2nids(self._part_id, self.type_name))
else: else:
raise RuntimeError('Cannot support policy: %s ' % self._policy_str) raise RuntimeError('Cannot support policy: %s ' % self.policy_str)
def get_size(self): def get_size(self):
"""Get the full size of the data. """Get the full size of the data.
...@@ -1058,24 +1174,26 @@ class PartitionPolicy(object): ...@@ -1058,24 +1174,26 @@ class PartitionPolicy(object):
int int
data size data size
""" """
if EDGE_PART_POLICY in self._policy_str: if EDGE_PART_POLICY in self.policy_str:
return self._partition_book._num_edges(self._policy_str[5:]) return self._partition_book._num_edges(self.type_name)
elif NODE_PART_POLICY in self._policy_str: elif NODE_PART_POLICY in self.policy_str:
return self._partition_book._num_nodes(self._policy_str[5:]) return self._partition_book._num_nodes(self.type_name)
else: else:
raise RuntimeError('Cannot support policy: %s ' % self._policy_str) raise RuntimeError('Cannot support policy: %s ' % self.policy_str)
class NodePartitionPolicy(PartitionPolicy): class NodePartitionPolicy(PartitionPolicy):
'''Partition policy for nodes. '''Partition policy for nodes.
''' '''
def __init__(self, partition_book, ntype='_N'): def __init__(self, partition_book, ntype=DEFAULT_NTYPE):
super(NodePartitionPolicy, self).__init__(NODE_PART_POLICY + ':' + ntype, partition_book) super(NodePartitionPolicy, self).__init__(
NODE_PART_POLICY + ':' + ntype, partition_book)
class EdgePartitionPolicy(PartitionPolicy): class EdgePartitionPolicy(PartitionPolicy):
'''Partition policy for edges. '''Partition policy for edges.
''' '''
def __init__(self, partition_book, etype='_E'): def __init__(self, partition_book, etype=DEFAULT_ETYPE):
super(EdgePartitionPolicy, self).__init__(EDGE_PART_POLICY + ':' + etype, partition_book) super(EdgePartitionPolicy, self).__init__(
EDGE_PART_POLICY + ':' + str(etype), partition_book)
class HeteroDataName(object): class HeteroDataName(object):
''' The data name in a heterogeneous graph. ''' The data name in a heterogeneous graph.
...@@ -1089,16 +1207,22 @@ class HeteroDataName(object): ...@@ -1089,16 +1207,22 @@ class HeteroDataName(object):
---------- ----------
is_node : bool is_node : bool
Indicate whether it's node data or edge data. Indicate whether it's node data or edge data.
entity_type : str entity_type : str or (str, str, str)
The type of the node/edge. The type of the node/edge.
data_name : str data_name : str
The name of the data. The name of the data.
''' '''
def __init__(self, is_node, entity_type, data_name): def __init__(self, is_node, entity_type, data_name):
self.policy_str = NODE_PART_POLICY if is_node else EDGE_PART_POLICY self._policy = NODE_PART_POLICY if is_node else EDGE_PART_POLICY
self.policy_str = self.policy_str + ':' + entity_type self._entity_type = entity_type
self.data_name = data_name self.data_name = data_name
@property
def policy_str(self):
''' concatenate policy and entity type into string
'''
return self._policy + ':' + str(self.get_type())
def is_node(self): def is_node(self):
''' Is this the name of node data ''' Is this the name of node data
''' '''
...@@ -1114,7 +1238,7 @@ class HeteroDataName(object): ...@@ -1114,7 +1238,7 @@ class HeteroDataName(object):
This is only meaningful in a heterogeneous graph. This is only meaningful in a heterogeneous graph.
In homogeneous graph, type is '_N' for a node and '_E' for an edge. In homogeneous graph, type is '_N' for a node and '_E' for an edge.
''' '''
return self.policy_str[5:] return self._entity_type
def get_name(self): def get_name(self):
''' The name of the data. ''' The name of the data.
...@@ -1148,4 +1272,4 @@ def parse_hetero_data_name(name): ...@@ -1148,4 +1272,4 @@ def parse_hetero_data_name(name):
assert len(names) == 3, '{} is not a valid heterograph data name'.format(name) assert len(names) == 3, '{} is not a valid heterograph data name'.format(name)
assert names[0] in (NODE_PART_POLICY, EDGE_PART_POLICY), \ assert names[0] in (NODE_PART_POLICY, EDGE_PART_POLICY), \
'{} is not a valid heterograph data name'.format(name) '{} is not a valid heterograph data name'.format(name)
return HeteroDataName(names[0] == NODE_PART_POLICY, names[1], names[2]) return HeteroDataName(names[0] == NODE_PART_POLICY, _str_to_tuple(names[1]), names[2])
...@@ -3,14 +3,12 @@ import sys ...@@ -3,14 +3,12 @@ import sys
import os import os
import numpy as np import numpy as np
from scipy import sparse as spsp from scipy import sparse as spsp
from numpy.testing import assert_array_equal
from dgl.heterograph_index import create_unitgraph_from_coo
from dgl.distributed import partition_graph, load_partition, load_partition_feats from dgl.distributed import partition_graph, load_partition, load_partition_feats
from dgl.distributed.graph_partition_book import BasicPartitionBook, RangePartitionBook, \
NodePartitionPolicy, EdgePartitionPolicy, HeteroDataName
from dgl import function as fn from dgl import function as fn
import backend as F import backend as F
import unittest import unittest
import pickle
import random
import tempfile import tempfile
def _get_inner_node_mask(graph, ntype_id): def _get_inner_node_mask(graph, ntype_id):
...@@ -426,8 +424,100 @@ def test_hetero_partition(): ...@@ -426,8 +424,100 @@ def test_hetero_partition():
check_hetero_partition(hg, 'random') check_hetero_partition(hg, 'random')
check_hetero_partition(hg, 'metis', 4, 8, load_feats=False) check_hetero_partition(hg, 'metis', 4, 8, load_feats=False)
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
def test_BasicPartitionBook():
part_id = 0
num_parts = 2
node_map = np.random.choice(num_parts, 1000)
edge_map = np.random.choice(num_parts, 5000)
graph = dgl.rand_graph(1000, 5000)
graph = dgl.node_subgraph(graph, F.arange(0, graph.num_nodes()))
gpb = BasicPartitionBook(part_id, num_parts, node_map, edge_map, graph)
c_etype = ('_N', '_E', '_N')
assert gpb.etypes == ['_E']
assert gpb.canonical_etypes == [c_etype]
node_policy = NodePartitionPolicy(gpb, '_N')
assert node_policy.type_name == '_N'
edge_policy = EdgePartitionPolicy(gpb, '_E')
assert edge_policy.type_name == '_E'
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
def test_RangePartitionBook():
part_id = 0
num_parts = 2
# homogeneous
node_map = {'_N': F.tensor([[0, 1000], [1000, 2000]])}
edge_map = {'_E': F.tensor([[0, 5000], [5000, 10000]])}
ntypes = {'_N': 0}
etypes = {'_E': 0}
gpb = RangePartitionBook(
part_id, num_parts, node_map, edge_map, ntypes, etypes)
assert gpb.etypes == ['_E']
assert gpb.canonical_etypes == [None]
assert gpb._to_canonical_etype('_E') == '_E'
node_policy = NodePartitionPolicy(gpb, '_N')
assert node_policy.type_name == '_N'
edge_policy = EdgePartitionPolicy(gpb, '_E')
assert edge_policy.type_name == '_E'
# heterogeneous, init via etype
node_map = {'node1': F.tensor([[0, 1000], [1000, 2000]]), 'node2': F.tensor([
[0, 1000], [1000, 2000]])}
edge_map = {'edge1': F.tensor([[0, 5000], [5000, 10000]])}
ntypes = {'node1': 0, 'node2': 1}
etypes = {'edge1': 0}
gpb = RangePartitionBook(
part_id, num_parts, node_map, edge_map, ntypes, etypes)
assert gpb.etypes == ['edge1']
assert gpb.canonical_etypes == [None]
assert gpb._to_canonical_etype('edge1') == 'edge1'
node_policy = NodePartitionPolicy(gpb, 'node1')
assert node_policy.type_name == 'node1'
edge_policy = EdgePartitionPolicy(gpb, 'edge1')
assert edge_policy.type_name == 'edge1'
# heterogeneous, init via canonical etype
node_map = {'node1': F.tensor([[0, 1000], [1000, 2000]]), 'node2': F.tensor([
[0, 1000], [1000, 2000]])}
edge_map = {('node1', 'edge1', 'node2'): F.tensor([[0, 5000], [5000, 10000]])}
ntypes = {'node1': 0, 'node2': 1}
etypes = {('node1', 'edge1', 'node2'): 0}
c_etype = list(etypes.keys())[0]
gpb = RangePartitionBook(
part_id, num_parts, node_map, edge_map, ntypes, etypes)
assert gpb.etypes == ['edge1']
assert gpb.canonical_etypes == [c_etype]
assert gpb._to_canonical_etype('edge1') == c_etype
assert gpb._to_canonical_etype(c_etype) == c_etype
expect_except = False
try:
gpb._to_canonical_etype(('node1', 'edge2', 'node2'))
except:
expect_except = True
assert expect_except
expect_except = False
try:
gpb._to_canonical_etype('edge2')
except:
expect_except = True
assert expect_except
node_policy = NodePartitionPolicy(gpb, 'node1')
assert node_policy.type_name == 'node1'
edge_policy = EdgePartitionPolicy(gpb, c_etype)
assert edge_policy.type_name == c_etype
data_name = HeteroDataName(False, 'edge1', 'edge1')
assert data_name.get_type() == 'edge1'
data_name = HeteroDataName(False, c_etype, 'edge1')
assert data_name.get_type() == c_etype
if __name__ == '__main__': if __name__ == '__main__':
os.makedirs('/tmp/partition', exist_ok=True) os.makedirs('/tmp/partition', exist_ok=True)
test_partition() test_partition()
test_hetero_partition() test_hetero_partition()
test_BasicPartitionBook()
test_RangePartitionBook()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment