Unverified Commit ba7e7cf9 authored by xiang song(charlie.song)'s avatar xiang song(charlie.song) Committed by GitHub
Browse files

[New Feature] Per edge type sampler for to_homogeneous graphs. (#3131)



* fix.

* fix.

* fix.

* fix.

* Fix test

* Deprecate old DistEmbedding impl, use synchronized embedding impl

* Basic imple of heterogeneous on homogenenous sampling

* make pass

* Pass C++ test

* Add python test code

* lint

* lint

* Add MultiLayerEtypeNeighborSampler

* Add unitest for single machine dataloader

* Add dist dataloader test for edge type sampler

* Fix lint

* fix

* support for per etype sample

* Fix some bug and enable distributed training with per edge sample

* fix

* Now distributed training works

* turn off some mxnet

* turn off mxnet for some dist test

* fix

* upd

* upd according to the comments

* Fix

* Fix test and now distributed works.

* upd

* upd

* Fix

* Fix bug

* remove dead code.

* upd

* Fix

* upd

* Fix
Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-71-112.ec2.internal>
Co-authored-by: default avatarUbuntu <ubuntu@ip-172-31-2-66.ec2.internal>
Co-authored-by: default avatarDa Zheng <zhengda1936@gmail.com>
parent d70beba7
...@@ -174,8 +174,6 @@ def run(proc_id, n_gpus, n_cpus, args, devices, dataset, queue=None): ...@@ -174,8 +174,6 @@ def run(proc_id, n_gpus, n_cpus, args, devices, dataset, queue=None):
world_size=world_size, world_size=world_size,
rank=proc_id) rank=proc_id)
sampler = dgl.dataloading.MultiLayerNeighborSampler(fanouts) sampler = dgl.dataloading.MultiLayerNeighborSampler(fanouts)
loader = dgl.dataloading.NodeDataLoader( loader = dgl.dataloading.NodeDataLoader(
g, g,
...@@ -554,7 +552,7 @@ def main(args, devices): ...@@ -554,7 +552,7 @@ def main(args, devices):
num_classes, num_rels, target_idx, num_classes, num_rels, target_idx,
inv_target, train_idx, val_idx, inv_target, train_idx, val_idx,
test_idx, labels), test_idx, labels),
queue)) queue))
p.start() p.start()
procs.append(p) procs.append(p)
for p in procs: for p in procs:
......
...@@ -220,7 +220,7 @@ COOMatrix COOTranspose(COOMatrix coo); ...@@ -220,7 +220,7 @@ COOMatrix COOTranspose(COOMatrix coo);
* - The function first check whether the input COO matrix is sorted * - The function first check whether the input COO matrix is sorted
* using a linear scan. * using a linear scan.
* - If the COO matrix is row sorted, the conversion can be done very * - If the COO matrix is row sorted, the conversion can be done very
* efficiently in a sequential scan. The result indices and data arrays * efficiently in a sequential scan. The result indices and data arrays
* are directly equal to the column and data arrays from the input. * are directly equal to the column and data arrays from the input.
* - If the COO matrix is further column sorted, the result CSR is * - If the COO matrix is further column sorted, the result CSR is
* also column sorted. * also column sorted.
...@@ -359,6 +359,54 @@ COOMatrix COORowWiseSampling( ...@@ -359,6 +359,54 @@ COOMatrix COORowWiseSampling(
FloatArray prob = FloatArray(), FloatArray prob = FloatArray(),
bool replace = true); bool replace = true);
/*!
* \brief Randomly select a fixed number of non-zero entries for each edge type
* along each given row independently.
*
* The function performs random choices along each row independently.
* In each row, num_samples samples is picked for each edge type. (The edge
* type is stored in etypes)
* The picked indices are returned in the form of a COO matrix.
*
* If replace is false and a row has fewer non-zero values than num_samples,
* all the values are picked.
*
* Examples:
*
* // coo.num_rows = 4;
* // coo.num_cols = 4;
* // coo.rows = [0, 0, 0, 0, 3]
* // coo.cols = [0, 1, 3, 2, 3]
* // coo.data = [2, 3, 0, 1, 4]
* // etype = [0, 0, 0, 2, 1]
* COOMatrix coo = ...;
* IdArray rows = ... ; // [0, 3]
* COOMatrix sampled = COORowWisePerEtypeSampling(coo, rows, etype, 2, FloatArray(), false);
* // possible sampled coo matrix:
* // sampled.num_rows = 4
* // sampled.num_cols = 4
* // sampled.rows = [0, 0, 0, 3]
* // sampled.cols = [0, 3, 2, 3]
* // sampled.data = [2, 0, 1, 4]
*
* \param mat Input coo matrix.
* \param rows Rows to sample from.
* \param etypes Edge types of each edge.
* \param num_samples Number of samples
* \param prob Unnormalized probability array. Should be of the same length as the data array.
* If an empty array is provided, assume uniform.
* \param replace True if sample with replacement
* \return A COOMatrix storing the picked row and col indices. Its data field stores the
* the index of the picked elements in the value array.
*/
COOMatrix COORowWisePerEtypeSampling(
COOMatrix mat,
IdArray rows,
IdArray etypes,
int64_t num_samples,
FloatArray prob = FloatArray(),
bool replace = true);
/*! /*!
* \brief Select K non-zero entries with the largest weights along each given row. * \brief Select K non-zero entries with the largest weights along each given row.
* *
...@@ -405,7 +453,7 @@ COOMatrix COORowWiseTopk( ...@@ -405,7 +453,7 @@ COOMatrix COORowWiseTopk(
/*! /*!
* \brief Union two COOMatrix into one COOMatrix. * \brief Union two COOMatrix into one COOMatrix.
* *
* Two Matrix must have the same shape. * Two Matrix must have the same shape.
* *
* Example: * Example:
...@@ -477,7 +525,7 @@ COOMatrix DisjointUnionCoo( ...@@ -477,7 +525,7 @@ COOMatrix DisjointUnionCoo(
* [3, 0, 2], * [3, 0, 2],
* [1, 1, 0], * [1, 1, 0],
* [0, 0, 4]] * [0, 0, 4]]
* *
* B, cnt, edge_map = COOToSimple(A) * B, cnt, edge_map = COOToSimple(A)
* *
* B = [[0, 0, 0], * B = [[0, 0, 0],
...@@ -588,7 +636,7 @@ COOMatrix COOSliceContiguousChunk( ...@@ -588,7 +636,7 @@ COOMatrix COOSliceContiguousChunk(
/*! /*!
* \brief Create a LineGraph of input coo * \brief Create a LineGraph of input coo
* *
* A = [[0, 0, 1], * A = [[0, 0, 1],
* [1, 0, 1], * [1, 0, 1],
* [1, 1, 0]] * [1, 1, 0]]
......
...@@ -386,6 +386,53 @@ COOMatrix CSRRowWiseSampling( ...@@ -386,6 +386,53 @@ COOMatrix CSRRowWiseSampling(
FloatArray prob = FloatArray(), FloatArray prob = FloatArray(),
bool replace = true); bool replace = true);
/*!
* \brief Randomly select a fixed number of non-zero entries for each edge type
* along each given row independently.
*
* The function performs random choices along each row independently.
* In each row, num_samples samples is picked for each edge type. (The edge
* type is stored in etypes)
* The picked indices are returned in the form of a COO matrix.
*
* If replace is false and a row has fewer non-zero values than num_samples,
* all the values are picked.
*
* Examples: TODO
*
* // csr.num_rows = 4;
* // csr.num_cols = 4;
* // csr.indptr = [0, 4, 4, 4, 5]
* // csr.cols = [0, 1, 3, 2, 3]
* // csr.data = [2, 3, 0, 1, 4]
* // etype = [0, 0, 0, 2, 1]
* CSRMatrix csr = ...;
* IdArray rows = ... ; // [0, 3]
* COOMatrix sampled = CSRRowWisePerEtypeSampling(csr, rows, etype, 2, FloatArray(), false);
* // possible sampled coo matrix:
* // sampled.num_rows = 4
* // sampled.num_cols = 4
* // sampled.rows = [0, 0, 0, 3]
* // sampled.cols = [0, 3, 2, 3]
* // sampled.data = [2, 0, 1, 4]
*
* \param mat Input CSR matrix.
* \param rows Rows to sample from.
* \param etypes Edge types of each edge.
* \param num_samples Number of samples
* \param prob Unnormalized probability array. Should be of the same length as the data array.
* If an empty array is provided, assume uniform.
* \param replace True if sample with replacement
* \return A COOMatrix storing the picked row, col and data indices.
*/
COOMatrix CSRRowWisePerEtypeSampling(
CSRMatrix mat,
IdArray rows,
IdArray etypes,
int64_t num_samples,
FloatArray prob = FloatArray(),
bool replace = true);
/*! /*!
* \brief Select K non-zero entries with the largest weights along each given row. * \brief Select K non-zero entries with the largest weights along each given row.
* *
......
...@@ -3,6 +3,7 @@ from .dataloader import BlockSampler ...@@ -3,6 +3,7 @@ from .dataloader import BlockSampler
from .. import sampling, subgraph, distributed from .. import sampling, subgraph, distributed
from .. import ndarray as nd from .. import ndarray as nd
from .. import backend as F from .. import backend as F
from ..base import ETYPE
class MultiLayerNeighborSampler(BlockSampler): class MultiLayerNeighborSampler(BlockSampler):
"""Sampler that builds computational dependency of node representations via """Sampler that builds computational dependency of node representations via
...@@ -78,7 +79,15 @@ class MultiLayerNeighborSampler(BlockSampler): ...@@ -78,7 +79,15 @@ class MultiLayerNeighborSampler(BlockSampler):
# let's use sample_neighbors to replace in_subgraph for now. # let's use sample_neighbors to replace in_subgraph for now.
frontier = distributed.sample_neighbors(g, seed_nodes, -1, replace=False) frontier = distributed.sample_neighbors(g, seed_nodes, -1, replace=False)
else: else:
frontier = distributed.sample_neighbors(g, seed_nodes, fanout, replace=self.replace) if len(g.etypes) > 1: # heterogeneous distributed graph
# The edge type is stored in g.edata[dgl.ETYPE]
assert isinstance(fanout, int), "For distributed training, " \
"we can only sample same number of neighbors for each edge type"
frontier = distributed.sample_etype_neighbors(
g, seed_nodes, ETYPE, fanout, replace=self.replace)
else:
frontier = distributed.sample_neighbors(
g, seed_nodes, fanout, replace=self.replace)
else: else:
if fanout is None: if fanout is None:
frontier = subgraph.in_subgraph(g, seed_nodes) frontier = subgraph.in_subgraph(g, seed_nodes)
......
...@@ -28,4 +28,4 @@ from .dist_context import initialize, exit_client ...@@ -28,4 +28,4 @@ from .dist_context import initialize, exit_client
from .kvstore import KVServer, KVClient from .kvstore import KVServer, KVClient
from .server_state import ServerState from .server_state import ServerState
from .dist_dataloader import DistDataLoader from .dist_dataloader import DistDataLoader
from .graph_services import sample_neighbors, in_subgraph from .graph_services import sample_neighbors, sample_etype_neighbors, in_subgraph
...@@ -73,14 +73,15 @@ def _copy_graph_to_shared_mem(g, graph_name, graph_format): ...@@ -73,14 +73,15 @@ def _copy_graph_to_shared_mem(g, graph_name, graph_format):
new_g.edata['inner_edge'] = _to_shared_mem(g.edata['inner_edge'], new_g.edata['inner_edge'] = _to_shared_mem(g.edata['inner_edge'],
_get_edata_path(graph_name, 'inner_edge')) _get_edata_path(graph_name, 'inner_edge'))
new_g.edata[EID] = _to_shared_mem(g.edata[EID], _get_edata_path(graph_name, EID)) new_g.edata[EID] = _to_shared_mem(g.edata[EID], _get_edata_path(graph_name, EID))
new_g.edata[ETYPE] = _to_shared_mem(g.edata[ETYPE], _get_edata_path(graph_name, ETYPE))
return new_g return new_g
FIELD_DICT = {'inner_node': F.int32, # A flag indicates whether the node is inside a partition. FIELD_DICT = {'inner_node': F.int32, # A flag indicates whether the node is inside a partition.
'inner_edge': F.int32, # A flag indicates whether the edge is inside a partition. 'inner_edge': F.int32, # A flag indicates whether the edge is inside a partition.
NID: F.int64, NID: F.int64,
EID: F.int64, EID: F.int64,
NTYPE: F.int16, NTYPE: F.int32,
ETYPE: F.int16} ETYPE: F.int32}
def _get_shared_mem_ndata(g, graph_name, name): def _get_shared_mem_ndata(g, graph_name, name):
''' Get shared-memory node data from DistGraph server. ''' Get shared-memory node data from DistGraph server.
...@@ -125,6 +126,7 @@ def _get_graph_from_shared_mem(graph_name): ...@@ -125,6 +126,7 @@ def _get_graph_from_shared_mem(graph_name):
g.edata['inner_edge'] = _get_shared_mem_edata(g, graph_name, 'inner_edge') g.edata['inner_edge'] = _get_shared_mem_edata(g, graph_name, 'inner_edge')
g.edata[EID] = _get_shared_mem_edata(g, graph_name, EID) g.edata[EID] = _get_shared_mem_edata(g, graph_name, EID)
g.edata[ETYPE] = _get_shared_mem_edata(g, graph_name, ETYPE)
return g return g
NodeSpace = namedtuple('NodeSpace', ['data']) NodeSpace = namedtuple('NodeSpace', ['data'])
......
...@@ -3,6 +3,7 @@ from collections import namedtuple ...@@ -3,6 +3,7 @@ from collections import namedtuple
from .rpc import Request, Response, send_requests_to_machine, recv_responses from .rpc import Request, Response, send_requests_to_machine, recv_responses
from ..sampling import sample_neighbors as local_sample_neighbors from ..sampling import sample_neighbors as local_sample_neighbors
from ..sampling import sample_etype_neighbors as local_sample_etype_neighbors
from ..subgraph import in_subgraph as local_in_subgraph from ..subgraph import in_subgraph as local_in_subgraph
from .rpc import register_service from .rpc import register_service
from ..convert import graph, heterograph from ..convert import graph, heterograph
...@@ -17,6 +18,7 @@ INSUBGRAPH_SERVICE_ID = 6658 ...@@ -17,6 +18,7 @@ INSUBGRAPH_SERVICE_ID = 6658
EDGES_SERVICE_ID = 6659 EDGES_SERVICE_ID = 6659
OUTDEGREE_SERVICE_ID = 6660 OUTDEGREE_SERVICE_ID = 6660
INDEGREE_SERVICE_ID = 6661 INDEGREE_SERVICE_ID = 6661
ETYPE_SAMPLING_SERVICE_ID = 6662
class SubgraphResponse(Response): class SubgraphResponse(Response):
"""The response for sampling and in_subgraph""" """The response for sampling and in_subgraph"""
...@@ -66,6 +68,27 @@ def _sample_neighbors(local_g, partition_book, seed_nodes, fan_out, edge_dir, pr ...@@ -66,6 +68,27 @@ def _sample_neighbors(local_g, partition_book, seed_nodes, fan_out, edge_dir, pr
global_eids = F.gather_row(local_g.edata[EID], sampled_graph.edata[EID]) global_eids = F.gather_row(local_g.edata[EID], sampled_graph.edata[EID])
return global_src, global_dst, global_eids return global_src, global_dst, global_eids
def _sample_etype_neighbors(local_g, partition_book, seed_nodes, etype_field,
fan_out, edge_dir, prob, replace):
""" Sample from local partition.
The input nodes use global IDs. We need to map the global node IDs to local node IDs,
perform sampling and map the sampled results to the global IDs space again.
The sampled results are stored in three vectors that store source nodes, destination nodes
and edge IDs.
"""
local_ids = partition_book.nid2localnid(seed_nodes, partition_book.partid)
local_ids = F.astype(local_ids, local_g.idtype)
# local_ids = self.seed_nodes
sampled_graph = local_sample_etype_neighbors(
local_g, local_ids, etype_field, fan_out, edge_dir, prob, replace, _dist_training=True)
global_nid_mapping = local_g.ndata[NID]
src, dst = sampled_graph.edges()
global_src, global_dst = F.gather_row(global_nid_mapping, src), \
F.gather_row(global_nid_mapping, dst)
global_eids = F.gather_row(local_g.edata[EID], sampled_graph.edata[EID])
return global_src, global_dst, global_eids
def _find_edges(local_g, partition_book, seed_edges): def _find_edges(local_g, partition_book, seed_edges):
"""Given an edge ID array, return the source """Given an edge ID array, return the source
and destination node ID array ``s`` and ``d`` in the local partition. and destination node ID array ``s`` and ``d`` in the local partition.
...@@ -136,6 +159,38 @@ class SamplingRequest(Request): ...@@ -136,6 +159,38 @@ class SamplingRequest(Request):
self.prob, self.replace) self.prob, self.replace)
return SubgraphResponse(global_src, global_dst, global_eids) return SubgraphResponse(global_src, global_dst, global_eids)
class SamplingRequestEtype(Request):
"""Sampling Request"""
def __init__(self, nodes, etype_field, fan_out, edge_dir='in', prob=None, replace=False):
self.seed_nodes = nodes
self.edge_dir = edge_dir
self.prob = prob
self.replace = replace
self.fan_out = fan_out
self.etype_field = etype_field
def __setstate__(self, state):
self.seed_nodes, self.edge_dir, self.prob, self.replace, \
self.fan_out, self.etype_field = state
def __getstate__(self):
return self.seed_nodes, self.edge_dir, self.prob, self.replace, \
self.fan_out, self.etype_field
def process_request(self, server_state):
local_g = server_state.graph
partition_book = server_state.partition_book
global_src, global_dst, global_eids = _sample_etype_neighbors(local_g,
partition_book,
self.seed_nodes,
self.etype_field,
self.fan_out,
self.edge_dir,
self.prob,
self.replace)
return SubgraphResponse(global_src, global_dst, global_eids)
class EdgesRequest(Request): class EdgesRequest(Request):
"""Edges Request""" """Edges Request"""
...@@ -327,6 +382,112 @@ def _distributed_access(g, nodes, issue_remote_req, local_access): ...@@ -327,6 +382,112 @@ def _distributed_access(g, nodes, issue_remote_req, local_access):
sampled_graph = merge_graphs(res_list, g.number_of_nodes()) sampled_graph = merge_graphs(res_list, g.number_of_nodes())
return sampled_graph return sampled_graph
def sample_etype_neighbors(g, nodes, etype_field, fanout, edge_dir='in', prob=None, replace=False):
"""Sample from the neighbors of the given nodes from a distributed graph.
For each node, a number of inbound (or outbound when ``edge_dir == 'out'``) edges
will be randomly chosen. The returned graph will contain all the nodes in the
original graph, but only the sampled edges.
Node/edge features are not preserved. The original IDs of
the sampled edges are stored as the `dgl.EID` feature in the returned graph.
This function assumes the input is a homogeneous ``DGLGraph`` with the TRUE edge type
information stored as the edge data in `etype_field`. The sampled subgraph is also
stored in the homogeneous graph format. That is, all nodes and edges are assigned
with unique IDs (in contrast, we typically use a type name and a node/edge ID to
identify a node or an edge in ``DGLGraph``). We refer to this type of IDs
as *homogeneous ID*.
Users can use :func:`dgl.distributed.GraphPartitionBook.map_to_per_ntype`
and :func:`dgl.distributed.GraphPartitionBook.map_to_per_etype`
to identify their node/edge types and node/edge IDs of that type.
Parameters
----------
g : DistGraph
The distributed graph..
nodes : tensor or dict
Node IDs to sample neighbors from. If it's a dict, it should contain only
one key-value pair to make this API consistent with dgl.sampling.sample_neighbors.
etype_field : string
The field in g.edata storing the edge type.
fanout : int
The number of edges to be sampled for each node per edge type.
If -1 is given, all of the neighbors will be selected.
edge_dir : str, optional
Determines whether to sample inbound or outbound edges.
Can take either ``in`` for inbound edges or ``out`` for outbound edges.
prob : str, optional
Feature name used as the (unnormalized) probabilities associated with each
neighboring edge of a node. The feature must have only one element for each
edge.
The features must be non-negative floats, and the sum of the features of
inbound/outbound edges for every node must be positive (though they don't have
to sum up to one). Otherwise, the result will be undefined.
replace : bool, optional
If True, sample with replacement.
When sampling with replacement, the sampled subgraph could have parallel edges.
For sampling without replacement, if fanout > the number of neighbors, all the
neighbors are sampled. If fanout == -1, all neighbors are collected.
Returns
-------
DGLGraph
A sampled subgraph containing only the sampled neighboring edges. It is on CPU.
"""
gpb = g.get_partition_book()
if isinstance(nodes, dict):
homo_nids = []
for ntype in nodes.keys():
assert ntype in g.ntypes, \
'The sampled node type {} does not exist in the input graph'.format(ntype)
if F.is_tensor(nodes[ntype]):
typed_nodes = nodes[ntype]
else:
typed_nodes = toindex(nodes[ntype]).tousertensor()
homo_nids.append(gpb.map_to_homo_nid(typed_nodes, ntype))
nodes = F.cat(homo_nids, 0)
def issue_remote_req(node_ids):
return SamplingRequestEtype(node_ids, etype_field, fanout, edge_dir=edge_dir,
prob=prob, replace=replace)
def local_access(local_g, partition_book, local_nids):
return _sample_etype_neighbors(local_g, partition_book, local_nids,
etype_field, fanout, edge_dir, prob, replace)
frontier = _distributed_access(g, nodes, issue_remote_req, local_access)
if len(gpb.etypes) > 1:
etype_ids, frontier.edata[EID] = gpb.map_to_per_etype(frontier.edata[EID])
src, dst = frontier.edges()
etype_ids, idx = F.sort_1d(etype_ids)
src, dst = F.gather_row(src, idx), F.gather_row(dst, idx)
eid = F.gather_row(frontier.edata[EID], idx)
_, src = gpb.map_to_per_ntype(src)
_, dst = gpb.map_to_per_ntype(dst)
data_dict = dict()
edge_ids = {}
for etid in range(len(g.etypes)):
etype = g.etypes[etid]
canonical_etype = g.canonical_etypes[etid]
type_idx = etype_ids == etid
if F.sum(type_idx, 0) > 0:
data_dict[canonical_etype] = (F.boolean_mask(src, type_idx), \
F.boolean_mask(dst, type_idx))
edge_ids[etype] = F.boolean_mask(eid, type_idx)
hg = heterograph(data_dict,
{ntype: g.number_of_nodes(ntype) for ntype in g.ntypes},
idtype=g.idtype)
for etype in edge_ids:
hg.edges[etype].data[EID] = edge_ids[etype]
return hg
else:
return frontier
def sample_neighbors(g, nodes, fanout, edge_dir='in', prob=None, replace=False): def sample_neighbors(g, nodes, fanout, edge_dir='in', prob=None, replace=False):
"""Sample from the neighbors of the given nodes from a distributed graph. """Sample from the neighbors of the given nodes from a distributed graph.
...@@ -623,3 +784,4 @@ register_service(EDGES_SERVICE_ID, EdgesRequest, FindEdgeResponse) ...@@ -623,3 +784,4 @@ register_service(EDGES_SERVICE_ID, EdgesRequest, FindEdgeResponse)
register_service(INSUBGRAPH_SERVICE_ID, InSubgraphRequest, SubgraphResponse) register_service(INSUBGRAPH_SERVICE_ID, InSubgraphRequest, SubgraphResponse)
register_service(OUTDEGREE_SERVICE_ID, OutDegreeRequest, OutDegreeResponse) register_service(OUTDEGREE_SERVICE_ID, OutDegreeRequest, OutDegreeResponse)
register_service(INDEGREE_SERVICE_ID, InDegreeRequest, InDegreeResponse) register_service(INDEGREE_SERVICE_ID, InDegreeRequest, InDegreeResponse)
register_service(ETYPE_SAMPLING_SERVICE_ID, SamplingRequestEtype, SubgraphResponse)
...@@ -8,10 +8,136 @@ from .. import ndarray as nd ...@@ -8,10 +8,136 @@ from .. import ndarray as nd
from .. import utils from .. import utils
__all__ = [ __all__ = [
'sample_etype_neighbors',
'sample_neighbors', 'sample_neighbors',
'sample_neighbors_biased', 'sample_neighbors_biased',
'select_topk'] 'select_topk']
def sample_etype_neighbors(g, nodes, etype_field, fanout, edge_dir='in', prob=None,
replace=False, copy_ndata=True, copy_edata=True, _dist_training=False):
"""Sample neighboring edges of the given nodes and return the induced subgraph.
For each node, a number of inbound (or outbound when ``edge_dir == 'out'``) edges
will be randomly chosen. The graph returned will then contain all the nodes in the
original graph, but only the sampled edges.
Node/edge features are not preserved. The original IDs of
the sampled edges are stored as the `dgl.EID` feature in the returned graph.
Parameters
----------
g : DGLGraph
The graph. Can only be in CPU. Should only have one node type and one edge type.
nodes : tensor or dict
Node IDs to sample neighbors from.
This argument can take a single ID tensor or a dictionary of node types and ID tensors.
If a single tensor is given, the graph must only have one type of nodes.
etype_field : string
The field in g.edata storing the edge type.
fanout : int
The number of edges to be sampled for each node on each edge type.
This argument can only take a single int. DGL will sample this number of edges for
each node for every edge type.
If -1 is given for a single edge type, all the neighboring edges with that edge
type will be selected.
edge_dir : str, optional
Determines whether to sample inbound or outbound edges.
Can take either ``in`` for inbound edges or ``out`` for outbound edges.
prob : str, optional
Feature name used as the (unnormalized) probabilities associated with each
neighboring edge of a node. The feature must have only one element for each
edge.
The features must be non-negative floats, and the sum of the features of
inbound/outbound edges for every node must be positive (though they don't have
to sum up to one). Otherwise, the result will be undefined.
If :attr:`prob` is not None, GPU sampling is not supported.
replace : bool, optional
If True, sample with replacement.
copy_ndata: bool, optional
If True, the node features of the new graph are copied from
the original graph. If False, the new graph will not have any
node features.
(Default: True)
copy_edata: bool, optional
If True, the edge features of the new graph are copied from
the original graph. If False, the new graph will not have any
edge features.
(Default: True)
_dist_training : bool, optional
Internal argument. Do not use.
(Default: False)
Returns
-------
DGLGraph
A sampled subgraph containing only the sampled neighboring edges, with the
same device as the input graph.
Notes
-----
If :attr:`copy_ndata` or :attr:`copy_edata` is True, same tensors are used as
the node or edge features of the original graph and the new graph.
As a result, users should avoid performing in-place operations
on the node features of the new graph to avoid feature corruption.
"""
if g.device != F.cpu():
raise DGLError("The graph should be in cpu.")
if etype_field not in g.edata:
raise DGLError("The graph should have {} in the edge data" \
"representing the edge type.".format(etype_field))
if isinstance(fanout, int) is False:
raise DGLError("The fanout should be an integer")
if isinstance(nodes, dict) is True:
assert len(nodes) == 1, "The input graph should not have node types"
nodes = list(nodes.values())[0]
nodes = F.to_dgl_nd(utils.prepare_tensor(g, nodes, 'nodes'))
# treat etypes as int32, it is much cheaper than int64
# TODO(xiangsx): int8 can be a better choice.
etypes = F.to_dgl_nd(F.astype(g.edata[etype_field], ty=F.int32))
if prob is None:
prob_array = nd.array([], ctx=nd.cpu())
elif isinstance(prob, nd.NDArray):
prob_array = prob
else:
if prob in g.edata:
prob_array = F.to_dgl_nd(g.edata[prob])
else:
prob_array = F.to_dgl_nd(F.tensor(prob, dtype=F.float32))
subgidx = _CAPI_DGLSampleNeighborsEType(g._graph, nodes, etypes, fanout,
edge_dir, prob_array, replace)
induced_edges = subgidx.induced_edges
ret = DGLHeteroGraph(subgidx.graph, g.ntypes, g.etypes)
# handle features
# (TODO) (BarclayII) DGL distributed fails with bus error, freezes, or other
# incomprehensible errors with lazy feature copy.
# So in distributed training context, we fall back to old behavior where we
# only set the edge IDs.
if not _dist_training:
if copy_ndata:
node_frames = utils.extract_node_subframes(g, None)
utils.set_new_frames(ret, node_frames=node_frames)
if copy_edata:
edge_frames = utils.extract_edge_subframes(g, induced_edges)
utils.set_new_frames(ret, edge_frames=edge_frames)
else:
for i, etype in enumerate(ret.canonical_etypes):
ret.edges[etype].data[EID] = induced_edges[i]
return ret
def sample_neighbors(g, nodes, fanout, edge_dir='in', prob=None, replace=False, def sample_neighbors(g, nodes, fanout, edge_dir='in', prob=None, replace=False,
copy_ndata=True, copy_edata=True, _dist_training=False): copy_ndata=True, copy_edata=True, _dist_training=False):
"""Sample neighboring edges of the given nodes and return the induced subgraph. """Sample neighboring edges of the given nodes and return the induced subgraph.
......
...@@ -568,6 +568,25 @@ COOMatrix CSRRowWiseSampling( ...@@ -568,6 +568,25 @@ COOMatrix CSRRowWiseSampling(
return ret; return ret;
} }
COOMatrix CSRRowWisePerEtypeSampling(
CSRMatrix mat, IdArray rows, IdArray etypes,
int64_t num_samples, FloatArray prob, bool replace) {
COOMatrix ret;
ATEN_CSR_SWITCH(mat, XPU, IdType, "CSRRowWisePerEtypeSampling", {
if (IsNullArray(prob)) {
ret = impl::CSRRowWisePerEtypeSamplingUniform<XPU, IdType>(
mat, rows, etypes, num_samples, replace);
} else {
ATEN_FLOAT_TYPE_SWITCH(prob->dtype, FloatType, "probability", {
ret = impl::CSRRowWisePerEtypeSampling<XPU, IdType, FloatType>(
mat, rows, etypes, num_samples, prob, replace);
});
}
});
return ret;
}
COOMatrix CSRRowWiseTopk( COOMatrix CSRRowWiseTopk(
CSRMatrix mat, IdArray rows, int64_t k, NDArray weight, bool ascending) { CSRMatrix mat, IdArray rows, int64_t k, NDArray weight, bool ascending) {
COOMatrix ret; COOMatrix ret;
...@@ -786,6 +805,24 @@ COOMatrix COORowWiseSampling( ...@@ -786,6 +805,24 @@ COOMatrix COORowWiseSampling(
return ret; return ret;
} }
COOMatrix COORowWisePerEtypeSampling(
COOMatrix mat, IdArray rows, IdArray etypes,
int64_t num_samples, FloatArray prob, bool replace) {
COOMatrix ret;
ATEN_COO_SWITCH(mat, XPU, IdType, "COORowWisePerEtypeSampling", {
if (IsNullArray(prob)) {
ret = impl::COORowWisePerEtypeSamplingUniform<XPU, IdType>(
mat, rows, etypes, num_samples, replace);
} else {
ATEN_FLOAT_TYPE_SWITCH(prob->dtype, FloatType, "probability", {
ret = impl::COORowWisePerEtypeSampling<XPU, IdType, FloatType>(
mat, rows, etypes, num_samples, prob, replace);
});
}
});
return ret;
}
COOMatrix COORowWiseTopk( COOMatrix COORowWiseTopk(
COOMatrix mat, IdArray rows, int64_t k, FloatArray weight, bool ascending) { COOMatrix mat, IdArray rows, int64_t k, FloatArray weight, bool ascending) {
COOMatrix ret; COOMatrix ret;
......
...@@ -164,10 +164,20 @@ template <DLDeviceType XPU, typename IdType, typename FloatType> ...@@ -164,10 +164,20 @@ template <DLDeviceType XPU, typename IdType, typename FloatType>
COOMatrix CSRRowWiseSampling( COOMatrix CSRRowWiseSampling(
CSRMatrix mat, IdArray rows, int64_t num_samples, FloatArray prob, bool replace); CSRMatrix mat, IdArray rows, int64_t num_samples, FloatArray prob, bool replace);
// FloatType is the type of probability data.
template <DLDeviceType XPU, typename IdType, typename FloatType>
COOMatrix CSRRowWisePerEtypeSampling(
CSRMatrix mat, IdArray rows, IdArray etypes,
int64_t num_samples, FloatArray prob, bool replace);
template <DLDeviceType XPU, typename IdType> template <DLDeviceType XPU, typename IdType>
COOMatrix CSRRowWiseSamplingUniform( COOMatrix CSRRowWiseSamplingUniform(
CSRMatrix mat, IdArray rows, int64_t num_samples, bool replace); CSRMatrix mat, IdArray rows, int64_t num_samples, bool replace);
template <DLDeviceType XPU, typename IdType>
COOMatrix CSRRowWisePerEtypeSamplingUniform(
CSRMatrix mat, IdArray rows, IdArray etypes, int64_t num_samples, bool replace);
// FloatType is the type of weight data. // FloatType is the type of weight data.
template <DLDeviceType XPU, typename IdType, typename DType> template <DLDeviceType XPU, typename IdType, typename DType>
COOMatrix CSRRowWiseTopk( COOMatrix CSRRowWiseTopk(
...@@ -250,10 +260,20 @@ template <DLDeviceType XPU, typename IdType, typename FloatType> ...@@ -250,10 +260,20 @@ template <DLDeviceType XPU, typename IdType, typename FloatType>
COOMatrix COORowWiseSampling( COOMatrix COORowWiseSampling(
COOMatrix mat, IdArray rows, int64_t num_samples, FloatArray prob, bool replace); COOMatrix mat, IdArray rows, int64_t num_samples, FloatArray prob, bool replace);
// FloatType is the type of probability data.
template <DLDeviceType XPU, typename IdType, typename FloatType>
COOMatrix COORowWisePerEtypeSampling(
COOMatrix mat, IdArray rows, IdArray etypes,
int64_t num_samples, FloatArray prob, bool replace);
template <DLDeviceType XPU, typename IdType> template <DLDeviceType XPU, typename IdType>
COOMatrix COORowWiseSamplingUniform( COOMatrix COORowWiseSamplingUniform(
COOMatrix mat, IdArray rows, int64_t num_samples, bool replace); COOMatrix mat, IdArray rows, int64_t num_samples, bool replace);
template <DLDeviceType XPU, typename IdType>
COOMatrix COORowWisePerEtypeSamplingUniform(
COOMatrix mat, IdArray rows, IdArray etypes, int64_t num_samples, bool replace);
// FloatType is the type of weight data. // FloatType is the type of weight data.
template <DLDeviceType XPU, typename IdType, typename FloatType> template <DLDeviceType XPU, typename IdType, typename FloatType>
COOMatrix COORowWiseTopk( COOMatrix COORowWiseTopk(
......
...@@ -9,6 +9,8 @@ ...@@ -9,6 +9,8 @@
#include <dgl/array.h> #include <dgl/array.h>
#include <functional> #include <functional>
#include <algorithm> #include <algorithm>
#include <string>
#include <vector>
namespace dgl { namespace dgl {
namespace aten { namespace aten {
...@@ -38,6 +40,30 @@ using PickFn = std::function<void( ...@@ -38,6 +40,30 @@ using PickFn = std::function<void(
const IdxType* col, const IdxType* data, const IdxType* col, const IdxType* data,
IdxType* out_idx)>; IdxType* out_idx)>;
// User-defined function for picking elements from a range within a row.
//
// The column indices of each element is in
// off + et_idx[et_offset+i]), where i is in [et_offset, et_offset+et_len)
//
// Similarly, the data indices are stored in
// data[off+et_idx[et_offset+i])]
// Data index pointer could be NULL, which means data[i] == off+et_idx[et_offset+i])
//
// *ATTENTION*: This function will be invoked concurrently. Please make sure
// it is thread-safe.
//
// \param off Starting offset of this row.
// \param et_offset Starting offset of this range.
// \param et_len Length of the range.
// \param et_idx A map from local idx to column id.
// \param data Pointer of the data indices.
// \param out_idx Picked indices in [et_offset, et_offset + et_len).
template <typename IdxType>
using RangePickFn = std::function<void(
IdxType off, IdxType et_offset, IdxType et_len,
const std::vector<IdxType> &et_idx, const IdxType* data,
IdxType* out_idx)>;
// Template for picking non-zero values row-wise. The implementation utilizes // Template for picking non-zero values row-wise. The implementation utilizes
// OpenMP parallelization on rows because each row performs computation independently. // OpenMP parallelization on rows because each row performs computation independently.
template <typename IdxType> template <typename IdxType>
...@@ -132,6 +158,133 @@ COOMatrix CSRRowWisePick(CSRMatrix mat, IdArray rows, ...@@ -132,6 +158,133 @@ COOMatrix CSRRowWisePick(CSRMatrix mat, IdArray rows,
picked_row, picked_col, picked_idx); picked_row, picked_col, picked_idx);
} }
// Template for picking non-zero values row-wise. The implementation utilizes
// OpenMP parallelization on rows because each row performs computation independently.
template <typename IdxType>
COOMatrix CSRRowWisePerEtypePick(CSRMatrix mat, IdArray rows, IdArray etypes,
int64_t num_picks, bool replace, RangePickFn<IdxType> pick_fn) {
using namespace aten;
const IdxType* indptr = static_cast<IdxType*>(mat.indptr->data);
const IdxType* indices = static_cast<IdxType*>(mat.indices->data);
const IdxType* data = CSRHasData(mat)? static_cast<IdxType*>(mat.data->data) : nullptr;
const IdxType* rows_data = static_cast<IdxType*>(rows->data);
const int32_t* etype_data = static_cast<int32_t*>(etypes->data);
const int64_t num_rows = rows->shape[0];
const auto& ctx = mat.indptr->ctx;
CHECK_EQ(etypes->dtype.bits / 8, sizeof(int32_t));
std::vector<IdArray> picked_rows(rows->shape[0]);
std::vector<IdArray> picked_cols(rows->shape[0]);
std::vector<IdArray> picked_idxs(rows->shape[0]);
#pragma omp parallel for
for (int64_t i = 0; i < num_rows; ++i) {
const IdxType rid = rows_data[i];
CHECK_LT(rid, mat.num_rows);
const IdxType off = indptr[rid];
const IdxType len = indptr[rid + 1] - off;
// do something here
if (len == 0) {
picked_rows[i] = NewIdArray(0, ctx, sizeof(IdxType) * 8);
picked_cols[i] = NewIdArray(0, ctx, sizeof(IdxType) * 8);
picked_idxs[i] = NewIdArray(0, ctx, sizeof(IdxType) * 8);
continue;
}
// fast path
if (len <= num_picks && !replace) {
IdArray rows = Full(rid, len, sizeof(IdxType) * 8, ctx);
IdArray cols = Full(-1, len, sizeof(IdxType) * 8, ctx);
IdArray idx = Full(-1, len, sizeof(IdxType) * 8, ctx);
IdxType* cdata = static_cast<IdxType*>(cols->data);
IdxType* idata = static_cast<IdxType*>(idx->data);
for (int64_t j = 0; j < len; ++j) {
cdata[j] = indices[off + j];
idata[j] = data ? data[off + j] : off + j;
}
picked_rows[i] = rows;
picked_cols[i] = cols;
picked_idxs[i] = idx;
} else {
// need to do per edge type sample
std::vector<IdxType> rows;
std::vector<IdxType> cols;
std::vector<IdxType> idx;
std::vector<IdxType> et(len);
std::vector<IdxType> et_idx(len);
std::iota(et_idx.begin(), et_idx.end(), 0);
for (int64_t j = 0; j < len; ++j) {
et[j] = data ? etype_data[data[off+j]] : etype_data[off+j];
}
std::sort(et_idx.begin(), et_idx.end(),
[&et](IdxType i1, IdxType i2) {return et[i1] < et[i2];});
IdxType cur_et = et[et_idx[0]];
int64_t et_offset = 0;
int64_t et_len = 1;
for (int64_t j = 0; j < len; ++j) {
if ((j+1 == len) || cur_et != et[et_idx[j+1]]) {
// 1 end of the current etype
// 2 end of the row
// random pick for current etype
if (et_len <= num_picks && !replace) {
// fast path, select all
for (int64_t k = 0; k < et_len; ++k) {
rows.push_back(rid);
cols.push_back(indices[off+et_idx[et_offset+k]]);
if (data)
idx.push_back(data[off+et_idx[et_offset+k]]);
else
idx.push_back(off+et_idx[et_offset+k]);
}
} else {
IdArray picked_idx = Full(-1, num_picks, sizeof(IdxType) * 8, ctx);
IdxType* picked_idata = static_cast<IdxType*>(picked_idx->data);
// need call random pick
pick_fn(off, et_offset,
et_len, et_idx,
data, picked_idata);
for (int64_t k = 0; k < num_picks; ++k) {
const IdxType picked = picked_idata[k];
rows.push_back(rid);
cols.push_back(indices[off+et_idx[et_offset+picked]]);
if (data)
idx.push_back(data[off+et_idx[et_offset+picked]]);
else
idx.push_back(off+et_idx[et_offset+picked]);
}
}
if (j+1 == len)
break;
// next etype
cur_et = et[et_idx[j+1]];
et_offset = j+1;
et_len = 1;
} else {
et_len++;
}
}
picked_rows[i] = VecToIdArray(rows, sizeof(IdxType) * 8, ctx);
picked_cols[i] = VecToIdArray(cols, sizeof(IdxType) * 8, ctx);
picked_idxs[i] = VecToIdArray(idx, sizeof(IdxType) * 8, ctx);
} // end processing one row
CHECK_EQ(picked_rows[i]->shape[0], picked_cols[i]->shape[0]);
CHECK_EQ(picked_rows[i]->shape[0], picked_idxs[i]->shape[0]);
} // end processing all rows
IdArray picked_row = Concat(picked_rows);
IdArray picked_col = Concat(picked_cols);
IdArray picked_idx = Concat(picked_idxs);
return COOMatrix(mat.num_rows, mat.num_cols,
picked_row, picked_col, picked_idx);
}
// Template for picking non-zero values row-wise. The implementation first slices // Template for picking non-zero values row-wise. The implementation first slices
// out the corresponding rows and then converts it to CSR format. It then performs // out the corresponding rows and then converts it to CSR format. It then performs
// row-wise pick on the CSR matrix and rectifies the returned results. // row-wise pick on the CSR matrix and rectifies the returned results.
...@@ -148,6 +301,23 @@ COOMatrix COORowWisePick(COOMatrix mat, IdArray rows, ...@@ -148,6 +301,23 @@ COOMatrix COORowWisePick(COOMatrix mat, IdArray rows,
picked.data); picked.data);
} }
// Template for picking non-zero values row-wise. The implementation first slices
// out the corresponding rows and then converts it to CSR format. It then performs
// row-wise pick on the CSR matrix and rectifies the returned results.
template <typename IdxType>
COOMatrix COORowWisePerEtypePick(COOMatrix mat, IdArray rows, IdArray etypes,
int64_t num_picks, bool replace, RangePickFn<IdxType> pick_fn) {
using namespace aten;
const auto& csr = COOToCSR(COOSliceRows(mat, rows));
const IdArray new_rows = Range(0, rows->shape[0], rows->dtype.bits, rows->ctx);
const auto& picked = CSRRowWisePerEtypePick<IdxType>(
csr, new_rows, etypes, num_picks, replace, pick_fn);
return COOMatrix(mat.num_rows, mat.num_cols,
IndexSelect(rows, picked.row), // map the row index to the correct one
picked.col,
picked.data);
}
} // namespace impl } // namespace impl
} // namespace aten } // namespace aten
} // namespace dgl } // namespace dgl
......
...@@ -44,6 +44,29 @@ inline PickFn<IdxType> GetSamplingPickFn( ...@@ -44,6 +44,29 @@ inline PickFn<IdxType> GetSamplingPickFn(
return pick_fn; return pick_fn;
} }
template <typename IdxType, typename FloatType>
inline RangePickFn<IdxType> GetSamplingRangePickFn(
int64_t num_samples, FloatArray prob, bool replace) {
RangePickFn<IdxType> pick_fn = [prob, num_samples, replace]
(IdxType off, IdxType et_offset, IdxType et_len,
const std::vector<IdxType> &et_idx,
const IdxType* data, IdxType* out_idx) {
const FloatType* p_data = static_cast<FloatType*>(prob->data);
FloatArray probs = FloatArray::Empty({et_len}, prob->dtype, prob->ctx);
FloatType* probs_data = static_cast<FloatType*>(probs->data);
for (int64_t j = 0; j < et_len; ++j) {
if (data)
probs_data[j] = p_data[data[off+et_idx[et_offset+j]]];
else
probs_data[j] = p_data[off+et_idx[et_offset+j]];
}
RandomEngine::ThreadLocal()->Choice<IdxType, FloatType>(
num_samples, probs, out_idx, replace);
};
return pick_fn;
}
template <typename IdxType> template <typename IdxType>
inline PickFn<IdxType> GetSamplingUniformPickFn( inline PickFn<IdxType> GetSamplingUniformPickFn(
int64_t num_samples, bool replace) { int64_t num_samples, bool replace) {
...@@ -60,6 +83,19 @@ inline PickFn<IdxType> GetSamplingUniformPickFn( ...@@ -60,6 +83,19 @@ inline PickFn<IdxType> GetSamplingUniformPickFn(
return pick_fn; return pick_fn;
} }
template <typename IdxType>
inline RangePickFn<IdxType> GetSamplingUniformRangePickFn(
int64_t num_samples, bool replace) {
RangePickFn<IdxType> pick_fn = [num_samples, replace]
(IdxType off, IdxType et_offset, IdxType et_len,
const std::vector<IdxType> &et_idx,
const IdxType* data, IdxType* out_idx) {
RandomEngine::ThreadLocal()->UniformChoice<IdxType>(
num_samples, et_len, out_idx, replace);
};
return pick_fn;
}
template <typename IdxType, typename FloatType> template <typename IdxType, typename FloatType>
inline PickFn<IdxType> GetSamplingBiasedPickFn( inline PickFn<IdxType> GetSamplingBiasedPickFn(
int64_t num_samples, IdArray split, FloatArray bias, bool replace) { int64_t num_samples, IdArray split, FloatArray bias, bool replace) {
...@@ -98,6 +134,23 @@ template COOMatrix CSRRowWiseSampling<kDLCPU, int32_t, double>( ...@@ -98,6 +134,23 @@ template COOMatrix CSRRowWiseSampling<kDLCPU, int32_t, double>(
template COOMatrix CSRRowWiseSampling<kDLCPU, int64_t, double>( template COOMatrix CSRRowWiseSampling<kDLCPU, int64_t, double>(
CSRMatrix, IdArray, int64_t, FloatArray, bool); CSRMatrix, IdArray, int64_t, FloatArray, bool);
template <DLDeviceType XPU, typename IdxType, typename FloatType>
COOMatrix CSRRowWisePerEtypeSampling(CSRMatrix mat, IdArray rows, IdArray etypes,
int64_t num_samples, FloatArray prob, bool replace) {
CHECK(prob.defined());
auto pick_fn = GetSamplingRangePickFn<IdxType, FloatType>(num_samples, prob, replace);
return CSRRowWisePerEtypePick(mat, rows, etypes, num_samples, replace, pick_fn);
}
template COOMatrix CSRRowWisePerEtypeSampling<kDLCPU, int32_t, float>(
CSRMatrix, IdArray, IdArray, int64_t, FloatArray, bool);
template COOMatrix CSRRowWisePerEtypeSampling<kDLCPU, int64_t, float>(
CSRMatrix, IdArray, IdArray, int64_t, FloatArray, bool);
template COOMatrix CSRRowWisePerEtypeSampling<kDLCPU, int32_t, double>(
CSRMatrix, IdArray, IdArray, int64_t, FloatArray, bool);
template COOMatrix CSRRowWisePerEtypeSampling<kDLCPU, int64_t, double>(
CSRMatrix, IdArray, IdArray, int64_t, FloatArray, bool);
template <DLDeviceType XPU, typename IdxType> template <DLDeviceType XPU, typename IdxType>
COOMatrix CSRRowWiseSamplingUniform(CSRMatrix mat, IdArray rows, COOMatrix CSRRowWiseSamplingUniform(CSRMatrix mat, IdArray rows,
int64_t num_samples, bool replace) { int64_t num_samples, bool replace) {
...@@ -110,6 +163,18 @@ template COOMatrix CSRRowWiseSamplingUniform<kDLCPU, int32_t>( ...@@ -110,6 +163,18 @@ template COOMatrix CSRRowWiseSamplingUniform<kDLCPU, int32_t>(
template COOMatrix CSRRowWiseSamplingUniform<kDLCPU, int64_t>( template COOMatrix CSRRowWiseSamplingUniform<kDLCPU, int64_t>(
CSRMatrix, IdArray, int64_t, bool); CSRMatrix, IdArray, int64_t, bool);
template <DLDeviceType XPU, typename IdxType>
COOMatrix CSRRowWisePerEtypeSamplingUniform(CSRMatrix mat, IdArray rows, IdArray etypes,
int64_t num_samples, bool replace) {
auto pick_fn = GetSamplingUniformRangePickFn<IdxType>(num_samples, replace);
return CSRRowWisePerEtypePick(mat, rows, etypes, num_samples, replace, pick_fn);
}
template COOMatrix CSRRowWisePerEtypeSamplingUniform<kDLCPU, int32_t>(
CSRMatrix, IdArray, IdArray, int64_t, bool);
template COOMatrix CSRRowWisePerEtypeSamplingUniform<kDLCPU, int64_t>(
CSRMatrix, IdArray, IdArray, int64_t, bool);
template <DLDeviceType XPU, typename IdxType, typename FloatType> template <DLDeviceType XPU, typename IdxType, typename FloatType>
COOMatrix CSRRowWiseSamplingBiased( COOMatrix CSRRowWiseSamplingBiased(
CSRMatrix mat, CSRMatrix mat,
...@@ -156,6 +221,23 @@ template COOMatrix COORowWiseSampling<kDLCPU, int32_t, double>( ...@@ -156,6 +221,23 @@ template COOMatrix COORowWiseSampling<kDLCPU, int32_t, double>(
template COOMatrix COORowWiseSampling<kDLCPU, int64_t, double>( template COOMatrix COORowWiseSampling<kDLCPU, int64_t, double>(
COOMatrix, IdArray, int64_t, FloatArray, bool); COOMatrix, IdArray, int64_t, FloatArray, bool);
template <DLDeviceType XPU, typename IdxType, typename FloatType>
COOMatrix COORowWisePerEtypeSampling(COOMatrix mat, IdArray rows, IdArray etypes,
int64_t num_samples, FloatArray prob, bool replace) {
CHECK(prob.defined());
auto pick_fn = GetSamplingRangePickFn<IdxType, FloatType>(num_samples, prob, replace);
return COORowWisePerEtypePick(mat, rows, etypes, num_samples, replace, pick_fn);
}
template COOMatrix COORowWisePerEtypeSampling<kDLCPU, int32_t, float>(
COOMatrix, IdArray, IdArray, int64_t, FloatArray, bool);
template COOMatrix COORowWisePerEtypeSampling<kDLCPU, int64_t, float>(
COOMatrix, IdArray, IdArray, int64_t, FloatArray, bool);
template COOMatrix COORowWisePerEtypeSampling<kDLCPU, int32_t, double>(
COOMatrix, IdArray, IdArray, int64_t, FloatArray, bool);
template COOMatrix COORowWisePerEtypeSampling<kDLCPU, int64_t, double>(
COOMatrix, IdArray, IdArray, int64_t, FloatArray, bool);
template <DLDeviceType XPU, typename IdxType> template <DLDeviceType XPU, typename IdxType>
COOMatrix COORowWiseSamplingUniform(COOMatrix mat, IdArray rows, COOMatrix COORowWiseSamplingUniform(COOMatrix mat, IdArray rows,
int64_t num_samples, bool replace) { int64_t num_samples, bool replace) {
...@@ -168,6 +250,18 @@ template COOMatrix COORowWiseSamplingUniform<kDLCPU, int32_t>( ...@@ -168,6 +250,18 @@ template COOMatrix COORowWiseSamplingUniform<kDLCPU, int32_t>(
template COOMatrix COORowWiseSamplingUniform<kDLCPU, int64_t>( template COOMatrix COORowWiseSamplingUniform<kDLCPU, int64_t>(
COOMatrix, IdArray, int64_t, bool); COOMatrix, IdArray, int64_t, bool);
template <DLDeviceType XPU, typename IdxType>
COOMatrix COORowWisePerEtypeSamplingUniform(COOMatrix mat, IdArray rows, IdArray etypes,
int64_t num_samples, bool replace) {
auto pick_fn = GetSamplingUniformRangePickFn<IdxType>(num_samples, replace);
return COORowWisePerEtypePick(mat, rows, etypes, num_samples, replace, pick_fn);
}
template COOMatrix COORowWisePerEtypeSamplingUniform<kDLCPU, int32_t>(
COOMatrix, IdArray, IdArray, int64_t, bool);
template COOMatrix COORowWisePerEtypeSamplingUniform<kDLCPU, int64_t>(
COOMatrix, IdArray, IdArray, int64_t, bool);
} // namespace impl } // namespace impl
} // namespace aten } // namespace aten
} // namespace dgl } // namespace dgl
...@@ -104,6 +104,88 @@ HeteroSubgraph SampleNeighbors( ...@@ -104,6 +104,88 @@ HeteroSubgraph SampleNeighbors(
return ret; return ret;
} }
HeteroSubgraph SampleNeighborsEType(
const HeteroGraphPtr hg,
const IdArray nodes,
const IdArray etypes,
const int64_t fanout,
EdgeDir dir,
const IdArray prob,
bool replace) {
CHECK_EQ(1, hg->NumVertexTypes())
<< "SampleNeighborsEType only work with homogeneous graph";
CHECK_EQ(1, hg->NumEdgeTypes())
<< "SampleNeighborsEType only work with homogeneous graph";
std::vector<HeteroGraphPtr> subrels(1);
std::vector<IdArray> induced_edges(1);
const int64_t num_nodes = nodes->shape[0];
dgl_type_t etype = 0;
const dgl_type_t src_vtype = 0;
const dgl_type_t dst_vtype = 0;
if (num_nodes == 0 || fanout == 0) {
subrels[etype] = UnitGraph::Empty(1,
hg->NumVertices(src_vtype),
hg->NumVertices(dst_vtype),
hg->DataType(), hg->Context());
induced_edges[etype] = aten::NullArray();
} else if (fanout == -1) {
const auto &earr = (dir == EdgeDir::kOut) ?
hg->OutEdges(etype, nodes) :
hg->InEdges(etype, nodes);
subrels[etype] = UnitGraph::CreateFromCOO(
1,
hg->NumVertices(src_vtype),
hg->NumVertices(dst_vtype),
earr.src,
earr.dst);
induced_edges[etype] = earr.id;
} else {
// sample from graph
// the edge type is stored in etypes
auto req_fmt = (dir == EdgeDir::kOut)? CSR_CODE : CSC_CODE;
auto avail_fmt = hg->SelectFormat(etype, req_fmt);
COOMatrix sampled_coo;
switch (avail_fmt) {
case SparseFormat::kCOO:
if (dir == EdgeDir::kIn) {
sampled_coo = aten::COOTranspose(aten::COORowWisePerEtypeSampling(
aten::COOTranspose(hg->GetCOOMatrix(etype)),
nodes, etypes, fanout, prob, replace));
} else {
sampled_coo = aten::COORowWisePerEtypeSampling(
hg->GetCOOMatrix(etype), nodes, etypes, fanout, prob, replace);
}
break;
case SparseFormat::kCSR:
CHECK(dir == EdgeDir::kOut) << "Cannot sample out edges on CSC matrix.";
sampled_coo = aten::CSRRowWisePerEtypeSampling(
hg->GetCSRMatrix(etype), nodes, etypes, fanout, prob, replace);
break;
case SparseFormat::kCSC:
CHECK(dir == EdgeDir::kIn) << "Cannot sample in edges on CSR matrix.";
sampled_coo = aten::CSRRowWisePerEtypeSampling(
hg->GetCSCMatrix(etype), nodes, etypes, fanout, prob, replace);
sampled_coo = aten::COOTranspose(sampled_coo);
break;
default:
LOG(FATAL) << "Unsupported sparse format.";
}
subrels[etype] = UnitGraph::CreateFromCOO(
1, sampled_coo.num_rows, sampled_coo.num_cols,
sampled_coo.row, sampled_coo.col);
induced_edges[etype] = sampled_coo.data;
}
HeteroSubgraph ret;
ret.graph = CreateHeteroGraph(hg->meta_graph(), subrels, hg->NumVerticesPerType());
ret.induced_vertices.resize(hg->NumVertexTypes());
ret.induced_edges = std::move(induced_edges);
return ret;
}
HeteroSubgraph SampleNeighborsTopk( HeteroSubgraph SampleNeighborsTopk(
const HeteroGraphPtr hg, const HeteroGraphPtr hg,
const std::vector<IdArray>& nodes, const std::vector<IdArray>& nodes,
...@@ -269,6 +351,27 @@ HeteroSubgraph SampleNeighborsBiased( ...@@ -269,6 +351,27 @@ HeteroSubgraph SampleNeighborsBiased(
return ret; return ret;
} }
DGL_REGISTER_GLOBAL("sampling.neighbor._CAPI_DGLSampleNeighborsEType")
.set_body([] (DGLArgs args, DGLRetValue *rv) {
HeteroGraphRef hg = args[0];
IdArray nodes = args[1];
IdArray etypes = args[2];
const int64_t fanout = args[3];
const std::string dir_str = args[4];
IdArray prob = args[5];
const bool replace = args[6];
CHECK(dir_str == "in" || dir_str == "out")
<< "Invalid edge direction. Must be \"in\" or \"out\".";
EdgeDir dir = (dir_str == "in")? EdgeDir::kIn : EdgeDir::kOut;
std::shared_ptr<HeteroSubgraph> subg(new HeteroSubgraph);
*subg = sampling::SampleNeighborsEType(
hg.sptr(), nodes, etypes, fanout, dir, prob, replace);
*rv = HeteroSubgraphRef(subg);
});
DGL_REGISTER_GLOBAL("sampling.neighbor._CAPI_DGLSampleNeighbors") DGL_REGISTER_GLOBAL("sampling.neighbor._CAPI_DGLSampleNeighbors")
.set_body([] (DGLArgs args, DGLRetValue *rv) { .set_body([] (DGLArgs args, DGLRetValue *rv) {
HeteroGraphRef hg = args[0]; HeteroGraphRef hg = args[0];
......
...@@ -156,7 +156,7 @@ def test_pinsage_sampling(): ...@@ -156,7 +156,7 @@ def test_pinsage_sampling():
_test_sampler(g, sampler, 'item') _test_sampler(g, sampler, 'item')
sampler = dgl.sampling.RandomWalkNeighborSampler(g, 4, 0.5, 3, 2, ['bought-by', 'bought']) sampler = dgl.sampling.RandomWalkNeighborSampler(g, 4, 0.5, 3, 2, ['bought-by', 'bought'])
_test_sampler(g, sampler, 'item') _test_sampler(g, sampler, 'item')
sampler = dgl.sampling.RandomWalkNeighborSampler(g, 4, 0.5, 3, 2, sampler = dgl.sampling.RandomWalkNeighborSampler(g, 4, 0.5, 3, 2,
[('item', 'bought-by', 'user'), ('user', 'bought', 'item')]) [('item', 'bought-by', 'user'), ('user', 'bought', 'item')])
_test_sampler(g, sampler, 'item') _test_sampler(g, sampler, 'item')
g = dgl.graph(([0, 0, 1, 1, 2, 2, 3, 3], g = dgl.graph(([0, 0, 1, 1, 2, 2, 3, 3],
...@@ -600,6 +600,34 @@ def create_test_graph(num_nodes, num_edges_per_node, bipartite=False): ...@@ -600,6 +600,34 @@ def create_test_graph(num_nodes, num_edges_per_node, bipartite=False):
g = dgl.graph((src, dst)) g = dgl.graph((src, dst))
return g return g
def create_etype_test_graph(num_nodes, num_edges_per_node, rare_cnt):
src = np.concatenate(
[np.random.choice(num_nodes, num_edges_per_node, replace=False) for i in range(num_nodes)]
)
dst = np.concatenate(
[np.array([i] * num_edges_per_node) for i in range(num_nodes)])
minor_src = np.concatenate(
[np.random.choice(num_nodes, 2, replace=False) for i in range(num_nodes)]
)
minor_dst = np.concatenate(
[np.array([i] * 2) for i in range(num_nodes)])
most_zero_src = np.concatenate(
[np.random.choice(num_nodes, num_edges_per_node, replace=False) for i in range(rare_cnt)]
)
most_zero_dst = np.concatenate(
[np.array([i] * num_edges_per_node) for i in range(rare_cnt)])
g = dgl.heterograph({("v", "e_major", "u") : (src, dst),
("u", "e_major_rev", "v") : (dst, src),
("v2", "e_minor", "u") : (minor_src, minor_dst),
("v2", "most_zero", "u") : (most_zero_src, most_zero_dst),
("u", "e_minor_rev", "v2") : (minor_dst, minor_src)})
return g
@unittest.skipIf(F._default_context_str == 'gpu', reason="GPU sample neighbors not implemented") @unittest.skipIf(F._default_context_str == 'gpu', reason="GPU sample neighbors not implemented")
def test_sample_neighbors_biased_homogeneous(): def test_sample_neighbors_biased_homogeneous():
g = create_test_graph(100, 30) g = create_test_graph(100, 30)
...@@ -689,11 +717,114 @@ def test_sample_neighbors_biased_bipartite(): ...@@ -689,11 +717,114 @@ def test_sample_neighbors_biased_bipartite():
subg = dgl.sampling.sample_neighbors_biased(g_sorted, g.srcnodes(), 5, bias, edge_dir='out', replace=True) subg = dgl.sampling.sample_neighbors_biased(g_sorted, g.srcnodes(), 5, bias, edge_dir='out', replace=True)
check_num(subg.edges()[1], tag) check_num(subg.edges()[1], tag)
@unittest.skipIf(F._default_context_str == 'gpu', reason="GPU sample neighbors not implemented")
def test_sample_neighbors_etype_homogeneous():
num_nodes = 100
rare_cnt = 4
g = create_etype_test_graph(100, 30, rare_cnt)
h_g = dgl.to_homogeneous(g)
seed_ntype = g.get_ntype_id("u")
seeds = F.nonzero_1d(h_g.ndata[dgl.NTYPE] == seed_ntype)
def check_num(nodes, replace):
nodes = F.asnumpy(nodes)
cnt = [sum(nodes == i) for i in range(num_nodes)]
for i in range(20):
if i < rare_cnt:
if replace is False:
assert cnt[i] == 22
else:
assert cnt[i] == 30
else:
if replace is False:
assert cnt[i] == 12
else:
assert cnt[i] == 20
# graph with coo format
coo_g = h_g.formats('coo')
for _ in range(5):
subg = dgl.sampling.sample_etype_neighbors(coo_g, seeds, dgl.ETYPE, 10, replace=False)
check_num(subg.edges()[1], False)
for _ in range(5):
subg = dgl.sampling.sample_etype_neighbors(coo_g, seeds, dgl.ETYPE, 10, replace=True)
check_num(subg.edges()[1], True)
# graph with csr format
csr_g = h_g.formats('csr')
csr_g = csr_g.formats(['csr','csc','coo'])
for _ in range(5):
subg = dgl.sampling.sample_etype_neighbors(csr_g, seeds, dgl.ETYPE, 10, replace=False)
check_num(subg.edges()[1], False)
for _ in range(5):
subg = dgl.sampling.sample_etype_neighbors(csr_g, seeds, dgl.ETYPE, 10, replace=True)
check_num(subg.edges()[1], True)
# graph with csc format
csc_g = h_g.formats('csc')
for _ in range(5):
subg = dgl.sampling.sample_etype_neighbors(csc_g, seeds, dgl.ETYPE, 10, replace=False)
check_num(subg.edges()[1], False)
for _ in range(5):
subg = dgl.sampling.sample_etype_neighbors(csc_g, seeds, dgl.ETYPE, 10, replace=True)
check_num(subg.edges()[1], True)
def check_num2(nodes, replace):
nodes = F.asnumpy(nodes)
cnt = [sum(nodes == i) for i in range(num_nodes)]
for i in range(20):
if replace is False:
assert cnt[i] == 7
else:
assert cnt[i] == 10
# edge dir out
# graph with coo format
coo_g = h_g.formats('coo')
for _ in range(5):
subg = dgl.sampling.sample_etype_neighbors(
coo_g, seeds, dgl.ETYPE, 5, edge_dir='out', replace=False)
check_num2(subg.edges()[0], False)
for _ in range(5):
subg = dgl.sampling.sample_etype_neighbors(
coo_g, seeds, dgl.ETYPE, 5, edge_dir='out', replace=True)
check_num2(subg.edges()[0], True)
# graph with csr format
csr_g = h_g.formats('csr')
for _ in range(5):
subg = dgl.sampling.sample_etype_neighbors(
csr_g, seeds, dgl.ETYPE, 5, edge_dir='out', replace=False)
check_num2(subg.edges()[0], False)
for _ in range(5):
subg = dgl.sampling.sample_etype_neighbors(
csr_g, seeds, dgl.ETYPE, 5, edge_dir='out', replace=True)
check_num2(subg.edges()[0], True)
# graph with csc format
csc_g = h_g.formats('csc')
csc_g = csc_g.formats(['csc','csr','coo'])
for _ in range(5):
subg = dgl.sampling.sample_etype_neighbors(
csc_g, seeds, dgl.ETYPE, 5, edge_dir='out', replace=False)
check_num2(subg.edges()[0], False)
for _ in range(5):
subg = dgl.sampling.sample_etype_neighbors(
csc_g, seeds, dgl.ETYPE, 5, edge_dir='out', replace=True)
check_num2(subg.edges()[0], True)
if __name__ == '__main__': if __name__ == '__main__':
test_sample_neighbors_etype_homogeneous()
test_random_walk() test_random_walk()
test_pack_traces() test_pack_traces()
test_pinsage_sampling() test_pinsage_sampling()
# test_sample_neighbors()
test_sample_neighbors_outedge() test_sample_neighbors_outedge()
test_sample_neighbors_topk() test_sample_neighbors_topk()
test_sample_neighbors_topk_outedge() test_sample_neighbors_topk_outedge()
......
...@@ -32,6 +32,29 @@ std::set<ETuple<Idx>> AllEdgeSet(bool has_data) { ...@@ -32,6 +32,29 @@ std::set<ETuple<Idx>> AllEdgeSet(bool has_data) {
} }
} }
template <typename Idx>
std::set<ETuple<Idx>> AllEdgePerEtypeSet(bool has_data) {
if (has_data) {
std::set<ETuple<Idx>> eset;
eset.insert(ETuple<Idx>{0, 0, 2});
eset.insert(ETuple<Idx>{0, 1, 3});
eset.insert(ETuple<Idx>{0, 2, 5});
eset.insert(ETuple<Idx>{0, 3, 6});
eset.insert(ETuple<Idx>{3, 2, 1});
eset.insert(ETuple<Idx>{3, 3, 4});
return eset;
} else {
std::set<ETuple<Idx>> eset;
eset.insert(ETuple<Idx>{0, 0, 0});
eset.insert(ETuple<Idx>{0, 1, 1});
eset.insert(ETuple<Idx>{0, 2, 2});
eset.insert(ETuple<Idx>{0, 3, 3});
eset.insert(ETuple<Idx>{3, 2, 5});
eset.insert(ETuple<Idx>{3, 3, 6});
return eset;
}
}
template <typename Idx> template <typename Idx>
std::set<ETuple<Idx>> ToEdgeSet(COOMatrix mat) { std::set<ETuple<Idx>> ToEdgeSet(COOMatrix mat) {
std::set<ETuple<Idx>> eset; std::set<ETuple<Idx>> eset;
...@@ -59,6 +82,43 @@ void CheckSampledResult(COOMatrix mat, IdArray rows, bool has_data) { ...@@ -59,6 +82,43 @@ void CheckSampledResult(COOMatrix mat, IdArray rows, bool has_data) {
} }
} }
template <typename Idx>
void CheckSampledPerEtypeReplaceResult(COOMatrix mat, IdArray rows, bool has_data) {
ASSERT_EQ(mat.num_rows, 4);
ASSERT_EQ(mat.num_cols, 4);
Idx* row = static_cast<Idx*>(mat.row->data);
Idx* col = static_cast<Idx*>(mat.col->data);
Idx* data = static_cast<Idx*>(mat.data->data);
const auto& gt = AllEdgePerEtypeSet<Idx>(has_data);
for (int64_t i = 0; i < mat.row->shape[0]; ++i) {
ASSERT_TRUE(gt.count(std::make_tuple(row[i], col[i], data[i])));
ASSERT_TRUE(IsInArray(rows, row[i]));
}
}
template <typename Idx>
void CheckSampledPerEtypeResult(COOMatrix mat, IdArray rows, bool has_data) {
ASSERT_EQ(mat.num_rows, 4);
ASSERT_EQ(mat.num_cols, 4);
Idx* row = static_cast<Idx*>(mat.row->data);
Idx* col = static_cast<Idx*>(mat.col->data);
Idx* data = static_cast<Idx*>(mat.data->data);
const auto& gt = AllEdgePerEtypeSet<Idx>(has_data);
int cnt_0 = 0;
int cnt_3 = 0;
for (int64_t i = 0; i < mat.row->shape[0]; ++i) {
ASSERT_TRUE(gt.count(std::make_tuple(row[i], col[i], data[i])));
ASSERT_TRUE(IsInArray(rows, row[i]));
if (row[i] == 0)
cnt_0 += 1;
if (row[i] == 3)
cnt_3 += 1;
}
ASSERT_EQ(cnt_0, 3);
ASSERT_EQ(cnt_3, 2);
}
template <typename Idx> template <typename Idx>
CSRMatrix CSR(bool has_data) { CSRMatrix CSR(bool has_data) {
IdArray indptr = NDArray::FromVector(std::vector<Idx>({0, 2, 3, 3, 5})); IdArray indptr = NDArray::FromVector(std::vector<Idx>({0, 2, 3, 3, 5}));
...@@ -81,6 +141,28 @@ COOMatrix COO(bool has_data) { ...@@ -81,6 +141,28 @@ COOMatrix COO(bool has_data) {
return COOMatrix(4, 4, row, col); return COOMatrix(4, 4, row, col);
} }
template <typename Idx>
CSRMatrix CSREtypes(bool has_data) {
IdArray indptr = NDArray::FromVector(std::vector<Idx>({0, 4, 5, 5, 7}));
IdArray indices = NDArray::FromVector(std::vector<Idx>({0, 1, 2, 3, 1, 2, 3}));
IdArray data = NDArray::FromVector(std::vector<Idx>({2, 3, 5, 6, 0, 1, 4}));
if (has_data)
return CSRMatrix(4, 4, indptr, indices, data);
else
return CSRMatrix(4, 4, indptr, indices);
}
template <typename Idx>
COOMatrix COOEtypes(bool has_data) {
IdArray row = NDArray::FromVector(std::vector<Idx>({0, 0, 0, 0, 1, 3, 3}));
IdArray col = NDArray::FromVector(std::vector<Idx>({0, 1, 2, 3, 1, 2, 3}));
IdArray data = NDArray::FromVector(std::vector<Idx>({2, 3, 5, 6, 0, 1, 4}));
if (has_data)
return COOMatrix(4, 4, row, col, data);
else
return COOMatrix(4, 4, row, col);
}
template <typename Idx, typename FloatType> template <typename Idx, typename FloatType>
void _TestCSRSampling(bool has_data) { void _TestCSRSampling(bool has_data) {
auto mat = CSR<Idx>(has_data); auto mat = CSR<Idx>(has_data);
...@@ -123,9 +205,6 @@ void _TestCSRSampling(bool has_data) { ...@@ -123,9 +205,6 @@ void _TestCSRSampling(bool has_data) {
} }
} }
TEST(RowwiseTest, TestCSRSampling) { TEST(RowwiseTest, TestCSRSampling) {
_TestCSRSampling<int32_t, float>(true); _TestCSRSampling<int32_t, float>(true);
_TestCSRSampling<int64_t, float>(true); _TestCSRSampling<int64_t, float>(true);
...@@ -176,6 +255,161 @@ TEST(RowwiseTest, TestCSRSamplingUniform) { ...@@ -176,6 +255,161 @@ TEST(RowwiseTest, TestCSRSamplingUniform) {
} }
template <typename Idx, typename FloatType>
void _TestCSRPerEtypeSampling(bool has_data) {
auto mat = CSREtypes<Idx>(has_data);
FloatArray prob = NDArray::FromVector(
std::vector<FloatType>({.5, .5, .5, .5, .5, .5, .5}));
IdArray rows = NDArray::FromVector(std::vector<Idx>({0, 3}));
IdArray etypes = has_data ?
NDArray::FromVector(std::vector<int32_t>({0, 1, 0, 0, 2, 0, 3})) :
NDArray::FromVector(std::vector<int32_t>({0, 0, 0, 3, 0, 1, 2}));
for (int k = 0; k < 10; ++k) {
auto rst = CSRRowWisePerEtypeSampling(mat, rows, etypes, 2, prob, true);
CheckSampledPerEtypeReplaceResult<Idx>(rst, rows, has_data);
}
for (int k = 0; k < 10; ++k) {
auto rst = CSRRowWisePerEtypeSampling(mat, rows, etypes, 2, prob, false);
CheckSampledPerEtypeResult<Idx>(rst, rows, has_data);
auto eset = ToEdgeSet<Idx>(rst);
if (has_data) {
int counts = 0;
counts += eset.count(std::make_tuple(0, 0, 2));
counts += eset.count(std::make_tuple(0, 1, 3));
counts += eset.count(std::make_tuple(0, 2, 5));
ASSERT_EQ(counts, 2);
counts = 0;
counts += eset.count(std::make_tuple(0, 3, 6));
ASSERT_EQ(counts, 1);
counts = 0;
counts += eset.count(std::make_tuple(1, 1, 0));
ASSERT_EQ(counts, 0);
counts = 0;
counts += eset.count(std::make_tuple(3, 2, 1));
ASSERT_EQ(counts, 1);
counts = 0;
counts += eset.count(std::make_tuple(3, 3, 4));
ASSERT_EQ(counts, 1);
} else {
int counts = 0;
counts += eset.count(std::make_tuple(0, 0, 0));
counts += eset.count(std::make_tuple(0, 1, 1));
counts += eset.count(std::make_tuple(0, 2, 2));
ASSERT_EQ(counts, 2);
counts = 0;
counts += eset.count(std::make_tuple(0, 3, 3));
ASSERT_EQ(counts, 1);
counts = 0;
counts += eset.count(std::make_tuple(1, 1, 4));
ASSERT_EQ(counts, 0);
counts = 0;
counts += eset.count(std::make_tuple(3, 2, 5));
ASSERT_EQ(counts, 1);
counts = 0;
counts += eset.count(std::make_tuple(3, 3, 6));
ASSERT_EQ(counts, 1);
}
}
prob = has_data ?
NDArray::FromVector(
std::vector<FloatType>({.0, .5, .0, .5, .5, .0, .5})) :
NDArray::FromVector(
std::vector<FloatType>({.0, .5, .0, .5, .0, .5, .5}));
for (int k = 0; k < 10; ++k) {
auto rst = CSRRowWisePerEtypeSampling(mat, rows, etypes, 2, prob, true);
CheckSampledPerEtypeReplaceResult<Idx>(rst, rows, has_data);
auto eset = ToEdgeSet<Idx>(rst);
if (has_data) {
ASSERT_FALSE(eset.count(std::make_tuple(0, 0, 2)));
ASSERT_FALSE(eset.count(std::make_tuple(0, 2, 5)));
} else {
ASSERT_FALSE(eset.count(std::make_tuple(0, 0, 0)));
ASSERT_FALSE(eset.count(std::make_tuple(0, 2, 2)));
}
}
}
TEST(RowwiseTest, TestCSRPerEtypeSampling) {
_TestCSRPerEtypeSampling<int32_t, float>(true);
_TestCSRPerEtypeSampling<int64_t, float>(true);
_TestCSRPerEtypeSampling<int32_t, double>(true);
_TestCSRPerEtypeSampling<int64_t, double>(true);
_TestCSRPerEtypeSampling<int32_t, float>(false);
_TestCSRPerEtypeSampling<int64_t, float>(false);
_TestCSRPerEtypeSampling<int32_t, double>(false);
_TestCSRPerEtypeSampling<int64_t, double>(false);
}
template <typename Idx, typename FloatType>
void _TestCSRPerEtypeSamplingUniform(bool has_data) {
auto mat = CSREtypes<Idx>(has_data);
FloatArray prob = aten::NullArray();
IdArray rows = NDArray::FromVector(std::vector<Idx>({0, 3}));
IdArray etypes = has_data ?
NDArray::FromVector(std::vector<int32_t>({0, 1, 0, 0, 2, 0, 3})) :
NDArray::FromVector(std::vector<int32_t>({0, 0, 0, 3, 0, 1, 2}));
for (int k = 0; k < 10; ++k) {
auto rst = CSRRowWisePerEtypeSampling(mat, rows, etypes, 2, prob, true);
CheckSampledPerEtypeReplaceResult<Idx>(rst, rows, has_data);
}
for (int k = 0; k < 10; ++k) {
auto rst = CSRRowWisePerEtypeSampling(mat, rows, etypes, 2, prob, false);
CheckSampledPerEtypeResult<Idx>(rst, rows, has_data);
auto eset = ToEdgeSet<Idx>(rst);
if (has_data) {
int counts = 0;
counts += eset.count(std::make_tuple(0, 0, 2));
counts += eset.count(std::make_tuple(0, 1, 3));
counts += eset.count(std::make_tuple(0, 2, 5));
ASSERT_EQ(counts, 2);
counts = 0;
counts += eset.count(std::make_tuple(0, 3, 6));
ASSERT_EQ(counts, 1);
counts = 0;
counts += eset.count(std::make_tuple(1, 1, 0));
ASSERT_EQ(counts, 0);
counts = 0;
counts += eset.count(std::make_tuple(3, 2, 1));
ASSERT_EQ(counts, 1);
counts = 0;
counts += eset.count(std::make_tuple(3, 3, 4));
ASSERT_EQ(counts, 1);
} else {
int counts = 0;
counts += eset.count(std::make_tuple(0, 0, 0));
counts += eset.count(std::make_tuple(0, 1, 1));
counts += eset.count(std::make_tuple(0, 2, 2));
ASSERT_EQ(counts, 2);
counts = 0;
counts += eset.count(std::make_tuple(0, 3, 3));
ASSERT_EQ(counts, 1);
counts = 0;
counts += eset.count(std::make_tuple(1, 1, 4));
ASSERT_EQ(counts, 0);
counts = 0;
counts += eset.count(std::make_tuple(3, 2, 5));
ASSERT_EQ(counts, 1);
counts = 0;
counts += eset.count(std::make_tuple(3, 3, 6));
ASSERT_EQ(counts, 1);
}
}
}
TEST(RowwiseTest, TestCSRPerEtypeSamplingUniform) {
_TestCSRPerEtypeSamplingUniform<int32_t, float>(true);
_TestCSRPerEtypeSamplingUniform<int64_t, float>(true);
_TestCSRPerEtypeSamplingUniform<int32_t, double>(true);
_TestCSRPerEtypeSamplingUniform<int64_t, double>(true);
_TestCSRPerEtypeSamplingUniform<int32_t, float>(false);
_TestCSRPerEtypeSamplingUniform<int64_t, float>(false);
_TestCSRPerEtypeSamplingUniform<int32_t, double>(false);
_TestCSRPerEtypeSamplingUniform<int64_t, double>(false);
}
template <typename Idx, typename FloatType> template <typename Idx, typename FloatType>
void _TestCOOSampling(bool has_data) { void _TestCOOSampling(bool has_data) {
auto mat = COO<Idx>(has_data); auto mat = COO<Idx>(has_data);
...@@ -267,6 +501,160 @@ TEST(RowwiseTest, TestCOOSamplingUniform) { ...@@ -267,6 +501,160 @@ TEST(RowwiseTest, TestCOOSamplingUniform) {
_TestCOOSamplingUniform<int64_t, double>(false); _TestCOOSamplingUniform<int64_t, double>(false);
} }
template <typename Idx, typename FloatType>
void _TestCOOerEtypeSampling(bool has_data) {
auto mat = COOEtypes<Idx>(has_data);
FloatArray prob = NDArray::FromVector(
std::vector<FloatType>({.5, .5, .5, .5, .5, .5, .5}));
IdArray rows = NDArray::FromVector(std::vector<Idx>({0, 3}));
IdArray etypes = has_data ?
NDArray::FromVector(std::vector<int32_t>({0, 1, 0, 0, 2, 0, 3})) :
NDArray::FromVector(std::vector<int32_t>({0, 0, 0, 3, 0, 1, 2}));
for (int k = 0; k < 10; ++k) {
auto rst = COORowWisePerEtypeSampling(mat, rows, etypes, 2, prob, true);
CheckSampledPerEtypeReplaceResult<Idx>(rst, rows, has_data);
}
for (int k = 0; k < 10; ++k) {
auto rst = COORowWisePerEtypeSampling(mat, rows, etypes, 2, prob, false);
CheckSampledPerEtypeResult<Idx>(rst, rows, has_data);
auto eset = ToEdgeSet<Idx>(rst);
if (has_data) {
int counts = 0;
counts += eset.count(std::make_tuple(0, 0, 2));
counts += eset.count(std::make_tuple(0, 1, 3));
counts += eset.count(std::make_tuple(0, 2, 5));
ASSERT_EQ(counts, 2);
counts = 0;
counts += eset.count(std::make_tuple(0, 3, 6));
ASSERT_EQ(counts, 1);
counts = 0;
counts += eset.count(std::make_tuple(1, 1, 0));
ASSERT_EQ(counts, 0);
counts = 0;
counts += eset.count(std::make_tuple(3, 2, 1));
ASSERT_EQ(counts, 1);
counts = 0;
counts += eset.count(std::make_tuple(3, 3, 4));
ASSERT_EQ(counts, 1);
} else {
int counts = 0;
counts += eset.count(std::make_tuple(0, 0, 0));
counts += eset.count(std::make_tuple(0, 1, 1));
counts += eset.count(std::make_tuple(0, 2, 2));
ASSERT_EQ(counts, 2);
counts = 0;
counts += eset.count(std::make_tuple(0, 3, 3));
ASSERT_EQ(counts, 1);
counts = 0;
counts += eset.count(std::make_tuple(1, 1, 4));
ASSERT_EQ(counts, 0);
counts = 0;
counts += eset.count(std::make_tuple(3, 2, 5));
ASSERT_EQ(counts, 1);
counts = 0;
counts += eset.count(std::make_tuple(3, 3, 6));
ASSERT_EQ(counts, 1);
}
}
prob = has_data ?
NDArray::FromVector(
std::vector<FloatType>({.0, .5, .0, .5, .5, .0, .5})) :
NDArray::FromVector(
std::vector<FloatType>({.0, .5, .0, .5, .0, .5, .5}));
for (int k = 0; k < 10; ++k) {
auto rst = COORowWisePerEtypeSampling(mat, rows, etypes, 2, prob, true);
CheckSampledPerEtypeReplaceResult<Idx>(rst, rows, has_data);
auto eset = ToEdgeSet<Idx>(rst);
if (has_data) {
ASSERT_FALSE(eset.count(std::make_tuple(0, 0, 2)));
ASSERT_FALSE(eset.count(std::make_tuple(0, 2, 5)));
} else {
ASSERT_FALSE(eset.count(std::make_tuple(0, 0, 0)));
ASSERT_FALSE(eset.count(std::make_tuple(0, 2, 2)));
}
}
}
TEST(RowwiseTest, TestCOOerEtypeSampling) {
_TestCOOerEtypeSampling<int32_t, float>(true);
_TestCOOerEtypeSampling<int64_t, float>(true);
_TestCOOerEtypeSampling<int32_t, double>(true);
_TestCOOerEtypeSampling<int64_t, double>(true);
_TestCOOerEtypeSampling<int32_t, float>(false);
_TestCOOerEtypeSampling<int64_t, float>(false);
_TestCOOerEtypeSampling<int32_t, double>(false);
_TestCOOerEtypeSampling<int64_t, double>(false);
}
template <typename Idx, typename FloatType>
void _TestCOOPerEtypeSamplingUniform(bool has_data) {
auto mat = COOEtypes<Idx>(has_data);
FloatArray prob = aten::NullArray();
IdArray rows = NDArray::FromVector(std::vector<Idx>({0, 3}));
IdArray etypes = has_data ?
NDArray::FromVector(std::vector<int32_t>({0, 1, 0, 0, 2, 0, 3})) :
NDArray::FromVector(std::vector<int32_t>({0, 0, 0, 3, 0, 1, 2}));
for (int k = 0; k < 10; ++k) {
auto rst = COORowWisePerEtypeSampling(mat, rows, etypes, 2, prob, true);
CheckSampledPerEtypeReplaceResult<Idx>(rst, rows, has_data);
}
for (int k = 0; k < 10; ++k) {
auto rst = COORowWisePerEtypeSampling(mat, rows, etypes, 2, prob, false);
CheckSampledPerEtypeResult<Idx>(rst, rows, has_data);
auto eset = ToEdgeSet<Idx>(rst);
if (has_data) {
int counts = 0;
counts += eset.count(std::make_tuple(0, 0, 2));
counts += eset.count(std::make_tuple(0, 1, 3));
counts += eset.count(std::make_tuple(0, 2, 5));
ASSERT_EQ(counts, 2);
counts = 0;
counts += eset.count(std::make_tuple(0, 3, 6));
ASSERT_EQ(counts, 1);
counts = 0;
counts += eset.count(std::make_tuple(1, 1, 0));
ASSERT_EQ(counts, 0);
counts = 0;
counts += eset.count(std::make_tuple(3, 2, 1));
ASSERT_EQ(counts, 1);
counts = 0;
counts += eset.count(std::make_tuple(3, 3, 4));
ASSERT_EQ(counts, 1);
} else {
int counts = 0;
counts += eset.count(std::make_tuple(0, 0, 0));
counts += eset.count(std::make_tuple(0, 1, 1));
counts += eset.count(std::make_tuple(0, 2, 2));
ASSERT_EQ(counts, 2);
counts = 0;
counts += eset.count(std::make_tuple(0, 3, 3));
ASSERT_EQ(counts, 1);
counts = 0;
counts += eset.count(std::make_tuple(1, 1, 4));
ASSERT_EQ(counts, 0);
counts = 0;
counts += eset.count(std::make_tuple(3, 2, 5));
ASSERT_EQ(counts, 1);
counts = 0;
counts += eset.count(std::make_tuple(3, 3, 6));
ASSERT_EQ(counts, 1);
}
}
}
TEST(RowwiseTest, TestCOOPerEtypeSamplingUniform) {
_TestCOOPerEtypeSamplingUniform<int32_t, float>(true);
_TestCOOPerEtypeSamplingUniform<int64_t, float>(true);
_TestCOOPerEtypeSamplingUniform<int32_t, double>(true);
_TestCOOPerEtypeSamplingUniform<int64_t, double>(true);
_TestCOOPerEtypeSamplingUniform<int32_t, float>(false);
_TestCOOPerEtypeSamplingUniform<int64_t, float>(false);
_TestCOOPerEtypeSamplingUniform<int32_t, double>(false);
_TestCOOPerEtypeSamplingUniform<int64_t, double>(false);
}
template <typename Idx, typename FloatType> template <typename Idx, typename FloatType>
void _TestCSRTopk(bool has_data) { void _TestCSRTopk(bool has_data) {
auto mat = CSR<Idx>(has_data); auto mat = CSR<Idx>(has_data);
......
...@@ -583,6 +583,7 @@ def check_server_client_hetero(shared_mem, num_servers, num_clients): ...@@ -583,6 +583,7 @@ def check_server_client_hetero(shared_mem, num_servers, num_clients):
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet') @unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support some of operations in DistGraph") @unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support some of operations in DistGraph")
@unittest.skipIf(dgl.backend.backend_name == "mxnet", reason="Turn off Mxnet support")
def test_server_client(): def test_server_client():
os.environ['DGL_DIST_MODE'] = 'distributed' os.environ['DGL_DIST_MODE'] = 'distributed'
check_server_client_hierarchy(False, 1, 4) check_server_client_hierarchy(False, 1, 4)
...@@ -603,6 +604,7 @@ def test_dist_emb_server_client(): ...@@ -603,6 +604,7 @@ def test_dist_emb_server_client():
check_dist_emb_server_client(True, 2, 2) check_dist_emb_server_client(True, 2, 2)
@unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support some of operations in DistGraph") @unittest.skipIf(dgl.backend.backend_name == "tensorflow", reason="TF doesn't support some of operations in DistGraph")
@unittest.skipIf(dgl.backend.backend_name == "mxnet", reason="Turn off Mxnet support")
def test_standalone(): def test_standalone():
os.environ['DGL_DIST_MODE'] = 'standalone' os.environ['DGL_DIST_MODE'] = 'standalone'
......
...@@ -2,7 +2,8 @@ import dgl ...@@ -2,7 +2,8 @@ import dgl
import unittest import unittest
import os import os
from dgl.data import CitationGraphDataset from dgl.data import CitationGraphDataset
from dgl.distributed import sample_neighbors from dgl.data import WN18Dataset
from dgl.distributed import sample_neighbors, sample_etype_neighbors
from dgl.distributed import partition_graph, load_partition, load_partition_book from dgl.distributed import partition_graph, load_partition, load_partition_book
import sys import sys
import multiprocessing as mp import multiprocessing as mp
...@@ -13,6 +14,7 @@ from utils import get_local_usable_addr ...@@ -13,6 +14,7 @@ from utils import get_local_usable_addr
from pathlib import Path from pathlib import Path
import pytest import pytest
from scipy import sparse as spsp from scipy import sparse as spsp
import random
from dgl.distributed import DistGraphServer, DistGraph from dgl.distributed import DistGraphServer, DistGraph
...@@ -145,6 +147,7 @@ def check_rpc_find_edges_shuffle(tmpdir, num_server): ...@@ -145,6 +147,7 @@ def check_rpc_find_edges_shuffle(tmpdir, num_server):
# Wait non shared memory graph store # Wait non shared memory graph store
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet') @unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(dgl.backend.backend_name == 'tensorflow', reason='Not support tensorflow for now') @unittest.skipIf(dgl.backend.backend_name == 'tensorflow', reason='Not support tensorflow for now')
@unittest.skipIf(dgl.backend.backend_name == "mxnet", reason="Turn off Mxnet support")
@pytest.mark.parametrize("num_server", [1, 2]) @pytest.mark.parametrize("num_server", [1, 2])
def test_rpc_find_edges_shuffle(num_server): def test_rpc_find_edges_shuffle(num_server):
import tempfile import tempfile
...@@ -195,6 +198,7 @@ def check_rpc_get_degree_shuffle(tmpdir, num_server): ...@@ -195,6 +198,7 @@ def check_rpc_get_degree_shuffle(tmpdir, num_server):
# Wait non shared memory graph store # Wait non shared memory graph store
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet') @unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(dgl.backend.backend_name == 'tensorflow', reason='Not support tensorflow for now') @unittest.skipIf(dgl.backend.backend_name == 'tensorflow', reason='Not support tensorflow for now')
@unittest.skipIf(dgl.backend.backend_name == "mxnet", reason="Turn off Mxnet support")
@pytest.mark.parametrize("num_server", [1, 2]) @pytest.mark.parametrize("num_server", [1, 2])
def test_rpc_get_degree_shuffle(num_server): def test_rpc_get_degree_shuffle(num_server):
import tempfile import tempfile
...@@ -255,15 +259,17 @@ def check_rpc_sampling_shuffle(tmpdir, num_server): ...@@ -255,15 +259,17 @@ def check_rpc_sampling_shuffle(tmpdir, num_server):
eids1 = orig_eid[sampled_graph.edata[dgl.EID]] eids1 = orig_eid[sampled_graph.edata[dgl.EID]]
assert np.array_equal(F.asnumpy(eids1), F.asnumpy(eids)) assert np.array_equal(F.asnumpy(eids1), F.asnumpy(eids))
def create_random_hetero(): def create_random_hetero(dense=False):
num_nodes = {'n1': 1010, 'n2': 1000, 'n3': 1020} num_nodes = {'n1': 210, 'n2': 200, 'n3': 220} if dense else \
{'n1': 1010, 'n2': 1000, 'n3': 1020}
etypes = [('n1', 'r1', 'n2'), etypes = [('n1', 'r1', 'n2'),
('n1', 'r2', 'n3'), ('n1', 'r2', 'n3'),
('n2', 'r3', 'n3')] ('n2', 'r3', 'n3')]
edges = {} edges = {}
random.seed(42)
for etype in etypes: for etype in etypes:
src_ntype, _, dst_ntype = etype src_ntype, _, dst_ntype = etype
arr = spsp.random(num_nodes[src_ntype], num_nodes[dst_ntype], density=0.001, format='coo', arr = spsp.random(num_nodes[src_ntype], num_nodes[dst_ntype], density=0.1 if dense else 0.001, format='coo',
random_state=100) random_state=100)
edges[etype] = (arr.row, arr.col) edges[etype] = (arr.row, arr.col)
g = dgl.heterograph(edges, num_nodes) g = dgl.heterograph(edges, num_nodes)
...@@ -292,6 +298,28 @@ def start_hetero_sample_client(rank, tmpdir, disable_shared_mem): ...@@ -292,6 +298,28 @@ def start_hetero_sample_client(rank, tmpdir, disable_shared_mem):
dgl.distributed.exit_client() dgl.distributed.exit_client()
return block, gpb return block, gpb
def start_hetero_etype_sample_client(rank, tmpdir, disable_shared_mem, fanout=3):
gpb = None
if disable_shared_mem:
_, _, _, gpb, _, _, _ = load_partition(tmpdir / 'test_sampling.json', rank)
dgl.distributed.initialize("rpc_ip_config.txt")
dist_graph = DistGraph("test_sampling", gpb=gpb)
assert 'feat' in dist_graph.nodes['n1'].data
assert 'feat' not in dist_graph.nodes['n2'].data
assert 'feat' not in dist_graph.nodes['n3'].data
if gpb is None:
gpb = dist_graph.get_partition_book()
try:
nodes = {'n3': [0, 10, 99, 66, 124, 208]}
sampled_graph = sample_etype_neighbors(dist_graph, nodes, dgl.ETYPE, fanout)
block = dgl.to_block(sampled_graph, nodes)
block.edata[dgl.EID] = sampled_graph.edata[dgl.EID]
except Exception as e:
print(e)
block = None
dgl.distributed.exit_client()
return block, gpb
def check_rpc_hetero_sampling_shuffle(tmpdir, num_server): def check_rpc_hetero_sampling_shuffle(tmpdir, num_server):
ip_config = open("rpc_ip_config.txt", "w") ip_config = open("rpc_ip_config.txt", "w")
for _ in range(num_server): for _ in range(num_server):
...@@ -350,9 +378,73 @@ def check_rpc_hetero_sampling_shuffle(tmpdir, num_server): ...@@ -350,9 +378,73 @@ def check_rpc_hetero_sampling_shuffle(tmpdir, num_server):
assert np.all(F.asnumpy(orig_src1) == orig_src) assert np.all(F.asnumpy(orig_src1) == orig_src)
assert np.all(F.asnumpy(orig_dst1) == orig_dst) assert np.all(F.asnumpy(orig_dst1) == orig_dst)
def check_rpc_hetero_etype_sampling_shuffle(tmpdir, num_server):
ip_config = open("rpc_ip_config.txt", "w")
for _ in range(num_server):
ip_config.write('{}\n'.format(get_local_usable_addr()))
ip_config.close()
g = create_random_hetero(dense=True)
num_parts = num_server
num_hops = 1
partition_graph(g, 'test_sampling', num_parts, tmpdir,
num_hops=num_hops, part_method='metis', reshuffle=True)
pserver_list = []
ctx = mp.get_context('spawn')
for i in range(num_server):
p = ctx.Process(target=start_server, args=(i, tmpdir, num_server > 1, 'test_sampling'))
p.start()
time.sleep(1)
pserver_list.append(p)
time.sleep(3)
fanout = 3
block, gpb = start_hetero_etype_sample_client(0, tmpdir, num_server > 1, fanout)
print("Done sampling")
for p in pserver_list:
p.join()
src, dst = block.edges(etype=('n1', 'r2', 'n3'))
assert len(src) == 18
src, dst = block.edges(etype=('n2', 'r3', 'n3'))
assert len(src) == 18
orig_nid_map = {ntype: F.zeros((g.number_of_nodes(ntype),), dtype=F.int64) for ntype in g.ntypes}
orig_eid_map = {etype: F.zeros((g.number_of_edges(etype),), dtype=F.int64) for etype in g.etypes}
for i in range(num_server):
part, _, _, _, _, _, _ = load_partition(tmpdir / 'test_sampling.json', i)
ntype_ids, type_nids = gpb.map_to_per_ntype(part.ndata[dgl.NID])
for ntype_id, ntype in enumerate(g.ntypes):
idx = ntype_ids == ntype_id
F.scatter_row_inplace(orig_nid_map[ntype], F.boolean_mask(type_nids, idx),
F.boolean_mask(part.ndata['orig_id'], idx))
etype_ids, type_eids = gpb.map_to_per_etype(part.edata[dgl.EID])
for etype_id, etype in enumerate(g.etypes):
idx = etype_ids == etype_id
F.scatter_row_inplace(orig_eid_map[etype], F.boolean_mask(type_eids, idx),
F.boolean_mask(part.edata['orig_id'], idx))
for src_type, etype, dst_type in block.canonical_etypes:
src, dst = block.edges(etype=etype)
# These are global Ids after shuffling.
shuffled_src = F.gather_row(block.srcnodes[src_type].data[dgl.NID], src)
shuffled_dst = F.gather_row(block.dstnodes[dst_type].data[dgl.NID], dst)
shuffled_eid = block.edges[etype].data[dgl.EID]
orig_src = F.asnumpy(F.gather_row(orig_nid_map[src_type], shuffled_src))
orig_dst = F.asnumpy(F.gather_row(orig_nid_map[dst_type], shuffled_dst))
orig_eid = F.asnumpy(F.gather_row(orig_eid_map[etype], shuffled_eid))
# Check the node Ids and edge Ids.
orig_src1, orig_dst1 = g.find_edges(orig_eid, etype=etype)
assert np.all(F.asnumpy(orig_src1) == orig_src)
assert np.all(F.asnumpy(orig_dst1) == orig_dst)
# Wait non shared memory graph store # Wait non shared memory graph store
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet') @unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(dgl.backend.backend_name == 'tensorflow', reason='Not support tensorflow for now') @unittest.skipIf(dgl.backend.backend_name == 'tensorflow', reason='Not support tensorflow for now')
@unittest.skipIf(dgl.backend.backend_name == "mxnet", reason="Turn off Mxnet support")
@pytest.mark.parametrize("num_server", [1, 2]) @pytest.mark.parametrize("num_server", [1, 2])
def test_rpc_sampling_shuffle(num_server): def test_rpc_sampling_shuffle(num_server):
import tempfile import tempfile
...@@ -360,6 +452,7 @@ def test_rpc_sampling_shuffle(num_server): ...@@ -360,6 +452,7 @@ def test_rpc_sampling_shuffle(num_server):
with tempfile.TemporaryDirectory() as tmpdirname: with tempfile.TemporaryDirectory() as tmpdirname:
check_rpc_sampling_shuffle(Path(tmpdirname), num_server) check_rpc_sampling_shuffle(Path(tmpdirname), num_server)
check_rpc_hetero_sampling_shuffle(Path(tmpdirname), num_server) check_rpc_hetero_sampling_shuffle(Path(tmpdirname), num_server)
check_rpc_hetero_etype_sampling_shuffle(Path(tmpdirname), num_server)
def check_standalone_sampling(tmpdir, reshuffle): def check_standalone_sampling(tmpdir, reshuffle):
g = CitationGraphDataset("cora")[0] g = CitationGraphDataset("cora")[0]
...@@ -381,6 +474,47 @@ def check_standalone_sampling(tmpdir, reshuffle): ...@@ -381,6 +474,47 @@ def check_standalone_sampling(tmpdir, reshuffle):
F.asnumpy(sampled_graph.edata[dgl.EID]), F.asnumpy(eids)) F.asnumpy(sampled_graph.edata[dgl.EID]), F.asnumpy(eids))
dgl.distributed.exit_client() dgl.distributed.exit_client()
def check_standalone_etype_sampling(tmpdir, reshuffle):
hg = CitationGraphDataset('cora')[0]
num_parts = 1
num_hops = 1
partition_graph(hg, 'test_sampling', num_parts, tmpdir,
num_hops=num_hops, part_method='metis', reshuffle=reshuffle)
os.environ['DGL_DIST_MODE'] = 'standalone'
dgl.distributed.initialize("rpc_ip_config.txt")
dist_graph = DistGraph("test_sampling", part_config=tmpdir / 'test_sampling.json')
sampled_graph = sample_etype_neighbors(dist_graph, [0, 10, 99, 66, 1023], dgl.ETYPE, 3)
src, dst = sampled_graph.edges()
assert sampled_graph.number_of_nodes() == hg.number_of_nodes()
assert np.all(F.asnumpy(hg.has_edges_between(src, dst)))
eids = hg.edge_ids(src, dst)
assert np.array_equal(
F.asnumpy(sampled_graph.edata[dgl.EID]), F.asnumpy(eids))
dgl.distributed.exit_client()
def check_standalone_etype_sampling_heterograph(tmpdir, reshuffle):
hg = CitationGraphDataset('cora')[0]
num_parts = 1
num_hops = 1
src, dst = hg.edges()
new_hg = dgl.heterograph({('paper', 'cite', 'paper'): (src, dst),
('paper', 'cite-by', 'paper'): (dst, src)},
{'paper': hg.number_of_nodes()})
partition_graph(new_hg, 'test_hetero_sampling', num_parts, tmpdir,
num_hops=num_hops, part_method='metis', reshuffle=reshuffle)
os.environ['DGL_DIST_MODE'] = 'standalone'
dgl.distributed.initialize("rpc_ip_config.txt")
dist_graph = DistGraph("test_hetero_sampling", part_config=tmpdir / 'test_hetero_sampling.json')
sampled_graph = sample_etype_neighbors(dist_graph, [0, 1, 2, 10, 99, 66, 1023, 1024, 2700, 2701], dgl.ETYPE, 1)
src, dst = sampled_graph.edges(etype=('paper', 'cite', 'paper'))
assert len(src) == 10
src, dst = sampled_graph.edges(etype=('paper', 'cite-by', 'paper'))
assert len(src) == 10
assert sampled_graph.number_of_nodes() == new_hg.number_of_nodes()
dgl.distributed.exit_client()
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet') @unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(dgl.backend.backend_name == 'tensorflow', reason='Not support tensorflow for now') @unittest.skipIf(dgl.backend.backend_name == 'tensorflow', reason='Not support tensorflow for now')
def test_standalone_sampling(): def test_standalone_sampling():
...@@ -462,10 +596,31 @@ def test_rpc_in_subgraph(): ...@@ -462,10 +596,31 @@ def test_rpc_in_subgraph():
with tempfile.TemporaryDirectory() as tmpdirname: with tempfile.TemporaryDirectory() as tmpdirname:
check_rpc_in_subgraph_shuffle(Path(tmpdirname), 2) check_rpc_in_subgraph_shuffle(Path(tmpdirname), 2)
@unittest.skipIf(os.name == 'nt', reason='Do not support windows yet')
@unittest.skipIf(dgl.backend.backend_name == 'tensorflow', reason='Not support tensorflow for now')
@unittest.skipIf(dgl.backend.backend_name == "mxnet", reason="Turn off Mxnet support")
def test_standalone_etype_sampling():
import tempfile
with tempfile.TemporaryDirectory() as tmpdirname:
os.environ['DGL_DIST_MODE'] = 'standalone'
check_standalone_etype_sampling_heterograph(Path(tmpdirname), True)
with tempfile.TemporaryDirectory() as tmpdirname:
os.environ['DGL_DIST_MODE'] = 'standalone'
check_standalone_etype_sampling(Path(tmpdirname), True)
check_standalone_etype_sampling(Path(tmpdirname), False)
if __name__ == "__main__": if __name__ == "__main__":
import tempfile import tempfile
with tempfile.TemporaryDirectory() as tmpdirname: with tempfile.TemporaryDirectory() as tmpdirname:
os.environ['DGL_DIST_MODE'] = 'standalone' os.environ['DGL_DIST_MODE'] = 'standalone'
check_standalone_etype_sampling_heterograph(Path(tmpdirname), True)
test_rpc_sampling_shuffle(1)
test_rpc_sampling_shuffle(2)
with tempfile.TemporaryDirectory() as tmpdirname:
os.environ['DGL_DIST_MODE'] = 'standalone'
check_standalone_etype_sampling(Path(tmpdirname), True)
check_standalone_etype_sampling(Path(tmpdirname), False)
check_standalone_sampling(Path(tmpdirname), True) check_standalone_sampling(Path(tmpdirname), True)
check_standalone_sampling(Path(tmpdirname), False) check_standalone_sampling(Path(tmpdirname), False)
os.environ['DGL_DIST_MODE'] = 'distributed' os.environ['DGL_DIST_MODE'] = 'distributed'
......
...@@ -80,7 +80,7 @@ def _check_neighbor_sampling_dataloader(g, nids, dl, mode, collator): ...@@ -80,7 +80,7 @@ def _check_neighbor_sampling_dataloader(g, nids, dl, mode, collator):
@unittest.skipIf(F._default_context_str == 'gpu', reason="GPU sample neighbors not implemented") @unittest.skipIf(F._default_context_str == 'gpu', reason="GPU sample neighbors not implemented")
def test_neighbor_sampler_dataloader(): def test_neighbor_sampler_dataloader():
g = dgl.heterograph({('user', 'follow', 'user'): ([0, 0, 0, 1, 1], [1, 2, 3, 3, 4])}, g = dgl.heterograph({('user', 'follow', 'user'): ([0, 0, 0, 1, 1], [1, 2, 3, 3, 4])},
{'user': 6}).long() {'user': 6}).long()
g = dgl.to_bidirected(g) g = dgl.to_bidirected(g)
g.ndata['feat'] = F.randn((6, 8)) g.ndata['feat'] = F.randn((6, 8))
...@@ -296,7 +296,7 @@ def test_edge_dataloader(): ...@@ -296,7 +296,7 @@ def test_edge_dataloader():
g2, {ety: g2.edges(form='eid', etype=ety) for ety in g2.canonical_etypes}, g2, {ety: g2.edges(form='eid', etype=ety) for ety in g2.canonical_etypes},
sampler, device=F.ctx(), negative_sampler=neg_sampler, sampler, device=F.ctx(), negative_sampler=neg_sampler,
batch_size=batch_size) batch_size=batch_size)
assert isinstance(iter(dataloader), Iterator) assert isinstance(iter(dataloader), Iterator)
for input_nodes, pos_pair_graph, neg_pair_graph, blocks in dataloader: for input_nodes, pos_pair_graph, neg_pair_graph, blocks in dataloader:
_check_device(input_nodes) _check_device(input_nodes)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment