Unverified Commit 6e7f19f2 authored by Da Zheng's avatar Da Zheng Committed by GitHub
Browse files

[Distributed] add distributed in-degree and out-degree. (#2918)



* add distributed in-degree and out-degree.

* update comments.

* fix a bug.

* add tests.

* add tests.

* fix a bug.

* fix docstring.

* update doc.

* fix

* fix.
Co-authored-by: default avatarZheng <dzzhen@3c22fba32af5.ant.amazon.com>
Co-authored-by: default avatarxiang song(charlie.song) <classicxsong@gmail.com>
parent 50492d56
...@@ -17,7 +17,7 @@ Distributed Graph ...@@ -17,7 +17,7 @@ Distributed Graph
----------------- -----------------
.. autoclass:: DistGraph .. autoclass:: DistGraph
:members: ndata, edata, idtype, device, ntypes, etypes, number_of_nodes, number_of_edges, node_attr_schemes, edge_attr_schemes, rank, find_edges, get_partition_book, barrier, local_partition, num_nodes, num_edges, get_node_partition_policy, get_edge_partition_policy, get_etype_id, get_ntype_id, nodes, edges :members: ndata, edata, idtype, device, ntypes, etypes, number_of_nodes, number_of_edges, node_attr_schemes, edge_attr_schemes, rank, find_edges, get_partition_book, barrier, local_partition, num_nodes, num_edges, get_node_partition_policy, get_edge_partition_policy, get_etype_id, get_ntype_id, nodes, edges, out_degrees, in_degrees
Distributed Tensor Distributed Tensor
------------------ ------------------
......
...@@ -29,4 +29,4 @@ from .dist_context import initialize, exit_client ...@@ -29,4 +29,4 @@ from .dist_context import initialize, exit_client
from .kvstore import KVServer, KVClient from .kvstore import KVServer, KVClient
from .server_state import ServerState from .server_state import ServerState
from .dist_dataloader import DistDataLoader from .dist_dataloader import DistDataLoader
from .graph_services import sample_neighbors, in_subgraph, find_edges from .graph_services import sample_neighbors, in_subgraph
...@@ -9,7 +9,7 @@ import numpy as np ...@@ -9,7 +9,7 @@ import numpy as np
from ..heterograph import DGLHeteroGraph from ..heterograph import DGLHeteroGraph
from .. import heterograph_index from .. import heterograph_index
from .. import backend as F from .. import backend as F
from ..base import NID, EID, NTYPE, ETYPE from ..base import NID, EID, NTYPE, ETYPE, ALL, is_all
from .kvstore import KVServer, get_kvstore from .kvstore import KVServer, get_kvstore
from .._ffi.ndarray import empty_shared_mem from .._ffi.ndarray import empty_shared_mem
from ..frame import infer_scheme from ..frame import infer_scheme
...@@ -23,6 +23,8 @@ from . import role ...@@ -23,6 +23,8 @@ from . import role
from .server_state import ServerState from .server_state import ServerState
from .rpc_server import start_server from .rpc_server import start_server
from .graph_services import find_edges as dist_find_edges from .graph_services import find_edges as dist_find_edges
from .graph_services import out_degrees as dist_out_degrees
from .graph_services import in_degrees as dist_in_degrees
from .dist_tensor import DistTensor from .dist_tensor import DistTensor
INIT_GRAPH = 800001 INIT_GRAPH = 800001
...@@ -745,6 +747,104 @@ class DistGraph: ...@@ -745,6 +747,104 @@ class DistGraph:
return sum([self._gpb._num_edges(etype) for etype in self.etypes]) return sum([self._gpb._num_edges(etype) for etype in self.etypes])
return self._gpb._num_edges(etype) return self._gpb._num_edges(etype)
def out_degrees(self, u=ALL):
"""Return the out-degree(s) of the given nodes.
It computes the out-degree(s).
It does not support heterogeneous graphs yet.
Parameters
----------
u : node IDs
The node IDs. The allowed formats are:
* ``int``: A single node.
* Int Tensor: Each element is a node ID. The tensor must have the same device type
and ID data type as the graph's.
* iterable[int]: Each element is a node ID.
If not given, return the in-degrees of all the nodes.
Returns
-------
int or Tensor
The out-degree(s) of the node(s) in a Tensor. The i-th element is the out-degree
of the i-th input node. If :attr:`v` is an ``int``, return an ``int`` too.
Examples
--------
The following example uses PyTorch backend.
>>> import dgl
>>> import torch
Query for all nodes.
>>> g.out_degrees()
tensor([2, 2, 0, 0])
Query for nodes 1 and 2.
>>> g.out_degrees(torch.tensor([1, 2]))
tensor([2, 0])
See Also
--------
in_degrees
"""
if is_all(u):
u = F.arange(0, self.number_of_nodes())
return dist_out_degrees(self, u)
def in_degrees(self, v=ALL):
"""Return the in-degree(s) of the given nodes.
It computes the in-degree(s).
It does not support heterogeneous graphs yet.
Parameters
----------
v : node IDs
The node IDs. The allowed formats are:
* ``int``: A single node.
* Int Tensor: Each element is a node ID. The tensor must have the same device type
and ID data type as the graph's.
* iterable[int]: Each element is a node ID.
If not given, return the in-degrees of all the nodes.
Returns
-------
int or Tensor
The in-degree(s) of the node(s) in a Tensor. The i-th element is the in-degree
of the i-th input node. If :attr:`v` is an ``int``, return an ``int`` too.
Examples
--------
The following example uses PyTorch backend.
>>> import dgl
>>> import torch
Query for all nodes.
>>> g.in_degrees()
tensor([0, 2, 1, 1])
Query for nodes 1 and 2.
>>> g.in_degrees(torch.tensor([1, 2]))
tensor([2, 1])
See Also
--------
out_degrees
"""
if is_all(v):
v = F.arange(0, self.number_of_nodes())
return dist_in_degrees(self, v)
def node_attr_schemes(self): def node_attr_schemes(self):
"""Return the node feature schemes. """Return the node feature schemes.
......
...@@ -15,6 +15,8 @@ __all__ = ['sample_neighbors', 'in_subgraph', 'find_edges'] ...@@ -15,6 +15,8 @@ __all__ = ['sample_neighbors', 'in_subgraph', 'find_edges']
SAMPLING_SERVICE_ID = 6657 SAMPLING_SERVICE_ID = 6657
INSUBGRAPH_SERVICE_ID = 6658 INSUBGRAPH_SERVICE_ID = 6658
EDGES_SERVICE_ID = 6659 EDGES_SERVICE_ID = 6659
OUTDEGREE_SERVICE_ID = 6660
INDEGREE_SERVICE_ID = 6661
class SubgraphResponse(Response): class SubgraphResponse(Response):
"""The response for sampling and in_subgraph""" """The response for sampling and in_subgraph"""
...@@ -76,6 +78,20 @@ def _find_edges(local_g, partition_book, seed_edges): ...@@ -76,6 +78,20 @@ def _find_edges(local_g, partition_book, seed_edges):
global_dst = global_nid_mapping[local_dst] global_dst = global_nid_mapping[local_dst]
return global_src, global_dst return global_src, global_dst
def _in_degrees(local_g, partition_book, n):
"""Get in-degree of the nodes in the local partition.
"""
local_nids = partition_book.nid2localnid(n, partition_book.partid)
local_nids = F.astype(local_nids, local_g.idtype)
return local_g.in_degrees(local_nids)
def _out_degrees(local_g, partition_book, n):
"""Get out-degree of the nodes in the local partition.
"""
local_nids = partition_book.nid2localnid(n, partition_book.partid)
local_nids = F.astype(local_nids, local_g.idtype)
return local_g.out_degrees(local_nids)
def _in_subgraph(local_g, partition_book, seed_nodes): def _in_subgraph(local_g, partition_book, seed_nodes):
""" Get in subgraph from local partition. """ Get in subgraph from local partition.
...@@ -140,6 +156,72 @@ class EdgesRequest(Request): ...@@ -140,6 +156,72 @@ class EdgesRequest(Request):
return FindEdgeResponse(global_src, global_dst, self.order_id) return FindEdgeResponse(global_src, global_dst, self.order_id)
class InDegreeRequest(Request):
"""In-degree Request"""
def __init__(self, n, order_id):
self.n = n
self.order_id = order_id
def __setstate__(self, state):
self.n, self.order_id = state
def __getstate__(self):
return self.n, self.order_id
def process_request(self, server_state):
local_g = server_state.graph
partition_book = server_state.partition_book
deg = _in_degrees(local_g, partition_book, self.n)
return InDegreeResponse(deg, self.order_id)
class InDegreeResponse(Response):
"""The response for in-degree"""
def __init__(self, deg, order_id):
self.val = deg
self.order_id = order_id
def __setstate__(self, state):
self.val, self.order_id = state
def __getstate__(self):
return self.val, self.order_id
class OutDegreeRequest(Request):
"""Out-degree Request"""
def __init__(self, n, order_id):
self.n = n
self.order_id = order_id
def __setstate__(self, state):
self.n, self.order_id = state
def __getstate__(self):
return self.n, self.order_id
def process_request(self, server_state):
local_g = server_state.graph
partition_book = server_state.partition_book
deg = _out_degrees(local_g, partition_book, self.n)
return OutDegreeResponse(deg, self.order_id)
class OutDegreeResponse(Response):
"""The response for out-degree"""
def __init__(self, deg, order_id):
self.val = deg
self.order_id = order_id
def __setstate__(self, state):
self.val, self.order_id = state
def __getstate__(self):
return self.val, self.order_id
class InSubgraphRequest(Request): class InSubgraphRequest(Request):
"""InSubgraph Request""" """InSubgraph Request"""
...@@ -410,11 +492,11 @@ def find_edges(g, edge_ids): ...@@ -410,11 +492,11 @@ def find_edges(g, edge_ids):
tensor tensor
The destination node ID array. The destination node ID array.
""" """
def issue_remove_req(edge_ids, order_id): def issue_remote_req(edge_ids, order_id):
return EdgesRequest(edge_ids, order_id) return EdgesRequest(edge_ids, order_id)
def local_access(local_g, partition_book, edge_ids): def local_access(local_g, partition_book, edge_ids):
return _find_edges(local_g, partition_book, edge_ids) return _find_edges(local_g, partition_book, edge_ids)
return _distributed_edge_access(g, edge_ids, issue_remove_req, local_access) return _distributed_edge_access(g, edge_ids, issue_remote_req, local_access)
def in_subgraph(g, nodes): def in_subgraph(g, nodes):
"""Return the subgraph induced on the inbound edges of the given nodes. """Return the subgraph induced on the inbound edges of the given nodes.
...@@ -452,6 +534,70 @@ def in_subgraph(g, nodes): ...@@ -452,6 +534,70 @@ def in_subgraph(g, nodes):
return _in_subgraph(local_g, partition_book, local_nids) return _in_subgraph(local_g, partition_book, local_nids)
return _distributed_access(g, nodes, issue_remote_req, local_access) return _distributed_access(g, nodes, issue_remote_req, local_access)
def _distributed_get_node_property(g, n, issue_remote_req, local_access):
req_list = []
partition_book = g.get_partition_book()
n = toindex(n).tousertensor()
partition_id = partition_book.nid2partid(n)
local_nids = None
reorder_idx = []
for pid in range(partition_book.num_partitions()):
mask = (partition_id == pid)
nid = F.boolean_mask(n, mask)
reorder_idx.append(F.nonzero_1d(mask))
if pid == partition_book.partid and g.local_partition is not None:
assert local_nids is None
local_nids = nid
elif len(nid) != 0:
req = issue_remote_req(nid, pid)
req_list.append((pid, req))
# send requests to the remote machine.
msgseq2pos = None
if len(req_list) > 0:
msgseq2pos = send_requests_to_machine(req_list)
# handle edges in local partition.
vals = None
if local_nids is not None:
local_vals = local_access(g.local_partition, partition_book, local_nids)
shape = list(F.shape(local_vals))
shape[0] = len(n)
vals = F.zeros(shape, F.dtype(local_vals), F.cpu())
vals = F.scatter_row(vals, reorder_idx[partition_book.partid], local_vals)
# receive responses from remote machines.
if msgseq2pos is not None:
results = recv_responses(msgseq2pos)
if len(results) > 0 and vals is None:
shape = list(F.shape(results[0].val))
shape[0] = len(n)
vals = F.zeros(shape, F.dtype(results[0].val), F.cpu())
for result in results:
val = result.val
vals = F.scatter_row(vals, reorder_idx[result.order_id], val)
return vals
def in_degrees(g, v):
'''Get in-degrees
'''
def issue_remote_req(v, order_id):
return InDegreeRequest(v, order_id)
def local_access(local_g, partition_book, v):
return _in_degrees(local_g, partition_book, v)
return _distributed_get_node_property(g, v, issue_remote_req, local_access)
def out_degrees(g, u):
'''Get out-degrees
'''
def issue_remote_req(u, order_id):
return OutDegreeRequest(u, order_id)
def local_access(local_g, partition_book, u):
return _out_degrees(local_g, partition_book, u)
return _distributed_get_node_property(g, u, issue_remote_req, local_access)
register_service(SAMPLING_SERVICE_ID, SamplingRequest, SubgraphResponse) register_service(SAMPLING_SERVICE_ID, SamplingRequest, SubgraphResponse)
register_service(EDGES_SERVICE_ID, EdgesRequest, FindEdgeResponse) register_service(EDGES_SERVICE_ID, EdgesRequest, FindEdgeResponse)
register_service(INSUBGRAPH_SERVICE_ID, InSubgraphRequest, SubgraphResponse) register_service(INSUBGRAPH_SERVICE_ID, InSubgraphRequest, SubgraphResponse)
register_service(OUTDEGREE_SERVICE_ID, OutDegreeRequest, OutDegreeResponse)
register_service(INDEGREE_SERVICE_ID, InDegreeRequest, InDegreeResponse)
...@@ -171,16 +171,37 @@ def partition_graph_with_halo(g, node_part, extra_cached_hops, reshuffle=False): ...@@ -171,16 +171,37 @@ def partition_graph_with_halo(g, node_part, extra_cached_hops, reshuffle=False):
node_part = node_part.tousertensor() node_part = node_part.tousertensor()
start = time.time() start = time.time()
# This function determines whether an edge belongs to a partition.
# An edge is assigned to a partition based on its destination node. If its destination node
# is assigned to a partition, we assign the edge to the partition as well.
def get_inner_edge(subg, inner_node):
inner_edge = F.zeros((subg.number_of_edges(),), F.int8, F.cpu())
inner_nids = F.nonzero_1d(inner_node)
# TODO(zhengda) we need to fix utils.toindex() to avoid the dtype cast below.
inner_nids = F.astype(inner_nids, F.int64)
inner_eids = subg.in_edges(inner_nids, form='eid')
inner_edge = F.scatter_row(inner_edge, inner_eids,
F.ones((len(inner_eids),), F.dtype(inner_edge), F.cpu()))
return inner_edge
# This creaets a subgraph from subgraphs returned from the CAPI above. # This creaets a subgraph from subgraphs returned from the CAPI above.
def create_subgraph(subg, induced_nodes, induced_edges): def create_subgraph(subg, induced_nodes, induced_edges, inner_node):
subg1 = DGLHeteroGraph(gidx=subg.graph, ntypes=['_N'], etypes=['_E']) subg1 = DGLHeteroGraph(gidx=subg.graph, ntypes=['_N'], etypes=['_E'])
# If IDs are shuffled, we should shuffled edges. This will help us collect edge data # If IDs are shuffled, we should shuffled edges. This will help us collect edge data
# from the distributed graph after training. # from the distributed graph after training.
if reshuffle: if reshuffle:
sorted_edges, index = F.sort_1d(induced_edges[0]) # When we shuffle edges, we need to make sure that the inner edges are assigned with
# contiguous edge IDs and their ID range starts with 0. In other words, we want to
# place these edge IDs in the front of the edge list. To ensure that, we add the IDs
# of outer edges with a large value, so we will get the sorted list as we want.
max_eid = F.max(induced_edges[0], 0) + 1
inner_edge = get_inner_edge(subg1, inner_node)
eid = F.astype(induced_edges[0], F.int64) + max_eid * F.astype(inner_edge == 0, F.int64)
_, index = F.sort_1d(eid)
subg1 = edge_subgraph(subg1, index, preserve_nodes=True) subg1 = edge_subgraph(subg1, index, preserve_nodes=True)
subg1.ndata[NID] = induced_nodes[0] subg1.ndata[NID] = induced_nodes[0]
subg1.edata[EID] = sorted_edges subg1.edata[EID] = F.gather_row(induced_edges[0], index)
else: else:
subg1.ndata[NID] = induced_nodes[0] subg1.ndata[NID] = induced_nodes[0]
subg1.edata[EID] = induced_edges[0] subg1.edata[EID] = induced_edges[0]
...@@ -188,8 +209,8 @@ def partition_graph_with_halo(g, node_part, extra_cached_hops, reshuffle=False): ...@@ -188,8 +209,8 @@ def partition_graph_with_halo(g, node_part, extra_cached_hops, reshuffle=False):
for i, subg in enumerate(subgs): for i, subg in enumerate(subgs):
inner_node = _get_halo_heterosubgraph_inner_node(subg) inner_node = _get_halo_heterosubgraph_inner_node(subg)
subg = create_subgraph(subg, subg.induced_nodes, subg.induced_edges)
inner_node = F.zerocopy_from_dlpack(inner_node.to_dlpack()) inner_node = F.zerocopy_from_dlpack(inner_node.to_dlpack())
subg = create_subgraph(subg, subg.induced_nodes, subg.induced_edges, inner_node)
subg.ndata['inner_node'] = inner_node subg.ndata['inner_node'] = inner_node
subg.ndata['part_id'] = F.gather_row(node_part, subg.ndata[NID]) subg.ndata['part_id'] = F.gather_row(node_part, subg.ndata[NID])
if reshuffle: if reshuffle:
...@@ -197,13 +218,7 @@ def partition_graph_with_halo(g, node_part, extra_cached_hops, reshuffle=False): ...@@ -197,13 +218,7 @@ def partition_graph_with_halo(g, node_part, extra_cached_hops, reshuffle=False):
subg.edata['orig_id'] = F.gather_row(orig_eids, subg.edata[EID]) subg.edata['orig_id'] = F.gather_row(orig_eids, subg.edata[EID])
if extra_cached_hops >= 1: if extra_cached_hops >= 1:
inner_edge = F.zeros((subg.number_of_edges(),), F.int8, F.cpu()) inner_edge = get_inner_edge(subg, inner_node)
inner_nids = F.nonzero_1d(subg.ndata['inner_node'])
# TODO(zhengda) we need to fix utils.toindex() to avoid the dtype cast below.
inner_nids = F.astype(inner_nids, F.int64)
inner_eids = subg.in_edges(inner_nids, form='eid')
inner_edge = F.scatter_row(inner_edge, inner_eids,
F.ones((len(inner_eids),), F.dtype(inner_edge), F.cpu()))
else: else:
inner_edge = F.ones((subg.number_of_edges(),), F.int8, F.cpu()) inner_edge = F.ones((subg.number_of_edges(),), F.int8, F.cpu())
subg.edata['inner_edge'] = inner_edge subg.edata['inner_edge'] = inner_edge
......
...@@ -111,9 +111,14 @@ HaloHeteroSubgraph GetSubgraphWithHalo(std::shared_ptr<HeteroGraph> hg, ...@@ -111,9 +111,14 @@ HaloHeteroSubgraph GetSubgraphWithHalo(std::shared_ptr<HeteroGraph> hg,
const dgl_id_t *dst_data = static_cast<dgl_id_t *>(dst->data); const dgl_id_t *dst_data = static_cast<dgl_id_t *>(dst->data);
const dgl_id_t *eid_data = static_cast<dgl_id_t *>(eid->data); const dgl_id_t *eid_data = static_cast<dgl_id_t *>(eid->data);
for (int64_t i = 0; i < num_edges; i++) { for (int64_t i = 0; i < num_edges; i++) {
auto it1 = orig_nodes.find(src_data[i]);
// If the source node is in the partition, we have got this edge when we iterate over
// the out-edges above.
if (it1 == orig_nodes.end()) {
edge_src.push_back(src_data[i]); edge_src.push_back(src_data[i]);
edge_dst.push_back(dst_data[i]); edge_dst.push_back(dst_data[i]);
edge_eid.push_back(eid_data[i]); edge_eid.push_back(eid_data[i]);
}
// If we haven't seen this node. // If we haven't seen this node.
auto it = all_nodes.find(src_data[i]); auto it = all_nodes.find(src_data[i]);
if (it == all_nodes.end()) { if (it == all_nodes.end()) {
...@@ -124,6 +129,32 @@ HaloHeteroSubgraph GetSubgraphWithHalo(std::shared_ptr<HeteroGraph> hg, ...@@ -124,6 +129,32 @@ HaloHeteroSubgraph GetSubgraphWithHalo(std::shared_ptr<HeteroGraph> hg,
} }
} }
if (num_hops > 0) {
EdgeArray out_edges = hg->OutEdges(0, nodes);
auto src = out_edges.src;
auto dst = out_edges.dst;
auto eid = out_edges.id;
auto num_edges = eid->shape[0];
const dgl_id_t *src_data = static_cast<dgl_id_t *>(src->data);
const dgl_id_t *dst_data = static_cast<dgl_id_t *>(dst->data);
const dgl_id_t *eid_data = static_cast<dgl_id_t *>(eid->data);
for (int64_t i = 0; i < num_edges; i++) {
// If the outer edge isn't in the partition.
auto it1 = orig_nodes.find(dst_data[i]);
if (it1 == orig_nodes.end()) {
edge_src.push_back(src_data[i]);
edge_dst.push_back(dst_data[i]);
edge_eid.push_back(eid_data[i]);
}
// We don't expand along the out-edges.
auto it = all_nodes.find(dst_data[i]);
if (it == all_nodes.end()) {
all_nodes[dst_data[i]] = false;
old_node_ids.push_back(dst_data[i]);
}
}
}
// We assign new Ids to the nodes in the subgraph. We ensure that the HALO // We assign new Ids to the nodes in the subgraph. We ensure that the HALO
// nodes are behind the input nodes. // nodes are behind the input nodes.
std::unordered_map<dgl_id_t, dgl_id_t> old2new; std::unordered_map<dgl_id_t, dgl_id_t> old2new;
...@@ -213,11 +244,12 @@ DGL_REGISTER_GLOBAL("partition._CAPI_DGLPartitionWithHalo_Hetero") ...@@ -213,11 +244,12 @@ DGL_REGISTER_GLOBAL("partition._CAPI_DGLPartitionWithHalo_Hetero")
part_ids.push_back(it->first); part_ids.push_back(it->first);
part_nodes.push_back(it->second); part_nodes.push_back(it->second);
} }
// When we construct subgraphs, we only access in-edges. // When we construct subgraphs, we need to access both in-edges and out-edges.
// We need to make sure the in-CSR exists. Otherwise, we'll // We need to make sure the in-CSR and out-CSR exist. Otherwise, we'll
// try to construct in-CSR in openmp for loop, which will lead // try to construct in-CSR and out-CSR in openmp for loop, which will lead
// to some unexpected results. // to some unexpected results.
ugptr->GetInCSR(); ugptr->GetInCSR();
ugptr->GetOutCSR();
std::vector<std::shared_ptr<HaloHeteroSubgraph>> subgs(max_part_id + 1); std::vector<std::shared_ptr<HaloHeteroSubgraph>> subgs(max_part_id + 1);
int num_partitions = part_nodes.size(); int num_partitions = part_nodes.size();
#pragma omp parallel for #pragma omp parallel for
......
...@@ -508,7 +508,10 @@ def test_partition_with_halo(): ...@@ -508,7 +508,10 @@ def test_partition_with_halo():
for part_id, subg in subgs.items(): for part_id, subg in subgs.items():
node_ids = np.nonzero(node_part == part_id)[0] node_ids = np.nonzero(node_part == part_id)[0]
lnode_ids = np.nonzero(F.asnumpy(subg.ndata['inner_node']))[0] lnode_ids = np.nonzero(F.asnumpy(subg.ndata['inner_node']))[0]
assert np.all(np.sort(F.asnumpy(subg.ndata['orig_id'])[lnode_ids]) == node_ids) orig_nids = F.asnumpy(subg.ndata['orig_id'])[lnode_ids]
assert np.all(np.sort(orig_nids) == node_ids)
assert np.all(F.asnumpy(subg.in_degrees(lnode_ids)) == F.asnumpy(g.in_degrees(orig_nids)))
assert np.all(F.asnumpy(subg.out_degrees(lnode_ids)) == F.asnumpy(g.out_degrees(orig_nids)))
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet') @unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(F._default_context_str == 'gpu', reason="METIS doesn't support GPU") @unittest.skipIf(F._default_context_str == 'gpu', reason="METIS doesn't support GPU")
...@@ -1478,4 +1481,4 @@ def test_remove_selfloop(idtype): ...@@ -1478,4 +1481,4 @@ def test_remove_selfloop(idtype):
assert raise_error assert raise_error
if __name__ == '__main__': if __name__ == '__main__':
pass test_partition_with_halo()
...@@ -2,7 +2,7 @@ import dgl ...@@ -2,7 +2,7 @@ import dgl
import unittest import unittest
import os import os
from dgl.data import CitationGraphDataset from dgl.data import CitationGraphDataset
from dgl.distributed import sample_neighbors, find_edges from dgl.distributed import sample_neighbors
from dgl.distributed import partition_graph, load_partition, load_partition_book from dgl.distributed import partition_graph, load_partition, load_partition_book
import sys import sys
import multiprocessing as mp import multiprocessing as mp
...@@ -43,13 +43,30 @@ def start_find_edges_client(rank, tmpdir, disable_shared_mem, eids): ...@@ -43,13 +43,30 @@ def start_find_edges_client(rank, tmpdir, disable_shared_mem, eids):
dgl.distributed.initialize("rpc_ip_config.txt") dgl.distributed.initialize("rpc_ip_config.txt")
dist_graph = DistGraph("test_find_edges", gpb=gpb) dist_graph = DistGraph("test_find_edges", gpb=gpb)
try: try:
u, v = find_edges(dist_graph, eids) u, v = dist_graph.find_edges(eids)
except Exception as e: except Exception as e:
print(e) print(e)
u, v = None, None u, v = None, None
dgl.distributed.exit_client() dgl.distributed.exit_client()
return u, v return u, v
def start_get_degrees_client(rank, tmpdir, disable_shared_mem, nids=None):
gpb = None
if disable_shared_mem:
_, _, _, gpb, _, _, _ = load_partition(tmpdir / 'test_get_degrees.json', rank)
dgl.distributed.initialize("rpc_ip_config.txt", 1)
dist_graph = DistGraph("test_get_degrees", gpb=gpb)
try:
in_deg = dist_graph.in_degrees(nids)
all_in_deg = dist_graph.in_degrees()
out_deg = dist_graph.out_degrees(nids)
all_out_deg = dist_graph.out_degrees()
except Exception as e:
print(e)
in_deg, out_deg, all_in_deg, all_out_deg = None, None, None, None
dgl.distributed.exit_client()
return in_deg, out_deg, all_in_deg, all_out_deg
def check_rpc_sampling(tmpdir, num_server): def check_rpc_sampling(tmpdir, num_server):
ip_config = open("rpc_ip_config.txt", "w") ip_config = open("rpc_ip_config.txt", "w")
for _ in range(num_server): for _ in range(num_server):
...@@ -107,8 +124,8 @@ def check_rpc_find_edges_shuffle(tmpdir, num_server): ...@@ -107,8 +124,8 @@ def check_rpc_find_edges_shuffle(tmpdir, num_server):
time.sleep(1) time.sleep(1)
pserver_list.append(p) pserver_list.append(p)
orig_nid = F.zeros((g.number_of_nodes(),), dtype=F.int64) orig_nid = F.zeros((g.number_of_nodes(),), dtype=F.int64, ctx=F.cpu())
orig_eid = F.zeros((g.number_of_edges(),), dtype=F.int64) orig_eid = F.zeros((g.number_of_edges(),), dtype=F.int64, ctx=F.cpu())
for i in range(num_server): for i in range(num_server):
part, _, _, _, _, _, _ = load_partition(tmpdir / 'test_find_edges.json', i) part, _, _, _, _, _, _ = load_partition(tmpdir / 'test_find_edges.json', i)
orig_nid[part.ndata[dgl.NID]] = part.ndata['orig_id'] orig_nid[part.ndata[dgl.NID]] = part.ndata['orig_id']
...@@ -123,6 +140,66 @@ def check_rpc_find_edges_shuffle(tmpdir, num_server): ...@@ -123,6 +140,66 @@ def check_rpc_find_edges_shuffle(tmpdir, num_server):
assert F.array_equal(u, du) assert F.array_equal(u, du)
assert F.array_equal(v, dv) assert F.array_equal(v, dv)
# Wait non shared memory graph store
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(dgl.backend.backend_name == 'tensorflow', reason='Not support tensorflow for now')
@pytest.mark.parametrize("num_server", [1, 2])
def test_rpc_find_edges_shuffle(num_server):
import tempfile
os.environ['DGL_DIST_MODE'] = 'distributed'
with tempfile.TemporaryDirectory() as tmpdirname:
check_rpc_find_edges_shuffle(Path(tmpdirname), num_server)
def check_rpc_get_degree_shuffle(tmpdir, num_server):
ip_config = open("rpc_ip_config.txt", "w")
for _ in range(num_server):
ip_config.write('{}\n'.format(get_local_usable_addr()))
ip_config.close()
g = CitationGraphDataset("cora")[0]
g.readonly()
num_parts = num_server
partition_graph(g, 'test_get_degrees', num_parts, tmpdir,
num_hops=1, part_method='metis', reshuffle=True)
pserver_list = []
ctx = mp.get_context('spawn')
for i in range(num_server):
p = ctx.Process(target=start_server, args=(i, tmpdir, num_server > 1, 'test_get_degrees'))
p.start()
time.sleep(1)
pserver_list.append(p)
orig_nid = F.zeros((g.number_of_nodes(),), dtype=F.int64, ctx=F.cpu())
for i in range(num_server):
part, _, _, _, _, _, _ = load_partition(tmpdir / 'test_get_degrees.json', i)
orig_nid[part.ndata[dgl.NID]] = part.ndata['orig_id']
time.sleep(3)
nids = F.tensor(np.random.randint(g.number_of_nodes(), size=100))
in_degs, out_degs, all_in_degs, all_out_degs = start_get_degrees_client(0, tmpdir, num_server > 1, nids)
print("Done get_degree")
for p in pserver_list:
p.join()
print('check results')
assert F.array_equal(g.in_degrees(orig_nid[nids]), in_degs)
assert F.array_equal(g.in_degrees(orig_nid), all_in_degs)
assert F.array_equal(g.out_degrees(orig_nid[nids]), out_degs)
assert F.array_equal(g.out_degrees(orig_nid), all_out_degs)
# Wait non shared memory graph store
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(dgl.backend.backend_name == 'tensorflow', reason='Not support tensorflow for now')
@pytest.mark.parametrize("num_server", [1, 2])
def test_rpc_get_degree_shuffle(num_server):
import tempfile
os.environ['DGL_DIST_MODE'] = 'distributed'
with tempfile.TemporaryDirectory() as tmpdirname:
check_rpc_get_degree_shuffle(Path(tmpdirname), num_server)
#@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet') #@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
#@unittest.skipIf(dgl.backend.backend_name == 'tensorflow', reason='Not support tensorflow for now') #@unittest.skipIf(dgl.backend.backend_name == 'tensorflow', reason='Not support tensorflow for now')
@unittest.skip('Only support partition with shuffle') @unittest.skip('Only support partition with shuffle')
...@@ -160,8 +237,8 @@ def check_rpc_sampling_shuffle(tmpdir, num_server): ...@@ -160,8 +237,8 @@ def check_rpc_sampling_shuffle(tmpdir, num_server):
for p in pserver_list: for p in pserver_list:
p.join() p.join()
orig_nid = F.zeros((g.number_of_nodes(),), dtype=F.int64) orig_nid = F.zeros((g.number_of_nodes(),), dtype=F.int64, ctx=F.cpu())
orig_eid = F.zeros((g.number_of_edges(),), dtype=F.int64) orig_eid = F.zeros((g.number_of_edges(),), dtype=F.int64, ctx=F.cpu())
for i in range(num_server): for i in range(num_server):
part, _, _, _, _, _, _ = load_partition(tmpdir / 'test_sampling.json', i) part, _, _, _, _, _, _ = load_partition(tmpdir / 'test_sampling.json', i)
orig_nid[part.ndata[dgl.NID]] = part.ndata['orig_id'] orig_nid[part.ndata[dgl.NID]] = part.ndata['orig_id']
...@@ -366,8 +443,8 @@ def check_rpc_in_subgraph_shuffle(tmpdir, num_server): ...@@ -366,8 +443,8 @@ def check_rpc_in_subgraph_shuffle(tmpdir, num_server):
p.join() p.join()
orig_nid = F.zeros((g.number_of_nodes(),), dtype=F.int64) orig_nid = F.zeros((g.number_of_nodes(),), dtype=F.int64, ctx=F.cpu())
orig_eid = F.zeros((g.number_of_edges(),), dtype=F.int64) orig_eid = F.zeros((g.number_of_edges(),), dtype=F.int64, ctx=F.cpu())
for i in range(num_server): for i in range(num_server):
part, _, _, _, _, _, _ = load_partition(tmpdir / 'test_in_subgraph.json', i) part, _, _, _, _, _, _ = load_partition(tmpdir / 'test_in_subgraph.json', i)
orig_nid[part.ndata[dgl.NID]] = part.ndata['orig_id'] orig_nid[part.ndata[dgl.NID]] = part.ndata['orig_id']
...@@ -404,6 +481,8 @@ if __name__ == "__main__": ...@@ -404,6 +481,8 @@ if __name__ == "__main__":
os.environ['DGL_DIST_MODE'] = 'distributed' os.environ['DGL_DIST_MODE'] = 'distributed'
check_rpc_sampling(Path(tmpdirname), 2) check_rpc_sampling(Path(tmpdirname), 2)
check_rpc_sampling(Path(tmpdirname), 1) check_rpc_sampling(Path(tmpdirname), 1)
check_rpc_get_degree_shuffle(Path(tmpdirname), 1)
check_rpc_get_degree_shuffle(Path(tmpdirname), 2)
check_rpc_find_edges_shuffle(Path(tmpdirname), 2) check_rpc_find_edges_shuffle(Path(tmpdirname), 2)
check_rpc_find_edges_shuffle(Path(tmpdirname), 1) check_rpc_find_edges_shuffle(Path(tmpdirname), 1)
check_rpc_in_subgraph_shuffle(Path(tmpdirname), 2) check_rpc_in_subgraph_shuffle(Path(tmpdirname), 2)
......
...@@ -252,10 +252,12 @@ def check_partition(g, part_method, reshuffle): ...@@ -252,10 +252,12 @@ def check_partition(g, part_method, reshuffle):
assert len(gpb.partid2eids(i)) == gpb_meta[i]['num_edges'] assert len(gpb.partid2eids(i)) == gpb_meta[i]['num_edges']
part_sizes.append((gpb_meta[i]['num_nodes'], gpb_meta[i]['num_edges'])) part_sizes.append((gpb_meta[i]['num_nodes'], gpb_meta[i]['num_edges']))
local_nid = gpb.nid2localnid(F.boolean_mask(part_g.ndata[dgl.NID], part_g.ndata['inner_node']), i) nid = F.boolean_mask(part_g.ndata[dgl.NID], part_g.ndata['inner_node'])
local_nid = gpb.nid2localnid(nid, i)
assert F.dtype(local_nid) in (F.int64, F.int32) assert F.dtype(local_nid) in (F.int64, F.int32)
assert np.all(F.asnumpy(local_nid) == np.arange(0, len(local_nid))) assert np.all(F.asnumpy(local_nid) == np.arange(0, len(local_nid)))
local_eid = gpb.eid2localeid(F.boolean_mask(part_g.edata[dgl.EID], part_g.edata['inner_edge']), i) eid = F.boolean_mask(part_g.edata[dgl.EID], part_g.edata['inner_edge'])
local_eid = gpb.eid2localeid(eid, i)
assert F.dtype(local_eid) in (F.int64, F.int32) assert F.dtype(local_eid) in (F.int64, F.int32)
assert np.all(F.asnumpy(local_eid) == np.arange(0, len(local_eid))) assert np.all(F.asnumpy(local_eid) == np.arange(0, len(local_eid)))
...@@ -265,12 +267,15 @@ def check_partition(g, part_method, reshuffle): ...@@ -265,12 +267,15 @@ def check_partition(g, part_method, reshuffle):
local_nodes1 = gpb.partid2nids(i) local_nodes1 = gpb.partid2nids(i)
assert F.dtype(local_nodes1) in (F.int32, F.int64) assert F.dtype(local_nodes1) in (F.int32, F.int64)
assert np.all(np.sort(F.asnumpy(local_nodes)) == np.sort(F.asnumpy(local_nodes1))) assert np.all(np.sort(F.asnumpy(local_nodes)) == np.sort(F.asnumpy(local_nodes1)))
assert np.all(F.asnumpy(llocal_nodes) == np.arange(len(llocal_nodes)))
# Check the edge map. # Check the edge map.
local_edges = F.boolean_mask(part_g.edata[dgl.EID], part_g.edata['inner_edge']) local_edges = F.boolean_mask(part_g.edata[dgl.EID], part_g.edata['inner_edge'])
llocal_edges = F.nonzero_1d(part_g.edata['inner_edge'])
local_edges1 = gpb.partid2eids(i) local_edges1 = gpb.partid2eids(i)
assert F.dtype(local_edges1) in (F.int32, F.int64) assert F.dtype(local_edges1) in (F.int32, F.int64)
assert np.all(np.sort(F.asnumpy(local_edges)) == np.sort(F.asnumpy(local_edges1))) assert np.all(np.sort(F.asnumpy(local_edges)) == np.sort(F.asnumpy(local_edges1)))
assert np.all(F.asnumpy(llocal_edges) == np.arange(len(llocal_edges)))
# Verify the mapping between the reshuffled IDs and the original IDs. # Verify the mapping between the reshuffled IDs and the original IDs.
part_src_ids, part_dst_ids = part_g.edges() part_src_ids, part_dst_ids = part_g.edges()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment