"torchvision/transforms/_functional_tensor.py" did not exist on "dbbc5c8e93fbd3b301c7f76d3798d3041ca4bdaa"
Unverified Commit 3a2a5031 authored by peizhou001's avatar peizhou001 Committed by GitHub
Browse files

[API Deprecation]Deprecate candidates in dgl.distributed (#5116)

parent ab0c0ec6
...@@ -430,7 +430,7 @@ class DistGraph: ...@@ -430,7 +430,7 @@ class DistGraph:
The example shows the creation of ``DistGraph`` in the standalone mode. The example shows the creation of ``DistGraph`` in the standalone mode.
>>> dgl.distributed.partition_graph(g, 'graph_name', 1, num_hops=1, part_method='metis', >>> dgl.distributed.partition_graph(g, 'graph_name', 1, num_hops=1, part_method='metis',
... out_path='output/', reshuffle=True) ... out_path='output/')
>>> g = dgl.distributed.DistGraph('graph_name', part_config='output/graph_name.json') >>> g = dgl.distributed.DistGraph('graph_name', part_config='output/graph_name.json')
The example shows the creation of ``DistGraph`` in the distributed mode. The example shows the creation of ``DistGraph`` in the distributed mode.
...@@ -513,7 +513,7 @@ class DistGraph: ...@@ -513,7 +513,7 @@ class DistGraph:
assert self._client is not None, \ assert self._client is not None, \
'Distributed module is not initialized. Please call dgl.distributed.initialize.' 'Distributed module is not initialized. Please call dgl.distributed.initialize.'
self._g = _get_graph_from_shared_mem(self.graph_name) self._g = _get_graph_from_shared_mem(self.graph_name)
self._gpb = get_shared_mem_partition_book(self.graph_name, self._g) self._gpb = get_shared_mem_partition_book(self.graph_name)
if self._gpb is None: if self._gpb is None:
self._gpb = gpb self._gpb = gpb
self._client.map_shared_data(self._gpb) self._client.map_shared_data(self._gpb)
......
...@@ -8,7 +8,7 @@ import numpy as np ...@@ -8,7 +8,7 @@ import numpy as np
from .. import backend as F from .. import backend as F
from .. import utils from .. import utils
from .._ffi.ndarray import empty_shared_mem from .._ffi.ndarray import empty_shared_mem
from ..base import EID, NID, DGLError from ..base import DGLError
from ..ndarray import exist_shared_mem_array from ..ndarray import exist_shared_mem_array
from ..partition import NDArrayPartition from ..partition import NDArrayPartition
from .constants import DEFAULT_ETYPE, DEFAULT_NTYPE from .constants import DEFAULT_ETYPE, DEFAULT_NTYPE
...@@ -175,7 +175,7 @@ def _get_shared_mem_metadata(graph_name): ...@@ -175,7 +175,7 @@ def _get_shared_mem_metadata(graph_name):
return is_range_part, part_id, num_partitions, node_map, edge_map return is_range_part, part_id, num_partitions, node_map, edge_map
def get_shared_mem_partition_book(graph_name, graph_part): def get_shared_mem_partition_book(graph_name):
"""Get a graph partition book from shared memory. """Get a graph partition book from shared memory.
A graph partition book of a specific graph can be serialized to shared memory. A graph partition book of a specific graph can be serialized to shared memory.
...@@ -185,8 +185,6 @@ def get_shared_mem_partition_book(graph_name, graph_part): ...@@ -185,8 +185,6 @@ def get_shared_mem_partition_book(graph_name, graph_part):
---------- ----------
graph_name : str graph_name : str
The name of the graph. The name of the graph.
graph_part : DGLGraph
The graph structure of a partition.
Returns Returns
------- -------
...@@ -225,9 +223,7 @@ def get_shared_mem_partition_book(graph_name, graph_part): ...@@ -225,9 +223,7 @@ def get_shared_mem_partition_book(graph_name, graph_part):
part_id, num_parts, node_map, edge_map, ntypes, etypes part_id, num_parts, node_map, edge_map, ntypes, etypes
) )
else: else:
return BasicPartitionBook( raise TypeError("Only RangePartitionBook is supported currently.")
part_id, num_parts, node_map_data, edge_map_data, graph_part
)
def get_node_partition_from_book(book, device): def get_node_partition_from_book(book, device):
...@@ -278,14 +274,10 @@ class GraphPartitionBook(ABC): ...@@ -278,14 +274,10 @@ class GraphPartitionBook(ABC):
* the node IDs and the edge IDs that a partition has. * the node IDs and the edge IDs that a partition has.
* the local IDs of nodes and edges in a partition. * the local IDs of nodes and edges in a partition.
Currently, there are two classes that implement ``GraphPartitionBook``: Currently, only one class that implement ``GraphPartitionBook``
``BasicGraphPartitionBook`` and ``RangePartitionBook``. ``BasicGraphPartitionBook`` :``RangePartitionBook``. It calculates the mapping between node/edge IDs
stores the mappings between every individual node/edge ID and partition ID on and partition IDs based on some small metadata because nodes/edges have been
every machine, which usually consumes a lot of memory, while ``RangePartitionBook`` relabeled to have IDs in the same partition fall in a contiguous ID range.
calculates the mapping between node/edge IDs and partition IDs based on some small
metadata because nodes/edges have been relabeled to have IDs in the same partition
fall in a contiguous ID range. ``RangePartitionBook`` is usually a preferred way to
provide mappings between node/edge IDs and partition IDs.
A graph partition book is constructed automatically when a graph is partitioned. A graph partition book is constructed automatically when a graph is partitioned.
When a graph partition is loaded, a graph partition book is loaded as well. When a graph partition is loaded, a graph partition book is loaded as well.
...@@ -541,262 +533,6 @@ class GraphPartitionBook(ABC): ...@@ -541,262 +533,6 @@ class GraphPartitionBook(ABC):
Homogeneous edge IDs. Homogeneous edge IDs.
""" """
class BasicPartitionBook(GraphPartitionBook):
"""This provides the most flexible way to store parition information.
The partition book maintains the mapping of every single node IDs and edge IDs to
partition IDs. This is very flexible at the coast of large memory consumption.
On a large graph, the mapping consumes significant memory and this partition book
is not recommended.
Parameters
----------
part_id : int
partition ID of current partition book
num_parts : int
number of total partitions
node_map : tensor
global node ID mapping to partition ID
edge_map : tensor
global edge ID mapping to partition ID
part_graph : DGLGraph
The graph partition structure.
"""
def __init__(self, part_id, num_parts, node_map, edge_map, part_graph):
assert part_id >= 0, "part_id cannot be a negative number."
assert num_parts > 0, "num_parts must be greater than zero."
self._part_id = int(part_id)
self._num_partitions = int(num_parts)
self._nid2partid = F.tensor(node_map)
assert (
F.dtype(self._nid2partid) == F.int64
), "the node map must be stored in an integer array"
self._eid2partid = F.tensor(edge_map)
assert (
F.dtype(self._eid2partid) == F.int64
), "the edge map must be stored in an integer array"
# Get meta data of the partition book.
self._partition_meta_data = []
_, nid_count = np.unique(
F.asnumpy(self._nid2partid), return_counts=True
)
_, eid_count = np.unique(
F.asnumpy(self._eid2partid), return_counts=True
)
for partid in range(self._num_partitions):
part_info = {}
part_info["machine_id"] = partid
part_info["num_nodes"] = int(nid_count[partid])
part_info["num_edges"] = int(eid_count[partid])
self._partition_meta_data.append(part_info)
# Get partid2nids
self._partid2nids = []
sorted_nid = F.tensor(np.argsort(F.asnumpy(self._nid2partid)))
start = 0
for offset in nid_count:
part_nids = sorted_nid[start : start + offset]
start += offset
self._partid2nids.append(part_nids)
# Get partid2eids
self._partid2eids = []
sorted_eid = F.tensor(np.argsort(F.asnumpy(self._eid2partid)))
start = 0
for offset in eid_count:
part_eids = sorted_eid[start : start + offset]
start += offset
self._partid2eids.append(part_eids)
# Get nidg2l
self._nidg2l = [None] * self._num_partitions
global_id = part_graph.ndata[NID]
max_global_id = np.amax(F.asnumpy(global_id))
# TODO(chao): support int32 index
g2l = F.zeros((max_global_id + 1), F.int64, F.context(global_id))
g2l = F.scatter_row(g2l, global_id, F.arange(0, len(global_id)))
self._nidg2l[self._part_id] = g2l
# Get eidg2l
self._eidg2l = [None] * self._num_partitions
global_id = part_graph.edata[EID]
max_global_id = np.amax(F.asnumpy(global_id))
# TODO(chao): support int32 index
g2l = F.zeros((max_global_id + 1), F.int64, F.context(global_id))
g2l = F.scatter_row(g2l, global_id, F.arange(0, len(global_id)))
self._eidg2l[self._part_id] = g2l
# node size and edge size
self._edge_size = len(self.partid2eids(self._part_id))
self._node_size = len(self.partid2nids(self._part_id))
def shared_memory(self, graph_name):
"""Move data to shared memory."""
(
self._meta,
self._nid2partid,
self._eid2partid,
) = _move_metadata_to_shared_mem(
graph_name,
self._num_nodes(),
self._num_edges(),
self._part_id,
self._num_partitions,
self._nid2partid,
self._eid2partid,
False,
)
def num_partitions(self):
"""Return the number of partitions."""
return self._num_partitions
def metadata(self):
"""Return the partition meta data."""
return self._partition_meta_data
def _num_nodes(self, ntype=DEFAULT_NTYPE):
"""The total number of nodes"""
assert (
ntype == DEFAULT_NTYPE
), "Base partition book only supports homogeneous graph."
return len(self._nid2partid)
def _num_edges(self, etype=DEFAULT_ETYPE):
"""The total number of edges"""
assert etype in (
DEFAULT_ETYPE,
DEFAULT_ETYPE[1],
), "Base partition book only supports homogeneous graph."
return len(self._eid2partid)
def map_to_per_ntype(self, ids):
"""Map global homogeneous node IDs to node type IDs.
Returns
type_ids, per_type_ids
"""
return F.zeros((len(ids),), F.int32, F.cpu()), ids
def map_to_per_etype(self, ids):
"""Map global homogeneous edge IDs to edge type IDs.
Returns
type_ids, per_type_ids
"""
return F.zeros((len(ids),), F.int32, F.cpu()), ids
def map_to_homo_nid(self, ids, ntype=DEFAULT_NTYPE):
"""Map per-node-type IDs to global node IDs in the homogeneous format."""
assert (
ntype == DEFAULT_NTYPE
), "Base partition book only supports homogeneous graph."
return ids
def map_to_homo_eid(self, ids, etype=DEFAULT_ETYPE):
"""Map per-edge-type IDs to global edge IDs in the homoenegeous format."""
assert etype in (
DEFAULT_ETYPE,
DEFAULT_ETYPE[1],
), "Base partition book only supports homogeneous graph."
return ids
def nid2partid(self, nids, ntype=DEFAULT_NTYPE):
"""From global node IDs to partition IDs"""
assert (
ntype == DEFAULT_NTYPE
), "Base partition book only supports homogeneous graph."
return F.gather_row(self._nid2partid, nids)
def eid2partid(self, eids, etype=DEFAULT_ETYPE):
"""From global edge IDs to partition IDs"""
assert etype in (
DEFAULT_ETYPE,
DEFAULT_ETYPE[1],
), "Base partition book only supports homogeneous graph."
return F.gather_row(self._eid2partid, eids)
def partid2nids(self, partid, ntype=DEFAULT_NTYPE):
"""From partition id to global node IDs"""
assert (
ntype == DEFAULT_NTYPE
), "Base partition book only supports homogeneous graph."
return self._partid2nids[partid]
def partid2eids(self, partid, etype=DEFAULT_ETYPE):
"""From partition id to global edge IDs"""
assert etype in (
DEFAULT_ETYPE,
DEFAULT_ETYPE[1],
), "Base partition book only supports homogeneous graph."
return self._partid2eids[partid]
def nid2localnid(self, nids, partid, ntype=DEFAULT_NTYPE):
"""Get local node IDs within the given partition."""
assert (
ntype == DEFAULT_NTYPE
), "Base partition book only supports homogeneous graph."
if partid != self._part_id:
raise RuntimeError(
"Now GraphPartitionBook does not support \
getting remote tensor of nid2localnid."
)
return F.gather_row(self._nidg2l[partid], nids)
def eid2localeid(self, eids, partid, etype=DEFAULT_ETYPE):
"""Get the local edge ids within the given partition."""
assert etype in (
DEFAULT_ETYPE,
DEFAULT_ETYPE[1],
), "Base partition book only supports homogeneous graph."
if partid != self._part_id:
raise RuntimeError(
"Now GraphPartitionBook does not support \
getting remote tensor of eid2localeid."
)
return F.gather_row(self._eidg2l[partid], eids)
@property
def partid(self):
"""Get the current partition ID"""
return self._part_id
@property
def ntypes(self):
"""Get the list of node types"""
return [DEFAULT_NTYPE]
@property
def etypes(self):
"""Get the list of edge types"""
return [DEFAULT_ETYPE[1]]
@property
def canonical_etypes(self):
"""Get the list of canonical edge types
Returns
-------
list[(str, str, str)]
A list of canonical etypes
"""
return [DEFAULT_ETYPE]
def to_canonical_etype(self, etype):
"""Convert an edge type to the corresponding canonical edge type.
Parameters
----------
etype : str or (str, str, str)
The edge type
Returns
-------
(str, str, str)
The corresponding canonical edge type
"""
assert etype in (
DEFAULT_ETYPE,
DEFAULT_ETYPE[1],
), "Base partition book only supports homogeneous graph."
return self.canonical_etypes[0]
class RangePartitionBook(GraphPartitionBook): class RangePartitionBook(GraphPartitionBook):
"""This partition book supports more efficient storage of partition information. """This partition book supports more efficient storage of partition information.
......
...@@ -6,7 +6,7 @@ import time ...@@ -6,7 +6,7 @@ import time
import numpy as np import numpy as np
from .. import backend as F from .. import backend as F
from ..base import NID, EID, NTYPE, ETYPE, dgl_warning, DGLError from ..base import NID, EID, NTYPE, ETYPE, DGLError
from ..convert import to_homogeneous from ..convert import to_homogeneous
from ..random import choice as random_choice from ..random import choice as random_choice
from ..transforms import sort_csr_by_tag, sort_csc_by_tag from ..transforms import sort_csr_by_tag, sort_csc_by_tag
...@@ -18,7 +18,6 @@ from ..partition import ( ...@@ -18,7 +18,6 @@ from ..partition import (
) )
from .constants import DEFAULT_ETYPE, DEFAULT_NTYPE from .constants import DEFAULT_ETYPE, DEFAULT_NTYPE
from .graph_partition_book import ( from .graph_partition_book import (
BasicPartitionBook,
RangePartitionBook, RangePartitionBook,
_etype_tuple_to_str, _etype_tuple_to_str,
_etype_str_to_tuple, _etype_str_to_tuple,
...@@ -173,7 +172,7 @@ def load_partition(part_config, part_id, load_feats=True): ...@@ -173,7 +172,7 @@ def load_partition(part_config, part_id, load_feats=True):
assert NID in graph.ndata, "the partition graph should contain node mapping to global node ID" assert NID in graph.ndata, "the partition graph should contain node mapping to global node ID"
assert EID in graph.edata, "the partition graph should contain edge mapping to global edge ID" assert EID in graph.edata, "the partition graph should contain edge mapping to global edge ID"
gpb, graph_name, ntypes, etypes = load_partition_book(part_config, part_id, graph) gpb, graph_name, ntypes, etypes = load_partition_book(part_config, part_id)
ntypes_list = list(ntypes.keys()) ntypes_list = list(ntypes.keys())
etypes_list = list(etypes.keys()) etypes_list = list(etypes.keys())
if 'DGL_DIST_DEBUG' in os.environ: if 'DGL_DIST_DEBUG' in os.environ:
...@@ -268,7 +267,7 @@ def load_partition_feats(part_config, part_id, load_nodes=True, load_edges=True) ...@@ -268,7 +267,7 @@ def load_partition_feats(part_config, part_id, load_nodes=True, load_edges=True)
return node_feats, edge_feats return node_feats, edge_feats
def load_partition_book(part_config, part_id, graph=None): def load_partition_book(part_config, part_id):
'''Load a graph partition book from the partition config file. '''Load a graph partition book from the partition config file.
Parameters Parameters
...@@ -277,8 +276,6 @@ def load_partition_book(part_config, part_id, graph=None): ...@@ -277,8 +276,6 @@ def load_partition_book(part_config, part_id, graph=None):
The path of the partition config file. The path of the partition config file.
part_id : int part_id : int
The partition ID. The partition ID.
graph : DGLGraph
The graph structure
Returns Returns
------- -------
...@@ -333,18 +330,15 @@ def load_partition_book(part_config, part_id, graph=None): ...@@ -333,18 +330,15 @@ def load_partition_book(part_config, part_id, graph=None):
for key in edge_map: for key in edge_map:
assert key in etypes, 'The edge type {} is invalid'.format(key) assert key in etypes, 'The edge type {} is invalid'.format(key)
if is_range_part: if not is_range_part:
node_map = _get_part_ranges(node_map) raise TypeError("Only RangePartitionBook is supported currently.")
edge_map = _get_part_ranges(edge_map)
return RangePartitionBook(part_id, num_parts, node_map, edge_map, ntypes, etypes), \ node_map = _get_part_ranges(node_map)
part_metadata['graph_name'], ntypes, etypes edge_map = _get_part_ranges(edge_map)
else: return RangePartitionBook(part_id, num_parts, node_map, edge_map, ntypes, etypes), \
node_map = np.load(node_map) part_metadata['graph_name'], ntypes, etypes
edge_map = np.load(edge_map)
return BasicPartitionBook(part_id, num_parts, node_map, edge_map, graph), \
part_metadata['graph_name'], ntypes, etypes
def _get_orig_ids(g, sim_g, reshuffle, orig_nids, orig_eids): def _get_orig_ids(g, sim_g, orig_nids, orig_eids):
'''Convert/construct the original node IDs and edge IDs. '''Convert/construct the original node IDs and edge IDs.
It handles multiple cases: It handles multiple cases:
...@@ -361,8 +355,6 @@ def _get_orig_ids(g, sim_g, reshuffle, orig_nids, orig_eids): ...@@ -361,8 +355,6 @@ def _get_orig_ids(g, sim_g, reshuffle, orig_nids, orig_eids):
The input graph for partitioning. The input graph for partitioning.
sim_g : DGLGraph sim_g : DGLGraph
The homogeneous version of the input graph. The homogeneous version of the input graph.
reshuffle : bool
Whether the input graph is reshuffled during partitioning.
orig_nids : tensor or None orig_nids : tensor or None
The original node IDs after the input graph is reshuffled. The original node IDs after the input graph is reshuffled.
orig_eids : tensor or None orig_eids : tensor or None
...@@ -373,23 +365,17 @@ def _get_orig_ids(g, sim_g, reshuffle, orig_nids, orig_eids): ...@@ -373,23 +365,17 @@ def _get_orig_ids(g, sim_g, reshuffle, orig_nids, orig_eids):
tensor or dict of tensors, tensor or dict of tensors tensor or dict of tensors, tensor or dict of tensors
''' '''
is_hetero = not g.is_homogeneous is_hetero = not g.is_homogeneous
if reshuffle and is_hetero: if is_hetero:
# Get the type IDs # Get the type IDs
orig_ntype = F.gather_row(sim_g.ndata[NTYPE], orig_nids) orig_ntype = F.gather_row(sim_g.ndata[NTYPE], orig_nids)
orig_etype = F.gather_row(sim_g.edata[ETYPE], orig_eids) orig_etype = F.gather_row(sim_g.edata[ETYPE], orig_eids)
# Mapping between shuffled global IDs to original per-type IDs # Mapping between shuffled global IDs to original per-type IDs
orig_nids = F.gather_row(sim_g.ndata[NID], orig_nids) orig_nids = F.gather_row(sim_g.ndata[NID], orig_nids)
orig_eids = F.gather_row(sim_g.edata[EID], orig_eids) orig_eids = F.gather_row(sim_g.edata[EID], orig_eids)
orig_nids = {ntype: F.boolean_mask(orig_nids, orig_ntype == g.get_ntype_id(ntype)) \ orig_nids = {ntype: F.boolean_mask(orig_nids, orig_ntype == g.get_ntype_id(ntype)) \
for ntype in g.ntypes} for ntype in g.ntypes}
orig_eids = {etype: F.boolean_mask(orig_eids, orig_etype == g.get_etype_id(etype)) \ orig_eids = {etype: F.boolean_mask(orig_eids, orig_etype == g.get_etype_id(etype)) \
for etype in g.canonical_etypes} for etype in g.canonical_etypes}
elif not reshuffle and not is_hetero:
orig_nids = F.arange(0, sim_g.number_of_nodes())
orig_eids = F.arange(0, sim_g.number_of_edges())
elif not reshuffle:
orig_nids = {ntype: F.arange(0, g.number_of_nodes(ntype)) for ntype in g.ntypes}
orig_eids = {etype: F.arange(0, g.number_of_edges(etype)) for etype in g.canonical_etypes}
return orig_nids, orig_eids return orig_nids, orig_eids
def _set_trainer_ids(g, sim_g, node_parts): def _set_trainer_ids(g, sim_g, node_parts):
...@@ -425,9 +411,8 @@ def _set_trainer_ids(g, sim_g, node_parts): ...@@ -425,9 +411,8 @@ def _set_trainer_ids(g, sim_g, node_parts):
g.edges[c_etype].data['trainer_id'] = trainer_id g.edges[c_etype].data['trainer_id'] = trainer_id
def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method="metis", def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method="metis",
reshuffle=True, balance_ntypes=None, balance_edges=False, return_mapping=False, balance_ntypes=None, balance_edges=False, return_mapping=False,
num_trainers_per_machine=1, objtype='cut', num_trainers_per_machine=1, objtype='cut', graph_formats=None):
graph_formats=None):
''' Partition a graph for distributed training and store the partitions on files. ''' Partition a graph for distributed training and store the partitions on files.
The partitioning occurs in three steps: 1) run a partition algorithm (e.g., Metis) to The partitioning occurs in three steps: 1) run a partition algorithm (e.g., Metis) to
...@@ -506,16 +491,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method= ...@@ -506,16 +491,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
* ``num_edges`` is the number of edges in the global graph. * ``num_edges`` is the number of edges in the global graph.
* `part-*` stores the data of a partition. * `part-*` stores the data of a partition.
If ``reshuffle=False``, node IDs and edge IDs of a partition do not fall into contiguous As node/edge IDs are reshuffled, ``node_map`` and ``edge_map`` contains the information
ID ranges. In this case, DGL stores node/edge mappings (from
node/edge IDs to partition IDs) in separate files (node_map.npy and edge_map.npy).
The node/edge mappings are stored in numpy files.
.. warning::
this format is deprecated and will not be supported by the next release. In other words,
the future release will always shuffle node IDs and edge IDs when partitioning a graph.
If ``reshuffle=True``, ``node_map`` and ``edge_map`` contains the information
for mapping between global node/edge IDs to partition-local node/edge IDs. for mapping between global node/edge IDs to partition-local node/edge IDs.
For heterogeneous graphs, the information in ``node_map`` and ``edge_map`` can also be used For heterogeneous graphs, the information in ``node_map`` and ``edge_map`` can also be used
to compute node types and edge types. The format of the data in ``node_map`` and ``edge_map`` to compute node types and edge types. The format of the data in ``node_map`` and ``edge_map``
...@@ -583,10 +559,6 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method= ...@@ -583,10 +559,6 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
The default value is 1. The default value is 1.
part_method : str, optional part_method : str, optional
The partition method. It supports "random" and "metis". The default value is "metis". The partition method. It supports "random" and "metis". The default value is "metis".
reshuffle : bool, optional
Reshuffle nodes and edges so that nodes and edges in a partition are in
contiguous ID range. The default value is True. The argument is deprecated
and will be removed in the next release.
balance_ntypes : tensor, optional balance_ntypes : tensor, optional
Node type of each node. This is a 1D-array of integers. Its values indicates the node Node type of each node. This is a 1D-array of integers. Its values indicates the node
type of each node. This argument is used by Metis partition. When the argument is type of each node. This argument is used by Metis partition. When the argument is
...@@ -597,8 +569,8 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method= ...@@ -597,8 +569,8 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
Indicate whether to balance the edges in each partition. This argument is used by Indicate whether to balance the edges in each partition. This argument is used by
the Metis algorithm. the Metis algorithm.
return_mapping : bool return_mapping : bool
If `reshuffle=True`, this indicates to return the mapping between shuffled node/edge IDs Indicate whether to return the mapping between shuffled node/edge IDs and the original
and the original node/edge IDs. node/edge IDs.
num_trainers_per_machine : int, optional num_trainers_per_machine : int, optional
The number of trainers per machine. If is not 1, the whole graph will be first partitioned The number of trainers per machine. If is not 1, the whole graph will be first partitioned
to each trainer, that is num_parts*num_trainers_per_machine parts. And the trainer ids of to each trainer, that is num_parts*num_trainers_per_machine parts. And the trainer ids of
...@@ -630,7 +602,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method= ...@@ -630,7 +602,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
Examples Examples
-------- --------
>>> dgl.distributed.partition_graph(g, 'test', 4, num_hops=1, part_method='metis', >>> dgl.distributed.partition_graph(g, 'test', 4, num_hops=1, part_method='metis',
... out_path='output/', reshuffle=True, ... out_path='output/',
... balance_ntypes=g.ndata['train_mask'], ... balance_ntypes=g.ndata['train_mask'],
... balance_edges=True) ... balance_edges=True)
>>> ( >>> (
...@@ -679,10 +651,6 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method= ...@@ -679,10 +651,6 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
if objtype not in ['cut', 'vol']: if objtype not in ['cut', 'vol']:
raise ValueError raise ValueError
if not reshuffle:
dgl_warning("The argument reshuffle will be deprecated in the next release. "
"For heterogeneous graphs, reshuffle must be enabled.")
if num_parts == 1: if num_parts == 1:
start = time.time() start = time.time()
sim_g, balance_ntypes = get_homogeneous(g, balance_ntypes) sim_g, balance_ntypes = get_homogeneous(g, balance_ntypes)
...@@ -708,11 +676,17 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method= ...@@ -708,11 +676,17 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
orig_eids = parts[0].edata[EID] = F.arange(0, sim_g.number_of_edges()) orig_eids = parts[0].edata[EID] = F.arange(0, sim_g.number_of_edges())
# For one partition, we don't really shuffle nodes and edges. We just need to simulate # For one partition, we don't really shuffle nodes and edges. We just need to simulate
# it and set node data and edge data of orig_id. # it and set node data and edge data of orig_id.
if reshuffle: parts[0].ndata['orig_id'] = orig_nids
parts[0].ndata['orig_id'] = orig_nids parts[0].edata['orig_id'] = orig_eids
parts[0].edata['orig_id'] = orig_eids
if return_mapping: if return_mapping:
orig_nids, orig_eids = _get_orig_ids(g, sim_g, False, orig_nids, orig_eids) if g.is_homogeneous:
orig_nids = F.arange(0, sim_g.number_of_nodes())
orig_eids = F.arange(0, sim_g.number_of_edges())
else:
orig_nids = {ntype: F.arange(0, g.number_of_nodes(ntype))
for ntype in g.ntypes}
orig_eids = {etype: F.arange(0, g.number_of_edges(etype))
for etype in g.canonical_etypes}
parts[0].ndata['inner_node'] = F.ones((sim_g.number_of_nodes(),), parts[0].ndata['inner_node'] = F.ones((sim_g.number_of_nodes(),),
RESERVED_FIELD_DTYPE['inner_node'], F.cpu()) RESERVED_FIELD_DTYPE['inner_node'], F.cpu())
parts[0].edata['inner_edge'] = F.ones((sim_g.number_of_edges(),), parts[0].edata['inner_edge'] = F.ones((sim_g.number_of_edges(),),
...@@ -749,11 +723,11 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method= ...@@ -749,11 +723,11 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
node_parts = random_choice(num_parts, sim_g.number_of_nodes()) node_parts = random_choice(num_parts, sim_g.number_of_nodes())
start = time.time() start = time.time()
parts, orig_nids, orig_eids = partition_graph_with_halo(sim_g, node_parts, num_hops, parts, orig_nids, orig_eids = partition_graph_with_halo(sim_g, node_parts, num_hops,
reshuffle=reshuffle) reshuffle=True)
print('Splitting the graph into partitions takes {:.3f}s, peak mem: {:.3f} GB'.format( print('Splitting the graph into partitions takes {:.3f}s, peak mem: {:.3f} GB'.format(
time.time() - start, get_peak_mem())) time.time() - start, get_peak_mem()))
if return_mapping: if return_mapping:
orig_nids, orig_eids = _get_orig_ids(g, sim_g, reshuffle, orig_nids, orig_eids) orig_nids, orig_eids = _get_orig_ids(g, sim_g, orig_nids, orig_eids)
else: else:
raise Exception('Unknown partitioning method: ' + part_method) raise Exception('Unknown partitioning method: ' + part_method)
...@@ -763,112 +737,88 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method= ...@@ -763,112 +737,88 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
# orig_id: the global node IDs in the homogeneous version of input graph. # orig_id: the global node IDs in the homogeneous version of input graph.
# NID: the global node IDs in the reshuffled homogeneous version of the input graph. # NID: the global node IDs in the reshuffled homogeneous version of the input graph.
if not g.is_homogeneous: if not g.is_homogeneous:
if reshuffle: for name in parts:
for name in parts: orig_ids = parts[name].ndata['orig_id']
orig_ids = parts[name].ndata['orig_id'] ntype = F.gather_row(sim_g.ndata[NTYPE], orig_ids)
ntype = F.gather_row(sim_g.ndata[NTYPE], orig_ids) parts[name].ndata[NTYPE] = F.astype(ntype, RESERVED_FIELD_DTYPE[NTYPE])
parts[name].ndata[NTYPE] = F.astype(ntype, RESERVED_FIELD_DTYPE[NTYPE]) assert np.all(F.asnumpy(ntype) == F.asnumpy(parts[name].ndata[NTYPE]))
assert np.all(F.asnumpy(ntype) == F.asnumpy(parts[name].ndata[NTYPE])) # Get the original edge types and original edge IDs.
# Get the original edge types and original edge IDs. orig_ids = parts[name].edata['orig_id']
orig_ids = parts[name].edata['orig_id'] etype = F.gather_row(sim_g.edata[ETYPE], orig_ids)
etype = F.gather_row(sim_g.edata[ETYPE], orig_ids) parts[name].edata[ETYPE] = F.astype(etype, RESERVED_FIELD_DTYPE[ETYPE])
parts[name].edata[ETYPE] = F.astype(etype, RESERVED_FIELD_DTYPE[ETYPE]) assert np.all(F.asnumpy(etype) == F.asnumpy(parts[name].edata[ETYPE]))
assert np.all(F.asnumpy(etype) == F.asnumpy(parts[name].edata[ETYPE]))
# Calculate the global node IDs to per-node IDs mapping.
# Calculate the global node IDs to per-node IDs mapping. inner_ntype = F.boolean_mask(parts[name].ndata[NTYPE],
inner_ntype = F.boolean_mask(parts[name].ndata[NTYPE],
parts[name].ndata['inner_node'] == 1)
inner_nids = F.boolean_mask(parts[name].ndata[NID],
parts[name].ndata['inner_node'] == 1) parts[name].ndata['inner_node'] == 1)
for ntype in g.ntypes: inner_nids = F.boolean_mask(parts[name].ndata[NID],
inner_ntype_mask = inner_ntype == g.get_ntype_id(ntype) parts[name].ndata['inner_node'] == 1)
typed_nids = F.boolean_mask(inner_nids, inner_ntype_mask) for ntype in g.ntypes:
# inner node IDs are in a contiguous ID range. inner_ntype_mask = inner_ntype == g.get_ntype_id(ntype)
expected_range = np.arange(int(F.as_scalar(typed_nids[0])), typed_nids = F.boolean_mask(inner_nids, inner_ntype_mask)
int(F.as_scalar(typed_nids[-1])) + 1) # inner node IDs are in a contiguous ID range.
assert np.all(F.asnumpy(typed_nids) == expected_range) expected_range = np.arange(int(F.as_scalar(typed_nids[0])),
# Calculate the global edge IDs to per-edge IDs mapping. int(F.as_scalar(typed_nids[-1])) + 1)
inner_etype = F.boolean_mask(parts[name].edata[ETYPE], assert np.all(F.asnumpy(typed_nids) == expected_range)
parts[name].edata['inner_edge'] == 1) # Calculate the global edge IDs to per-edge IDs mapping.
inner_eids = F.boolean_mask(parts[name].edata[EID], inner_etype = F.boolean_mask(parts[name].edata[ETYPE],
parts[name].edata['inner_edge'] == 1) parts[name].edata['inner_edge'] == 1)
for etype in g.canonical_etypes: inner_eids = F.boolean_mask(parts[name].edata[EID],
inner_etype_mask = inner_etype == g.get_etype_id(etype) parts[name].edata['inner_edge'] == 1)
typed_eids = np.sort(F.asnumpy(F.boolean_mask(inner_eids, inner_etype_mask))) for etype in g.canonical_etypes:
assert np.all(typed_eids == np.arange(int(typed_eids[0]), inner_etype_mask = inner_etype == g.get_etype_id(etype)
int(typed_eids[-1]) + 1)) typed_eids = np.sort(F.asnumpy(F.boolean_mask(inner_eids, inner_etype_mask)))
else: assert np.all(typed_eids == np.arange(int(typed_eids[0]),
raise NotImplementedError('not shuffled case') int(typed_eids[-1]) + 1))
# Let's calculate edge assignment.
if not reshuffle:
start = time.time()
# We only optimize for reshuffled case. So it's fine to use int64 here.
edge_parts = np.zeros((g.number_of_edges(),), dtype=np.int64) - 1
for part_id in parts:
part = parts[part_id]
# To get the edges in the input graph, we should use original node IDs.
local_edges = F.boolean_mask(part.edata[EID], part.edata['inner_edge'])
edge_parts[F.asnumpy(local_edges)] = part_id
print('Calculate edge assignment: {:.3f} seconds'.format(time.time() - start))
os.makedirs(out_path, mode=0o775, exist_ok=True) os.makedirs(out_path, mode=0o775, exist_ok=True)
tot_num_inner_edges = 0 tot_num_inner_edges = 0
out_path = os.path.abspath(out_path) out_path = os.path.abspath(out_path)
# Without reshuffling, we have to store the entire node/edge mapping in a file. # With reshuffling, we can ensure that all nodes and edges are reshuffled
if not reshuffle: # and are in contiguous ID space.
node_part_file = os.path.join(out_path, "node_map") if num_parts > 1:
edge_part_file = os.path.join(out_path, "edge_map") node_map_val = {}
np.save(node_part_file, F.asnumpy(node_parts), allow_pickle=False) edge_map_val = {}
np.save(edge_part_file, edge_parts, allow_pickle=False) for ntype in g.ntypes:
node_map_val = node_part_file + ".npy" ntype_id = g.get_ntype_id(ntype)
edge_map_val = edge_part_file + ".npy" val = []
node_map_val[ntype] = []
for i in parts:
inner_node_mask = _get_inner_node_mask(parts[i], ntype_id)
val.append(F.as_scalar(F.sum(F.astype(inner_node_mask, F.int64), 0)))
inner_nids = F.boolean_mask(parts[i].ndata[NID], inner_node_mask)
node_map_val[ntype].append([int(F.as_scalar(inner_nids[0])),
int(F.as_scalar(inner_nids[-1])) + 1])
val = np.cumsum(val).tolist()
assert val[-1] == g.number_of_nodes(ntype)
for etype in g.canonical_etypes:
etype_id = g.get_etype_id(etype)
val = []
edge_map_val[etype] = []
for i in parts:
inner_edge_mask = _get_inner_edge_mask(parts[i], etype_id)
val.append(F.as_scalar(F.sum(F.astype(inner_edge_mask, F.int64), 0)))
inner_eids = np.sort(F.asnumpy(F.boolean_mask(parts[i].edata[EID],
inner_edge_mask)))
edge_map_val[etype].append([int(inner_eids[0]), int(inner_eids[-1]) + 1])
val = np.cumsum(val).tolist()
assert val[-1] == g.number_of_edges(etype)
else: else:
# With reshuffling, we can ensure that all nodes and edges are reshuffled node_map_val = {}
# and are in contiguous ID space. edge_map_val = {}
if num_parts > 1: for ntype in g.ntypes:
node_map_val = {} ntype_id = g.get_ntype_id(ntype)
edge_map_val = {} inner_node_mask = _get_inner_node_mask(parts[0], ntype_id)
for ntype in g.ntypes: inner_nids = F.boolean_mask(parts[0].ndata[NID], inner_node_mask)
ntype_id = g.get_ntype_id(ntype) node_map_val[ntype] = [[int(F.as_scalar(inner_nids[0])),
val = [] int(F.as_scalar(inner_nids[-1])) + 1]]
node_map_val[ntype] = [] for etype in g.canonical_etypes:
for i in parts: etype_id = g.get_etype_id(etype)
inner_node_mask = _get_inner_node_mask(parts[i], ntype_id) inner_edge_mask = _get_inner_edge_mask(parts[0], etype_id)
val.append(F.as_scalar(F.sum(F.astype(inner_node_mask, F.int64), 0))) inner_eids = F.boolean_mask(parts[0].edata[EID], inner_edge_mask)
inner_nids = F.boolean_mask(parts[i].ndata[NID], inner_node_mask) edge_map_val[etype] = [[int(F.as_scalar(inner_eids[0])),
node_map_val[ntype].append([int(F.as_scalar(inner_nids[0])), int(F.as_scalar(inner_eids[-1])) + 1]]
int(F.as_scalar(inner_nids[-1])) + 1])
val = np.cumsum(val).tolist()
assert val[-1] == g.number_of_nodes(ntype)
for etype in g.canonical_etypes:
etype_id = g.get_etype_id(etype)
val = []
edge_map_val[etype] = []
for i in parts:
inner_edge_mask = _get_inner_edge_mask(parts[i], etype_id)
val.append(F.as_scalar(F.sum(F.astype(inner_edge_mask, F.int64), 0)))
inner_eids = np.sort(F.asnumpy(F.boolean_mask(parts[i].edata[EID],
inner_edge_mask)))
edge_map_val[etype].append([int(inner_eids[0]), int(inner_eids[-1]) + 1])
val = np.cumsum(val).tolist()
assert val[-1] == g.number_of_edges(etype)
else:
node_map_val = {}
edge_map_val = {}
for ntype in g.ntypes:
ntype_id = g.get_ntype_id(ntype)
inner_node_mask = _get_inner_node_mask(parts[0], ntype_id)
inner_nids = F.boolean_mask(parts[0].ndata[NID], inner_node_mask)
node_map_val[ntype] = [[int(F.as_scalar(inner_nids[0])),
int(F.as_scalar(inner_nids[-1])) + 1]]
for etype in g.canonical_etypes:
etype_id = g.get_etype_id(etype)
inner_edge_mask = _get_inner_edge_mask(parts[0], etype_id)
inner_eids = F.boolean_mask(parts[0].edata[EID], inner_edge_mask)
edge_map_val[etype] = [[int(F.as_scalar(inner_eids[0])),
int(F.as_scalar(inner_eids[-1])) + 1]]
# Double check that the node IDs in the global ID space are sorted. # Double check that the node IDs in the global ID space are sorted.
for ntype in node_map_val: for ntype in node_map_val:
...@@ -902,7 +852,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method= ...@@ -902,7 +852,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
ntype_id = g.get_ntype_id(ntype) ntype_id = g.get_ntype_id(ntype)
# To get the edges in the input graph, we should use original node IDs. # To get the edges in the input graph, we should use original node IDs.
# Both orig_id and NID stores the per-node-type IDs. # Both orig_id and NID stores the per-node-type IDs.
ndata_name = 'orig_id' if reshuffle else NID ndata_name = 'orig_id'
inner_node_mask = _get_inner_node_mask(part, ntype_id) inner_node_mask = _get_inner_node_mask(part, ntype_id)
# This is global node IDs. # This is global node IDs.
local_nodes = F.boolean_mask(part.ndata[ndata_name], inner_node_mask) local_nodes = F.boolean_mask(part.ndata[ndata_name], inner_node_mask)
...@@ -924,7 +874,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method= ...@@ -924,7 +874,7 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
for etype in g.canonical_etypes: for etype in g.canonical_etypes:
etype_id = g.get_etype_id(etype) etype_id = g.get_etype_id(etype)
edata_name = 'orig_id' if reshuffle else EID edata_name = 'orig_id'
inner_edge_mask = _get_inner_edge_mask(part, etype_id) inner_edge_mask = _get_inner_edge_mask(part, etype_id)
# This is global edge IDs. # This is global edge IDs.
local_edges = F.boolean_mask(part.edata[edata_name], inner_edge_mask) local_edges = F.boolean_mask(part.edata[edata_name], inner_edge_mask)
...@@ -945,46 +895,38 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method= ...@@ -945,46 +895,38 @@ def partition_graph(g, graph_name, num_parts, out_path, num_hops=1, part_method=
g.edges[etype].data[name], local_edges) g.edges[etype].data[name], local_edges)
else: else:
for ntype in g.ntypes: for ntype in g.ntypes:
if reshuffle and len(g.ntypes) > 1: if len(g.ntypes) > 1:
ndata_name = 'orig_id' ndata_name = 'orig_id'
ntype_id = g.get_ntype_id(ntype) ntype_id = g.get_ntype_id(ntype)
inner_node_mask = _get_inner_node_mask(part, ntype_id) inner_node_mask = _get_inner_node_mask(part, ntype_id)
# This is global node IDs. # This is global node IDs.
local_nodes = F.boolean_mask(part.ndata[ndata_name], inner_node_mask) local_nodes = F.boolean_mask(part.ndata[ndata_name], inner_node_mask)
local_nodes = F.gather_row(sim_g.ndata[NID], local_nodes) local_nodes = F.gather_row(sim_g.ndata[NID], local_nodes)
elif reshuffle: else:
local_nodes = sim_g.ndata[NID] local_nodes = sim_g.ndata[NID]
for name in g.nodes[ntype].data: for name in g.nodes[ntype].data:
if name in [NID, 'inner_node']: if name in [NID, 'inner_node']:
continue continue
if reshuffle: node_feats[ntype + '/' + name] = F.gather_row(g.nodes[ntype].data[name],
node_feats[ntype + '/' + name] = F.gather_row(g.nodes[ntype].data[name], local_nodes)
local_nodes)
else:
node_feats[ntype + '/' + name] = g.nodes[ntype].data[name]
for etype in g.canonical_etypes: for etype in g.canonical_etypes:
if reshuffle and not g.is_homogeneous: if not g.is_homogeneous:
edata_name = 'orig_id' edata_name = 'orig_id'
etype_id = g.get_etype_id(etype) etype_id = g.get_etype_id(etype)
inner_edge_mask = _get_inner_edge_mask(part, etype_id) inner_edge_mask = _get_inner_edge_mask(part, etype_id)
# This is global edge IDs. # This is global edge IDs.
local_edges = F.boolean_mask(part.edata[edata_name], inner_edge_mask) local_edges = F.boolean_mask(part.edata[edata_name], inner_edge_mask)
local_edges = F.gather_row(sim_g.edata[EID], local_edges) local_edges = F.gather_row(sim_g.edata[EID], local_edges)
elif reshuffle: else:
local_edges = sim_g.edata[EID] local_edges = sim_g.edata[EID]
for name in g.edges[etype].data: for name in g.edges[etype].data:
if name in [EID, 'inner_edge']: if name in [EID, 'inner_edge']:
continue continue
if reshuffle: edge_feats[_etype_tuple_to_str(etype) + '/' + name] = F.gather_row(
edge_feats[_etype_tuple_to_str(etype) + '/' + name] = F.gather_row( g.edges[etype].data[name], local_edges)
g.edges[etype].data[name], local_edges)
else:
edge_feats[_etype_tuple_to_str(etype) + '/' + name] = \
g.edges[etype].data[name]
# delete `orig_id` from ndata/edata # delete `orig_id` from ndata/edata
if reshuffle: del part.ndata['orig_id']
del part.ndata['orig_id'] del part.edata['orig_id']
del part.edata['orig_id']
part_dir = os.path.join(out_path, "part" + str(part_id)) part_dir = os.path.join(out_path, "part" + str(part_id))
node_feat_file = os.path.join(part_dir, "node_feat.dgl") node_feat_file = os.path.join(part_dir, "node_feat.dgl")
......
...@@ -682,7 +682,7 @@ elif mode == "client": ...@@ -682,7 +682,7 @@ elif mode == "client":
dgl.distributed.initialize(ip_config, net_type=net_type) dgl.distributed.initialize(ip_config, net_type=net_type)
gpb, graph_name, _, _ = load_partition_book( gpb, graph_name, _, _ = load_partition_book(
graph_path + "/{}.json".format(graph_name), part_id, None graph_path + "/{}.json".format(graph_name), part_id
) )
g = dgl.distributed.DistGraph(graph_name, gpb=gpb) g = dgl.distributed.DistGraph(graph_name, gpb=gpb)
......
...@@ -119,7 +119,7 @@ def run_client_empty( ...@@ -119,7 +119,7 @@ def run_client_empty(
os.environ["DGL_NUM_SERVER"] = str(server_count) os.environ["DGL_NUM_SERVER"] = str(server_count)
dgl.distributed.initialize("kv_ip_config.txt") dgl.distributed.initialize("kv_ip_config.txt")
gpb, graph_name, _, _ = load_partition_book( gpb, graph_name, _, _ = load_partition_book(
"/tmp/dist_graph/{}.json".format(graph_name), part_id, None "/tmp/dist_graph/{}.json".format(graph_name), part_id
) )
g = DistGraph(graph_name, gpb=gpb) g = DistGraph(graph_name, gpb=gpb)
check_dist_graph_empty(g, num_clients, num_nodes, num_edges) check_dist_graph_empty(g, num_clients, num_nodes, num_edges)
...@@ -187,7 +187,7 @@ def run_client( ...@@ -187,7 +187,7 @@ def run_client(
os.environ["DGL_GROUP_ID"] = str(group_id) os.environ["DGL_GROUP_ID"] = str(group_id)
dgl.distributed.initialize("kv_ip_config.txt") dgl.distributed.initialize("kv_ip_config.txt")
gpb, graph_name, _, _ = load_partition_book( gpb, graph_name, _, _ = load_partition_book(
"/tmp/dist_graph/{}.json".format(graph_name), part_id, None "/tmp/dist_graph/{}.json".format(graph_name), part_id
) )
g = DistGraph(graph_name, gpb=gpb) g = DistGraph(graph_name, gpb=gpb)
check_dist_graph(g, num_clients, num_nodes, num_edges) check_dist_graph(g, num_clients, num_nodes, num_edges)
...@@ -206,7 +206,7 @@ def run_emb_client( ...@@ -206,7 +206,7 @@ def run_emb_client(
os.environ["DGL_GROUP_ID"] = str(group_id) os.environ["DGL_GROUP_ID"] = str(group_id)
dgl.distributed.initialize("kv_ip_config.txt") dgl.distributed.initialize("kv_ip_config.txt")
gpb, graph_name, _, _ = load_partition_book( gpb, graph_name, _, _ = load_partition_book(
"/tmp/dist_graph/{}.json".format(graph_name), part_id, None "/tmp/dist_graph/{}.json".format(graph_name), part_id
) )
g = DistGraph(graph_name, gpb=gpb) g = DistGraph(graph_name, gpb=gpb)
check_dist_emb(g, num_clients, num_nodes, num_edges) check_dist_emb(g, num_clients, num_nodes, num_edges)
...@@ -230,7 +230,7 @@ def run_optim_client( ...@@ -230,7 +230,7 @@ def run_optim_client(
backend="gloo", rank=rank, world_size=world_size backend="gloo", rank=rank, world_size=world_size
) )
gpb, graph_name, _, _ = load_partition_book( gpb, graph_name, _, _ = load_partition_book(
"/tmp/dist_graph/{}.json".format(graph_name), part_id, None "/tmp/dist_graph/{}.json".format(graph_name), part_id
) )
g = DistGraph(graph_name, gpb=gpb) g = DistGraph(graph_name, gpb=gpb)
check_dist_optim_store(rank, num_nodes, optimizer_states, save) check_dist_optim_store(rank, num_nodes, optimizer_states, save)
...@@ -279,7 +279,7 @@ def run_client_hierarchy( ...@@ -279,7 +279,7 @@ def run_client_hierarchy(
os.environ["DGL_NUM_SERVER"] = str(server_count) os.environ["DGL_NUM_SERVER"] = str(server_count)
dgl.distributed.initialize("kv_ip_config.txt") dgl.distributed.initialize("kv_ip_config.txt")
gpb, graph_name, _, _ = load_partition_book( gpb, graph_name, _, _ = load_partition_book(
"/tmp/dist_graph/{}.json".format(graph_name), part_id, None "/tmp/dist_graph/{}.json".format(graph_name), part_id
) )
g = DistGraph(graph_name, gpb=gpb) g = DistGraph(graph_name, gpb=gpb)
node_mask = F.tensor(node_mask) node_mask = F.tensor(node_mask)
...@@ -687,7 +687,7 @@ def run_client_hetero( ...@@ -687,7 +687,7 @@ def run_client_hetero(
os.environ["DGL_NUM_SERVER"] = str(server_count) os.environ["DGL_NUM_SERVER"] = str(server_count)
dgl.distributed.initialize("kv_ip_config.txt") dgl.distributed.initialize("kv_ip_config.txt")
gpb, graph_name, _, _ = load_partition_book( gpb, graph_name, _, _ = load_partition_book(
"/tmp/dist_graph/{}.json".format(graph_name), part_id, None "/tmp/dist_graph/{}.json".format(graph_name), part_id
) )
g = DistGraph(graph_name, gpb=gpb) g = DistGraph(graph_name, gpb=gpb)
check_dist_graph_hetero(g, num_clients, num_nodes, num_edges) check_dist_graph_hetero(g, num_clients, num_nodes, num_edges)
......
...@@ -101,7 +101,7 @@ def check_rpc_sampling(tmpdir, num_server): ...@@ -101,7 +101,7 @@ def check_rpc_sampling(tmpdir, num_server):
num_hops = 1 num_hops = 1
partition_graph(g, 'test_sampling', num_parts, tmpdir, partition_graph(g, 'test_sampling', num_parts, tmpdir,
num_hops=num_hops, part_method='metis', reshuffle=False) num_hops=num_hops, part_method='metis')
pserver_list = [] pserver_list = []
ctx = mp.get_context('spawn') ctx = mp.get_context('spawn')
...@@ -132,7 +132,7 @@ def check_rpc_find_edges_shuffle(tmpdir, num_server): ...@@ -132,7 +132,7 @@ def check_rpc_find_edges_shuffle(tmpdir, num_server):
orig_nid, orig_eid = partition_graph(g, 'test_find_edges', num_parts, tmpdir, orig_nid, orig_eid = partition_graph(g, 'test_find_edges', num_parts, tmpdir,
num_hops=1, part_method='metis', num_hops=1, part_method='metis',
reshuffle=True, return_mapping=True) return_mapping=True)
pserver_list = [] pserver_list = []
ctx = mp.get_context('spawn') ctx = mp.get_context('spawn')
...@@ -178,7 +178,7 @@ def check_rpc_hetero_find_edges_shuffle(tmpdir, num_server): ...@@ -178,7 +178,7 @@ def check_rpc_hetero_find_edges_shuffle(tmpdir, num_server):
orig_nid, orig_eid = partition_graph(g, 'test_find_edges', num_parts, tmpdir, orig_nid, orig_eid = partition_graph(g, 'test_find_edges', num_parts, tmpdir,
num_hops=1, part_method='metis', num_hops=1, part_method='metis',
reshuffle=True, return_mapping=True) return_mapping=True)
pserver_list = [] pserver_list = []
ctx = mp.get_context('spawn') ctx = mp.get_context('spawn')
...@@ -227,7 +227,7 @@ def check_rpc_get_degree_shuffle(tmpdir, num_server): ...@@ -227,7 +227,7 @@ def check_rpc_get_degree_shuffle(tmpdir, num_server):
num_parts = num_server num_parts = num_server
orig_nid, _ = partition_graph(g, 'test_get_degrees', num_parts, tmpdir, orig_nid, _ = partition_graph(g, 'test_get_degrees', num_parts, tmpdir,
num_hops=1, part_method='metis', reshuffle=True, return_mapping=True) num_hops=1, part_method='metis', return_mapping=True)
pserver_list = [] pserver_list = []
ctx = mp.get_context('spawn') ctx = mp.get_context('spawn')
...@@ -281,7 +281,7 @@ def check_rpc_sampling_shuffle(tmpdir, num_server, num_groups=1): ...@@ -281,7 +281,7 @@ def check_rpc_sampling_shuffle(tmpdir, num_server, num_groups=1):
num_hops = 1 num_hops = 1
orig_nids, orig_eids = partition_graph(g, 'test_sampling', num_parts, tmpdir, orig_nids, orig_eids = partition_graph(g, 'test_sampling', num_parts, tmpdir,
num_hops=num_hops, part_method='metis', reshuffle=True, return_mapping=True) num_hops=num_hops, part_method='metis', return_mapping=True)
pserver_list = [] pserver_list = []
ctx = mp.get_context('spawn') ctx = mp.get_context('spawn')
...@@ -379,7 +379,7 @@ def check_rpc_hetero_sampling_shuffle(tmpdir, num_server): ...@@ -379,7 +379,7 @@ def check_rpc_hetero_sampling_shuffle(tmpdir, num_server):
num_hops = 1 num_hops = 1
orig_nid_map, orig_eid_map = partition_graph(g, 'test_sampling', num_parts, tmpdir, orig_nid_map, orig_eid_map = partition_graph(g, 'test_sampling', num_parts, tmpdir,
num_hops=num_hops, part_method='metis', reshuffle=True, return_mapping=True) num_hops=num_hops, part_method='metis', return_mapping=True)
pserver_list = [] pserver_list = []
ctx = mp.get_context('spawn') ctx = mp.get_context('spawn')
...@@ -431,7 +431,7 @@ def check_rpc_hetero_sampling_empty_shuffle(tmpdir, num_server): ...@@ -431,7 +431,7 @@ def check_rpc_hetero_sampling_empty_shuffle(tmpdir, num_server):
orig_nids, _ = partition_graph(g, 'test_sampling', num_parts, tmpdir, orig_nids, _ = partition_graph(g, 'test_sampling', num_parts, tmpdir,
num_hops=num_hops, part_method='metis', num_hops=num_hops, part_method='metis',
reshuffle=True, return_mapping=True) return_mapping=True)
pserver_list = [] pserver_list = []
ctx = mp.get_context('spawn') ctx = mp.get_context('spawn')
...@@ -461,7 +461,7 @@ def check_rpc_hetero_etype_sampling_shuffle(tmpdir, num_server, graph_formats=No ...@@ -461,7 +461,7 @@ def check_rpc_hetero_etype_sampling_shuffle(tmpdir, num_server, graph_formats=No
num_hops = 1 num_hops = 1
orig_nid_map, orig_eid_map = partition_graph(g, 'test_sampling', num_parts, tmpdir, orig_nid_map, orig_eid_map = partition_graph(g, 'test_sampling', num_parts, tmpdir,
num_hops=num_hops, part_method='metis', reshuffle=True, return_mapping=True, num_hops=num_hops, part_method='metis', return_mapping=True,
graph_formats=graph_formats) graph_formats=graph_formats)
pserver_list = [] pserver_list = []
...@@ -515,7 +515,7 @@ def check_rpc_hetero_etype_sampling_empty_shuffle(tmpdir, num_server): ...@@ -515,7 +515,7 @@ def check_rpc_hetero_etype_sampling_empty_shuffle(tmpdir, num_server):
orig_nids, _ = partition_graph(g, 'test_sampling', num_parts, tmpdir, orig_nids, _ = partition_graph(g, 'test_sampling', num_parts, tmpdir,
num_hops=num_hops, part_method='metis', num_hops=num_hops, part_method='metis',
reshuffle=True, return_mapping=True) return_mapping=True)
pserver_list = [] pserver_list = []
ctx = mp.get_context('spawn') ctx = mp.get_context('spawn')
...@@ -607,7 +607,7 @@ def check_rpc_bipartite_sampling_empty(tmpdir, num_server): ...@@ -607,7 +607,7 @@ def check_rpc_bipartite_sampling_empty(tmpdir, num_server):
num_hops = 1 num_hops = 1
orig_nids, _ = partition_graph(g, 'test_sampling', num_parts, tmpdir, orig_nids, _ = partition_graph(g, 'test_sampling', num_parts, tmpdir,
num_hops=num_hops, part_method='metis', reshuffle=True, return_mapping=True) num_hops=num_hops, part_method='metis', return_mapping=True)
pserver_list = [] pserver_list = []
ctx = mp.get_context('spawn') ctx = mp.get_context('spawn')
...@@ -641,7 +641,7 @@ def check_rpc_bipartite_sampling_shuffle(tmpdir, num_server): ...@@ -641,7 +641,7 @@ def check_rpc_bipartite_sampling_shuffle(tmpdir, num_server):
num_hops = 1 num_hops = 1
orig_nid_map, orig_eid_map = partition_graph(g, 'test_sampling', num_parts, tmpdir, orig_nid_map, orig_eid_map = partition_graph(g, 'test_sampling', num_parts, tmpdir,
num_hops=num_hops, part_method='metis', reshuffle=True, return_mapping=True) num_hops=num_hops, part_method='metis', return_mapping=True)
pserver_list = [] pserver_list = []
ctx = mp.get_context('spawn') ctx = mp.get_context('spawn')
...@@ -692,7 +692,7 @@ def check_rpc_bipartite_etype_sampling_empty(tmpdir, num_server): ...@@ -692,7 +692,7 @@ def check_rpc_bipartite_etype_sampling_empty(tmpdir, num_server):
num_hops = 1 num_hops = 1
orig_nids, _ = partition_graph(g, 'test_sampling', num_parts, tmpdir, orig_nids, _ = partition_graph(g, 'test_sampling', num_parts, tmpdir,
num_hops=num_hops, part_method='metis', reshuffle=True, return_mapping=True) num_hops=num_hops, part_method='metis', return_mapping=True)
pserver_list = [] pserver_list = []
ctx = mp.get_context('spawn') ctx = mp.get_context('spawn')
...@@ -727,7 +727,7 @@ def check_rpc_bipartite_etype_sampling_shuffle(tmpdir, num_server): ...@@ -727,7 +727,7 @@ def check_rpc_bipartite_etype_sampling_shuffle(tmpdir, num_server):
num_hops = 1 num_hops = 1
orig_nid_map, orig_eid_map = partition_graph(g, 'test_sampling', num_parts, tmpdir, orig_nid_map, orig_eid_map = partition_graph(g, 'test_sampling', num_parts, tmpdir,
num_hops=num_hops, part_method='metis', reshuffle=True, return_mapping=True) num_hops=num_hops, part_method='metis', return_mapping=True)
pserver_list = [] pserver_list = []
ctx = mp.get_context('spawn') ctx = mp.get_context('spawn')
...@@ -795,7 +795,7 @@ def test_rpc_sampling_shuffle(num_server): ...@@ -795,7 +795,7 @@ def test_rpc_sampling_shuffle(num_server):
check_rpc_bipartite_etype_sampling_empty(Path(tmpdirname), num_server) check_rpc_bipartite_etype_sampling_empty(Path(tmpdirname), num_server)
check_rpc_bipartite_etype_sampling_shuffle(Path(tmpdirname), num_server) check_rpc_bipartite_etype_sampling_shuffle(Path(tmpdirname), num_server)
def check_standalone_sampling(tmpdir, reshuffle): def check_standalone_sampling(tmpdir):
g = CitationGraphDataset("cora")[0] g = CitationGraphDataset("cora")[0]
prob = np.maximum(np.random.randn(g.num_edges()), 0) prob = np.maximum(np.random.randn(g.num_edges()), 0)
mask = (prob > 0) mask = (prob > 0)
...@@ -804,7 +804,7 @@ def check_standalone_sampling(tmpdir, reshuffle): ...@@ -804,7 +804,7 @@ def check_standalone_sampling(tmpdir, reshuffle):
num_parts = 1 num_parts = 1
num_hops = 1 num_hops = 1
partition_graph(g, 'test_sampling', num_parts, tmpdir, partition_graph(g, 'test_sampling', num_parts, tmpdir,
num_hops=num_hops, part_method='metis', reshuffle=reshuffle) num_hops=num_hops, part_method='metis')
os.environ['DGL_DIST_MODE'] = 'standalone' os.environ['DGL_DIST_MODE'] = 'standalone'
dgl.distributed.initialize("rpc_ip_config.txt") dgl.distributed.initialize("rpc_ip_config.txt")
...@@ -829,7 +829,7 @@ def check_standalone_sampling(tmpdir, reshuffle): ...@@ -829,7 +829,7 @@ def check_standalone_sampling(tmpdir, reshuffle):
assert (prob[eid] > 0).all() assert (prob[eid] > 0).all()
dgl.distributed.exit_client() dgl.distributed.exit_client()
def check_standalone_etype_sampling(tmpdir, reshuffle): def check_standalone_etype_sampling(tmpdir):
hg = CitationGraphDataset('cora')[0] hg = CitationGraphDataset('cora')[0]
prob = np.maximum(np.random.randn(hg.num_edges()), 0) prob = np.maximum(np.random.randn(hg.num_edges()), 0)
mask = (prob > 0) mask = (prob > 0)
...@@ -839,7 +839,7 @@ def check_standalone_etype_sampling(tmpdir, reshuffle): ...@@ -839,7 +839,7 @@ def check_standalone_etype_sampling(tmpdir, reshuffle):
num_hops = 1 num_hops = 1
partition_graph(hg, 'test_sampling', num_parts, tmpdir, partition_graph(hg, 'test_sampling', num_parts, tmpdir,
num_hops=num_hops, part_method='metis', reshuffle=reshuffle) num_hops=num_hops, part_method='metis')
os.environ['DGL_DIST_MODE'] = 'standalone' os.environ['DGL_DIST_MODE'] = 'standalone'
dgl.distributed.initialize("rpc_ip_config.txt") dgl.distributed.initialize("rpc_ip_config.txt")
dist_graph = DistGraph("test_sampling", part_config=tmpdir / 'test_sampling.json') dist_graph = DistGraph("test_sampling", part_config=tmpdir / 'test_sampling.json')
...@@ -863,7 +863,7 @@ def check_standalone_etype_sampling(tmpdir, reshuffle): ...@@ -863,7 +863,7 @@ def check_standalone_etype_sampling(tmpdir, reshuffle):
assert (prob[eid] > 0).all() assert (prob[eid] > 0).all()
dgl.distributed.exit_client() dgl.distributed.exit_client()
def check_standalone_etype_sampling_heterograph(tmpdir, reshuffle): def check_standalone_etype_sampling_heterograph(tmpdir):
hg = CitationGraphDataset('cora')[0] hg = CitationGraphDataset('cora')[0]
num_parts = 1 num_parts = 1
num_hops = 1 num_hops = 1
...@@ -872,7 +872,7 @@ def check_standalone_etype_sampling_heterograph(tmpdir, reshuffle): ...@@ -872,7 +872,7 @@ def check_standalone_etype_sampling_heterograph(tmpdir, reshuffle):
('paper', 'cite-by', 'paper'): (dst, src)}, ('paper', 'cite-by', 'paper'): (dst, src)},
{'paper': hg.number_of_nodes()}) {'paper': hg.number_of_nodes()})
partition_graph(new_hg, 'test_hetero_sampling', num_parts, tmpdir, partition_graph(new_hg, 'test_hetero_sampling', num_parts, tmpdir,
num_hops=num_hops, part_method='metis', reshuffle=reshuffle) num_hops=num_hops, part_method='metis')
os.environ['DGL_DIST_MODE'] = 'standalone' os.environ['DGL_DIST_MODE'] = 'standalone'
dgl.distributed.initialize("rpc_ip_config.txt") dgl.distributed.initialize("rpc_ip_config.txt")
dist_graph = DistGraph("test_hetero_sampling", part_config=tmpdir / 'test_hetero_sampling.json') dist_graph = DistGraph("test_hetero_sampling", part_config=tmpdir / 'test_hetero_sampling.json')
...@@ -892,8 +892,7 @@ def test_standalone_sampling(): ...@@ -892,8 +892,7 @@ def test_standalone_sampling():
import tempfile import tempfile
os.environ['DGL_DIST_MODE'] = 'standalone' os.environ['DGL_DIST_MODE'] = 'standalone'
with tempfile.TemporaryDirectory() as tmpdirname: with tempfile.TemporaryDirectory() as tmpdirname:
check_standalone_sampling(Path(tmpdirname), False) check_standalone_sampling(Path(tmpdirname))
check_standalone_sampling(Path(tmpdirname), True)
def start_in_subgraph_client(rank, tmpdir, disable_shared_mem, nodes): def start_in_subgraph_client(rank, tmpdir, disable_shared_mem, nodes):
gpb = None gpb = None
...@@ -917,7 +916,7 @@ def check_rpc_in_subgraph_shuffle(tmpdir, num_server): ...@@ -917,7 +916,7 @@ def check_rpc_in_subgraph_shuffle(tmpdir, num_server):
num_parts = num_server num_parts = num_server
orig_nid, orig_eid = partition_graph(g, 'test_in_subgraph', num_parts, tmpdir, orig_nid, orig_eid = partition_graph(g, 'test_in_subgraph', num_parts, tmpdir,
num_hops=1, part_method='metis', reshuffle=True, return_mapping=True) num_hops=1, part_method='metis', return_mapping=True)
pserver_list = [] pserver_list = []
ctx = mp.get_context('spawn') ctx = mp.get_context('spawn')
...@@ -964,23 +963,21 @@ def test_standalone_etype_sampling(): ...@@ -964,23 +963,21 @@ def test_standalone_etype_sampling():
import tempfile import tempfile
with tempfile.TemporaryDirectory() as tmpdirname: with tempfile.TemporaryDirectory() as tmpdirname:
os.environ['DGL_DIST_MODE'] = 'standalone' os.environ['DGL_DIST_MODE'] = 'standalone'
check_standalone_etype_sampling_heterograph(Path(tmpdirname), True) check_standalone_etype_sampling_heterograph(Path(tmpdirname))
with tempfile.TemporaryDirectory() as tmpdirname: with tempfile.TemporaryDirectory() as tmpdirname:
os.environ['DGL_DIST_MODE'] = 'standalone' os.environ['DGL_DIST_MODE'] = 'standalone'
check_standalone_etype_sampling(Path(tmpdirname), True) check_standalone_etype_sampling(Path(tmpdirname))
if __name__ == "__main__": if __name__ == "__main__":
import tempfile import tempfile
with tempfile.TemporaryDirectory() as tmpdirname: with tempfile.TemporaryDirectory() as tmpdirname:
os.environ['DGL_DIST_MODE'] = 'standalone' os.environ['DGL_DIST_MODE'] = 'standalone'
check_standalone_etype_sampling_heterograph(Path(tmpdirname), True) check_standalone_etype_sampling_heterograph(Path(tmpdirname))
with tempfile.TemporaryDirectory() as tmpdirname: with tempfile.TemporaryDirectory() as tmpdirname:
os.environ['DGL_DIST_MODE'] = 'standalone' os.environ['DGL_DIST_MODE'] = 'standalone'
check_standalone_etype_sampling(Path(tmpdirname), True) check_standalone_etype_sampling(Path(tmpdirname))
check_standalone_etype_sampling(Path(tmpdirname), False) check_standalone_sampling(Path(tmpdirname))
check_standalone_sampling(Path(tmpdirname), True)
check_standalone_sampling(Path(tmpdirname), False)
os.environ['DGL_DIST_MODE'] = 'distributed' os.environ['DGL_DIST_MODE'] = 'distributed'
check_rpc_sampling(Path(tmpdirname), 2) check_rpc_sampling(Path(tmpdirname), 2)
check_rpc_sampling(Path(tmpdirname), 1) check_rpc_sampling(Path(tmpdirname), 1)
......
...@@ -162,7 +162,6 @@ def test_standalone(): ...@@ -162,7 +162,6 @@ def test_standalone():
test_dir, test_dir,
num_hops=num_hops, num_hops=num_hops,
part_method="metis", part_method="metis",
reshuffle=True,
return_mapping=True, return_mapping=True,
) )
part_config = os.path.join(test_dir, "test_sampling.json") part_config = os.path.join(test_dir, "test_sampling.json")
...@@ -262,7 +261,6 @@ def check_neg_dataloader(g, num_server, num_workers): ...@@ -262,7 +261,6 @@ def check_neg_dataloader(g, num_server, num_workers):
test_dir, test_dir,
num_hops=num_hops, num_hops=num_hops,
part_method="metis", part_method="metis",
reshuffle=True,
return_mapping=True, return_mapping=True,
) )
part_config = os.path.join(test_dir, "test_sampling.json") part_config = os.path.join(test_dir, "test_sampling.json")
...@@ -317,10 +315,9 @@ def check_neg_dataloader(g, num_server, num_workers): ...@@ -317,10 +315,9 @@ def check_neg_dataloader(g, num_server, num_workers):
@pytest.mark.parametrize("num_server", [3]) @pytest.mark.parametrize("num_server", [3])
@pytest.mark.parametrize("num_workers", [0, 4]) @pytest.mark.parametrize("num_workers", [0, 4])
@pytest.mark.parametrize("drop_last", [True, False]) @pytest.mark.parametrize("drop_last", [True, False])
@pytest.mark.parametrize("reshuffle", [True, False])
@pytest.mark.parametrize("num_groups", [1]) @pytest.mark.parametrize("num_groups", [1])
def test_dist_dataloader( def test_dist_dataloader(
num_server, num_workers, drop_last, reshuffle, num_groups num_server, num_workers, drop_last, num_groups
): ):
reset_envs() reset_envs()
# No multiple partitions on single machine for # No multiple partitions on single machine for
...@@ -343,7 +340,6 @@ def test_dist_dataloader( ...@@ -343,7 +340,6 @@ def test_dist_dataloader(
test_dir, test_dir,
num_hops=num_hops, num_hops=num_hops,
part_method="metis", part_method="metis",
reshuffle=reshuffle,
return_mapping=True, return_mapping=True,
) )
...@@ -560,7 +556,6 @@ def check_dataloader(g, num_server, num_workers, dataloader_type): ...@@ -560,7 +556,6 @@ def check_dataloader(g, num_server, num_workers, dataloader_type):
test_dir, test_dir,
num_hops=num_hops, num_hops=num_hops,
part_method="metis", part_method="metis",
reshuffle=True,
return_mapping=True, return_mapping=True,
) )
part_config = os.path.join(test_dir, "test_sampling.json") part_config = os.path.join(test_dir, "test_sampling.json")
......
...@@ -18,8 +18,8 @@ if os.name != "nt": ...@@ -18,8 +18,8 @@ if os.name != "nt":
import struct import struct
# Create an one-part Graph # Create an one-part Graph
node_map = F.tensor([0, 0, 0, 0, 0, 0], F.int64) node_map = {'_N': F.tensor([[0, 6]], F.int64)}
edge_map = F.tensor([0, 0, 0, 0, 0, 0, 0], F.int64) edge_map = {('_N','_E','_N'): F.tensor([[0, 7]], F.int64)}
global_nid = F.tensor([0, 1, 2, 3, 4, 5], F.int64) global_nid = F.tensor([0, 1, 2, 3, 4, 5], F.int64)
global_eid = F.tensor([0, 1, 2, 3, 4, 5, 6], F.int64) global_eid = F.tensor([0, 1, 2, 3, 4, 5, 6], F.int64)
...@@ -36,8 +36,10 @@ g.add_edges(2, 5) # 6 ...@@ -36,8 +36,10 @@ g.add_edges(2, 5) # 6
g.ndata[dgl.NID] = global_nid g.ndata[dgl.NID] = global_nid
g.edata[dgl.EID] = global_eid g.edata[dgl.EID] = global_eid
gpb = dgl.distributed.graph_partition_book.BasicPartitionBook( gpb = dgl.distributed.graph_partition_book.RangePartitionBook(
part_id=0, num_parts=1, node_map=node_map, edge_map=edge_map, part_graph=g part_id=0, num_parts=1, node_map=node_map, edge_map=edge_map,
ntypes={ntype: i for i, ntype in enumerate(g.ntypes)},
etypes={etype: i for i, etype in enumerate(g.canonical_etypes)}
) )
node_policy = dgl.distributed.PartitionPolicy( node_policy = dgl.distributed.PartitionPolicy(
...@@ -110,8 +112,8 @@ def test_partition_policy(): ...@@ -110,8 +112,8 @@ def test_partition_policy():
F.asnumpy(eid_partid), F.asnumpy(eid_partid),
F.asnumpy(F.tensor([0, 0, 0, 0, 0, 0, 0], F.int64)), F.asnumpy(F.tensor([0, 0, 0, 0, 0, 0, 0], F.int64)),
) )
assert node_policy.get_part_size() == len(node_map) assert node_policy.get_part_size() == len(local_nid)
assert edge_policy.get_part_size() == len(edge_map) assert edge_policy.get_part_size() == len(local_eid)
def start_server(server_id, num_clients, num_servers): def start_server(server_id, num_clients, num_servers):
......
...@@ -3,18 +3,20 @@ import os ...@@ -3,18 +3,20 @@ import os
import backend as F import backend as F
import torch as th import torch as th
import dgl import dgl
import json
import numpy as np import numpy as np
import pytest import pytest
import tempfile
from dgl import function as fn from dgl import function as fn
from dgl.distributed import ( from dgl.distributed import (
load_partition, load_partition,
load_partition_book,
load_partition_feats, load_partition_feats,
partition_graph, partition_graph,
) )
from dgl.distributed.graph_partition_book import ( from dgl.distributed.graph_partition_book import (
DEFAULT_ETYPE, DEFAULT_ETYPE,
DEFAULT_NTYPE, DEFAULT_NTYPE,
BasicPartitionBook,
EdgePartitionPolicy, EdgePartitionPolicy,
HeteroDataName, HeteroDataName,
NodePartitionPolicy, NodePartitionPolicy,
...@@ -226,7 +228,6 @@ def check_hetero_partition( ...@@ -226,7 +228,6 @@ def check_hetero_partition(
"/tmp/partition", "/tmp/partition",
num_hops=num_hops, num_hops=num_hops,
part_method=part_method, part_method=part_method,
reshuffle=True,
return_mapping=True, return_mapping=True,
num_trainers_per_machine=num_trainers_per_machine, num_trainers_per_machine=num_trainers_per_machine,
graph_formats=graph_formats, graph_formats=graph_formats,
...@@ -328,7 +329,6 @@ def check_hetero_partition( ...@@ -328,7 +329,6 @@ def check_hetero_partition(
def check_partition( def check_partition(
g, g,
part_method, part_method,
reshuffle,
num_parts=4, num_parts=4,
num_trainers_per_machine=1, num_trainers_per_machine=1,
load_feats=True, load_feats=True,
...@@ -352,7 +352,6 @@ def check_partition( ...@@ -352,7 +352,6 @@ def check_partition(
"/tmp/partition", "/tmp/partition",
num_hops=num_hops, num_hops=num_hops,
part_method=part_method, part_method=part_method,
reshuffle=reshuffle,
return_mapping=True, return_mapping=True,
num_trainers_per_machine=num_trainers_per_machine, num_trainers_per_machine=num_trainers_per_machine,
graph_formats=graph_formats, graph_formats=graph_formats,
...@@ -445,24 +444,16 @@ def check_partition( ...@@ -445,24 +444,16 @@ def check_partition(
assert F.shape(orig_eids1)[0] == F.shape(orig_eids2)[0] assert F.shape(orig_eids1)[0] == F.shape(orig_eids2)[0]
assert np.all(F.asnumpy(orig_eids1) == F.asnumpy(orig_eids2)) assert np.all(F.asnumpy(orig_eids1) == F.asnumpy(orig_eids2))
if reshuffle: local_orig_nids = orig_nids[part_g.ndata[dgl.NID]]
local_orig_nids = orig_nids[part_g.ndata[dgl.NID]] local_orig_eids = orig_eids[part_g.edata[dgl.EID]]
local_orig_eids = orig_eids[part_g.edata[dgl.EID]] part_g.ndata["feats"] = F.gather_row(
part_g.ndata["feats"] = F.gather_row( g.ndata["feats"], local_orig_nids
g.ndata["feats"], local_orig_nids )
) part_g.edata["feats"] = F.gather_row(
part_g.edata["feats"] = F.gather_row( g.edata["feats"], local_orig_eids
g.edata["feats"], local_orig_eids )
) local_nodes = orig_nids[local_nodes]
local_nodes = orig_nids[local_nodes] local_edges = orig_eids[local_edges]
local_edges = orig_eids[local_edges]
else:
part_g.ndata["feats"] = F.gather_row(
g.ndata["feats"], part_g.ndata[dgl.NID]
)
part_g.edata["feats"] = F.gather_row(
g.edata["feats"], part_g.edata[dgl.NID]
)
part_g.update_all(fn.copy_u("feats", "msg"), fn.sum("msg", "h")) part_g.update_all(fn.copy_u("feats", "msg"), fn.sum("msg", "h"))
part_g.update_all(fn.copy_e("feats", "msg"), fn.sum("msg", "eh")) part_g.update_all(fn.copy_e("feats", "msg"), fn.sum("msg", "eh"))
...@@ -490,41 +481,37 @@ def check_partition( ...@@ -490,41 +481,37 @@ def check_partition(
assert np.all(F.asnumpy(true_feats) == F.asnumpy(edata)) assert np.all(F.asnumpy(true_feats) == F.asnumpy(edata))
# This only works if node/edge IDs are shuffled. # This only works if node/edge IDs are shuffled.
if reshuffle: shuffled_labels.append(node_feats["_N/labels"])
shuffled_labels.append(node_feats["_N/labels"]) shuffled_edata.append(edge_feats["_N:_E:_N/feats"])
shuffled_edata.append(edge_feats["_N:_E:_N/feats"])
# Verify that we can reconstruct node/edge data for original IDs. # Verify that we can reconstruct node/edge data for original IDs.
if reshuffle: shuffled_labels = F.asnumpy(F.cat(shuffled_labels, 0))
shuffled_labels = F.asnumpy(F.cat(shuffled_labels, 0)) shuffled_edata = F.asnumpy(F.cat(shuffled_edata, 0))
shuffled_edata = F.asnumpy(F.cat(shuffled_edata, 0)) orig_labels = np.zeros(
orig_labels = np.zeros( shuffled_labels.shape, dtype=shuffled_labels.dtype
shuffled_labels.shape, dtype=shuffled_labels.dtype )
) orig_edata = np.zeros(shuffled_edata.shape, dtype=shuffled_edata.dtype)
orig_edata = np.zeros(shuffled_edata.shape, dtype=shuffled_edata.dtype) orig_labels[F.asnumpy(orig_nids)] = shuffled_labels
orig_labels[F.asnumpy(orig_nids)] = shuffled_labels orig_edata[F.asnumpy(orig_eids)] = shuffled_edata
orig_edata[F.asnumpy(orig_eids)] = shuffled_edata assert np.all(orig_labels == F.asnumpy(g.ndata["labels"]))
assert np.all(orig_labels == F.asnumpy(g.ndata["labels"])) assert np.all(orig_edata == F.asnumpy(g.edata["feats"]))
assert np.all(orig_edata == F.asnumpy(g.edata["feats"]))
node_map = []
if reshuffle: edge_map = []
node_map = [] for i, (num_nodes, num_edges) in enumerate(part_sizes):
edge_map = [] node_map.append(np.ones(num_nodes) * i)
for i, (num_nodes, num_edges) in enumerate(part_sizes): edge_map.append(np.ones(num_edges) * i)
node_map.append(np.ones(num_nodes) * i) node_map = np.concatenate(node_map)
edge_map.append(np.ones(num_edges) * i) edge_map = np.concatenate(edge_map)
node_map = np.concatenate(node_map) nid2pid = gpb.nid2partid(F.arange(0, len(node_map)))
edge_map = np.concatenate(edge_map) assert F.dtype(nid2pid) in (F.int32, F.int64)
nid2pid = gpb.nid2partid(F.arange(0, len(node_map))) assert np.all(F.asnumpy(nid2pid) == node_map)
assert F.dtype(nid2pid) in (F.int32, F.int64) eid2pid = gpb.eid2partid(F.arange(0, len(edge_map)))
assert np.all(F.asnumpy(nid2pid) == node_map) assert F.dtype(eid2pid) in (F.int32, F.int64)
eid2pid = gpb.eid2partid(F.arange(0, len(edge_map))) assert np.all(F.asnumpy(eid2pid) == edge_map)
assert F.dtype(eid2pid) in (F.int32, F.int64)
assert np.all(F.asnumpy(eid2pid) == edge_map)
@pytest.mark.parametrize("part_method", ["metis", "random"]) @pytest.mark.parametrize("part_method", ["metis", "random"])
@pytest.mark.parametrize("reshuffle", [True, False])
@pytest.mark.parametrize("num_parts", [1, 4]) @pytest.mark.parametrize("num_parts", [1, 4])
@pytest.mark.parametrize("num_trainers_per_machine", [1, 4]) @pytest.mark.parametrize("num_trainers_per_machine", [1, 4])
@pytest.mark.parametrize("load_feats", [True, False]) @pytest.mark.parametrize("load_feats", [True, False])
...@@ -533,7 +520,6 @@ def check_partition( ...@@ -533,7 +520,6 @@ def check_partition(
) )
def test_partition( def test_partition(
part_method, part_method,
reshuffle,
num_parts, num_parts,
num_trainers_per_machine, num_trainers_per_machine,
load_feats, load_feats,
...@@ -546,7 +532,6 @@ def test_partition( ...@@ -546,7 +532,6 @@ def test_partition(
check_partition( check_partition(
g, g,
part_method, part_method,
reshuffle,
num_parts, num_parts,
num_trainers_per_machine, num_trainers_per_machine,
load_feats, load_feats,
...@@ -563,31 +548,6 @@ def test_partition( ...@@ -563,31 +548,6 @@ def test_partition(
) )
reset_envs() reset_envs()
def test_BasicPartitionBook():
part_id = 0
num_parts = 2
node_map = np.random.choice(num_parts, 1000)
edge_map = np.random.choice(num_parts, 5000)
graph = dgl.rand_graph(1000, 5000)
graph = dgl.node_subgraph(graph, F.arange(0, graph.num_nodes()))
gpb = BasicPartitionBook(part_id, num_parts, node_map, edge_map, graph)
c_etype = ("_N", "_E", "_N")
assert gpb.etypes == ["_E"]
assert gpb.canonical_etypes == [c_etype]
node_policy = NodePartitionPolicy(gpb, "_N")
assert node_policy.type_name == "_N"
expect_except = False
try:
edge_policy = EdgePartitionPolicy(gpb, "_E")
except AssertionError:
expect_except = True
assert expect_except
edge_policy = EdgePartitionPolicy(gpb, c_etype)
assert edge_policy.type_name == c_etype
def test_RangePartitionBook(): def test_RangePartitionBook():
part_id = 1 part_id = 1
num_parts = 2 num_parts = 2
...@@ -699,3 +659,27 @@ def test_RangePartitionBook(): ...@@ -699,3 +659,27 @@ def test_RangePartitionBook():
assert expect_except assert expect_except
data_name = HeteroDataName(False, c_etype, "feat") data_name = HeteroDataName(False, c_etype, "feat")
assert data_name.get_type() == c_etype assert data_name.get_type() == c_etype
def test_UnknownPartitionBook():
node_map = {'_N': {0:0, 1:1, 2:2}}
edge_map = {'_N:_E:_N': {0:0, 1:1, 2:2}}
part_metadata = {
"num_parts": 1,
"num_nodes": len(node_map),
"num_edges": len(edge_map),
"node_map": node_map,
"edge_map": edge_map,
"graph_name": "test_graph"
}
with tempfile.TemporaryDirectory() as test_dir:
part_config = os.path.join(test_dir, "test_graph.json")
with open(part_config, "w") as file:
json.dump(part_metadata, file, indent = 4)
try:
load_partition_book(part_config, 0)
except Exception as e:
if not isinstance(e, TypeError):
raise e
...@@ -92,7 +92,7 @@ def run_client(graph_name, cli_id, part_id, server_count): ...@@ -92,7 +92,7 @@ def run_client(graph_name, cli_id, part_id, server_count):
os.environ["DGL_NUM_SERVER"] = str(server_count) os.environ["DGL_NUM_SERVER"] = str(server_count)
dgl.distributed.initialize("optim_ip_config.txt") dgl.distributed.initialize("optim_ip_config.txt")
gpb, graph_name, _, _ = load_partition_book( gpb, graph_name, _, _ = load_partition_book(
"/tmp/dist_graph/{}.json".format(graph_name), part_id, None "/tmp/dist_graph/{}.json".format(graph_name), part_id
) )
g = DistGraph(graph_name, gpb=gpb) g = DistGraph(graph_name, gpb=gpb)
policy = dgl.distributed.PartitionPolicy("node", g.get_partition_book()) policy = dgl.distributed.PartitionPolicy("node", g.get_partition_book())
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment