"docs/vscode:/vscode.git/clone" did not exist on "60ffa8425383a058c34dcab48079a36e526ed454"
Unverified Commit ed8e9c44 authored by Rhett Ying's avatar Rhett Ying Committed by GitHub
Browse files

[Dist] deprecate etype and always use canonical etype for partition and load (#4777)

* [Dist] deprecate etype and always use canonical etype for partition and load

* enable canonical etypes in dist part pipeline

* resolve rebase conflicts

* fix lint

* fix test failure

* throw exception if outdated part config is loaded

* refine

* refine

* revert unnecessary change

* fix typo
parent 49a4436a
......@@ -22,6 +22,7 @@ from .partition import load_partition, load_partition_feats, load_partition_book
from .graph_partition_book import PartitionPolicy, get_shared_mem_partition_book
from .graph_partition_book import HeteroDataName, parse_hetero_data_name
from .graph_partition_book import NodePartitionPolicy, EdgePartitionPolicy
from .graph_partition_book import _etype_str_to_tuple
from .shared_mem_utils import _to_shared_mem, _get_ndata_path, _get_edata_path, DTYPE_DICT
from . import rpc
from . import role
......@@ -355,10 +356,10 @@ class DistGraphServer(KVServer):
self.gpb.shared_memory(graph_name)
assert self.gpb.partid == self.part_id
for ntype in ntypes:
node_name = HeteroDataName(True, ntype, None)
node_name = HeteroDataName(True, ntype, "")
self.add_part_policy(PartitionPolicy(node_name.policy_str, self.gpb))
for etype in etypes:
edge_name = HeteroDataName(False, etype, None)
edge_name = HeteroDataName(False, etype, "")
self.add_part_policy(PartitionPolicy(edge_name.policy_str, self.gpb))
if not self.is_backup_server():
......@@ -381,7 +382,7 @@ class DistGraphServer(KVServer):
# The feature name has the following format: edge_type + "/" + feature_name to avoid
# feature name collision for different edge types.
etype, feat_name = name.split('/')
etype = _etype_str_to_tuple(etype)
data_name = HeteroDataName(False, etype, feat_name)
self.init_data(name=str(data_name), policy_str=data_name.policy_str,
data_tensor=edge_feats[name])
......@@ -480,7 +481,6 @@ class DistGraph:
'''
def __init__(self, graph_name, gpb=None, part_config=None):
self.graph_name = graph_name
self._gpb_input = gpb
if os.environ.get('DGL_DIST_MODE', 'standalone') == 'standalone':
assert part_config is not None, \
'When running in the standalone model, the partition config file is required'
......@@ -503,13 +503,14 @@ class DistGraph:
for name in edge_feats:
# The feature name has the following format: edge_type + "/" + feature_name.
etype, feat_name = name.split('/')
etype = _etype_str_to_tuple(etype)
self._client.add_data(str(HeteroDataName(False, etype, feat_name)),
edge_feats[name],
EdgePartitionPolicy(self._gpb, etype=etype))
self._client.map_shared_data(self._gpb)
rpc.set_num_client(1)
else:
self._init()
self._init(gpb)
# Tell the backup servers to load the graph structure from shared memory.
for server_id in range(self._client.num_servers):
rpc.send_request(server_id, InitGraphRequest(graph_name))
......@@ -530,56 +531,25 @@ class DistGraph:
# When we store node/edge types in a list, they are stored in the order of type IDs.
self._ntype_map = {ntype:i for i, ntype in enumerate(self.ntypes)}
self._etype_map = {etype:i for i, etype in enumerate(self.etypes)}
# Get canonical edge types.
# TODO(zhengda) this requires the server to store the graph with coo format.
eid = []
for etype in self.etypes:
type_eid = F.zeros((1,), F.int64, F.cpu())
eid.append(self._gpb.map_to_homo_eid(type_eid, etype))
eid = F.cat(eid, 0)
src, dst = dist_find_edges(self, eid)
src_tids, _ = self._gpb.map_to_per_ntype(src)
dst_tids, _ = self._gpb.map_to_per_ntype(dst)
self._canonical_etypes = []
etype_ids = F.arange(0, len(self.etypes))
for src_tid, etype_id, dst_tid in zip(src_tids, etype_ids, dst_tids):
src_tid = F.as_scalar(src_tid)
etype_id = F.as_scalar(etype_id)
dst_tid = F.as_scalar(dst_tid)
self._canonical_etypes.append((self.ntypes[src_tid], self.etypes[etype_id],
self.ntypes[dst_tid]))
self._etype2canonical = {}
for src_type, etype, dst_type in self._canonical_etypes:
if etype in self._etype2canonical:
self._etype2canonical[etype] = ()
else:
self._etype2canonical[etype] = (src_type, etype, dst_type)
self._etype_map = {etype:i for i, etype in enumerate(self.canonical_etypes)}
def _init(self):
def _init(self, gpb):
self._client = get_kvstore()
assert self._client is not None, \
'Distributed module is not initialized. Please call dgl.distributed.initialize.'
self._g = _get_graph_from_shared_mem(self.graph_name)
self._gpb = get_shared_mem_partition_book(self.graph_name, self._g)
if self._gpb is None:
self._gpb = self._gpb_input
self._gpb = gpb
self._client.map_shared_data(self._gpb)
def __getstate__(self):
return self.graph_name, self._gpb, self._canonical_etypes
return self.graph_name, self._gpb
def __setstate__(self, state):
self.graph_name, self._gpb_input, self._canonical_etypes = state
self._init()
self.graph_name, gpb = state
self._init(gpb)
self._etype2canonical = {}
for src_type, etype, dst_type in self._canonical_etypes:
if etype in self._etype2canonical:
self._etype2canonical[etype] = ()
else:
self._etype2canonical[etype] = (src_type, etype, dst_type)
self._ndata_store = {}
self._edata_store = {}
self._ndata = NodeDataView(self)
......@@ -724,7 +694,6 @@ class DistGraph:
>>> g.etypes
['_E']
"""
# Currently, we only support a graph with one edge type.
return self._gpb.etypes
@property
......@@ -761,7 +730,7 @@ class DistGraph:
('user', 'follows', 'game'),
('user', 'plays', 'game')]
"""
return self._canonical_etypes
return self._gpb.canonical_etypes
def to_canonical_etype(self, etype):
"""Convert an edge type to the corresponding canonical edge type in the graph.
......@@ -806,21 +775,7 @@ class DistGraph:
--------
canonical_etypes
"""
if etype is None:
if len(self.etypes) != 1:
raise DGLError('Edge type name must be specified if there are more than one '
'edge types.')
etype = self.etypes[0]
if isinstance(etype, tuple):
return etype
else:
ret = self._etype2canonical.get(etype, None)
if ret is None:
raise DGLError('Edge type "{}" does not exist.'.format(etype))
if len(ret) != 3:
raise DGLError('Edge type "{}" is ambiguous. Please use canonical edge type '
'in the form of (srctype, etype, dsttype)'.format(etype))
return ret
return self._gpb.to_canonical_etype(etype)
def get_ntype_id(self, ntype):
"""Return the ID of the given node type.
......@@ -864,6 +819,7 @@ class DistGraph:
raise DGLError('Edge type name must be specified if there are more than one '
'edge types.')
return 0
etype = self.to_canonical_etype(etype)
return self._etype_map[etype]
def number_of_nodes(self, ntype=None):
......@@ -928,10 +884,8 @@ class DistGraph:
123718280
"""
if etype is None:
if len(self.etypes) == 1:
return self._gpb._num_edges(self.etypes[0])
else:
return sum([self._gpb._num_edges(etype) for etype in self.etypes])
return sum([self._gpb._num_edges(c_etype)
for c_etype in self.canonical_etypes])
return self._gpb._num_edges(etype)
def out_degrees(self, u=ALL):
......@@ -1131,10 +1085,6 @@ class DistGraph:
gpb = self.get_partition_book()
if len(gpb.etypes) > 1:
# if etype is a canonical edge type (str, str, str), extract the edge type
if isinstance(etype, tuple):
assert len(etype) == 3, 'Invalid canonical etype: {}'.format(etype)
etype = etype[1]
edges = gpb.map_to_homo_eid(edges, etype)
src, dst = dist_find_edges(self, edges)
if len(gpb.ntypes) > 1:
......@@ -1180,14 +1130,10 @@ class DistGraph:
if isinstance(edges, dict):
# TODO(zhengda) we need to directly generate subgraph of all relations with
# one invocation.
if isinstance(list(edges.keys())[0], tuple):
subg = {etype: self.find_edges(edges[etype], etype[1]) for etype in edges}
else:
subg = {}
for etype in edges:
assert len(self._etype2canonical[etype]) == 3, \
'the etype in input edges is ambiguous'
subg[self._etype2canonical[etype]] = self.find_edges(edges[etype], etype)
for etype, edge in edges.items():
etype = self.to_canonical_etype(etype)
subg[etype] = self.find_edges(edge, etype)
num_nodes = {ntype: self.number_of_nodes(ntype) for ntype in self.ntypes}
subg = dgl_heterograph(subg, num_nodes_dict=num_nodes)
for etype in edges:
......@@ -1245,7 +1191,7 @@ class DistGraph:
Parameters
----------
etype : str
etype : str or (str, str, str)
The edge type
Returns
......@@ -1253,6 +1199,7 @@ class DistGraph:
PartitionPolicy
The partition policy for the edge type.
"""
etype = self.to_canonical_etype(etype)
return EdgePartitionPolicy(self.get_partition_book(), etype)
def barrier(self):
......@@ -1292,6 +1239,8 @@ class DistGraph:
def _get_edata_names(self, etype=None):
''' Get the names of all edge data.
'''
if etype is not None:
etype = self.to_canonical_etype(etype)
names = self._client.gdata_name_list()
edata_names = []
for name in names:
......@@ -1570,7 +1519,7 @@ def edge_split(edges, partition_book=None, etype='_E', rank=None, force_even=Tru
A boolean mask vector that indicates input edges.
partition_book : GraphPartitionBook, optional
The graph partition book
etype : str, optional
etype : str or (str, str, str), optional
The edge type of the input edges.
rank : int, optional
The rank of a process. If not given, the rank of the current process is used.
......
......@@ -2,14 +2,13 @@
import pickle
from abc import ABC
from ast import literal_eval
import numpy as np
from .. import backend as F
from .. import utils
from .._ffi.ndarray import empty_shared_mem
from ..base import EID, NID, DGLError, dgl_warning
from ..base import EID, NID, DGLError
from ..ndarray import exist_shared_mem_array
from ..partition import NDArrayPartition
from .constants import DEFAULT_ETYPE, DEFAULT_NTYPE
......@@ -21,14 +20,40 @@ from .shared_mem_utils import (
_to_shared_mem,
)
CANONICAL_ETYPE_DELIMITER = ":"
def _str_to_tuple(s):
try:
ret = literal_eval(s)
except ValueError:
ret = s
return ret
def _etype_tuple_to_str(c_etype):
'''Convert canonical etype from tuple to string.
Examples
--------
>>> c_etype = ('user', 'like', 'item')
>>> c_etype_str = _etype_tuple_to_str(c_etype)
>>> print(c_etype_str)
'user:like:item'
'''
assert isinstance(c_etype, tuple) and len(c_etype) == 3, \
"Passed-in canonical etype should be in format of (str, str, str). " \
f"But got {c_etype}."
return CANONICAL_ETYPE_DELIMITER.join(c_etype)
def _etype_str_to_tuple(c_etype):
'''Convert canonical etype from tuple to string.
Examples
--------
>>> c_etype_str = 'user:like:item'
>>> c_etype = _etype_str_to_tuple(c_etype_str)
>>> print(c_etype)
('user', 'like', 'item')
'''
ret = tuple(c_etype.split(CANONICAL_ETYPE_DELIMITER))
assert len(ret) == 3, \
"Passed-in canonical etype should be in format of 'str:str:str'. " \
f"But got {c_etype}."
return ret
def _move_metadata_to_shared_mem(
graph_name,
......@@ -437,6 +462,20 @@ class GraphPartitionBook(ABC):
A list of canonical etypes
"""
def to_canonical_etype(self, etype):
"""Convert an edge type to the corresponding canonical edge type.
Parameters
----------
etype : str or (str, str, str)
The edge type
Returns
-------
(str, str, str)
The corresponding canonical edge type
"""
@property
def is_homogeneous(self):
"""check if homogeneous"""
......@@ -494,7 +533,7 @@ class GraphPartitionBook(ABC):
ids : tensor
Type-wise edge Ids
etype : str or (str, str, str)
edge type
The edge type
Returns
-------
......@@ -738,14 +777,32 @@ class BasicPartitionBook(GraphPartitionBook):
"""
return [DEFAULT_ETYPE]
def to_canonical_etype(self, etype):
"""Convert an edge type to the corresponding canonical edge type.
Parameters
----------
etype : str or (str, str, str)
The edge type
Returns
-------
(str, str, str)
The corresponding canonical edge type
"""
assert etype in (
DEFAULT_ETYPE,
DEFAULT_ETYPE[1],
), "Base partition book only supports homogeneous graph."
return self.canonical_etypes
class RangePartitionBook(GraphPartitionBook):
"""This partition book supports more efficient storage of partition information.
This partition book is used if the nodes and edges of a graph partition are assigned
with contiguous IDs. It uses very small amount of memory to store the partition
information. Canonical etypes are availabe only when the keys of argument ``etypes``
are canonical etypes.
information.
Parameters
----------
......@@ -759,7 +816,7 @@ class RangePartitionBook(GraphPartitionBook):
the number of partitions. Each row has two integers: the starting and the ending IDs
for a particular node type in a partition. For example, all nodes of type ``"T"`` in
partition ``i`` has ID range ``node_map["T"][i][0]`` to ``node_map["T"][i][1]``.
edge_map : dict[str, Tensor] or dict[(str, str, str), Tensor]
edge_map : dict[(str, str, str), Tensor]
Global edge ID ranges within partitions for each edge type. The key is the edge type
name in string. The value is a tensor of shape :math:`(K, 2)`, where :math:`K` is
the number of partitions. Each row has two integers: the starting and the ending IDs
......@@ -767,13 +824,9 @@ class RangePartitionBook(GraphPartitionBook):
partition ``i`` has ID range ``edge_map["T"][i][0]`` to ``edge_map["T"][i][1]``.
ntypes : dict[str, int]
map ntype strings to ntype IDs.
etypes : dict[str, int] or dict[(str, str, str), int]
map etype strings to etype IDs.
etypes : dict[(str, str, str), int]
map canonical etypes to etype IDs.
.. deprecated:: 0.9.1
Single string format for keys of ``edge_map`` and ``etypes`` is deprecated.
``(str, str, str)`` will be the only format supported in the future.
"""
def __init__(self, part_id, num_parts, node_map, edge_map, ntypes, etypes):
......@@ -792,28 +845,19 @@ class RangePartitionBook(GraphPartitionBook):
assert all(
ntype is not None for ntype in self._ntypes
), "The node types have invalid IDs."
for etype, etype_id in etypes.items():
if isinstance(etype, tuple):
assert (
len(etype) == 3
), "Canonical etype should be in format of (str, str, str)."
c_etype = etype
etype = etype[1]
for c_etype, etype_id in etypes.items():
assert isinstance(c_etype, tuple) and len(c_etype) == 3, \
f"Expect canonical edge type in a triplet of string, but got {c_etype}."
etype = c_etype[1]
self._etypes[etype_id] = etype
self._canonical_etypes[etype_id] = c_etype
if etype in self._etype2canonical:
# If one etype maps to multiple canonical etypes, empty
# tuple is used to indicate such ambiguity casued by etype.
# See more details in self._to_canonical_etype().
# See more details in self.to_canonical_etype().
self._etype2canonical[etype] = tuple()
else:
self._etype2canonical[etype] = c_etype
else:
dgl_warning(
"Etype with 'str' format is deprecated. Please use '(str, str, str)'."
)
self._etypes[etype_id] = etype
self._canonical_etypes[etype_id] = None
assert all(
etype is not None for etype in self._etypes
), "The edge types have invalid IDs."
......@@ -912,10 +956,9 @@ class RangePartitionBook(GraphPartitionBook):
nid_range[i] = (ntype, self._typed_nid_range[ntype])
nid_range_pickle = list(pickle.dumps(nid_range))
eid_range = [None] * len(self.etypes)
for i, etype in enumerate(self.etypes):
c_etype = self._to_canonical_etype(etype)
eid_range[i] = (c_etype, self._typed_eid_range[c_etype])
eid_range = [None] * len(self.canonical_etypes)
for i, etype in enumerate(self.canonical_etypes):
eid_range[i] = (etype, self._typed_eid_range[etype])
eid_range_pickle = list(pickle.dumps(eid_range))
self._meta = _move_metadata_to_shared_mem(
......@@ -945,7 +988,7 @@ class RangePartitionBook(GraphPartitionBook):
if etype in (DEFAULT_ETYPE, DEFAULT_ETYPE[1]):
return int(self._max_edge_ids[-1])
else:
c_etype = self._to_canonical_etype(etype)
c_etype = self.to_canonical_etype(etype)
return int(self._typed_max_edge_ids[c_etype][-1])
def metadata(self):
......@@ -980,7 +1023,7 @@ class RangePartitionBook(GraphPartitionBook):
def map_to_homo_eid(self, ids, etype):
"""Map per-edge-type IDs to global edge IDs in the homoenegeous format."""
ids = utils.toindex(ids).tousertensor()
c_etype = self._to_canonical_etype(etype)
c_etype = self.to_canonical_etype(etype)
partids = self.eid2partid(ids, c_etype)
typed_max_eids = F.zerocopy_from_numpy(
self._typed_max_edge_ids[c_etype]
......@@ -1013,7 +1056,7 @@ class RangePartitionBook(GraphPartitionBook):
self._max_edge_ids, eids.tonumpy(), side="right"
)
else:
c_etype = self._to_canonical_etype(etype)
c_etype = self.to_canonical_etype(etype)
ret = np.searchsorted(
self._typed_max_edge_ids[c_etype], eids.tonumpy(), side="right"
)
......@@ -1042,7 +1085,7 @@ class RangePartitionBook(GraphPartitionBook):
end = self._max_edge_ids[partid]
return F.arange(start, end)
else:
c_etype = self._to_canonical_etype(etype)
c_etype = self.to_canonical_etype(etype)
start = (
self._typed_max_edge_ids[c_etype][partid - 1]
if partid > 0
......@@ -1082,7 +1125,7 @@ class RangePartitionBook(GraphPartitionBook):
if etype in (DEFAULT_ETYPE, DEFAULT_ETYPE[1]):
start = self._max_edge_ids[partid - 1] if partid > 0 else 0
else:
c_etype = self._to_canonical_etype(etype)
c_etype = self.to_canonical_etype(etype)
start = (
self._typed_max_edge_ids[c_etype][partid - 1]
if partid > 0
......@@ -1134,20 +1177,23 @@ class RangePartitionBook(GraphPartitionBook):
"""
return self._local_etype_offset
def _to_canonical_etype(self, etype):
def to_canonical_etype(self, etype):
"""Convert an edge type to the corresponding canonical edge type.
If canonical etype is not available, no conversion is applied.
Parameters
----------
etype : str or (str, str, str)
The edge type
Returns
-------
(str, str, str)
The corresponding canonical edge type
"""
if isinstance(etype, tuple):
if etype not in self.canonical_etypes:
raise DGLError('Edge type "{}" does not exist.'.format(etype))
return etype
if not self._etype2canonical:
# canonical etype is not available, no conversion is applied.
# This is the case that 'etypes' passed in when instantiating
# are in format of str instead of (str, str, str).
# [TODO] Deprecate support for str etypes.
return etype
ret = self._etype2canonical.get(etype, None)
if ret is None:
raise DGLError('Edge type "{}" does not exist.'.format(etype))
......@@ -1161,6 +1207,7 @@ class RangePartitionBook(GraphPartitionBook):
NODE_PART_POLICY = "node"
EDGE_PART_POLICY = "edge"
POLICY_DELIMITER = '~'
class PartitionPolicy(object):
......@@ -1176,26 +1223,27 @@ class PartitionPolicy(object):
Parameters
----------
policy_str : str
Partition policy name, e.g., 'edge:_E' or 'node:_N'.
Partition policy name, e.g., 'edge~_N:_E:_N' or 'node~_N'.
partition_book : GraphPartitionBook
A graph partition book
"""
def __init__(self, policy_str, partition_book):
splits = policy_str.split(":")
if len(splits) == 1:
if POLICY_DELIMITER not in policy_str:
assert policy_str in (
EDGE_PART_POLICY,
NODE_PART_POLICY,
), "policy_str must contain 'edge' or 'node'."
if NODE_PART_POLICY == policy_str:
policy_str = NODE_PART_POLICY + ":" + DEFAULT_NTYPE
policy_str = NODE_PART_POLICY + POLICY_DELIMITER + DEFAULT_NTYPE
else:
policy_str = EDGE_PART_POLICY + ":" + DEFAULT_ETYPE[1]
policy_str = EDGE_PART_POLICY + POLICY_DELIMITER + DEFAULT_ETYPE[1]
self._policy_str = policy_str
self._part_id = partition_book.partid
self._partition_book = partition_book
self._type_name = _str_to_tuple(self.policy_str[5:])
part_policy, self._type_name = policy_str.split(POLICY_DELIMITER, 1)
if part_policy == EDGE_PART_POLICY:
self._type_name = _etype_str_to_tuple(self._type_name)
@property
def policy_str(self):
......@@ -1330,7 +1378,7 @@ class NodePartitionPolicy(PartitionPolicy):
def __init__(self, partition_book, ntype=DEFAULT_NTYPE):
super(NodePartitionPolicy, self).__init__(
NODE_PART_POLICY + ":" + ntype, partition_book
NODE_PART_POLICY + POLICY_DELIMITER + ntype, partition_book
)
......@@ -1338,8 +1386,11 @@ class EdgePartitionPolicy(PartitionPolicy):
"""Partition policy for edges."""
def __init__(self, partition_book, etype=DEFAULT_ETYPE):
assert isinstance(etype, tuple) and len(etype) == 3, \
f"Expect canonical edge type in a triplet of string, but got {etype}."
super(EdgePartitionPolicy, self).__init__(
EDGE_PART_POLICY + ":" + str(etype), partition_book
EDGE_PART_POLICY + POLICY_DELIMITER + _etype_tuple_to_str(etype),
partition_book
)
......@@ -1363,26 +1414,32 @@ class HeteroDataName(object):
def __init__(self, is_node, entity_type, data_name):
self._policy = NODE_PART_POLICY if is_node else EDGE_PART_POLICY
if not is_node:
assert isinstance(entity_type, tuple) and len(entity_type) == 3, \
f"Expect canonical edge type in a triplet of string, but got {entity_type}."
self._entity_type = entity_type
self.data_name = data_name
@property
def policy_str(self):
"""concatenate policy and entity type into string"""
return self._policy + ":" + str(self.get_type())
entity_type = self.get_type()
if self.is_edge():
entity_type = _etype_tuple_to_str(entity_type)
return self._policy + POLICY_DELIMITER + entity_type
def is_node(self):
"""Is this the name of node data"""
return NODE_PART_POLICY in self.policy_str
return self._policy == NODE_PART_POLICY
def is_edge(self):
"""Is this the name of edge data"""
return EDGE_PART_POLICY in self.policy_str
return self._policy == EDGE_PART_POLICY
def get_type(self):
"""The type of the node/edge.
This is only meaningful in a heterogeneous graph.
In homogeneous graph, type is '_N' for a node and '_E' for an edge.
In homogeneous graph, type is '_N' for a node and '_N:_E:_N' for an edge.
"""
return self._entity_type
......@@ -1395,7 +1452,7 @@ class HeteroDataName(object):
The full name is used as the key in the KVStore.
"""
return self.policy_str + ":" + self.data_name
return self.policy_str + POLICY_DELIMITER + self.data_name
def parse_hetero_data_name(name):
......@@ -1414,7 +1471,7 @@ def parse_hetero_data_name(name):
-------
HeteroDataName
"""
names = name.split(":")
names = name.split(POLICY_DELIMITER)
assert len(names) == 3, "{} is not a valid heterograph data name".format(
name
)
......@@ -1422,6 +1479,10 @@ def parse_hetero_data_name(name):
NODE_PART_POLICY,
EDGE_PART_POLICY,
), "{} is not a valid heterograph data name".format(name)
is_node = names[0] == NODE_PART_POLICY
entity_type = names[1]
if not is_node:
entity_type = _etype_str_to_tuple(entity_type)
return HeteroDataName(
names[0] == NODE_PART_POLICY, _str_to_tuple(names[1]), names[2]
is_node, entity_type, names[2]
)
......@@ -1141,7 +1141,7 @@ class KVClient(object):
for ntype in partition_book.ntypes:
policy = NodePartitionPolicy(partition_book, ntype)
self._all_possible_part_policy[policy.policy_str] = policy
for etype in partition_book.etypes:
for etype in partition_book.canonical_etypes:
policy = EdgePartitionPolicy(partition_book, etype)
self._all_possible_part_policy[policy.policy_str] = policy
......
......@@ -6,13 +6,23 @@ import time
import numpy as np
from .. import backend as F
from ..base import NID, EID, NTYPE, ETYPE, dgl_warning
from ..base import NID, EID, NTYPE, ETYPE, dgl_warning, DGLError
from ..convert import to_homogeneous
from ..random import choice as random_choice
from ..transforms import sort_csr_by_tag, sort_csc_by_tag
from ..data.utils import load_graphs, save_graphs, load_tensors, save_tensors
from ..partition import metis_partition_assignment, partition_graph_with_halo, get_peak_mem
from .graph_partition_book import BasicPartitionBook, RangePartitionBook
from ..partition import (
metis_partition_assignment,
partition_graph_with_halo,
get_peak_mem,
)
from .constants import DEFAULT_ETYPE, DEFAULT_NTYPE
from .graph_partition_book import (
BasicPartitionBook,
RangePartitionBook,
_etype_tuple_to_str,
_etype_str_to_tuple,
)
RESERVED_FIELD_DTYPE = {
'inner_node': F.uint8, # A flag indicates whether the node is inside a partition.
......@@ -24,6 +34,43 @@ RESERVED_FIELD_DTYPE = {
ETYPE: F.int32
}
def _format_part_metadata(part_metadata, formatter):
'''Format etypes with specified formatter.
'''
for key in ['edge_map', 'etypes']:
if key not in part_metadata:
continue
orig_data = part_metadata[key]
if not isinstance(orig_data, dict):
continue
new_data = {}
for etype, data in orig_data.items():
etype = formatter(etype)
new_data[etype] = data
part_metadata[key] = new_data
return part_metadata
def _load_part_config(part_config):
'''Load part config and format.
'''
try:
with open(part_config) as f:
part_metadata = _format_part_metadata(json.load(f),
_etype_str_to_tuple)
except AssertionError as e:
raise DGLError(f"Failed to load partition config due to {e}. "
"Probably caused by outdated config. If so, please refer to "
"https://github.com/dmlc/dgl/tree/master/tools#change-edge-"
"type-to-canonical-edge-type-for-partition-configuration-json")
return part_metadata
def _dump_part_config(part_config, part_metadata):
'''Format and dump part config.
'''
part_metadata = _format_part_metadata(part_metadata, _etype_tuple_to_str)
with open(part_config, 'w') as outfile:
json.dump(part_metadata, outfile, sort_keys=True, indent=4)
def _save_graphs(filename, g_list, formats=None, sort_etypes=False):
'''Preprocess partitions before saving:
1. format data types.
......@@ -102,7 +149,7 @@ def load_partition(part_config, part_id, load_feats=True):
The graph partition structure.
Dict[str, Tensor]
Node features.
Dict[str, Tensor]
Dict[(str, str, str), Tensor]
Edge features.
GraphPartitionBook
The graph partition information.
......@@ -110,7 +157,7 @@ def load_partition(part_config, part_id, load_feats=True):
The graph name
List[str]
The node types
List[str]
List[(str, str, str)]
The edge types
'''
config_path = os.path.dirname(part_config)
......@@ -207,7 +254,7 @@ def load_partition_feats(part_config, part_id, load_nodes=True, load_edges=True)
for name in node_feats:
feat = node_feats[name]
if name.find('/') == -1:
name = '_N/' + name
name = DEFAULT_NTYPE + '/' + name
new_feats[name] = feat
node_feats = new_feats
if edge_feats is not None:
......@@ -215,7 +262,7 @@ def load_partition_feats(part_config, part_id, load_nodes=True, load_edges=True)
for name in edge_feats:
feat = edge_feats[name]
if name.find('/') == -1:
name = '_E/' + name
name = _etype_tuple_to_str(DEFAULT_ETYPE) + '/' + name
new_feats[name] = feat
edge_feats = new_feats
......@@ -244,8 +291,7 @@ def load_partition_book(part_config, part_id, graph=None):
dict
The edge types
'''
with open(part_config) as conf_f:
part_metadata = json.load(conf_f)
part_metadata = _load_part_config(part_config)
assert 'num_parts' in part_metadata, 'num_parts does not exist.'
assert part_metadata['num_parts'] > part_id, \
'part {} is out of range (#parts: {})'.format(part_id, part_metadata['num_parts'])
......@@ -267,14 +313,14 @@ def load_partition_book(part_config, part_id, graph=None):
break
elif isinstance(node_map, list):
is_range_part = True
node_map = {'_N': node_map}
node_map = {DEFAULT_NTYPE: node_map}
else:
is_range_part = False
if isinstance(edge_map, list):
edge_map = {'_E': edge_map}
edge_map = {DEFAULT_ETYPE: edge_map}
ntypes = {'_N': 0}
etypes = {'_E': 0}
ntypes = {DEFAULT_NTYPE: 0}
etypes = {DEFAULT_ETYPE: 0}
if 'ntypes' in part_metadata:
ntypes = part_metadata['ntypes']
if 'etypes' in part_metadata:
......@@ -337,13 +383,13 @@ def _get_orig_ids(g, sim_g, reshuffle, orig_nids, orig_eids):
orig_nids = {ntype: F.boolean_mask(orig_nids, orig_ntype == g.get_ntype_id(ntype)) \
for ntype in g.ntypes}
orig_eids = {etype: F.boolean_mask(orig_eids, orig_etype == g.get_etype_id(etype)) \
for etype in g.etypes}
for etype in g.canonical_etypes}
elif not reshuffle and not is_hetero:
orig_nids = F.arange(0, sim_g.number_of_nodes())
orig_eids = F.arange(0, sim_g.number_of_edges())
elif not reshuffle:
orig_nids = {ntype: F.arange(0, g.number_of_nodes(ntype)) for ntype in g.ntypes}
orig_eids = {etype: F.arange(0, g.number_of_edges(etype)) for etype in g.etypes}
orig_eids = {etype: F.arange(0, g.number_of_edges(etype)) for etype in g.canonical_etypes}
return orig_nids, orig_eids
def _set_trainer_ids(g, sim_g, node_parts):
......@@ -371,10 +417,12 @@ def _set_trainer_ids(g, sim_g, node_parts):
trainer_id = F.zeros((len(orig_nid),), F.dtype(node_parts), F.cpu())
F.scatter_row_inplace(trainer_id, orig_nid, F.boolean_mask(node_parts, type_idx))
g.nodes[ntype].data['trainer_id'] = trainer_id
for _, etype, dst_type in g.canonical_etypes:
for c_etype in g.canonical_etypes:
# An edge is assigned to a partition based on its destination node.
trainer_id = F.gather_row(g.nodes[dst_type].data['trainer_id'], g.edges(etype=etype)[1])
g.edges[etype].data['trainer_id'] = trainer_id
_, _, dst_type = c_etype
trainer_id = F.gather_row(g.nodes[dst_type].data['trainer_id'],
g.edges(etype=c_etype)[1])
g.edges[c_etype].data['trainer_id'] = trainer_id
def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method="metis",
reshuffle=True, balance_ntypes=None, balance_edges=False, return_mapping=False,
......@@ -421,15 +469,15 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
"num_parts" : 2,
"halo_hops" : 1,
"node_map": {
"_U": [ [ 0, 1261310 ],
"_N": [ [ 0, 1261310 ],
[ 1261310, 2449029 ] ]
},
"edge_map": {
"_V": [ [ 0, 62539528 ],
"_N:_E:_N": [ [ 0, 62539528 ],
[ 62539528, 123718280 ] ]
},
"etypes": { "_V": 0 },
"ntypes": { "_U": 0 },
"etypes": { "_N:_E:_N": 0 },
"ntypes": { "_N": 0 },
"num_nodes" : 1000000,
"num_edges" : 52000000,
"part-0" : {
......@@ -483,8 +531,8 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
},
Essentially, ``node_map`` and ``edge_map`` are dictionaries. The keys are
node/edge types. The values are lists of pairs containing the start and end of
the ID range for the corresponding types in a partition.
node etypes and canonical edge types respectively. The values are lists of pairs
containing the start and end of the ID range for the corresponding types in a partition.
The length of the list is the number of
partitions; each element in the list is a tuple that stores the start and the end of
an ID range for a particular node/edge type in the partition.
......@@ -744,7 +792,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
parts[name].edata['inner_edge'] == 1)
inner_eids = F.boolean_mask(parts[name].edata[EID],
parts[name].edata['inner_edge'] == 1)
for etype in g.etypes:
for etype in g.canonical_etypes:
inner_etype_mask = inner_etype == g.get_etype_id(etype)
typed_eids = np.sort(F.asnumpy(F.boolean_mask(inner_eids, inner_etype_mask)))
assert np.all(typed_eids == np.arange(int(typed_eids[0]),
......@@ -794,7 +842,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
int(F.as_scalar(inner_nids[-1])) + 1])
val = np.cumsum(val).tolist()
assert val[-1] == g.number_of_nodes(ntype)
for etype in g.etypes:
for etype in g.canonical_etypes:
etype_id = g.get_etype_id(etype)
val = []
edge_map_val[etype] = []
......@@ -815,7 +863,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
inner_nids = F.boolean_mask(parts[0].ndata[NID], inner_node_mask)
node_map_val[ntype] = [[int(F.as_scalar(inner_nids[0])),
int(F.as_scalar(inner_nids[-1])) + 1]]
for etype in g.etypes:
for etype in g.canonical_etypes:
etype_id = g.get_etype_id(etype)
inner_edge_mask = _get_inner_edge_mask(parts[0], etype_id)
inner_eids = F.boolean_mask(parts[0].edata[EID], inner_edge_mask)
......@@ -832,7 +880,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
start = time.time()
ntypes = {ntype:g.get_ntype_id(ntype) for ntype in g.ntypes}
etypes = {etype:g.get_etype_id(etype) for etype in g.etypes}
etypes = {etype:g.get_etype_id(etype) for etype in g.canonical_etypes}
part_metadata = {'graph_name': graph_name,
'num_nodes': g.number_of_nodes(),
'num_edges': g.number_of_edges(),
......@@ -874,7 +922,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
node_feats[ntype + '/' + name] = F.gather_row(g.nodes[ntype].data[name],
local_nodes)
for etype in g.etypes:
for etype in g.canonical_etypes:
etype_id = g.get_etype_id(etype)
edata_name = 'orig_id' if reshuffle else EID
inner_edge_mask = _get_inner_edge_mask(part, etype_id)
......@@ -893,8 +941,8 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
for name in g.edges[etype].data:
if name in [EID, 'inner_edge']:
continue
edge_feats[etype + '/' + name] = F.gather_row(g.edges[etype].data[name],
local_edges)
edge_feats[_etype_tuple_to_str(etype) + '/' + name] = F.gather_row(
g.edges[etype].data[name], local_edges)
else:
for ntype in g.ntypes:
if reshuffle and len(g.ntypes) > 1:
......@@ -914,7 +962,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
local_nodes)
else:
node_feats[ntype + '/' + name] = g.nodes[ntype].data[name]
for etype in g.etypes:
for etype in g.canonical_etypes:
if reshuffle and not g.is_homogeneous:
edata_name = 'orig_id'
etype_id = g.get_etype_id(etype)
......@@ -928,10 +976,11 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
if name in [EID, 'inner_edge']:
continue
if reshuffle:
edge_feats[etype + '/' + name] = F.gather_row(g.edges[etype].data[name],
local_edges)
edge_feats[_etype_tuple_to_str(etype) + '/' + name] = F.gather_row(
g.edges[etype].data[name], local_edges)
else:
edge_feats[etype + '/' + name] = g.edges[etype].data[name]
edge_feats[_etype_tuple_to_str(etype) + '/' + name] = \
g.edges[etype].data[name]
# delete `orig_id` from ndata/edata
if reshuffle:
del part.ndata['orig_id']
......@@ -955,8 +1004,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
print('Save partitions: {:.3f} seconds, peak memory: {:.3f} GB'.format(
time.time() - start, get_peak_mem()))
with open('{}/{}.json'.format(out_path, graph_name), 'w') as outfile:
json.dump(part_metadata, outfile, sort_keys=True, indent=4)
_dump_part_config(f'{out_path}/{graph_name}.json', part_metadata)
num_cuts = sim_g.number_of_edges() - tot_num_inner_edges
if num_parts == 1:
......
......@@ -13,10 +13,12 @@ from dgl.partition import NDArrayPartition
)
@parametrize_idtype
def test_get_node_partition_from_book(idtype):
node_map = {"type_n": F.tensor([[0, 3], [4, 5], [6, 10]], dtype=idtype)}
edge_map = {"type_e": F.tensor([[0, 9], [10, 15], [16, 25]], dtype=idtype)}
node_map = {'_N': F.tensor([[0, 3], [4, 5], [6, 10]], dtype=idtype)}
edge_map = {('_N', '_E', '_N'): F.tensor([[0, 9], [10, 15], [16, 25]], dtype=idtype)}
ntypes = {ntype: i for i, ntype in enumerate(node_map)}
etypes = {etype: i for i, etype in enumerate(edge_map)}
book = gpb.RangePartitionBook(
0, 3, node_map, edge_map, {"type_n": 0}, {"type_e": 0}
0, 3, node_map, edge_map, ntypes, etypes
)
partition = gpb.get_node_partition_from_book(book, F.ctx())
assert partition.num_parts() == 3
......
......@@ -190,15 +190,16 @@ def check_rpc_hetero_find_edges_shuffle(tmpdir, num_server):
time.sleep(1)
pserver_list.append(p)
eids = F.tensor(np.random.randint(g.num_edges('r12'), size=100))
test_etype = g.to_canonical_etype('r12')
eids = F.tensor(np.random.randint(g.num_edges(test_etype), size=100))
expect_except = False
try:
_, _ = g.find_edges(orig_eid['r12'][eids], etype=('n1', 'r12'))
_, _ = g.find_edges(orig_eid[test_etype][eids], etype=('n1', 'r12'))
except:
expect_except = True
assert expect_except
u, v = g.find_edges(orig_eid['r12'][eids], etype='r12')
u1, v1 = g.find_edges(orig_eid['r12'][eids], etype=('n1', 'r12', 'n2'))
u, v = g.find_edges(orig_eid[test_etype][eids], etype='r12')
u1, v1 = g.find_edges(orig_eid[test_etype][eids], etype=('n1', 'r12', 'n2'))
assert F.array_equal(u, u1)
assert F.array_equal(v, v1)
du, dv = start_find_edges_client(0, tmpdir, num_server > 1, eids, etype='r12')
......@@ -394,7 +395,8 @@ def check_rpc_hetero_sampling_shuffle(tmpdir, num_server):
for p in pserver_list:
p.join()
for src_type, etype, dst_type in block.canonical_etypes:
for c_etype in block.canonical_etypes:
src_type, etype, dst_type = c_etype
src, dst = block.edges(etype=etype)
# These are global Ids after shuffling.
shuffled_src = F.gather_row(block.srcnodes[src_type].data[dgl.NID], src)
......@@ -403,7 +405,7 @@ def check_rpc_hetero_sampling_shuffle(tmpdir, num_server):
orig_src = F.asnumpy(F.gather_row(orig_nid_map[src_type], shuffled_src))
orig_dst = F.asnumpy(F.gather_row(orig_nid_map[dst_type], shuffled_dst))
orig_eid = F.asnumpy(F.gather_row(orig_eid_map[etype], shuffled_eid))
orig_eid = F.asnumpy(F.gather_row(orig_eid_map[c_etype], shuffled_eid))
# Check the node Ids and edge Ids.
orig_src1, orig_dst1 = g.find_edges(orig_eid, etype=etype)
......@@ -484,7 +486,8 @@ def check_rpc_hetero_etype_sampling_shuffle(tmpdir, num_server, graph_formats=No
src, dst = block.edges(etype=('n2', 'r23', 'n3'))
assert len(src) == 18
for src_type, etype, dst_type in block.canonical_etypes:
for c_etype in block.canonical_etypes:
src_type, etype, dst_type = c_etype
src, dst = block.edges(etype=etype)
# These are global Ids after shuffling.
shuffled_src = F.gather_row(block.srcnodes[src_type].data[dgl.NID], src)
......@@ -493,7 +496,7 @@ def check_rpc_hetero_etype_sampling_shuffle(tmpdir, num_server, graph_formats=No
orig_src = F.asnumpy(F.gather_row(orig_nid_map[src_type], shuffled_src))
orig_dst = F.asnumpy(F.gather_row(orig_nid_map[dst_type], shuffled_dst))
orig_eid = F.asnumpy(F.gather_row(orig_eid_map[etype], shuffled_eid))
orig_eid = F.asnumpy(F.gather_row(orig_eid_map[c_etype], shuffled_eid))
# Check the node Ids and edge Ids.
orig_src1, orig_dst1 = g.find_edges(orig_eid, etype=etype)
......@@ -652,7 +655,8 @@ def check_rpc_bipartite_sampling_shuffle(tmpdir, num_server):
for p in pserver_list:
p.join()
for src_type, etype, dst_type in block.canonical_etypes:
for c_etype in block.canonical_etypes:
src_type, etype, dst_type = c_etype
src, dst = block.edges(etype=etype)
# These are global Ids after shuffling.
shuffled_src = F.gather_row(
......@@ -665,7 +669,7 @@ def check_rpc_bipartite_sampling_shuffle(tmpdir, num_server):
orig_nid_map[src_type], shuffled_src))
orig_dst = F.asnumpy(F.gather_row(
orig_nid_map[dst_type], shuffled_dst))
orig_eid = F.asnumpy(F.gather_row(orig_eid_map[etype], shuffled_eid))
orig_eid = F.asnumpy(F.gather_row(orig_eid_map[c_etype], shuffled_eid))
# Check the node Ids and edge Ids.
orig_src1, orig_dst1 = g.find_edges(orig_eid, etype=etype)
......@@ -736,7 +740,8 @@ def check_rpc_bipartite_etype_sampling_shuffle(tmpdir, num_server):
for p in pserver_list:
p.join()
for src_type, etype, dst_type in block.canonical_etypes:
for c_etype in block.canonical_etypes:
src_type, etype, dst_type = c_etype
src, dst = block.edges(etype=etype)
# These are global Ids after shuffling.
shuffled_src = F.gather_row(
......@@ -749,7 +754,7 @@ def check_rpc_bipartite_etype_sampling_shuffle(tmpdir, num_server):
orig_nid_map[src_type], shuffled_src))
orig_dst = F.asnumpy(F.gather_row(
orig_nid_map[dst_type], shuffled_dst))
orig_eid = F.asnumpy(F.gather_row(orig_eid_map[etype], shuffled_eid))
orig_eid = F.asnumpy(F.gather_row(orig_eid_map[c_etype], shuffled_eid))
# Check the node Ids and edge Ids.
orig_src1, orig_dst1 = g.find_edges(orig_eid, etype=etype)
......
......@@ -41,11 +41,11 @@ gpb = dgl.distributed.graph_partition_book.BasicPartitionBook(
)
node_policy = dgl.distributed.PartitionPolicy(
policy_str="node:_N", partition_book=gpb
policy_str="node~_N", partition_book=gpb
)
edge_policy = dgl.distributed.PartitionPolicy(
policy_str="edge:_E", partition_book=gpb
policy_str="edge~_N:_E:_N", partition_book=gpb
)
data_0 = F.tensor(
......@@ -127,15 +127,15 @@ def start_server(server_id, num_clients, num_servers):
kvserver.add_part_policy(node_policy)
kvserver.add_part_policy(edge_policy)
if kvserver.is_backup_server():
kvserver.init_data("data_0", "node:_N")
kvserver.init_data("data_0_1", "node:_N")
kvserver.init_data("data_0_2", "node:_N")
kvserver.init_data("data_0_3", "node:_N")
kvserver.init_data("data_0", "node~_N")
kvserver.init_data("data_0_1", "node~_N")
kvserver.init_data("data_0_2", "node~_N")
kvserver.init_data("data_0_3", "node~_N")
else:
kvserver.init_data("data_0", "node:_N", data_0)
kvserver.init_data("data_0_1", "node:_N", data_0_1)
kvserver.init_data("data_0_2", "node:_N", data_0_2)
kvserver.init_data("data_0_3", "node:_N", data_0_3)
kvserver.init_data("data_0", "node~_N", data_0)
kvserver.init_data("data_0_1", "node~_N", data_0_1)
kvserver.init_data("data_0_2", "node~_N", data_0_2)
kvserver.init_data("data_0_3", "node~_N", data_0_3)
# start server
server_state = dgl.distributed.ServerState(
kv_store=kvserver, local_g=None, partition_book=None
......@@ -159,9 +159,9 @@ def start_server_mul_role(server_id, num_clients, num_servers):
)
kvserver.add_part_policy(node_policy)
if kvserver.is_backup_server():
kvserver.init_data("data_0", "node:_N")
kvserver.init_data("data_0", "node~_N")
else:
kvserver.init_data("data_0", "node:_N", data_0)
kvserver.init_data("data_0", "node~_N", data_0)
# start server
server_state = dgl.distributed.ServerState(
kv_store=kvserver, local_g=None, partition_book=None
......@@ -214,37 +214,37 @@ def start_client(num_clients, num_servers):
dtype, shape, policy = meta
assert dtype == F.dtype(data_0)
assert shape == F.shape(data_0)
assert policy.policy_str == "node:_N"
assert policy.policy_str == "node~_N"
meta = kvclient.get_data_meta("data_0_1")
dtype, shape, policy = meta
assert dtype == F.dtype(data_0_1)
assert shape == F.shape(data_0_1)
assert policy.policy_str == "node:_N"
assert policy.policy_str == "node~_N"
meta = kvclient.get_data_meta("data_0_2")
dtype, shape, policy = meta
assert dtype == F.dtype(data_0_2)
assert shape == F.shape(data_0_2)
assert policy.policy_str == "node:_N"
assert policy.policy_str == "node~_N"
meta = kvclient.get_data_meta("data_0_3")
dtype, shape, policy = meta
assert dtype == F.dtype(data_0_3)
assert shape == F.shape(data_0_3)
assert policy.policy_str == "node:_N"
assert policy.policy_str == "node~_N"
meta = kvclient.get_data_meta("data_1")
dtype, shape, policy = meta
assert dtype == F.dtype(data_1)
assert shape == F.shape(data_1)
assert policy.policy_str == "edge:_E"
assert policy.policy_str == "edge~_N:_E:_N"
meta = kvclient.get_data_meta("data_2")
dtype, shape, policy = meta
assert dtype == F.dtype(data_2)
assert shape == F.shape(data_2)
assert policy.policy_str == "node:_N"
assert policy.policy_str == "node~_N"
# Test push and pull
id_tensor = F.tensor([0, 2, 4], F.int64)
......
......@@ -5,17 +5,25 @@ import dgl
import numpy as np
import pytest
from dgl import function as fn
from dgl.distributed import (load_partition, load_partition_feats,
partition_graph)
from dgl.distributed.graph_partition_book import (BasicPartitionBook,
from dgl.distributed import (
load_partition,
load_partition_feats,
partition_graph,
)
from dgl.distributed.graph_partition_book import (
DEFAULT_ETYPE,
DEFAULT_NTYPE,
BasicPartitionBook,
EdgePartitionPolicy,
HeteroDataName,
NodePartitionPolicy,
RangePartitionBook)
RangePartitionBook,
_etype_tuple_to_str,
)
from dgl.distributed.partition import (
RESERVED_FIELD_DTYPE,
_get_inner_edge_mask,
_get_inner_node_mask,
_get_inner_edge_mask
)
from scipy import sparse as spsp
from utils import reset_envs
......@@ -47,7 +55,12 @@ def create_random_graph(n):
def create_random_hetero():
num_nodes = {"n1": 1000, "n2": 1010, "n3": 1020}
etypes = [("n1", "r1", "n2"), ("n1", "r2", "n3"), ("n2", "r3", "n3")]
etypes = [
("n1", "r1", "n2"),
("n2", "r1", "n1"),
("n1", "r2", "n3"),
("n2", "r3", "n3"),
]
edges = {}
for etype in etypes:
src_ntype, _, dst_ntype = etype
......@@ -64,16 +77,16 @@ def create_random_hetero():
def verify_hetero_graph(g, parts):
num_nodes = {ntype: 0 for ntype in g.ntypes}
num_edges = {etype: 0 for etype in g.etypes}
num_edges = {etype: 0 for etype in g.canonical_etypes}
for part in parts:
assert len(g.ntypes) == len(F.unique(part.ndata[dgl.NTYPE]))
assert len(g.etypes) == len(F.unique(part.edata[dgl.ETYPE]))
assert len(g.canonical_etypes) == len(F.unique(part.edata[dgl.ETYPE]))
for ntype in g.ntypes:
ntype_id = g.get_ntype_id(ntype)
inner_node_mask = _get_inner_node_mask(part, ntype_id)
num_inner_nodes = F.sum(F.astype(inner_node_mask, F.int64), 0)
num_nodes[ntype] += num_inner_nodes
for etype in g.etypes:
for etype in g.canonical_etypes:
etype_id = g.get_etype_id(etype)
inner_edge_mask = _get_inner_edge_mask(part, etype_id)
num_inner_edges = F.sum(F.astype(inner_edge_mask, F.int64), 0)
......@@ -87,7 +100,7 @@ def verify_hetero_graph(g, parts):
)
assert g.number_of_nodes(ntype) == num_nodes[ntype]
# Verify the number of edges are correct.
for etype in g.etypes:
for etype in g.canonical_etypes:
print(
"edge {}: {}, {}".format(
etype, g.number_of_edges(etype), num_edges[etype]
......@@ -96,12 +109,12 @@ def verify_hetero_graph(g, parts):
assert g.number_of_edges(etype) == num_edges[etype]
nids = {ntype: [] for ntype in g.ntypes}
eids = {etype: [] for etype in g.etypes}
eids = {etype: [] for etype in g.canonical_etypes}
for part in parts:
_, _, eid = part.edges(form="all")
etype_arr = F.gather_row(part.edata[dgl.ETYPE], eid)
eid_type = F.gather_row(part.edata[dgl.EID], eid)
for etype in g.etypes:
for etype in g.canonical_etypes:
etype_id = g.get_etype_id(etype)
eids[etype].append(F.boolean_mask(eid_type, etype_arr == etype_id))
# Make sure edge Ids fall into a range.
......@@ -163,7 +176,7 @@ def verify_graph_feats(
ndata = F.gather_row(node_feats[ntype + "/" + name], local_nids)
assert np.all(F.asnumpy(ndata == true_feats))
for etype in g.etypes:
for etype in g.canonical_etypes:
etype_id = g.get_etype_id(etype)
inner_edge_mask = _get_inner_edge_mask(part, etype_id)
inner_eids = F.boolean_mask(part.edata[dgl.EID], inner_edge_mask)
......@@ -179,7 +192,9 @@ def verify_graph_feats(
if name in [dgl.EID, "inner_edge"]:
continue
true_feats = F.gather_row(g.edges[etype].data[name], orig_id)
edata = F.gather_row(edge_feats[etype + "/" + name], local_eids)
edata = F.gather_row(
edge_feats[_etype_tuple_to_str(etype) + "/" + name], local_eids
)
assert np.all(F.asnumpy(edata == true_feats))
......@@ -191,14 +206,16 @@ def check_hetero_partition(
load_feats=True,
graph_formats=None,
):
hg.nodes["n1"].data["labels"] = F.arange(0, hg.number_of_nodes("n1"))
hg.nodes["n1"].data["feats"] = F.tensor(
np.random.randn(hg.number_of_nodes("n1"), 10), F.float32
test_ntype = "n1"
test_etype = ("n1", "r1", "n2")
hg.nodes[test_ntype].data["labels"] = F.arange(0, hg.num_nodes(test_ntype))
hg.nodes[test_ntype].data["feats"] = F.tensor(
np.random.randn(hg.num_nodes(test_ntype), 10), F.float32
)
hg.edges["r1"].data["feats"] = F.tensor(
np.random.randn(hg.number_of_edges("r1"), 10), F.float32
hg.edges[test_etype].data["feats"] = F.tensor(
np.random.randn(hg.num_edges(test_etype), 10), F.float32
)
hg.edges["r1"].data["labels"] = F.arange(0, hg.number_of_edges("r1"))
hg.edges[test_etype].data["labels"] = F.arange(0, hg.num_edges(test_etype))
num_hops = 1
orig_nids, orig_eids = partition_graph(
......@@ -214,10 +231,10 @@ def check_hetero_partition(
graph_formats=graph_formats,
)
assert len(orig_nids) == len(hg.ntypes)
assert len(orig_eids) == len(hg.etypes)
assert len(orig_eids) == len(hg.canonical_etypes)
for ntype in hg.ntypes:
assert len(orig_nids[ntype]) == hg.number_of_nodes(ntype)
for etype in hg.etypes:
for etype in hg.canonical_etypes:
assert len(orig_eids[etype]) == hg.number_of_edges(etype)
parts = []
shuffled_labels = []
......@@ -243,8 +260,8 @@ def check_hetero_partition(
)
assert np.all(F.asnumpy(part_ids) == i)
for etype in hg.etypes:
name = etype + "/trainer_id"
for etype in hg.canonical_etypes:
name = _etype_tuple_to_str(etype) + "/trainer_id"
assert name in edge_feats
part_ids = F.floor_div(
edge_feats[name], num_trainers_per_machine
......@@ -262,7 +279,7 @@ def check_hetero_partition(
dst_ntype_ids, part_dst_ids = gpb.map_to_per_ntype(part_dst_ids)
etype_ids, part_eids = gpb.map_to_per_etype(part_eids)
# These are original per-type IDs.
for etype_id, etype in enumerate(hg.etypes):
for etype_id, etype in enumerate(hg.canonical_etypes):
part_src_ids1 = F.boolean_mask(part_src_ids, etype_ids == etype_id)
src_ntype_ids1 = F.boolean_mask(
src_ntype_ids, etype_ids == etype_id
......@@ -287,8 +304,10 @@ def check_hetero_partition(
hg, gpb, part_g, node_feats, edge_feats, orig_nids, orig_eids
)
shuffled_labels.append(node_feats["n1/labels"])
shuffled_elabels.append(edge_feats["r1/labels"])
shuffled_labels.append(node_feats[test_ntype + "/labels"])
shuffled_elabels.append(
edge_feats[_etype_tuple_to_str(test_etype) + "/labels"]
)
verify_hetero_graph(hg, parts)
shuffled_labels = F.asnumpy(F.cat(shuffled_labels, 0))
......@@ -297,10 +316,12 @@ def check_hetero_partition(
orig_elabels = np.zeros(
shuffled_elabels.shape, dtype=shuffled_elabels.dtype
)
orig_labels[F.asnumpy(orig_nids["n1"])] = shuffled_labels
orig_elabels[F.asnumpy(orig_eids["r1"])] = shuffled_elabels
assert np.all(orig_labels == F.asnumpy(hg.nodes["n1"].data["labels"]))
assert np.all(orig_elabels == F.asnumpy(hg.edges["r1"].data["labels"]))
orig_labels[F.asnumpy(orig_nids[test_ntype])] = shuffled_labels
orig_elabels[F.asnumpy(orig_eids[test_etype])] = shuffled_elabels
assert np.all(orig_labels == F.asnumpy(hg.nodes[test_ntype].data["labels"]))
assert np.all(
orig_elabels == F.asnumpy(hg.edges[test_etype].data["labels"])
)
def check_partition(
......@@ -339,7 +360,7 @@ def check_partition(
shuffled_labels = []
shuffled_edata = []
for i in range(num_parts):
part_g, node_feats, edge_feats, gpb, _, ntypes, etypes = load_partition(
part_g, node_feats, edge_feats, gpb, _, _, _ = load_partition(
"/tmp/partition/test.json", i, load_feats=load_feats
)
_verify_partition_data_types(part_g)
......@@ -359,8 +380,8 @@ def check_partition(
)
assert np.all(F.asnumpy(part_ids) == i)
for etype in g.etypes:
name = etype + "/trainer_id"
for etype in g.canonical_etypes:
name = _etype_tuple_to_str(etype) + "/trainer_id"
assert name in edge_feats
part_ids = F.floor_div(
edge_feats[name], num_trainers_per_machine
......@@ -460,16 +481,17 @@ def check_partition(
ndata = F.gather_row(node_feats["_N/" + name], local_nid)
assert np.all(F.asnumpy(true_feats) == F.asnumpy(ndata))
for name in ["feats"]:
assert "_E/" + name in edge_feats
assert edge_feats["_E/" + name].shape[0] == len(local_edges)
efeat_name = _etype_tuple_to_str(DEFAULT_ETYPE) + "/" + name
assert efeat_name in edge_feats
assert edge_feats[efeat_name].shape[0] == len(local_edges)
true_feats = F.gather_row(g.edata[name], local_edges)
edata = F.gather_row(edge_feats["_E/" + name], local_eid)
edata = F.gather_row(edge_feats[efeat_name], local_eid)
assert np.all(F.asnumpy(true_feats) == F.asnumpy(edata))
# This only works if node/edge IDs are shuffled.
if reshuffle:
shuffled_labels.append(node_feats["_N/labels"])
shuffled_edata.append(edge_feats["_E/feats"])
shuffled_edata.append(edge_feats["_N:_E:_N/feats"])
# Verify that we can reconstruct node/edge data for original IDs.
if reshuffle:
......@@ -555,31 +577,38 @@ def test_BasicPartitionBook():
node_policy = NodePartitionPolicy(gpb, "_N")
assert node_policy.type_name == "_N"
expect_except = False
try:
edge_policy = EdgePartitionPolicy(gpb, "_E")
assert edge_policy.type_name == "_E"
except AssertionError:
expect_except = True
assert expect_except
edge_policy = EdgePartitionPolicy(gpb, c_etype)
assert edge_policy.type_name == c_etype
def test_RangePartitionBook():
part_id = 0
num_parts = 2
# homogeneous
node_map = {"_N": F.tensor([[0, 1000], [1000, 2000]])}
edge_map = {"_E": F.tensor([[0, 5000], [5000, 10000]])}
ntypes = {"_N": 0}
etypes = {"_E": 0}
node_map = {DEFAULT_NTYPE: F.tensor([[0, 1000], [1000, 2000]])}
edge_map = {DEFAULT_ETYPE: F.tensor([[0, 5000], [5000, 10000]])}
ntypes = {DEFAULT_NTYPE: 0}
etypes = {DEFAULT_ETYPE: 0}
gpb = RangePartitionBook(
part_id, num_parts, node_map, edge_map, ntypes, etypes
)
assert gpb.etypes == ["_E"]
assert gpb.canonical_etypes == [None]
assert gpb._to_canonical_etype("_E") == "_E"
assert gpb.etypes == [DEFAULT_ETYPE[1]]
assert gpb.canonical_etypes == [DEFAULT_ETYPE]
assert gpb.to_canonical_etype(DEFAULT_ETYPE[1]) == DEFAULT_ETYPE
node_policy = NodePartitionPolicy(gpb, "_N")
assert node_policy.type_name == "_N"
edge_policy = EdgePartitionPolicy(gpb, "_E")
assert edge_policy.type_name == "_E"
node_policy = NodePartitionPolicy(gpb, DEFAULT_NTYPE)
assert node_policy.type_name == DEFAULT_NTYPE
edge_policy = EdgePartitionPolicy(gpb, DEFAULT_ETYPE)
assert edge_policy.type_name == DEFAULT_ETYPE
# heterogeneous, init via etype
# Init via etype is not supported
node_map = {
"node1": F.tensor([[0, 1000], [1000, 2000]]),
"node2": F.tensor([[0, 1000], [1000, 2000]]),
......@@ -587,17 +616,20 @@ def test_RangePartitionBook():
edge_map = {"edge1": F.tensor([[0, 5000], [5000, 10000]])}
ntypes = {"node1": 0, "node2": 1}
etypes = {"edge1": 0}
gpb = RangePartitionBook(
expect_except = False
try:
RangePartitionBook(
part_id, num_parts, node_map, edge_map, ntypes, etypes
)
assert gpb.etypes == ["edge1"]
assert gpb.canonical_etypes == [None]
assert gpb._to_canonical_etype("edge1") == "edge1"
node_policy = NodePartitionPolicy(gpb, "node1")
assert node_policy.type_name == "node1"
edge_policy = EdgePartitionPolicy(gpb, "edge1")
assert edge_policy.type_name == "edge1"
except AssertionError:
expect_except = True
assert expect_except
expect_except = False
try:
EdgePartitionPolicy(gpb, "edge1")
except AssertionError:
expect_except = True
assert expect_except
# heterogeneous, init via canonical etype
node_map = {
......@@ -615,18 +647,18 @@ def test_RangePartitionBook():
)
assert gpb.etypes == ["edge1"]
assert gpb.canonical_etypes == [c_etype]
assert gpb._to_canonical_etype("edge1") == c_etype
assert gpb._to_canonical_etype(c_etype) == c_etype
assert gpb.to_canonical_etype("edge1") == c_etype
assert gpb.to_canonical_etype(c_etype) == c_etype
expect_except = False
try:
gpb._to_canonical_etype(("node1", "edge2", "node2"))
except dgl.DGLError:
gpb.to_canonical_etype(("node1", "edge2", "node2"))
except:
expect_except = True
assert expect_except
expect_except = False
try:
gpb._to_canonical_etype("edge2")
except dgl.DGLError:
gpb.to_canonical_etype("edge2")
except:
expect_except = True
assert expect_except
......@@ -635,7 +667,11 @@ def test_RangePartitionBook():
edge_policy = EdgePartitionPolicy(gpb, c_etype)
assert edge_policy.type_name == c_etype
data_name = HeteroDataName(False, "edge1", "edge1")
assert data_name.get_type() == "edge1"
data_name = HeteroDataName(False, c_etype, "edge1")
expect_except = False
try:
HeteroDataName(False, "edge1", "feat")
except:
expect_except = True
assert expect_except
data_name = HeteroDataName(False, c_etype, "feat")
assert data_name.get_type() == c_etype
......@@ -53,6 +53,9 @@ def create_chunked_dataset(
('author', 'affiliated_with', 'institution'): rand_edges(
num_authors, num_institutions, num_affiliate_edges
),
('institution', 'writes', 'paper'): rand_edges(
num_institutions, num_papers, num_write_edges
),
}
src, dst = data_dict[('author', 'writes', 'paper')]
data_dict[('paper', 'rev_writes', 'author')] = (dst, src)
......@@ -65,7 +68,7 @@ def create_chunked_dataset(
paper_label = np.random.choice(num_classes, num_papers)
paper_year = np.random.choice(2022, num_papers)
paper_orig_ids = np.arange(0, num_papers)
writes_orig_ids = np.arange(0, g.num_edges('writes'))
writes_orig_ids = np.arange(0, num_write_edges)
# masks.
if include_masks:
......@@ -84,11 +87,12 @@ def create_chunked_dataset(
# Edge features.
cite_count = np.random.choice(10, num_cite_edges)
write_year = np.random.choice(2022, num_write_edges)
write2_year = np.random.choice(2022, num_write_edges)
# Save features.
input_dir = os.path.join(root_dir, 'data_test')
os.makedirs(input_dir)
for sub_d in ['paper', 'cites', 'writes']:
for sub_d in ['paper', 'cites', 'writes', 'writes2']:
os.makedirs(os.path.join(input_dir, sub_d))
paper_feat_path = os.path.join(input_dir, 'paper/feat.npy')
......@@ -119,13 +123,18 @@ def create_chunked_dataset(
write_year_path = os.path.join(input_dir, 'writes/year.npy')
with open(write_year_path, 'wb') as f:
np.save(f, write_year)
g.edges['writes'].data['year'] = torch.from_numpy(write_year)
g.edges[('author', 'writes', 'paper')].data['year'] = torch.from_numpy(write_year)
g.edges['rev_writes'].data['year'] = torch.from_numpy(write_year)
writes_orig_ids_path = os.path.join(input_dir, 'writes/orig_ids.npy')
with open(writes_orig_ids_path, 'wb') as f:
np.save(f, writes_orig_ids)
g.edges['writes'].data['orig_ids'] = torch.from_numpy(writes_orig_ids)
g.edges[('author', 'writes', 'paper')].data['orig_ids'] = torch.from_numpy(writes_orig_ids)
write2_year_path = os.path.join(input_dir, 'writes2/year.npy')
with open(write2_year_path, 'wb') as f:
np.save(f, write2_year)
g.edges[('institution', 'writes', 'paper')].data['year'] = torch.from_numpy(write2_year)
node_data = None
if include_masks:
......@@ -206,11 +215,14 @@ def create_chunked_dataset(
edge_data = {
'cites': {'count': cite_count_path},
'writes': {
('author', 'writes', 'paper'): {
'year': write_year_path,
'orig_ids': writes_orig_ids_path
},
'rev_writes': {'year': write_year_path},
('institution', 'writes', 'paper'): {
'year': write2_year_path,
},
}
output_dir = os.path.join(root_dir, 'chunked-data')
......
......@@ -16,6 +16,7 @@ from dgl.distributed.partition import (
load_partition,
_get_inner_node_mask,
_get_inner_edge_mask,
_etype_tuple_to_str,
)
......@@ -58,8 +59,7 @@ def _verify_graph_feats(
ndata = node_feats[ntype + "/" + name][local_nids]
assert torch.equal(ndata, true_feats)
for c_etype in g.canonical_etypes:
etype = c_etype[1]
for etype in g.canonical_etypes:
etype_id = g.get_etype_id(etype)
inner_edge_mask = _get_inner_edge_mask(part, etype_id)
inner_eids = part.edata[dgl.EID][inner_edge_mask]
......@@ -68,14 +68,14 @@ def _verify_graph_feats(
assert np.all(etype_ids.numpy() == etype_id)
assert np.all(partid.numpy() == gpb.partid)
orig_id = orig_eids[etype][inner_type_eids]
orig_id = orig_eids[_etype_tuple_to_str(etype)][inner_type_eids]
local_eids = gpb.eid2localeid(inner_type_eids, gpb.partid, etype)
for name in g.edges[etype].data:
if name in [dgl.EID, "inner_edge"]:
continue
true_feats = g.edges[etype].data[name][orig_id]
edata = edge_feats[etype + "/" + name][local_eids]
edata = edge_feats[_etype_tuple_to_str(etype) + "/" + name][local_eids]
assert torch.equal(edata, true_feats)
......@@ -86,14 +86,6 @@ def test_chunk_graph(num_chunks):
g = create_chunked_dataset(root_dir, num_chunks)
num_cite_edges = g.number_of_edges("cites")
num_write_edges = g.number_of_edges("writes")
num_affiliate_edges = g.number_of_edges("affiliated_with")
num_institutions = g.number_of_nodes("institution")
num_authors = g.number_of_nodes("author")
num_papers = g.number_of_nodes("paper")
# check metadata.json
output_dir = os.path.join(root_dir, "chunked-data")
json_file = os.path.join(output_dir, "metadata.json")
......@@ -105,66 +97,46 @@ def test_chunk_graph(num_chunks):
# check edge_index
output_edge_index_dir = os.path.join(output_dir, "edge_index")
for utype, etype, vtype in g.canonical_etypes:
fname = ":".join([utype, etype, vtype])
for c_etype in g.canonical_etypes:
c_etype_str = _etype_tuple_to_str(c_etype)
for i in range(num_chunks):
chunk_f_name = os.path.join(
output_edge_index_dir, fname + str(i) + ".txt"
fname = os.path.join(
output_edge_index_dir, f'{c_etype_str}{i}.txt'
)
assert os.path.isfile(chunk_f_name)
with open(chunk_f_name, "r") as f:
assert os.path.isfile(fname)
with open(fname, "r") as f:
header = f.readline()
num1, num2 = header.rstrip().split(" ")
assert isinstance(int(num1), int)
assert isinstance(int(num2), int)
# check node_data
output_node_data_dir = os.path.join(output_dir, "node_data", "paper")
for feat in ["feat", "label", "year", "orig_ids"]:
feat_data = []
# check node/edge_data
def test_data(sub_dir, feat, expected_data, expected_shape):
data = []
for i in range(num_chunks):
chunk_f_name = "{}-{}.npy".format(feat, i)
chunk_f_name = os.path.join(output_node_data_dir, chunk_f_name)
assert os.path.isfile(chunk_f_name)
feat_array = np.load(chunk_f_name)
assert feat_array.shape[0] == num_papers // num_chunks
feat_data.append(feat_array)
feat_data = np.concatenate(feat_data, 0)
assert torch.equal(torch.from_numpy(feat_data), g.nodes['paper'].data[feat])
# check edge_data
num_edges = {
"paper:cites:paper": num_cite_edges,
"author:writes:paper": num_write_edges,
"paper:rev_writes:author": num_write_edges,
}
fname = os.path.join(sub_dir, f'{feat}-{i}.npy')
assert os.path.isfile(fname)
feat_array = np.load(fname)
assert feat_array.shape[0] == expected_shape
data.append(feat_array)
data = np.concatenate(data, 0)
assert torch.equal(torch.from_numpy(data), expected_data)
output_node_data_dir = os.path.join(output_dir, "node_data")
for ntype in g.ntypes:
sub_dir = os.path.join(output_node_data_dir, ntype)
for feat, data in g.nodes[ntype].data.items():
test_data(sub_dir, feat, data, g.num_nodes(ntype) // num_chunks)
output_edge_data_dir = os.path.join(output_dir, "edge_data")
for etype, feat in [
["paper:cites:paper", "count"],
["author:writes:paper", "year"],
["author:writes:paper", "orig_ids"],
["paper:rev_writes:author", "year"],
]:
feat_data = []
output_edge_sub_dir = os.path.join(output_edge_data_dir, etype)
for i in range(num_chunks):
chunk_f_name = "{}-{}.npy".format(feat, i)
chunk_f_name = os.path.join(output_edge_sub_dir, chunk_f_name)
assert os.path.isfile(chunk_f_name)
feat_array = np.load(chunk_f_name)
assert feat_array.shape[0] == num_edges[etype] // num_chunks
feat_data.append(feat_array)
feat_data = np.concatenate(feat_data, 0)
assert torch.equal(torch.from_numpy(feat_data),
g.edges[etype.split(':')[1]].data[feat])
@pytest.mark.parametrize("num_chunks", [1, 3, 8])
@pytest.mark.parametrize("num_parts", [1, 3, 8])
@pytest.mark.parametrize(
"graph_formats", [None, "csc", "coo,csc", "coo,csc,csr"]
)
def test_part_pipeline(num_chunks, num_parts, graph_formats):
for c_etype in g.canonical_etypes:
c_etype_str = _etype_tuple_to_str(c_etype)
sub_dir = os.path.join(output_edge_data_dir, c_etype_str)
for feat, data in g.edges[c_etype].data.items():
test_data(sub_dir, feat, data, g.num_edges(c_etype) // num_chunks)
def _test_pipeline(num_chunks, num_parts, graph_formats=None):
if num_chunks < num_parts:
# num_parts should less/equal than num_chunks
return
......@@ -235,3 +207,17 @@ def test_part_pipeline(num_chunks, num_parts, graph_formats):
_verify_graph_feats(
g, gpb, part_g, node_feats, edge_feats, orig_nids, orig_eids
)
@pytest.mark.parametrize("num_chunks", [1, 3, 4, 8])
@pytest.mark.parametrize("num_parts", [1, 3, 4, 8])
def test_pipeline_basics(num_chunks, num_parts):
_test_pipeline(num_chunks, num_parts)
@pytest.mark.parametrize(
"graph_formats", [None, "csc", "coo,csc", "coo,csc,csr"]
)
def test_pipeline_formats(graph_formats):
_test_pipeline(4, 4, graph_formats)
......@@ -10,7 +10,6 @@ from utils import array_readwriter, setdir
import dgl
def chunk_numpy_array(arr, fmt_meta, chunk_sizes, path_fmt):
paths = []
offset = 0
......@@ -150,7 +149,7 @@ def _chunk_graph(g, name, ndata_paths, edata_paths, num_chunks, output_path):
metadata_path = "metadata.json"
with open(metadata_path, "w") as f:
json.dump(metadata, f)
json.dump(metadata, f, sort_keys=True, indent=4)
logging.info("Saved metadata in %s" % os.path.abspath(metadata_path))
......@@ -170,9 +169,9 @@ def chunk_graph(g, name, ndata_paths, edata_paths, num_chunks, output_path):
ndata_paths : dict[str, pathlike] or dict[ntype, dict[str, pathlike]]
The dictionary of paths pointing to the corresponding numpy array file for each
node data key.
edata_paths : dict[str, pathlike] or dict[etype, dict[str, pathlike]]
edata_paths : dict[etype, pathlike] or dict[etype, dict[str, pathlike]]
The dictionary of paths pointing to the corresponding numpy array file for each
edge data key.
edge data key. ``etype`` could be canonical or non-canonical.
num_chunks : int
The number of chunks
output_path : pathlike
......
......@@ -14,7 +14,11 @@ from pyarrow import csv
import constants
from utils import get_idranges, memory_snapshot, read_json
from dgl.distributed.partition import RESERVED_FIELD_DTYPE
from dgl.distributed.partition import (
RESERVED_FIELD_DTYPE,
_etype_str_to_tuple,
_etype_tuple_to_str,
)
def create_dgl_object(schema, part_id, node_data, edge_data, edgeid_offset,
......@@ -121,10 +125,10 @@ def create_dgl_object(schema, part_id, node_data, edge_data, edgeid_offset,
etypes = [(key, global_eid_ranges[key][0, 0]) for key in global_eid_ranges]
etypes.sort(key=lambda e: e[1])
etypes = [e[0] for e in etypes]
etypes_map = {e.split(":")[1]: i for i, e in enumerate(etypes)}
etypes_map = {_etype_str_to_tuple(e): i for i, e in enumerate(etypes)}
node_map_val = {ntype: [] for ntype in ntypes}
edge_map_val = {etype.split(":")[1]: [] for etype in etypes}
edge_map_val = {_etype_str_to_tuple(etype): [] for etype in etypes}
memory_snapshot("CreateDGLObj_AssignNodeData", part_id)
shuffle_global_nids = node_data[constants.SHUFFLE_GLOBAL_NID]
......@@ -194,10 +198,10 @@ def create_dgl_object(schema, part_id, node_data, edge_data, edgeid_offset,
# Determine the edge ID range of different edge types.
edge_id_start = edgeid_offset
for etype_name in global_eid_ranges:
tokens = etype_name.split(":")
assert len(tokens) == 3
etype_id = etypes_map[tokens[1]]
edge_map_val[tokens[1]].append([edge_id_start,
etype = _etype_str_to_tuple(etype_name)
assert len(etype) == 3
etype_id = etypes_map[etype]
edge_map_val[etype].append([edge_id_start,
edge_id_start + np.sum(etype_ids == etype_id)])
edge_id_start += np.sum(etype_ids == etype_id)
memory_snapshot("CreateDGLObj_UniqueNodeIds: ", part_id)
......@@ -303,7 +307,7 @@ def create_dgl_object(schema, part_id, node_data, edge_data, edgeid_offset,
for etype, etype_id in etypes_map.items():
mask = th.logical_and(part_graph.edata[dgl.ETYPE] == etype_id,
part_graph.edata['inner_edge'])
orig_eids[etype] = th.as_tensor(global_edge_id[mask])
orig_eids[_etype_tuple_to_str(etype)] = th.as_tensor(global_edge_id[mask])
return part_graph, node_map_val, edge_map_val, ntypes_map, etypes_map, \
......
......@@ -10,7 +10,9 @@ import torch
from pyarrow import csv
import constants
from dgl.distributed.partition import (
_dump_part_config
)
def read_ntype_partition_files(schema_map, input_dir):
"""
......@@ -234,8 +236,7 @@ def write_metadata_json(metadata_list, output_dir, graph_name):
for i in range(len(metadata_list)):
graph_metadata["part-{}".format(i)] = metadata_list[i]["part-{}".format(i)]
with open('{}/metadata.json'.format(output_dir), 'w') as outfile:
json.dump(graph_metadata, outfile, sort_keys=False, indent=4)
_dump_part_config(f'{output_dir}/metadata.json', graph_metadata)
def augment_edge_data(edge_data, lookup_service, edge_tids, rank, world_size):
"""
......@@ -377,15 +378,6 @@ def write_edge_features(edge_features, edge_file):
edge_file : string
File in which the edge information is serialized
"""
# TODO[Rui]: Below is a temporary fix for etype and will be
# further refined in the near future as we'll shift to canonical
# etypes entirely.
def format_etype(etype):
etype, name = etype.split('/')
etype = etype.split(':')[1]
return etype + '/' + name
edge_features = {format_etype(etype):
data for etype, data in edge_features.items()}
dgl.data.utils.save_tensors(edge_file, edge_features)
def write_graph_dgl(graph_file, graph_obj, formats, sort_etypes):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment