"...git@developer.sourcefind.cn:renzhc/diffusers_dcu.git" did not exist on "fb54499614f9603bfaa4c026202c5783841b3a80"
Unverified Commit bf264d00 authored by Muhammed Fatih BALIN's avatar Muhammed Fatih BALIN Committed by GitHub
Browse files

[Feature] (La)yer-Neigh(bor) sampling implementation (#4668)



* adding LABOR sampling

* add ladies and pladies samplers

* fix compile error after rebase

* add reference for ladies sampler

* Improve ladies implementation.

* weighted labor sampling initial implementation draft
fix indentation and small bug in ladies script

* importance_sampling currently doesn't work with weights

* fix weighted importance sampling

* move labor example into its own folder

* lint fixes

* Improve documentation

* remove examples from the main PR

* fix linting by not using c++17 features

* fix documentation of labor_sampler.py

* update documentation for labor.py

* reformat the labor.py file with black

* fix linting errors

* replace exception use with if

* fix typo in error comment

* fixing win64 build for ci

* fixing weighted implementation, works now.

* fix bug in the weighted case and importance_sampling==0

* address part of the reviews

* remove unused code paths from cuda

* remove unused code path from cpu side

* remove extra features of labor making use of random seed.

* fix exclude_edges bug

* remove pcg and seed logic from cpu implementation, seed logic should still work for cuda.

* minor style change

* refactor CPU implementation, take out the importance_sampling probability computation into a function.

* improve CUDAWorkspaceAllocator

* refactor importance_sampling part out to a function

* minor optimization

* fix linting issue

* Revert "remove pcg and seed logic from cpu implementation, seed logic should still work for cuda."

This reverts commit c250e07ac6d7e13f57e79e8a2c2f098d777378c2.

* Revert "remove extra features of labor making use of random seed."

This reverts commit 7f99034353080308f4783f27d9a08bea343fb796.

* fix the documentation

* disable NIDs

* improve the documentation in the code

* use the stream argument in pcg32 instead of skipping ahead t times, can discard the use of hashmap now since it is faster this way.

* fix linting issue

* address another round of reviews

* further optimize CPU LABOR sampling implementation

* fix linting error

* update the comment

* reformat

* rename and rephrase comment

* fix formatting according to new linting specs

* fix compile error due to renaming, fix linting.

* lint

* rename DGLHeteroGraph to DGLGraph to match master

* replace other occurrences of DGLHeteroGraph to DGLGraph
Co-authored-by: default avatarMuhammed Fatih BALIN <m.f.balin@gmail.com>
Co-authored-by: default avatarKaan Sancak <kaansnck@gmail.com>
Co-authored-by: default avatarQuan Gan <coin2028@hotmail.com>
parent 59f3d6e0
...@@ -34,3 +34,6 @@ ...@@ -34,3 +34,6 @@
[submodule "third_party/thrust"] [submodule "third_party/thrust"]
path = third_party/thrust path = third_party/thrust
url = https://github.com/NVIDIA/thrust.git url = https://github.com/NVIDIA/thrust.git
[submodule "third_party/pcg"]
path = third_party/pcg
url = https://github.com/imneme/pcg-cpp.git
...@@ -204,7 +204,7 @@ target_include_directories(dgl PRIVATE "third_party/METIS/include/") ...@@ -204,7 +204,7 @@ target_include_directories(dgl PRIVATE "third_party/METIS/include/")
target_include_directories(dgl PRIVATE "tensoradapter/include") target_include_directories(dgl PRIVATE "tensoradapter/include")
target_include_directories(dgl PRIVATE "third_party/nanoflann/include") target_include_directories(dgl PRIVATE "third_party/nanoflann/include")
target_include_directories(dgl PRIVATE "third_party/libxsmm/include") target_include_directories(dgl PRIVATE "third_party/libxsmm/include")
target_include_directories(dgl PRIVATE "third_party/pcg/include")
# For serialization # For serialization
if (USE_HDFS) if (USE_HDFS)
......
...@@ -64,3 +64,4 @@ Contributors ...@@ -64,3 +64,4 @@ Contributors
* [Jiahui Liu](https://github.com/paoxiaode) from Nvidia * [Jiahui Liu](https://github.com/paoxiaode) from Nvidia
* [Neil Dickson](https://github.com/ndickson-nvidia) from Nvidia * [Neil Dickson](https://github.com/ndickson-nvidia) from Nvidia
* [Chang Liu](https://github.com/chang-l) from Nvidia * [Chang Liu](https://github.com/chang-l) from Nvidia
* [Muhammed Fatih Balin](https://github.com/mfbalin) from Nvidia and Georgia Tech
...@@ -47,6 +47,17 @@ IdArray NewIdArray( ...@@ -47,6 +47,17 @@ IdArray NewIdArray(
int64_t length, DGLContext ctx = DGLContext{kDGLCPU, 0}, int64_t length, DGLContext ctx = DGLContext{kDGLCPU, 0},
uint8_t nbits = 64); uint8_t nbits = 64);
/**
* @brief Create a new float array with given length
* @param length The array length
* @param ctx The array context
* @param nbits The number of integer bits
* @return float array
*/
FloatArray NewFloatArray(int64_t length,
DGLContext ctx = DGLContext{kDGLCPU, 0},
uint8_t nbits = 32);
/** /**
* @brief Create a new id array using the given vector data * @brief Create a new id array using the given vector data
* @param vec The vector data * @param vec The vector data
......
/** /**
* Copyright (c) 2020 by Contributors * Copyright (c) 2020-2022 by Contributors
* @file dgl/aten/coo.h * @file dgl/aten/coo.h
* @brief Common COO operations required by DGL. * @brief Common COO operations required by DGL.
*/ */
...@@ -379,6 +379,66 @@ COOMatrix COORemove(COOMatrix coo, IdArray entries); ...@@ -379,6 +379,66 @@ COOMatrix COORemove(COOMatrix coo, IdArray entries);
COOMatrix COOReorder( COOMatrix COOReorder(
COOMatrix coo, runtime::NDArray new_row_ids, runtime::NDArray new_col_ids); COOMatrix coo, runtime::NDArray new_row_ids, runtime::NDArray new_col_ids);
/**
* @brief Randomly select a fixed number of non-zero entries along each given
* row using arXiv:2210.13339, Labor sampling.
*
* The picked indices are returned in the form of a COO matrix.
*
* The passed random_seed makes it so that for any seed vertex s and its
* neighbor t, the rolled random variate r_t is the same for any call to this
* function with the same random seed. When sampling as part of the same batch,
* one would want identical seeds so that LABOR can globally sample. One example
* is that for heterogenous graphs, there is a single random seed passed for
* each edge type. This will sample much fewer vertices compared to having
* unique random seeds for each edge type. If one called this function
* individually for each edge type for a heterogenous graph with different
* random seeds, then it would run LABOR locally for each edge type, resulting
* into a larger number of vertices being sampled.
*
* If this function is called without a random_seed, we get the random seed by
* getting a random number from DGL.
*
*
* Examples:
*
* // coo.num_rows = 4;
* // coo.num_cols = 4;
* // coo.rows = [0, 0, 1, 3, 3]
* // coo.cols = [0, 1, 1, 2, 3]
* // coo.data = [2, 3, 0, 1, 4]
* COOMatrix coo = ...;
* IdArray rows = ... ; // [1, 3]
* COOMatrix sampled = COOLaborSampling(coo, rows, 2, NullArray(), 0 \
* , NullArray(), NullArray());
* // possible sampled coo matrix:
* // sampled.num_rows = 4
* // sampled.num_cols = 4
* // sampled.rows = [1, 3, 3]
* // sampled.cols = [1, 2, 3]
* // sampled.data = [3, 0, 4]
*
* @param mat Input coo matrix.
* @param rows Rows to sample from.
* @param num_samples Number of samples using labor sampling
* @param prob Probability array for nonuniform sampling
* @param importance_sampling Whether to enable importance sampling
* @param random_seed The random seed for the sampler
* @param NIDs global nids if sampling from a subgraph
* @return A pair of COOMatrix storing the picked row and col indices and edge
* weights if importance_sampling != 0 or prob argument was passed.
* Its data field stores the the index of the picked elements in the
* value array.
*/
std::pair<COOMatrix, FloatArray> COOLaborSampling(
COOMatrix mat,
IdArray rows,
int64_t num_samples,
FloatArray prob = NullArray(),
int importance_sampling = 0,
IdArray random_seed = NullArray(),
IdArray NIDs = NullArray());
/** /**
* @brief Randomly select a fixed number of non-zero entries along each given * @brief Randomly select a fixed number of non-zero entries along each given
* row independently. * row independently.
......
/** /**
* Copyright (c) 2020 by Contributors * Copyright (c) 2020-2022 by Contributors
* @file dgl/aten/csr.h * @file dgl/aten/csr.h
* @brief Common CSR operations required by DGL. * @brief Common CSR operations required by DGL.
*/ */
...@@ -409,7 +409,66 @@ CSRMatrix CSRRemove(CSRMatrix csr, IdArray entries); ...@@ -409,7 +409,66 @@ CSRMatrix CSRRemove(CSRMatrix csr, IdArray entries);
/** /**
* @brief Randomly select a fixed number of non-zero entries along each given * @brief Randomly select a fixed number of non-zero entries along each given
* row independently. * row using arXiv:2210.13339, Labor sampling.
*
* The picked indices are returned in the form of a COO matrix.
*
* The passed random_seed makes it so that for any seed vertex s and its
* neighbor t, the rolled random variate r_t is the same for any call to this
* function with the same random seed. When sampling as part of the same batch,
* one would want identical seeds so that LABOR can globally sample. One example
* is that for heterogenous graphs, there is a single random seed passed for
* each edge type. This will sample much fewer vertices compared to having
* unique random seeds for each edge type. If one called this function
* individually for each edge type for a heterogenous graph with different
* random seeds, then it would run LABOR locally for each edge type, resulting
* into a larger number of vertices being sampled.
*
* If this function is called without a random_seed, we get the random seed by
* getting a random number from DGL.
*
*
* Examples:
*
* // csr.num_rows = 4;
* // csr.num_cols = 4;
* // csr.indptr = [0, 2, 3, 3, 5]
* // csr.indices = [0, 1, 1, 2, 3]
* // csr.data = [2, 3, 0, 1, 4]
* CSRMatrix csr = ...;
* IdArray rows = ... ; // [1, 3]
* COOMatrix sampled = CSRLaborSampling(csr, rows, 2, NullArray(), 0, \
* NullArray(), NullArray());
* // possible sampled coo matrix:
* // sampled.num_rows = 4
* // sampled.num_cols = 4
* // sampled.rows = [1, 3, 3]
* // sampled.cols = [1, 2, 3]
* // sampled.data = [3, 0, 4]
*
* @param mat Input CSR matrix.
* @param rows Rows to sample from.
* @param num_samples Number of samples using labor sampling
* @param prob Probability array for nonuniform sampling
* @param importance_sampling Whether to enable importance sampling
* @param random_seed The random seed for the sampler
* @param NIDs global nids if sampling from a subgraph
* @return A pair of COOMatrix storing the picked row and col indices and edge
* weights if importance_sampling != 0 or prob argument was passed. Its
* data field stores the the index of the picked elements in the value
* array.
*/
std::pair<COOMatrix, FloatArray> CSRLaborSampling(
CSRMatrix mat,
IdArray rows,
int64_t num_samples,
FloatArray prob = NullArray(),
int importance_sampling = 0,
IdArray random_seed = NullArray(),
IdArray NIDs = NullArray());
/*!
* @brief Randomly select a fixed number of non-zero entries along each given row independently.
* *
* The function performs random choices along each row independently. * The function performs random choices along each row independently.
* The picked indices are returned in the form of a COO matrix. * The picked indices are returned in the form of a COO matrix.
......
...@@ -24,7 +24,7 @@ class SamplerOp { ...@@ -24,7 +24,7 @@ class SamplerOp {
* @brief Sample a graph from the seed vertices with neighbor sampling. * @brief Sample a graph from the seed vertices with neighbor sampling.
* The neighbors are sampled with a uniform distribution. * The neighbors are sampled with a uniform distribution.
* *
* @param graphs A graph for sampling. * @param graph A graph for sampling.
* @param seeds the nodes where we should start to sample. * @param seeds the nodes where we should start to sample.
* @param edge_type the type of edges we should sample neighbors. * @param edge_type the type of edges we should sample neighbors.
* @param num_hops the number of hops to sample neighbors. * @param num_hops the number of hops to sample neighbors.
...@@ -43,7 +43,7 @@ class SamplerOp { ...@@ -43,7 +43,7 @@ class SamplerOp {
* @brief Sample a graph from the seed vertices with layer sampling. * @brief Sample a graph from the seed vertices with layer sampling.
* The layers are sampled with a uniform distribution. * The layers are sampled with a uniform distribution.
* *
* @param graphs A graph for sampling. * @param graph A graph for sampling.
* @param seeds the nodes where we should start to sample. * @param seeds the nodes where we should start to sample.
* @param edge_type the type of edges we should sample neighbors. * @param edge_type the type of edges we should sample neighbors.
* @param layer_sizes The size of layers. * @param layer_sizes The size of layers.
......
...@@ -4,6 +4,7 @@ from . import negative_sampler ...@@ -4,6 +4,7 @@ from . import negative_sampler
from .base import * from .base import *
from .cluster_gcn import * from .cluster_gcn import *
from .graphsaint import * from .graphsaint import *
from .labor_sampler import *
from .neighbor_sampler import * from .neighbor_sampler import *
from .shadow import * from .shadow import *
......
#
# Copyright (c) 2022 by Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Based off of neighbor_sampler.py
#
"""Data loading components for labor sampling"""
from ..base import NID, EID
from ..transforms import to_block
from .base import BlockSampler
from ..random import choice
from .. import backend as F
class LaborSampler(BlockSampler):
"""Sampler that builds computational dependency of node representations via
labor sampling for multilayer GNN from
`(LA)yer-neigh(BOR) Sampling: Defusing Neighborhood Explosion in GNNs
<https://arxiv.org/abs/2210.13339>`
This sampler will make every node gather messages from a fixed number of
neighbors per edge type. The neighbors are picked uniformly with default
parameters. For every vertex t that will be considered to be sampled, there
will be a single random variate r_t.
Parameters
----------
fanouts : list[int] or list[dict[etype, int]]
List of neighbors to sample per edge type for each GNN layer, with the
i-th element being the fanout for the i-th GNN layer.
If only a single integer is provided, DGL assumes that every edge type
will have the same fanout.
If -1 is provided for one edge type on one layer, then all inbound edges
of that edge type will be included.
edge_dir : str, default ``'in'``
Can be either ``'in' `` where the neighbors will be sampled according to
incoming edges, or ``'out'`` otherwise, same as
:func:`dgl.sampling.sample_neighbors`.
prob : str, optional
If given, the probability of each neighbor being sampled is proportional
to the edge feature value with the given name in ``g.edata``.
The feature must be a scalar on each edge.
importance_sampling : int, default ``0``
Whether to use importance sampling or uniform sampling, use of negative
values optimizes importance sampling probabilities until convergence
while use of positive values runs optimization steps that many times.
If the value is i, then LABOR-i variant is used.
layer_dependency : bool, default ``False``
Specifies whether different layers should use same random variates.
Results into a reduction in the number of vertices sampled, but may
degrade the quality slightly.
prefetch_node_feats : list[str] or dict[ntype, list[str]], optional
The source node data to prefetch for the first MFG, corresponding to the
input node features necessary for the first GNN layer.
prefetch_labels : list[str] or dict[ntype, list[str]], optional
The destination node data to prefetch for the last MFG, corresponding to
the node labels of the minibatch.
prefetch_edge_feats : list[str] or dict[etype, list[str]], optional
The edge data names to prefetch for all the MFGs, corresponding to the
edge features necessary for all GNN layers.
output_device : device, optional
The device of the output subgraphs or MFGs. Default is the same as the
minibatch of seed nodes.
Examples
--------
**Node classification**
To train a 3-layer GNN for node classification on a set of nodes
``train_nid`` on a homogeneous graph where each node takes messages from
5, 10, 15 neighbors for the first, second, and third layer respectively
(assuming the backend is PyTorch):
>>> sampler = dgl.dataloading.LaborSampler([5, 10, 15])
>>> dataloader = dgl.dataloading.DataLoader(
... g, train_nid, sampler,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
>>> for input_nodes, output_nodes, blocks in dataloader:
... train_on(blocks)
If training on a heterogeneous graph and you want different number of
neighbors for each edge type, one should instead provide a list of dicts.
Each dict would specify the number of neighbors to pick per edge type.
>>> sampler = dgl.dataloading.LaborSampler([
... {('user', 'follows', 'user'): 5,
... ('user', 'plays', 'game'): 4,
... ('game', 'played-by', 'user'): 3}] * 3)
If you would like non-uniform labor sampling:
>>> # any non-negative 1D vector works
>>> g.edata['p'] = torch.rand(g.num_edges())
>>> sampler = dgl.dataloading.LaborSampler([5, 10, 15], prob='p')
**Edge classification and link prediction**
This class can also work for edge classification and link prediction
together with :func:`as_edge_prediction_sampler`.
>>> sampler = dgl.dataloading.LaborSampler([5, 10, 15])
>>> sampler = dgl.dataloading.as_edge_prediction_sampler(sampler)
>>> dataloader = dgl.dataloading.DataLoader(
... g, train_eid, sampler,
... batch_size=1024, shuffle=True, drop_last=False, num_workers=4)
See the documentation :func:`as_edge_prediction_sampler` for more details.
Notes
-----
For the concept of MFGs, please refer to
:ref:`User Guide Section 6 <guide-minibatch>` and
:doc:`Minibatch Training Tutorials
<tutorials/large/L0_neighbor_sampling_overview>`.
"""
def __init__(
self,
fanouts,
edge_dir="in",
prob=None,
importance_sampling=0,
layer_dependency=False,
prefetch_node_feats=None,
prefetch_labels=None,
prefetch_edge_feats=None,
output_device=None,
):
super().__init__(
prefetch_node_feats=prefetch_node_feats,
prefetch_labels=prefetch_labels,
prefetch_edge_feats=prefetch_edge_feats,
output_device=output_device,
)
self.fanouts = fanouts
self.edge_dir = edge_dir
self.prob = prob
self.importance_sampling = importance_sampling
self.layer_dependency = layer_dependency
self.set_seed()
def set_seed(self, random_seed=None):
"""Updates the underlying seed for the sampler
Calling this function enforces the sampling algorithm to use the same
seed on every edge type. This can reduce the number of nodes being
sampled because the passed random_seed makes it so that for any seed
vertex ``s`` and its neighbor ``t``, the rolled random variate ``r_t``
is the same for any instance of this class with the same random seed.
When sampling as part of the same batch, one would want identical seeds
so that LABOR can globally sample. One example is that for heterogenous
graphs, there is a single random seed passed for each edge type. This
will sample much fewer vertices compared to having unique random seeds
for each edge type. If one called this function individually for each
edge type for a heterogenous graph with different random seeds, then it
would run LABOR locally for each edge type, resulting into a larger
number of vertices being sampled.
If this function is called without any parameters, we get the random
seed by getting a random number from DGL. Call this function if multiple
instances of LaborSampler are used to sample as part of a single batch.
Parameters
----------
random_seed : int, default ``None``
The random seed to be used for next sampling call.
"""
if random_seed is None:
self.random_seed = choice(1e18, 1)
else:
self.random_seed = F.tensor(random_seed, F.int64)
def sample_blocks(self, g, seed_nodes, exclude_eids=None):
output_nodes = seed_nodes
blocks = []
for i, fanout in enumerate(reversed(self.fanouts)):
random_seed_i = F.zerocopy_to_dgl_ndarray(
self.random_seed + (i if not self.layer_dependency else 0)
)
frontier, importances = g.sample_labors(
seed_nodes,
fanout,
edge_dir=self.edge_dir,
prob=self.prob,
importance_sampling=self.importance_sampling,
random_seed=random_seed_i,
output_device=self.output_device,
exclude_edges=exclude_eids,
)
eid = frontier.edata[EID]
block = to_block(
frontier, seed_nodes, include_dst_in_src=True, src_nodes=None
)
block.edata[EID] = eid
if len(g.canonical_etypes) > 1:
for etype, importance in zip(
g.canonical_etypes, importances
):
if importance.shape[0] == block.num_edges(etype):
block.edata["edge_weights"][etype] = importance
elif importances[0].shape[0] == block.num_edges():
block.edata["edge_weights"] = importances[0]
seed_nodes = block.srcdata[NID]
blocks.insert(0, block)
self.set_seed()
return seed_nodes, output_nodes, blocks
...@@ -8,6 +8,7 @@ gives a holistic explanation on how different components work together. ...@@ -8,6 +8,7 @@ gives a holistic explanation on how different components work together.
from .randomwalks import * from .randomwalks import *
from .pinsage import * from .pinsage import *
from .neighbor import * from .neighbor import *
from .labor import *
from .node2vec_randomwalk import * from .node2vec_randomwalk import *
from .negative import * from .negative import *
from . import utils from . import utils
#
# Copyright (c) 2022 by Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Based off of neighbor.py
#
"""Labor sampling APIs"""
from .._ffi.function import _init_api
from .. import backend as F
from ..base import DGLError
from ..heterograph import DGLGraph
from .. import ndarray as nd
from .. import utils
from .utils import EidExcluder
from ..random import choice
__all__ = ["sample_labors"]
def sample_labors(
g,
nodes,
fanout,
edge_dir="in",
prob=None,
importance_sampling=0,
random_seed=None,
copy_ndata=True,
copy_edata=True,
exclude_edges=None,
output_device=None,
):
"""Sampler that builds computational dependency of node representations via
labor sampling for multilayer GNN from
`(LA)yer-neigh(BOR) Sampling: Defusing Neighborhood Explosion in GNNs
<https://arxiv.org/abs/2210.13339>`
This sampler will make every node gather messages from a fixed number of neighbors
per edge type. The neighbors are picked uniformly with default parameters. For every vertex t
that will be considered to be sampled, there will be a single random variate r_t.
For each node, a number of inbound (or outbound when ``edge_dir == 'out'``) edges
will be randomly chosen. The graph returned will then contain all the nodes in the
original graph, but only the sampled edges.
Node/edge features are not preserved. The original IDs of
the sampled edges are stored as the `dgl.EID` feature in the returned graph.
Parameters
----------
g : DGLGraph
The graph, allowed to have multiple node or edge types. Can be either on CPU or GPU.
nodes : tensor or dict
Node IDs to sample neighbors from.
This argument can take a single ID tensor or a dictionary of node types and ID tensors.
If a single tensor is given, the graph must only have one type of nodes.
fanout : int or dict[etype, int]
The number of edges to be sampled for each node on each edge type.
This argument can take a single int or a dictionary of edge types and ints.
If a single int is given, DGL will sample this number of edges for each node for
every edge type.
If -1 is given for a single edge type, all the neighboring edges with that edge
type will be selected.
edge_dir : str, optional
Determines whether to sample inbound or outbound edges.
Can take either ``in`` for inbound edges or ``out`` for outbound edges.
prob : str, optional
Feature name used as the (unnormalized) probabilities associated with each
neighboring edge of a node. The feature must have only one element for each
edge.
The features must be non-negative floats, and the sum of the features of
inbound/outbound edges for every node must be positive (though they don't have
to sum up to one). Otherwise, the result will be undefined.
If :attr:`prob` is not None, GPU sampling is not supported.
importance_sampling : int, optional
Whether to use importance sampling or uniform sampling, use of negative values optimizes
importance sampling probabilities until convergence while use of positive values runs
optimization steps that many times. If the value is i, then LABOR-i variant is used.
random_seed : tensor
An int64 tensor with one element.
The passed random_seed makes it so that for any seed vertex ``s`` and its neighbor ``t``,
the rolled random variate ``r_t`` is the same for any call to this function with the same
random seed. When sampling as part of the same batch, one would want identical seeds so that
LABOR can globally sample. One example is that for heterogenous graphs, there is a single
random seed passed for each edge type. This will sample much fewer vertices compared to
having unique random seeds for each edge type. If one called this function individually for
each edge type for a heterogenous graph with different random seeds, then it would run LABOR
locally for each edge type, resulting into a larger number of vertices being sampled.
If this function is called without a ``random_seed``, we get the random seed by getting a
random number from DGL. Use this argument with identical random_seed if multiple calls to
this function are used to sample as part of a single batch.
copy_ndata: bool, optional
If True, the node features of the new graph are copied from
the original graph. If False, the new graph will not have any
node features.
(Default: True)
copy_edata: bool, optional
If True, the edge features of the new graph are copied from
the original graph. If False, the new graph will not have any
edge features.
(Default: True)
exclude_edges: tensor or dict
Edge IDs to exclude during sampling neighbors for the seed nodes.
This argument can take a single ID tensor or a dictionary of edge types and ID tensors.
If a single tensor is given, the graph must only have one type of nodes.
output_device : Framework-specific device context object, optional
The output device. Default is the same as the input graph.
Returns
-------
tuple(DGLGraph, list[Tensor])
A sampled subgraph containing only the sampled neighboring edges along with edge weights.
Notes
-----
If :attr:`copy_ndata` or :attr:`copy_edata` is True, same tensors are used as
the node or edge features of the original graph and the new graph.
As a result, users should avoid performing in-place operations
on the node features of the new graph to avoid feature corruption.
Examples
--------
Assume that you have the following graph
>>> g = dgl.graph(([0, 0, 1, 1, 2, 2], [1, 2, 0, 1, 2, 0]))
And the weights
>>> g.edata['prob'] = torch.FloatTensor([0., 1., 0., 1., 0., 1.])
To sample one inbound edge for node 0 and node 1:
>>> sg = dgl.sampling.sample_labors(g, [0, 1], 1)
>>> sg.edges(order='eid')
(tensor([1, 0]), tensor([0, 1]))
>>> sg.edata[dgl.EID]
tensor([2, 0])
To sample one inbound edge for node 0 and node 1 with probability in edge feature
``prob``:
>>> sg = dgl.sampling.sample_labors(g, [0, 1], 1, prob='prob')
>>> sg.edges(order='eid')
(tensor([2, 1]), tensor([0, 1]))
With ``fanout`` greater than the number of actual neighbors and without replacement,
DGL will take all neighbors instead:
>>> sg = dgl.sampling.sample_labors(g, [0, 1], 3)
>>> sg.edges(order='eid')
(tensor([1, 2, 0, 1]), tensor([0, 0, 1, 1]))
To exclude certain EID's during sampling for the seed nodes:
>>> g = dgl.graph(([0, 0, 1, 1, 2, 2], [1, 2, 0, 1, 2, 0]))
>>> g_edges = g.all_edges(form='all')``
(tensor([0, 0, 1, 1, 2, 2]), tensor([1, 2, 0, 1, 2, 0]), tensor([0, 1, 2, 3, 4, 5]))
>>> sg = dgl.sampling.sample_labors(g, [0, 1], 3, exclude_edges=[0, 1, 2])
>>> sg.all_edges(form='all')
(tensor([2, 1]), tensor([0, 1]), tensor([0, 1]))
>>> sg.has_edges_between(g_edges[0][:3],g_edges[1][:3])
tensor([False, False, False])
>>> g = dgl.heterograph({
... ('drug', 'interacts', 'drug'): ([0, 0, 1, 1, 3, 2], [1, 2, 0, 1, 2, 0]),
... ('drug', 'interacts', 'gene'): ([0, 0, 1, 1, 2, 2], [1, 2, 0, 1, 2, 0]),
... ('drug', 'treats', 'disease'): ([0, 0, 1, 1, 2, 2], [1, 2, 0, 1, 2, 0])})
>>> g_edges = g.all_edges(form='all', etype=('drug', 'interacts', 'drug'))
(tensor([0, 0, 1, 1, 3, 2]), tensor([1, 2, 0, 1, 2, 0]), tensor([0, 1, 2, 3, 4, 5]))
>>> excluded_edges = {('drug', 'interacts', 'drug'): g_edges[2][:3]}
>>> sg = dgl.sampling.sample_labors(g, {'drug':[0, 1]}, 3, exclude_edges=excluded_edges)
>>> sg.all_edges(form='all', etype=('drug', 'interacts', 'drug'))
(tensor([2, 1]), tensor([0, 1]), tensor([0, 1]))
>>> sg.has_edges_between(g_edges[0][:3],g_edges[1][:3],etype=('drug', 'interacts', 'drug'))
tensor([False, False, False])
"""
if F.device_type(g.device) == "cpu" and not g.is_pinned():
frontier, importances = _sample_labors(
g,
nodes,
fanout,
edge_dir=edge_dir,
prob=prob,
importance_sampling=importance_sampling,
random_seed=random_seed,
copy_ndata=copy_ndata,
copy_edata=copy_edata,
exclude_edges=exclude_edges,
)
else:
frontier, importances = _sample_labors(
g,
nodes,
fanout,
edge_dir=edge_dir,
prob=prob,
importance_sampling=importance_sampling,
random_seed=random_seed,
copy_ndata=copy_ndata,
copy_edata=copy_edata,
)
if exclude_edges is not None:
eid_excluder = EidExcluder(exclude_edges)
frontier, importances = eid_excluder(frontier, importances)
if output_device is None:
return (frontier, importances)
else:
return (frontier.to(output_device), list(map(lambda x: x.to(output_device), importances)))
def _sample_labors(
g,
nodes,
fanout,
edge_dir="in",
prob=None,
importance_sampling=0,
random_seed=None,
copy_ndata=True,
copy_edata=True,
exclude_edges=None,
):
if random_seed is None:
random_seed = F.to_dgl_nd(choice(1e18, 1))
if not isinstance(nodes, dict):
if len(g.ntypes) > 1:
raise DGLError(
"Must specify node type when the graph is not homogeneous."
)
nodes = {g.ntypes[0]: nodes}
nodes = utils.prepare_tensor_dict(g, nodes, "nodes")
if len(nodes) == 0:
raise ValueError(
"Got an empty dictionary in the nodes argument. "
"Please pass in a dictionary with empty tensors as values instead."
)
ctx = utils.to_dgl_context(F.context(next(iter(nodes.values()))))
nodes_all_types = []
# nids_all_types is needed if one wants labor to work for subgraphs whose vertices have
# been renamed and the rolled randoms should be rolled for global vertex ids.
# It is disabled for now below by passing empty ndarrays.
nids_all_types = [nd.array([], ctx=ctx) for _ in g.ntypes]
for ntype in g.ntypes:
if ntype in nodes:
nodes_all_types.append(F.to_dgl_nd(nodes[ntype]))
else:
nodes_all_types.append(nd.array([], ctx=ctx))
if isinstance(fanout, nd.NDArray):
fanout_array = fanout
else:
if not isinstance(fanout, dict):
fanout_array = [int(fanout)] * len(g.etypes)
else:
if len(fanout) != len(g.etypes):
raise DGLError(
"Fan-out must be specified for each edge type "
"if a dict is provided."
)
fanout_array = [None] * len(g.etypes)
for etype, value in fanout.items():
fanout_array[g.get_etype_id(etype)] = value
fanout_array = F.to_dgl_nd(F.tensor(fanout_array, dtype=F.int64))
if (
isinstance(prob, list)
and len(prob) > 0
and isinstance(prob[0], nd.NDArray)
):
prob_arrays = prob
elif prob is None:
prob_arrays = [nd.array([], ctx=nd.cpu())] * len(g.etypes)
else:
prob_arrays = []
for etype in g.canonical_etypes:
if prob in g.edges[etype].data:
prob_arrays.append(F.to_dgl_nd(g.edges[etype].data[prob]))
else:
prob_arrays.append(nd.array([], ctx=nd.cpu()))
excluded_edges_all_t = []
if exclude_edges is not None:
if not isinstance(exclude_edges, dict):
if len(g.etypes) > 1:
raise DGLError(
"Must specify etype when the graph is not homogeneous."
)
exclude_edges = {g.canonical_etypes[0]: exclude_edges}
exclude_edges = utils.prepare_tensor_dict(g, exclude_edges, "edges")
for etype in g.canonical_etypes:
if etype in exclude_edges:
excluded_edges_all_t.append(F.to_dgl_nd(exclude_edges[etype]))
else:
excluded_edges_all_t.append(nd.array([], ctx=ctx))
ret_val = _CAPI_DGLSampleLabors(
g._graph,
nodes_all_types,
fanout_array,
edge_dir,
prob_arrays,
excluded_edges_all_t,
importance_sampling,
random_seed,
nids_all_types,
)
subgidx = ret_val[0]
importances = [F.from_dgl_nd(importance) for importance in ret_val[1:]]
induced_edges = subgidx.induced_edges
ret = DGLGraph(subgidx.graph, g.ntypes, g.etypes)
if copy_ndata:
node_frames = utils.extract_node_subframes(g, None)
utils.set_new_frames(ret, node_frames=node_frames)
if copy_edata:
edge_frames = utils.extract_edge_subframes(g, induced_edges)
utils.set_new_frames(ret, edge_frames=edge_frames)
return ret, importances
DGLGraph.sample_labors = utils.alias_func(sample_labors)
_init_api("dgl.sampling.labor", __name__)
...@@ -375,7 +375,7 @@ def _sample_neighbors(g, nodes, fanout, edge_dir='in', prob=None, ...@@ -375,7 +375,7 @@ def _sample_neighbors(g, nodes, fanout, edge_dir='in', prob=None,
if exclude_edges is not None: if exclude_edges is not None:
if not isinstance(exclude_edges, dict): if not isinstance(exclude_edges, dict):
if len(g.etypes) > 1: if len(g.etypes) > 1:
raise DGLError("Must specify etype type when the graph is not homogeneous.") raise DGLError("Must specify etype when the graph is not homogeneous.")
exclude_edges = {g.canonical_etypes[0] : exclude_edges} exclude_edges = {g.canonical_etypes[0] : exclude_edges}
exclude_edges = utils.prepare_tensor_dict(g, exclude_edges, 'edges') exclude_edges = utils.prepare_tensor_dict(g, exclude_edges, 'edges')
for etype in g.canonical_etypes: for etype in g.canonical_etypes:
......
...@@ -57,7 +57,7 @@ class EidExcluder(object): ...@@ -57,7 +57,7 @@ class EidExcluder(object):
func = lambda x, y: x.find_included_indices(y) func = lambda x, y: x.find_included_indices(y)
return recursive_apply_pair(self._filter, parent_eids, func) return recursive_apply_pair(self._filter, parent_eids, func)
def __call__(self, frontier): def __call__(self, frontier, weights=None):
parent_eids = frontier.edata[EID] parent_eids = frontier.edata[EID]
located_eids = self._find_indices(parent_eids) located_eids = self._find_indices(parent_eids)
...@@ -69,15 +69,19 @@ class EidExcluder(object): ...@@ -69,15 +69,19 @@ class EidExcluder(object):
if len(located_eids) > 0: if len(located_eids) > 0:
frontier = transforms.remove_edges( frontier = transforms.remove_edges(
frontier, located_eids, store_ids=True) frontier, located_eids, store_ids=True)
if weights is not None and weights[0].shape[0] == frontier.num_edges():
weights[0] = F.gather_row(weights[0], frontier.edata[EID])
frontier.edata[EID] = F.gather_row(parent_eids, frontier.edata[EID]) frontier.edata[EID] = F.gather_row(parent_eids, frontier.edata[EID])
else: else:
# (BarclayII) remove_edges only accepts removing one type of edges, # (BarclayII) remove_edges only accepts removing one type of edges,
# so I need to keep track of the edge IDs left one by one. # so I need to keep track of the edge IDs left one by one.
new_eids = parent_eids.copy() new_eids = parent_eids.copy()
for k, v in located_eids.items(): for i, (k, v) in enumerate(located_eids.items()):
if len(v) > 0: if len(v) > 0:
frontier = transforms.remove_edges( frontier = transforms.remove_edges(
frontier, v, etype=k, store_ids=True) frontier, v, etype=k, store_ids=True)
new_eids[k] = F.gather_row(parent_eids[k], frontier.edges[k].data[EID]) new_eids[k] = F.gather_row(parent_eids[k], frontier.edges[k].data[EID])
if weights is not None and weights[i].shape[0] == frontier.num_edges(k):
weights[i] = F.gather_row(weights[i], frontier.edges[k].data[EID])
frontier.edata[EID] = new_eids frontier.edata[EID] = new_eids
return frontier return frontier if weights is None else (frontier, weights)
/** /**
* Copyright (c) 2019-2021 by Contributors * Copyright (c) 2019-2022 by Contributors
* @file array/array.cc * @file array/array.cc
* @brief DGL array utilities implementation * @brief DGL array utilities implementation
*/ */
...@@ -25,6 +25,10 @@ IdArray NewIdArray(int64_t length, DGLContext ctx, uint8_t nbits) { ...@@ -25,6 +25,10 @@ IdArray NewIdArray(int64_t length, DGLContext ctx, uint8_t nbits) {
return IdArray::Empty({length}, DGLDataType{kDGLInt, nbits, 1}, ctx); return IdArray::Empty({length}, DGLDataType{kDGLInt, nbits, 1}, ctx);
} }
FloatArray NewFloatArray(int64_t length, DGLContext ctx, uint8_t nbits) {
return FloatArray::Empty({length}, DGLDataType{kDGLFloat, nbits, 1}, ctx);
}
IdArray Clone(IdArray arr) { IdArray Clone(IdArray arr) {
IdArray ret = NewIdArray(arr->shape[0], arr->ctx, arr->dtype.bits); IdArray ret = NewIdArray(arr->shape[0], arr->ctx, arr->dtype.bits);
ret.CopyFrom(arr); ret.CopyFrom(arr);
...@@ -536,6 +540,22 @@ CSRMatrix CSRRemove(CSRMatrix csr, IdArray entries) { ...@@ -536,6 +540,22 @@ CSRMatrix CSRRemove(CSRMatrix csr, IdArray entries) {
return ret; return ret;
} }
std::pair<COOMatrix, FloatArray> CSRLaborSampling(
CSRMatrix mat, IdArray rows, int64_t num_samples, FloatArray prob,
int importance_sampling, IdArray random_seed, IdArray NIDs) {
std::pair<COOMatrix, FloatArray> ret;
ATEN_CSR_SWITCH_CUDA_UVA(mat, rows, XPU, IdType, "CSRLaborSampling", {
const auto dtype = IsNullArray(prob)
? DGLDataTypeTraits<float>::dtype
: prob->dtype;
ATEN_FLOAT_TYPE_SWITCH(dtype, FloatType, "probability", {
ret = impl::CSRLaborSampling<XPU, IdType, FloatType>(
mat, rows, num_samples, prob, importance_sampling, random_seed, NIDs);
});
});
return ret;
}
COOMatrix CSRRowWiseSampling( COOMatrix CSRRowWiseSampling(
CSRMatrix mat, IdArray rows, int64_t num_samples, NDArray prob_or_mask, CSRMatrix mat, IdArray rows, int64_t num_samples, NDArray prob_or_mask,
bool replace) { bool replace) {
...@@ -794,6 +814,22 @@ COOMatrix COORemove(COOMatrix coo, IdArray entries) { ...@@ -794,6 +814,22 @@ COOMatrix COORemove(COOMatrix coo, IdArray entries) {
return ret; return ret;
} }
std::pair<COOMatrix, FloatArray> COOLaborSampling(
COOMatrix mat, IdArray rows, int64_t num_samples, FloatArray prob,
int importance_sampling, IdArray random_seed, IdArray NIDs) {
std::pair<COOMatrix, FloatArray> ret;
ATEN_COO_SWITCH(mat, XPU, IdType, "COOLaborSampling", {
const auto dtype = IsNullArray(prob)
? DGLDataTypeTraits<float>::dtype
: prob->dtype;
ATEN_FLOAT_TYPE_SWITCH(dtype, FloatType, "probability", {
ret = impl::COOLaborSampling<XPU, IdType, FloatType>(
mat, rows, num_samples, prob, importance_sampling, random_seed, NIDs);
});
});
return ret;
}
COOMatrix COORowWiseSampling( COOMatrix COORowWiseSampling(
COOMatrix mat, IdArray rows, int64_t num_samples, NDArray prob_or_mask, COOMatrix mat, IdArray rows, int64_t num_samples, NDArray prob_or_mask,
bool replace) { bool replace) {
......
/** /**
* Copyright (c) 2019 by Contributors * Copyright (c) 2019-2022 by Contributors
* @file array/array_op.h * @file array/array_op.h
* @brief Array operator templates * @brief Array operator templates
*/ */
...@@ -166,6 +166,16 @@ COOMatrix COOReorder( ...@@ -166,6 +166,16 @@ COOMatrix COOReorder(
template <DGLDeviceType XPU, typename IdType> template <DGLDeviceType XPU, typename IdType>
CSRMatrix CSRRemove(CSRMatrix csr, IdArray entries); CSRMatrix CSRRemove(CSRMatrix csr, IdArray entries);
template <DGLDeviceType XPU, typename IdType, typename FloatType>
std::pair<COOMatrix, FloatArray> CSRLaborSampling(
CSRMatrix mat,
IdArray rows,
int64_t num_samples,
FloatArray prob,
int importance_sampling,
IdArray random_seed,
IdArray NIDs);
// FloatType is the type of probability data. // FloatType is the type of probability data.
template <DGLDeviceType XPU, typename IdType, typename DType> template <DGLDeviceType XPU, typename IdType, typename DType>
COOMatrix CSRRowWiseSampling( COOMatrix CSRRowWiseSampling(
...@@ -273,6 +283,16 @@ std::pair<bool, bool> COOIsSorted(COOMatrix coo); ...@@ -273,6 +283,16 @@ std::pair<bool, bool> COOIsSorted(COOMatrix coo);
template <DGLDeviceType XPU, typename IdType> template <DGLDeviceType XPU, typename IdType>
COOMatrix COORemove(COOMatrix coo, IdArray entries); COOMatrix COORemove(COOMatrix coo, IdArray entries);
template <DGLDeviceType XPU, typename IdType, typename FloatType>
std::pair<COOMatrix, FloatArray> COOLaborSampling(
COOMatrix mat,
IdArray rows,
int64_t num_samples,
FloatArray prob,
int importance_sampling,
IdArray random_seed,
IdArray NIDs);
// FloatType is the type of probability data. // FloatType is the type of probability data.
template <DGLDeviceType XPU, typename IdType, typename DType> template <DGLDeviceType XPU, typename IdType, typename DType>
COOMatrix COORowWiseSampling( COOMatrix COORowWiseSampling(
......
/*!
* Copyright (c) 2022, NVIDIA Corporation
* Copyright (c) 2022, GT-TDAlab (Muhammed Fatih Balin & Umit V. Catalyurek)
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* \file array/cpu/labor_pick.h
* \brief Template implementation for layerwise pick operators.
*/
#ifndef DGL_ARRAY_CPU_LABOR_PICK_H_
#define DGL_ARRAY_CPU_LABOR_PICK_H_
#include <dgl/array.h>
#include <dgl/random.h>
#include <dgl/runtime/parallel_for.h>
#include <dmlc/omp.h>
#include <parallel_hashmap/phmap.h>
#include <algorithm>
#include <cmath>
#include <functional>
#include <iostream>
#include <memory>
#include <numeric>
#include <string>
#include <utility>
#include <vector>
#include <pcg_random.hpp>
namespace dgl {
namespace aten {
namespace impl {
constexpr double eps = 0.0001;
template <typename IdxType, typename FloatType>
auto compute_importance_sampling_probabilities(
DGLContext ctx, DGLDataType dtype, const IdxType max_degree,
const IdxType num_rows, const int importance_sampling, const bool weighted,
const IdxType* rows_data, const IdxType* indptr, const FloatType* A,
const IdxType* indices, const IdxType num_picks, const FloatType* ds,
FloatType* cs) {
constexpr FloatType ONE = 1;
// ps stands for \pi in arXiv:2210.13339
FloatArray ps_array = NDArray::Empty({max_degree + 1}, dtype, ctx);
FloatType* ps = ps_array.Ptr<FloatType>();
double prev_ex_nodes = max_degree * num_rows;
phmap::flat_hash_map<IdxType, FloatType> hop_map, hop_map2;
for (int iters = 0; iters < importance_sampling || importance_sampling < 0;
iters++) {
// NOTE(mfbalin) When the graph is unweighted, the first c values in
// the first iteration can be computed in O(1) as k / d where k is fanout
// and d is the degree.
// If the graph is weighted, the first c values are computed in the inner
// for loop instead. Therefore the importance_sampling argument should be
// increased by one in the caller.
// The later iterations will have correct c values so the if block will be
// executed.
if (!weighted || iters) {
hop_map2.clear();
for (int64_t i = 0; i < num_rows; ++i) {
const FloatType c = cs[i];
const IdxType rid = rows_data[i];
for (auto j = indptr[rid]; j < indptr[rid + 1]; j++) {
const auto ct = c * (weighted && iters == 1 ? A[j] : 1);
auto itb = hop_map2.emplace(indices[j], ct);
if (!itb.second) itb.first->second = std::max(ct, itb.first->second);
}
}
if (hop_map.empty())
hop_map = std::move(hop_map2);
else
// Update the pi array according to Eq 18.
for (auto it : hop_map2) hop_map[it.first] *= it.second;
}
// Compute c_s according to Equation (15), (17) is slower because sorting is
// required.
for (int64_t i = 0; i < num_rows; ++i) {
const IdxType rid = rows_data[i];
const auto d = indptr[rid + 1] - indptr[rid];
if (d == 0) continue;
const auto k = std::min(num_picks, d);
if (hop_map.empty()) { // weighted first iter, pi = A
for (auto j = indptr[rid]; j < indptr[rid + 1]; j++)
ps[j - indptr[rid]] = A[j];
} else {
for (auto j = indptr[rid]; j < indptr[rid + 1]; j++)
ps[j - indptr[rid]] = hop_map[indices[j]];
}
// stands for RHS of Equation (22) in arXiv:2210.13339 after moving the
// other terms without c_s to RHS.
double var_target = ds[i] * ds[i] / k;
if (weighted) {
var_target -= ds[i] * ds[i] / d;
for (auto j = indptr[rid]; j < indptr[rid + 1]; j++)
var_target += A[j] * A[j];
}
FloatType c = cs[i];
// stands for left handside of Equation (22) in arXiv:2210.13339 after
// moving the other terms without c_s to RHS.
double var_1;
// Compute c_s in Equation (22) via fixed-point iteration.
do {
var_1 = 0;
if (weighted) {
for (auto j = indptr[rid]; j < indptr[rid + 1]; j++)
var_1 += A[j] * A[j] / std::min(ONE, c * ps[j - indptr[rid]]);
} else {
for (auto j = indptr[rid]; j < indptr[rid + 1]; j++)
var_1 += ONE / std::min(ONE, c * ps[j - indptr[rid]]);
}
c *= var_1 / var_target;
} while (std::min(var_1, var_target) / std::max(var_1, var_target) <
1 - eps);
cs[i] = c;
}
// Check convergence
if (!weighted || iters) {
double cur_ex_nodes = 0;
for (auto it : hop_map) cur_ex_nodes += std::min((FloatType)1, it.second);
if (cur_ex_nodes / prev_ex_nodes >= 1 - eps) break;
prev_ex_nodes = cur_ex_nodes;
}
}
return hop_map;
}
// Template for picking non-zero values row-wise.
template <typename IdxType, typename FloatType>
std::pair<COOMatrix, FloatArray> CSRLaborPick(
CSRMatrix mat, IdArray rows, int64_t num_picks, FloatArray prob,
int importance_sampling, IdArray random_seed_arr, IdArray NIDs) {
using namespace aten;
const IdxType* indptr = mat.indptr.Ptr<IdxType>();
const IdxType* indices = mat.indices.Ptr<IdxType>();
const IdxType* data = CSRHasData(mat) ? mat.data.Ptr<IdxType>() : nullptr;
const IdxType* rows_data = rows.Ptr<IdxType>();
const IdxType* nids = IsNullArray(NIDs) ? nullptr : NIDs.Ptr<IdxType>();
const auto num_rows = rows->shape[0];
const auto& ctx = mat.indptr->ctx;
const bool weighted = !IsNullArray(prob);
// O(1) c computation not possible, so one more iteration is needed.
if (importance_sampling >= 0) importance_sampling += weighted;
// A stands for the same notation in arXiv:2210.13339, i.e. the edge weights.
auto A_arr = prob;
FloatType* A = A_arr.Ptr<FloatType>();
constexpr FloatType ONE = 1;
constexpr auto dtype = DGLDataTypeTraits<FloatType>::dtype;
// cs stands for c_s in arXiv:2210.13339
FloatArray cs_array = NDArray::Empty({num_rows}, dtype, ctx);
FloatType* cs = cs_array.Ptr<FloatType>();
// ds stands for A_{*s} in arXiv:2210.13339
FloatArray ds_array = NDArray::Empty({num_rows}, dtype, ctx);
FloatType* ds = ds_array.Ptr<FloatType>();
IdxType max_degree = 1;
IdxType hop_size = 0;
for (int64_t i = 0; i < num_rows; ++i) {
const IdxType rid = rows_data[i];
const auto act_degree = indptr[rid + 1] - indptr[rid];
max_degree = std::max(act_degree, max_degree);
double d = weighted
? std::accumulate(A + indptr[rid], A + indptr[rid + 1], 0.0)
: act_degree;
// O(1) c computation, samples more than needed for weighted case, mentioned
// in the sentence between (10) and (11) in arXiv:2210.13339
cs[i] = num_picks / d;
ds[i] = d;
hop_size += act_degree;
}
phmap::flat_hash_map<IdxType, FloatType> hop_map;
if (importance_sampling)
hop_map = compute_importance_sampling_probabilities<IdxType, FloatType>(
ctx, dtype, max_degree, num_rows, importance_sampling, weighted,
rows_data, indptr, A, indices, (IdxType)num_picks, ds, cs);
constexpr auto vidtype = DGLDataTypeTraits<IdxType>::dtype;
IdArray picked_row = NDArray::Empty({hop_size}, vidtype, ctx);
IdArray picked_col = NDArray::Empty({hop_size}, vidtype, ctx);
IdArray picked_idx = NDArray::Empty({hop_size}, vidtype, ctx);
FloatArray picked_imp =
NDArray::Empty({importance_sampling ? hop_size : 0}, dtype, ctx);
IdxType* picked_rdata = picked_row.Ptr<IdxType>();
IdxType* picked_cdata = picked_col.Ptr<IdxType>();
IdxType* picked_idata = picked_idx.Ptr<IdxType>();
FloatType* picked_imp_data = picked_imp.Ptr<FloatType>();
const uint64_t random_seed =
IsNullArray(random_seed_arr)
? RandomEngine::ThreadLocal()->RandInt(1000000000)
: random_seed_arr.Ptr<int64_t>()[0];
// compute number of edges first and do sampling
IdxType num_edges = 0;
for (int64_t i = 0; i < num_rows; i++) {
const IdxType rid = rows_data[i];
const auto c = cs[i];
FloatType norm_inv_p = 0;
const auto off = num_edges;
for (auto j = indptr[rid]; j < indptr[rid + 1]; j++) {
const auto v = indices[j];
const auto t = nids ? nids[v] : v; // t in the paper
pcg32 ng(random_seed, t);
std::uniform_real_distribution<FloatType> uni;
// rolled random number r_t is a function of the random_seed and t
const auto rnd = uni(ng);
const auto w = (weighted ? A[j] : 1);
// if hop_map is initialized, get ps from there, otherwise get it from the
// alternative.
const auto ps = std::min(
ONE, importance_sampling - weighted ? c * hop_map[v] : c * w);
if (rnd <= ps) {
picked_rdata[num_edges] = rid;
picked_cdata[num_edges] = v;
picked_idata[num_edges] = data ? data[j] : j;
if (importance_sampling) {
const auto edge_weight = w / ps;
norm_inv_p += edge_weight;
picked_imp_data[num_edges] = edge_weight;
}
num_edges++;
}
}
if (importance_sampling) {
const auto norm_factor = (num_edges - off) / norm_inv_p;
for (auto i = off; i < num_edges; i++)
// so that fn.mean can be used
picked_imp_data[i] *= norm_factor;
}
}
picked_row = picked_row.CreateView({num_edges}, picked_row->dtype);
picked_col = picked_col.CreateView({num_edges}, picked_col->dtype);
picked_idx = picked_idx.CreateView({num_edges}, picked_idx->dtype);
if (importance_sampling)
picked_imp = picked_imp.CreateView({num_edges}, picked_imp->dtype);
return std::make_pair(
COOMatrix(mat.num_rows, mat.num_cols, picked_row, picked_col, picked_idx),
picked_imp);
}
// Template for picking non-zero values row-wise. The implementation first
// slices out the corresponding rows and then converts it to CSR format. It then
// performs row-wise pick on the CSR matrix and rectifies the returned results.
template <typename IdxType, typename FloatType>
std::pair<COOMatrix, FloatArray> COOLaborPick(
COOMatrix mat, IdArray rows, int64_t num_picks, FloatArray prob,
int importance_sampling, IdArray random_seed, IdArray NIDs) {
using namespace aten;
const auto& csr = COOToCSR(COOSliceRows(mat, rows));
const IdArray new_rows =
Range(0, rows->shape[0], rows->dtype.bits, rows->ctx);
const auto&& picked_importances = CSRLaborPick<IdxType, FloatType>(
csr, new_rows, num_picks, prob, importance_sampling, random_seed, NIDs);
const auto& picked = picked_importances.first;
const auto& importances = picked_importances.second;
return std::make_pair(
COOMatrix(
mat.num_rows, mat.num_cols,
IndexSelect(
rows, picked.row), // map the row index to the correct one
picked.col, picked.data),
importances);
}
} // namespace impl
} // namespace aten
} // namespace dgl
#endif // DGL_ARRAY_CPU_LABOR_PICK_H_
/*!
* Copyright (c) 2022, NVIDIA Corporation
* Copyright (c) 2022, GT-TDAlab (Muhammed Fatih Balin & Umit V. Catalyurek)
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* \file array/cuda/labor_sampling.cc
* \brief labor sampling
*/
#include "./labor_pick.h"
namespace dgl {
namespace aten {
namespace impl {
/////////////////////////////// CSR ///////////////////////////////
template <DGLDeviceType XPU, typename IdxType, typename FloatType>
std::pair<COOMatrix, FloatArray> CSRLaborSampling(
CSRMatrix mat, IdArray rows, int64_t num_samples, FloatArray prob,
int importance_sampling, IdArray random_seed, IdArray NIDs) {
return CSRLaborPick<IdxType, FloatType>(
mat, rows, num_samples, prob, importance_sampling, random_seed, NIDs);
}
template std::pair<COOMatrix, FloatArray>
CSRLaborSampling<kDGLCPU, int32_t, float>(
CSRMatrix, IdArray, int64_t, FloatArray, int, IdArray, IdArray);
template std::pair<COOMatrix, FloatArray>
CSRLaborSampling<kDGLCPU, int64_t, float>(
CSRMatrix, IdArray, int64_t, FloatArray, int, IdArray, IdArray);
template std::pair<COOMatrix, FloatArray>
CSRLaborSampling<kDGLCPU, int32_t, double>(
CSRMatrix, IdArray, int64_t, FloatArray, int, IdArray, IdArray);
template std::pair<COOMatrix, FloatArray>
CSRLaborSampling<kDGLCPU, int64_t, double>(
CSRMatrix, IdArray, int64_t, FloatArray, int, IdArray, IdArray);
/////////////////////////////// COO ///////////////////////////////
template <DGLDeviceType XPU, typename IdxType, typename FloatType>
std::pair<COOMatrix, FloatArray> COOLaborSampling(
COOMatrix mat, IdArray rows, int64_t num_samples, FloatArray prob,
int importance_sampling, IdArray random_seed, IdArray NIDs) {
return COOLaborPick<IdxType, FloatType>(
mat, rows, num_samples, prob, importance_sampling, random_seed, NIDs);
}
template std::pair<COOMatrix, FloatArray>
COOLaborSampling<kDGLCPU, int32_t, float>(
COOMatrix, IdArray, int64_t, FloatArray, int, IdArray, IdArray);
template std::pair<COOMatrix, FloatArray>
COOLaborSampling<kDGLCPU, int64_t, float>(
COOMatrix, IdArray, int64_t, FloatArray, int, IdArray, IdArray);
template std::pair<COOMatrix, FloatArray>
COOLaborSampling<kDGLCPU, int32_t, double>(
COOMatrix, IdArray, int64_t, FloatArray, int, IdArray, IdArray);
template std::pair<COOMatrix, FloatArray>
COOLaborSampling<kDGLCPU, int64_t, double>(
COOMatrix, IdArray, int64_t, FloatArray, int, IdArray, IdArray);
} // namespace impl
} // namespace aten
} // namespace dgl
/*!
* Copyright (c) 2022, NVIDIA Corporation
* Copyright (c) 2022, GT-TDAlab (Muhammed Fatih Balin & Umit V. Catalyurek)
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* \file array/cuda/labor_sampling.cu
* \brief labor sampling
*/
#include <curand_kernel.h>
#include <dgl/aten/coo.h>
#include <dgl/random.h>
#include <dgl/runtime/device_api.h>
#include <thrust/binary_search.h>
#include <thrust/copy.h>
#include <thrust/execution_policy.h>
#include <thrust/gather.h>
#include <thrust/iterator/constant_iterator.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/zip_iterator.h>
#include <thrust/shuffle.h>
#include <thrust/transform.h>
#include <thrust/zip_function.h>
#include <algorithm>
#include <limits>
#include <numeric>
#include <type_traits>
#include <utility>
#include "../../array/cuda/atomic.cuh"
#include "../../array/cuda/utils.h"
#include "../../graph/transform/cuda/cuda_map_edges.cuh"
#include "../../runtime/cuda/cuda_common.h"
#include "./dgl_cub.cuh"
#include "./functor.cuh"
#include "./spmm.cuh"
namespace dgl {
namespace aten {
namespace impl {
constexpr int BLOCK_SIZE = 128;
constexpr int CTA_SIZE = 128;
constexpr double eps = 0.0001;
namespace {
template <typename IdType>
struct TransformOp {
const IdType* idx_coo;
const IdType* rows;
const IdType* indptr;
const IdType* subindptr;
const IdType* indices;
const IdType* data_arr;
__host__ __device__ auto operator()(IdType idx) {
const auto in_row = idx_coo[idx];
const auto row = rows[in_row];
const auto in_idx = indptr[row] + idx - subindptr[in_row];
const auto u = indices[in_idx];
const auto data = data_arr ? data_arr[in_idx] : in_idx;
return thrust::make_tuple(row, u, data);
}
};
template <
typename IdType, typename FloatType, typename probs_t, typename A_t,
typename B_t>
struct TransformOpImp {
probs_t probs;
A_t A;
B_t B;
const IdType* idx_coo;
const IdType* rows;
const FloatType* cs;
const IdType* indptr;
const IdType* subindptr;
const IdType* indices;
const IdType* data_arr;
__host__ __device__ auto operator()(IdType idx) {
const auto ps = probs[idx];
const auto in_row = idx_coo[idx];
const auto c = cs[in_row];
const auto row = rows[in_row];
const auto in_idx = indptr[row] + idx - subindptr[in_row];
const auto u = indices[in_idx];
const auto w = A[in_idx];
const auto w2 = B[in_idx];
const auto data = data_arr ? data_arr[in_idx] : in_idx;
return thrust::make_tuple(
in_row, row, u, data, w / min((FloatType)1, c * w2 * ps));
}
};
template <typename FloatType>
struct StencilOp {
const FloatType* cs;
template <typename IdType>
__host__ __device__ auto operator()(
IdType in_row, FloatType ps, FloatType rnd) {
return rnd <= cs[in_row] * ps;
}
};
template <typename IdType, typename FloatType, typename ps_t, typename A_t>
struct StencilOpFused {
const uint64_t rand_seed;
const IdType* idx_coo;
const FloatType* cs;
const ps_t probs;
const A_t A;
const IdType* subindptr;
const IdType* rows;
const IdType* indptr;
const IdType* indices;
const IdType* nids;
__device__ auto operator()(IdType idx) {
const auto in_row = idx_coo[idx];
const auto ps = probs[idx];
IdType rofs = idx - subindptr[in_row];
const IdType row = rows[in_row];
const auto in_idx = indptr[row] + rofs;
const auto u = indices[in_idx];
const auto t = nids ? nids[u] : u; // t in the paper
curandStatePhilox4_32_10_t rng;
// rolled random number r_t is a function of the random_seed and t
curand_init(123123, rand_seed, t, &rng);
const float rnd = curand_uniform(&rng);
return rnd <= cs[in_row] * A[in_idx] * ps;
}
};
template <typename IdType, typename FloatType>
struct TransformOpMean {
const IdType* ds;
const FloatType* ws;
__host__ __device__ auto operator()(IdType idx, FloatType ps) {
return ps * ds[idx] / ws[idx];
}
};
struct TransformOpMinWith1 {
template <typename FloatType>
__host__ __device__ auto operator()(FloatType x) {
return min((FloatType)1, x);
}
};
template <typename IdType>
struct IndptrFunc {
const IdType* indptr;
__host__ __device__ auto operator()(IdType row) { return indptr[row]; }
};
template <typename FloatType>
struct SquareFunc {
__host__ __device__ auto operator()(FloatType x) {
return thrust::make_tuple(x, x * x);
}
};
struct TupleSum {
template <typename T>
__host__ __device__ T operator()(const T& a, const T& b) const {
return thrust::make_tuple(
thrust::get<0>(a) + thrust::get<0>(b),
thrust::get<1>(a) + thrust::get<1>(b));
}
};
template <typename IdType, typename FloatType>
struct DegreeFunc {
const IdType num_picks;
const IdType* rows;
const IdType* indptr;
const FloatType* ds;
IdType* in_deg;
FloatType* cs;
__host__ __device__ auto operator()(IdType tIdx) {
const auto out_row = rows[tIdx];
const auto d = indptr[out_row + 1] - indptr[out_row];
in_deg[tIdx] = d;
cs[tIdx] = num_picks / (ds ? ds[tIdx] : (FloatType)d);
}
};
template <typename IdType, typename FloatType>
__global__ void _CSRRowWiseOneHopExtractorKernel(
const uint64_t rand_seed, const IdType hop_size, const IdType* const rows,
const IdType* const indptr, const IdType* const subindptr,
const IdType* const indices, const IdType* const idx_coo,
const IdType* const nids, const FloatType* const A, FloatType* const rands,
IdType* const hop, FloatType* const A_l) {
IdType tx = static_cast<IdType>(blockIdx.x) * blockDim.x + threadIdx.x;
const int stride_x = gridDim.x * blockDim.x;
curandStatePhilox4_32_10_t rng;
while (tx < hop_size) {
IdType rpos = idx_coo[tx];
IdType rofs = tx - subindptr[rpos];
const IdType row = rows[rpos];
const auto in_idx = indptr[row] + rofs;
const auto u = indices[in_idx];
hop[tx] = u;
const auto v = nids ? nids[u] : u;
// 123123 is just a number with no significance.
curand_init(123123, rand_seed, v, &rng);
const float rnd = curand_uniform(&rng);
if (A) A_l[tx] = A[in_idx];
rands[tx] = (FloatType)rnd;
tx += stride_x;
}
}
template <typename IdType, typename FloatType, int BLOCK_CTAS, int TILE_SIZE>
__global__ void _CSRRowWiseLayerSampleDegreeKernel(
const IdType num_picks, const IdType num_rows, const IdType* const rows,
FloatType* const cs, const FloatType* const ds, const FloatType* const d2s,
const IdType* const indptr, const FloatType* const probs,
const FloatType* const A, const IdType* const subindptr) {
typedef cub::BlockReduce<FloatType, BLOCK_SIZE> BlockReduce;
__shared__ typename BlockReduce::TempStorage temp_storage;
__shared__ FloatType var_1_bcast[BLOCK_CTAS];
// we assign one warp per row
assert(blockDim.x == CTA_SIZE);
assert(blockDim.y == BLOCK_CTAS);
IdType out_row = blockIdx.x * TILE_SIZE + threadIdx.y;
const auto last_row =
min(static_cast<IdType>(blockIdx.x + 1) * TILE_SIZE, num_rows);
constexpr FloatType ONE = 1;
while (out_row < last_row) {
const auto row = rows[out_row];
const auto in_row_start = indptr[row];
const auto out_row_start = subindptr[out_row];
const IdType degree = indptr[row + 1] - in_row_start;
if (degree > 0) {
// stands for k in in arXiv:2210.13339, i.e. fanout
const auto k = min(num_picks, degree);
// slightly better than NS
const FloatType d_ = ds ? ds[row] : degree;
// stands for right handside of Equation (22) in arXiv:2210.13339
FloatType var_target =
d_ * d_ / k + (ds ? d2s[row] - d_ * d_ / degree : 0);
auto c = cs[out_row];
const int num_valid = min(degree, (IdType)CTA_SIZE);
// stands for left handside of Equation (22) in arXiv:2210.13339
FloatType var_1;
do {
var_1 = 0;
if (A) {
for (int idx = threadIdx.x; idx < degree; idx += CTA_SIZE) {
const auto w = A[in_row_start + idx];
const auto ps = probs ? probs[out_row_start + idx] : w;
var_1 += w * w / min(ONE, c * ps);
}
} else {
for (int idx = threadIdx.x; idx < degree; idx += CTA_SIZE) {
const auto ps = probs[out_row_start + idx];
var_1 += 1 / min(ONE, c * ps);
}
}
var_1 = BlockReduce(temp_storage).Sum(var_1, num_valid);
if (threadIdx.x == 0) var_1_bcast[threadIdx.y] = var_1;
__syncthreads();
var_1 = var_1_bcast[threadIdx.y];
c *= var_1 / var_target;
} while (min(var_1, var_target) / max(var_1, var_target) < 1 - eps);
if (threadIdx.x == 0) cs[out_row] = c;
}
out_row += BLOCK_CTAS;
}
}
} // namespace
template <typename IdType, typename FloatType, typename exec_policy_t>
void compute_importance_sampling_probabilities(
CSRMatrix mat, const IdType hop_size, cudaStream_t stream,
const uint64_t random_seed, const IdType num_rows, const IdType* rows,
const IdType* indptr, const IdType* subindptr, const IdType* indices,
IdArray idx_coo_arr, const IdType* nids,
FloatArray cs_arr, // holds the computed cs values, has size num_rows
const bool weighted, const FloatType* A, const FloatType* ds,
const FloatType* d2s, const IdType num_picks, DGLContext ctx,
const runtime::CUDAWorkspaceAllocator& allocator,
const exec_policy_t& exec_policy, const int importance_sampling,
IdType* hop_1, // holds the contiguous one-hop neighborhood, has size |E|
FloatType* rands, // holds the rolled random numbers r_t for each edge, has
// size |E|
FloatType* probs_found) { // holds the computed pi_t values for each edge,
// has size |E|
auto device = runtime::DeviceAPI::Get(ctx);
auto idx_coo = idx_coo_arr.Ptr<IdType>();
auto cs = cs_arr.Ptr<FloatType>();
FloatArray A_l_arr = weighted
? NewFloatArray(hop_size, ctx, sizeof(FloatType) * 8)
: NullArray();
auto A_l = A_l_arr.Ptr<FloatType>();
const uint64_t max_log_num_vertices = [&]() -> int {
for (int i = 0; i < static_cast<int>(sizeof(IdType)) * 8; i++)
if (mat.num_cols <= ((IdType)1) << i) return i;
return sizeof(IdType) * 8;
}();
{ // extracts the onehop neighborhood cols to a contiguous range into hop_1
const dim3 block(BLOCK_SIZE);
const dim3 grid((hop_size + BLOCK_SIZE - 1) / BLOCK_SIZE);
CUDA_KERNEL_CALL(
(_CSRRowWiseOneHopExtractorKernel<IdType, FloatType>), grid, block, 0,
stream, random_seed, hop_size, rows, indptr, subindptr, indices,
idx_coo, nids, weighted ? A : nullptr, rands, hop_1, A_l);
}
int64_t hop_uniq_size = 0;
IdArray hop_new_arr = NewIdArray(hop_size, ctx, sizeof(IdType) * 8);
auto hop_new = hop_new_arr.Ptr<IdType>();
auto hop_unique = allocator.alloc_unique<IdType>(hop_size);
// After this block, hop_unique holds the unique set of one-hop neighborhood
// and hop_new holds the relabeled hop_1, idx_coo already holds relabeled
// destination. hop_unique[hop_new] == hop_1 holds
{
auto hop_2 = allocator.alloc_unique<IdType>(hop_size);
auto hop_3 = allocator.alloc_unique<IdType>(hop_size);
device->CopyDataFromTo(
hop_1, 0, hop_2.get(), 0, sizeof(IdType) * hop_size, ctx, ctx,
mat.indptr->dtype);
cub::DoubleBuffer<IdType> hop_b(hop_2.get(), hop_3.get());
{
std::size_t temp_storage_bytes = 0;
CUDA_CALL(cub::DeviceRadixSort::SortKeys(
nullptr, temp_storage_bytes, hop_b, hop_size, 0, max_log_num_vertices,
stream));
auto temp = allocator.alloc_unique<char>(temp_storage_bytes);
CUDA_CALL(cub::DeviceRadixSort::SortKeys(
temp.get(), temp_storage_bytes, hop_b, hop_size, 0,
max_log_num_vertices, stream));
}
auto hop_counts = allocator.alloc_unique<IdType>(hop_size + 1);
auto hop_unique_size = allocator.alloc_unique<int64_t>(1);
{
std::size_t temp_storage_bytes = 0;
CUDA_CALL(cub::DeviceRunLengthEncode::Encode(
nullptr, temp_storage_bytes, hop_b.Current(), hop_unique.get(),
hop_counts.get(), hop_unique_size.get(), hop_size, stream));
auto temp = allocator.alloc_unique<char>(temp_storage_bytes);
CUDA_CALL(cub::DeviceRunLengthEncode::Encode(
temp.get(), temp_storage_bytes, hop_b.Current(), hop_unique.get(),
hop_counts.get(), hop_unique_size.get(), hop_size, stream));
device->CopyDataFromTo(
hop_unique_size.get(), 0, &hop_uniq_size, 0, sizeof(hop_uniq_size),
ctx, DGLContext{kDGLCPU, 0}, mat.indptr->dtype);
}
thrust::lower_bound(
exec_policy, hop_unique.get(), hop_unique.get() + hop_uniq_size, hop_1,
hop_1 + hop_size, hop_new);
}
// @todo Consider creating a CSC because the SpMV will be done multiple times.
COOMatrix rmat(
num_rows, hop_uniq_size, idx_coo_arr, hop_new_arr, NullArray(), true,
mat.sorted);
BcastOff bcast_off;
bcast_off.use_bcast = false;
bcast_off.out_len = 1;
bcast_off.lhs_len = 1;
bcast_off.rhs_len = 1;
FloatArray probs_arr =
NewFloatArray(hop_uniq_size, ctx, sizeof(FloatType) * 8);
auto probs_1 = probs_arr.Ptr<FloatType>();
FloatArray probs_arr_2 =
NewFloatArray(hop_uniq_size, ctx, sizeof(FloatType) * 8);
auto probs = probs_arr_2.Ptr<FloatType>();
auto arg_u = NewIdArray(hop_uniq_size, ctx, sizeof(IdType) * 8);
auto arg_e = NewIdArray(hop_size, ctx, sizeof(IdType) * 8);
double prev_ex_nodes = hop_uniq_size;
for (int iters = 0; iters < importance_sampling || importance_sampling < 0;
iters++) {
if (weighted && iters == 0) {
cuda::SpMMCoo<
IdType, FloatType, cuda::binary::Mul<FloatType>,
cuda::reduce::Max<IdType, FloatType, true>>(
bcast_off, rmat, cs_arr, A_l_arr, probs_arr_2, arg_u, arg_e);
} else {
cuda::SpMMCoo<
IdType, FloatType, cuda::binary::CopyLhs<FloatType>,
cuda::reduce::Max<IdType, FloatType, true>>(
bcast_off, rmat, cs_arr, NullArray(), iters ? probs_arr : probs_arr_2,
arg_u, arg_e);
}
if (iters)
thrust::transform(
exec_policy, probs_1, probs_1 + hop_uniq_size, probs, probs,
thrust::multiplies<FloatType>{});
thrust::gather(
exec_policy, hop_new, hop_new + hop_size, probs, probs_found);
{
constexpr int BLOCK_CTAS = BLOCK_SIZE / CTA_SIZE;
// the number of rows each thread block will cover
constexpr int TILE_SIZE = BLOCK_CTAS;
const dim3 block(CTA_SIZE, BLOCK_CTAS);
const dim3 grid((num_rows + TILE_SIZE - 1) / TILE_SIZE);
CUDA_KERNEL_CALL(
(_CSRRowWiseLayerSampleDegreeKernel<
IdType, FloatType, BLOCK_CTAS, TILE_SIZE>),
grid, block, 0, stream, (IdType)num_picks, num_rows, rows, cs,
weighted ? ds : nullptr, weighted ? d2s : nullptr, indptr,
probs_found, A, subindptr);
}
{
auto probs_min_1 =
thrust::make_transform_iterator(probs, TransformOpMinWith1{});
const double cur_ex_nodes = thrust::reduce(
exec_policy, probs_min_1, probs_min_1 + hop_uniq_size, 0.0);
if (cur_ex_nodes / prev_ex_nodes >= 1 - eps) break;
prev_ex_nodes = cur_ex_nodes;
}
}
}
/////////////////////////////// CSR ///////////////////////////////
template <DGLDeviceType XPU, typename IdType, typename FloatType>
std::pair<COOMatrix, FloatArray> CSRLaborSampling(
CSRMatrix mat, IdArray rows_arr, const int64_t num_picks,
FloatArray prob_arr, const int importance_sampling, IdArray random_seed_arr,
IdArray NIDs) {
const bool weighted = !IsNullArray(prob_arr);
const auto& ctx = rows_arr->ctx;
runtime::CUDAWorkspaceAllocator allocator(ctx);
const auto stream = runtime::getCurrentCUDAStream();
const auto exec_policy = thrust::cuda::par_nosync(allocator).on(stream);
auto device = runtime::DeviceAPI::Get(ctx);
const IdType num_rows = rows_arr->shape[0];
IdType* const rows = rows_arr.Ptr<IdType>();
IdType* const nids = IsNullArray(NIDs) ? nullptr : NIDs.Ptr<IdType>();
FloatType* const A = prob_arr.Ptr<FloatType>();
IdType* const indptr = mat.indptr.Ptr<IdType>();
IdType* const indices = mat.indices.Ptr<IdType>();
IdType* const data = CSRHasData(mat) ? mat.data.Ptr<IdType>() : nullptr;
// compute in-degrees
auto in_deg = allocator.alloc_unique<IdType>(num_rows + 1);
// cs stands for c_s in arXiv:2210.13339
FloatArray cs_arr = NewFloatArray(num_rows, ctx, sizeof(FloatType) * 8);
auto cs = cs_arr.Ptr<FloatType>();
// ds stands for A_{*s} in arXiv:2210.13339
FloatArray ds_arr = weighted
? NewFloatArray(num_rows, ctx, sizeof(FloatType) * 8)
: NullArray();
auto ds = ds_arr.Ptr<FloatType>();
// d2s stands for (A^2)_{*s} in arXiv:2210.13339, ^2 is elementwise.
FloatArray d2s_arr = weighted
? NewFloatArray(num_rows, ctx, sizeof(FloatType) * 8)
: NullArray();
auto d2s = d2s_arr.Ptr<FloatType>();
if (weighted) {
auto b_offsets =
thrust::make_transform_iterator(rows, IndptrFunc<IdType>{indptr});
auto e_offsets =
thrust::make_transform_iterator(rows, IndptrFunc<IdType>{indptr + 1});
auto A_A2 = thrust::make_transform_iterator(A, SquareFunc<FloatType>{});
auto ds_d2s = thrust::make_zip_iterator(ds, d2s);
size_t prefix_temp_size = 0;
CUDA_CALL(cub::DeviceSegmentedReduce::Reduce(
nullptr, prefix_temp_size, A_A2, ds_d2s, num_rows, b_offsets, e_offsets,
TupleSum{}, thrust::make_tuple((FloatType)0, (FloatType)0), stream));
auto temp = allocator.alloc_unique<char>(prefix_temp_size);
CUDA_CALL(cub::DeviceSegmentedReduce::Reduce(
temp.get(), prefix_temp_size, A_A2, ds_d2s, num_rows, b_offsets,
e_offsets, TupleSum{}, thrust::make_tuple((FloatType)0, (FloatType)0),
stream));
}
thrust::counting_iterator<IdType> iota(0);
thrust::for_each(
exec_policy, iota, iota + num_rows,
DegreeFunc<IdType, FloatType>{
(IdType)num_picks, rows, indptr, weighted ? ds : nullptr,
in_deg.get(), cs});
// fill subindptr
IdArray subindptr_arr = NewIdArray(num_rows + 1, ctx, sizeof(IdType) * 8);
auto subindptr = subindptr_arr.Ptr<IdType>();
IdType hop_size;
{
size_t prefix_temp_size = 0;
CUDA_CALL(cub::DeviceScan::ExclusiveSum(
nullptr, prefix_temp_size, in_deg.get(), subindptr, num_rows + 1,
stream));
auto temp = allocator.alloc_unique<char>(prefix_temp_size);
CUDA_CALL(cub::DeviceScan::ExclusiveSum(
temp.get(), prefix_temp_size, in_deg.get(), subindptr, num_rows + 1,
stream));
device->CopyDataFromTo(
subindptr, num_rows * sizeof(hop_size), &hop_size, 0, sizeof(hop_size),
ctx, DGLContext{kDGLCPU, 0}, mat.indptr->dtype);
}
IdArray hop_arr = NewIdArray(hop_size, ctx, sizeof(IdType) * 8);
CSRMatrix smat(
num_rows, mat.num_cols, subindptr_arr, hop_arr, NullArray(), mat.sorted);
// @todo Consider fusing CSRToCOO into StencilOpFused kernel
auto smatcoo = CSRToCOO(smat, false);
auto idx_coo_arr = smatcoo.row;
auto idx_coo = idx_coo_arr.Ptr<IdType>();
auto hop_1 = hop_arr.Ptr<IdType>();
auto rands =
allocator.alloc_unique<FloatType>(importance_sampling ? hop_size : 1);
auto probs_found =
allocator.alloc_unique<FloatType>(importance_sampling ? hop_size : 1);
if (weighted) {
// Recompute c for weighted graphs.
constexpr int BLOCK_CTAS = BLOCK_SIZE / CTA_SIZE;
// the number of rows each thread block will cover
constexpr int TILE_SIZE = BLOCK_CTAS;
const dim3 block(CTA_SIZE, BLOCK_CTAS);
const dim3 grid((num_rows + TILE_SIZE - 1) / TILE_SIZE);
CUDA_KERNEL_CALL(
(_CSRRowWiseLayerSampleDegreeKernel<
IdType, FloatType, BLOCK_CTAS, TILE_SIZE>),
grid, block, 0, stream, (IdType)num_picks, num_rows, rows, cs, ds, d2s,
indptr, nullptr, A, subindptr);
}
const uint64_t random_seed =
IsNullArray(random_seed_arr)
? RandomEngine::ThreadLocal()->RandInt(1000000000)
: random_seed_arr.Ptr<int64_t>()[0];
if (importance_sampling)
compute_importance_sampling_probabilities<
IdType, FloatType, decltype(exec_policy)>(
mat, hop_size, stream, random_seed, num_rows, rows, indptr, subindptr,
indices, idx_coo_arr, nids, cs_arr, weighted, A, ds, d2s,
(IdType)num_picks, ctx, allocator, exec_policy, importance_sampling,
hop_1, rands.get(), probs_found.get());
IdArray picked_row = NewIdArray(hop_size, ctx, sizeof(IdType) * 8);
IdArray picked_col = NewIdArray(hop_size, ctx, sizeof(IdType) * 8);
IdArray picked_idx = NewIdArray(hop_size, ctx, sizeof(IdType) * 8);
FloatArray picked_imp =
importance_sampling || weighted
? NewFloatArray(hop_size, ctx, sizeof(FloatType) * 8)
: NullArray();
IdType* const picked_row_data = picked_row.Ptr<IdType>();
IdType* const picked_col_data = picked_col.Ptr<IdType>();
IdType* const picked_idx_data = picked_idx.Ptr<IdType>();
FloatType* const picked_imp_data = picked_imp.Ptr<FloatType>();
auto picked_inrow = allocator.alloc_unique<IdType>(
importance_sampling || weighted ? hop_size : 1);
// Sample edges here
IdType num_edges;
{
thrust::constant_iterator<FloatType> one(1);
if (importance_sampling) {
auto output = thrust::make_zip_iterator(
picked_inrow.get(), picked_row_data, picked_col_data, picked_idx_data,
picked_imp_data);
if (weighted) {
auto transformed_output = thrust::make_transform_output_iterator(
output,
TransformOpImp<
IdType, FloatType, FloatType*, FloatType*, decltype(one)>{
probs_found.get(), A, one, idx_coo, rows, cs, indptr, subindptr,
indices, data});
auto stencil =
thrust::make_zip_iterator(idx_coo, probs_found.get(), rands.get());
num_edges =
thrust::copy_if(
exec_policy, iota, iota + hop_size, stencil, transformed_output,
thrust::make_zip_function(StencilOp<FloatType>{cs})) -
transformed_output;
} else {
auto transformed_output = thrust::make_transform_output_iterator(
output,
TransformOpImp<
IdType, FloatType, FloatType*, decltype(one), decltype(one)>{
probs_found.get(), one, one, idx_coo, rows, cs, indptr,
subindptr, indices, data});
auto stencil =
thrust::make_zip_iterator(idx_coo, probs_found.get(), rands.get());
num_edges =
thrust::copy_if(
exec_policy, iota, iota + hop_size, stencil, transformed_output,
thrust::make_zip_function(StencilOp<FloatType>{cs})) -
transformed_output;
}
} else {
if (weighted) {
auto output = thrust::make_zip_iterator(
picked_inrow.get(), picked_row_data, picked_col_data,
picked_idx_data, picked_imp_data);
auto transformed_output = thrust::make_transform_output_iterator(
output,
TransformOpImp<
IdType, FloatType, decltype(one), FloatType*, FloatType*>{
one, A, A, idx_coo, rows, cs, indptr, subindptr, indices,
data});
const auto pred =
StencilOpFused<IdType, FloatType, decltype(one), FloatType*>{
random_seed, idx_coo, cs, one, A,
subindptr, rows, indptr, indices, nids};
num_edges = thrust::copy_if(
exec_policy, iota, iota + hop_size, iota,
transformed_output, pred) -
transformed_output;
} else {
auto output = thrust::make_zip_iterator(
picked_row_data, picked_col_data, picked_idx_data);
auto transformed_output = thrust::make_transform_output_iterator(
output, TransformOp<IdType>{
idx_coo, rows, indptr, subindptr, indices, data});
const auto pred =
StencilOpFused<IdType, FloatType, decltype(one), decltype(one)>{
random_seed, idx_coo, cs, one, one,
subindptr, rows, indptr, indices, nids};
num_edges = thrust::copy_if(
exec_policy, iota, iota + hop_size, iota,
transformed_output, pred) -
transformed_output;
}
}
}
// Normalize edge weights here
if (importance_sampling || weighted) {
thrust::constant_iterator<IdType> one(1);
// contains degree information
auto ds = allocator.alloc_unique<IdType>(num_rows);
// contains sum of edge weights
auto ws = allocator.alloc_unique<FloatType>(num_rows);
// contains degree information only for vertices with nonzero degree
auto ds_2 = allocator.alloc_unique<IdType>(num_rows);
// contains sum of edge weights only for vertices with nonzero degree
auto ws_2 = allocator.alloc_unique<FloatType>(num_rows);
auto output_ = thrust::make_zip_iterator(ds.get(), ws.get());
// contains row ids only for vertices with nonzero degree
auto keys = allocator.alloc_unique<IdType>(num_rows);
auto input = thrust::make_zip_iterator(one, picked_imp_data);
auto new_end = thrust::reduce_by_key(
exec_policy, picked_inrow.get(), picked_inrow.get() + num_edges, input,
keys.get(), output_, thrust::equal_to<IdType>{}, TupleSum{});
{
thrust::constant_iterator<IdType> zero_int(0);
thrust::constant_iterator<FloatType> zero_float(0);
auto input = thrust::make_zip_iterator(zero_int, zero_float);
auto output = thrust::make_zip_iterator(ds_2.get(), ws_2.get());
thrust::copy(exec_policy, input, input + num_rows, output);
{
const auto num_rows_2 = new_end.first - keys.get();
thrust::scatter(
exec_policy, output_, output_ + num_rows_2, keys.get(), output);
}
}
{
auto input =
thrust::make_zip_iterator(picked_inrow.get(), picked_imp_data);
auto transformed_input = thrust::make_transform_iterator(
input, thrust::make_zip_function(TransformOpMean<IdType, FloatType>{
ds_2.get(), ws_2.get()}));
thrust::copy(
exec_policy, transformed_input, transformed_input + num_edges,
picked_imp_data);
}
}
picked_row = picked_row.CreateView({num_edges}, picked_row->dtype);
picked_col = picked_col.CreateView({num_edges}, picked_col->dtype);
picked_idx = picked_idx.CreateView({num_edges}, picked_idx->dtype);
if (importance_sampling || weighted)
picked_imp = picked_imp.CreateView({num_edges}, picked_imp->dtype);
return std::make_pair(
COOMatrix(mat.num_rows, mat.num_cols, picked_row, picked_col, picked_idx),
picked_imp);
}
template std::pair<COOMatrix, FloatArray>
CSRLaborSampling<kDGLCUDA, int32_t, float>(
CSRMatrix, IdArray, int64_t, FloatArray, int, IdArray, IdArray);
template std::pair<COOMatrix, FloatArray>
CSRLaborSampling<kDGLCUDA, int64_t, float>(
CSRMatrix, IdArray, int64_t, FloatArray, int, IdArray, IdArray);
template std::pair<COOMatrix, FloatArray>
CSRLaborSampling<kDGLCUDA, int32_t, double>(
CSRMatrix, IdArray, int64_t, FloatArray, int, IdArray, IdArray);
template std::pair<COOMatrix, FloatArray>
CSRLaborSampling<kDGLCUDA, int64_t, double>(
CSRMatrix, IdArray, int64_t, FloatArray, int, IdArray, IdArray);
} // namespace impl
} // namespace aten
} // namespace dgl
/** /**
* Copyright (c) 2020-2021 by Contributors * Copyright (c) 2020-2022 by Contributors
* @file graph/sampling/neighbor.cc * @file graph/sampling/neighbor.cc
* @brief Definition of neighborhood-based sampler APIs. * @brief Definition of neighborhood-based sampler APIs.
*/ */
...@@ -10,6 +10,9 @@ ...@@ -10,6 +10,9 @@
#include <dgl/runtime/container.h> #include <dgl/runtime/container.h>
#include <dgl/sampling/neighbor.h> #include <dgl/sampling/neighbor.h>
#include <tuple>
#include <utility>
#include "../../../c_api_common.h" #include "../../../c_api_common.h"
#include "../../unit_graph.h" #include "../../unit_graph.h"
...@@ -19,11 +22,13 @@ using namespace dgl::aten; ...@@ -19,11 +22,13 @@ using namespace dgl::aten;
namespace dgl { namespace dgl {
namespace sampling { namespace sampling {
HeteroSubgraph ExcludeCertainEdges( std::pair<HeteroSubgraph, std::vector<FloatArray>> ExcludeCertainEdges(
const HeteroSubgraph& sg, const std::vector<IdArray>& exclude_edges) { const HeteroSubgraph& sg, const std::vector<IdArray>& exclude_edges,
const std::vector<FloatArray>* weights = nullptr) {
HeteroGraphPtr hg_view = HeteroGraphRef(sg.graph).sptr(); HeteroGraphPtr hg_view = HeteroGraphRef(sg.graph).sptr();
std::vector<IdArray> remain_induced_edges(hg_view->NumEdgeTypes()); std::vector<IdArray> remain_induced_edges(hg_view->NumEdgeTypes());
std::vector<IdArray> remain_edges(hg_view->NumEdgeTypes()); std::vector<IdArray> remain_edges(hg_view->NumEdgeTypes());
std::vector<FloatArray> remain_weights(hg_view->NumEdgeTypes());
for (dgl_type_t etype = 0; etype < hg_view->NumEdgeTypes(); ++etype) { for (dgl_type_t etype = 0; etype < hg_view->NumEdgeTypes(); ++etype) {
IdArray edge_ids = Range( IdArray edge_ids = Range(
...@@ -32,34 +37,149 @@ HeteroSubgraph ExcludeCertainEdges( ...@@ -32,34 +37,149 @@ HeteroSubgraph ExcludeCertainEdges(
if (exclude_edges[etype].GetSize() == 0 || edge_ids.GetSize() == 0) { if (exclude_edges[etype].GetSize() == 0 || edge_ids.GetSize() == 0) {
remain_edges[etype] = edge_ids; remain_edges[etype] = edge_ids;
remain_induced_edges[etype] = sg.induced_edges[etype]; remain_induced_edges[etype] = sg.induced_edges[etype];
if (weights) remain_weights[etype] = (*weights)[etype];
continue; continue;
} }
ATEN_ID_TYPE_SWITCH(hg_view->DataType(), IdType, { ATEN_ID_TYPE_SWITCH(hg_view->DataType(), IdType, {
IdType* idx_data = edge_ids.Ptr<IdType>(); const auto dtype = weights && (*weights)[etype]->shape[0]
IdType* induced_edges_data = sg.induced_edges[etype].Ptr<IdType>(); ? (*weights)[etype]->dtype
const IdType exclude_edges_len = exclude_edges[etype]->shape[0]; : DGLDataType{kDGLFloat, 8 * sizeof(float), 1};
std::sort( ATEN_FLOAT_TYPE_SWITCH(dtype, FloatType, "weights", {
exclude_edges[etype].Ptr<IdType>(), IdType* idx_data = edge_ids.Ptr<IdType>();
exclude_edges[etype].Ptr<IdType>() + exclude_edges_len); IdType* induced_edges_data = sg.induced_edges[etype].Ptr<IdType>();
const IdType* exclude_edges_data = exclude_edges[etype].Ptr<IdType>(); FloatType* weights_data = weights && (*weights)[etype]->shape[0]
IdType outId = 0; ? (*weights)[etype].Ptr<FloatType>()
for (IdType i = 0; i != sg.induced_edges[etype]->shape[0]; ++i) { : nullptr;
if (!std::binary_search( const IdType exclude_edges_len = exclude_edges[etype]->shape[0];
exclude_edges_data, exclude_edges_data + exclude_edges_len, std::sort(
induced_edges_data[i])) { exclude_edges[etype].Ptr<IdType>(),
induced_edges_data[outId] = induced_edges_data[i]; exclude_edges[etype].Ptr<IdType>() + exclude_edges_len);
idx_data[outId] = idx_data[i]; const IdType* exclude_edges_data = exclude_edges[etype].Ptr<IdType>();
++outId; IdType outId = 0;
for (IdType i = 0; i != sg.induced_edges[etype]->shape[0]; ++i) {
// the following binary search is the bottleneck, excluding weights
// together with edges should almost be free.
if (!std::binary_search(
exclude_edges_data, exclude_edges_data + exclude_edges_len,
induced_edges_data[i])) {
induced_edges_data[outId] = induced_edges_data[i];
idx_data[outId] = idx_data[i];
if (weights_data) weights_data[outId] = weights_data[i];
++outId;
}
} }
} remain_edges[etype] = aten::IndexSelect(edge_ids, 0, outId);
remain_edges[etype] = aten::IndexSelect(edge_ids, 0, outId); remain_induced_edges[etype] =
remain_induced_edges[etype] = aten::IndexSelect(sg.induced_edges[etype], 0, outId);
aten::IndexSelect(sg.induced_edges[etype], 0, outId); remain_weights[etype] =
weights_data ? aten::IndexSelect((*weights)[etype], 0, outId)
: NullArray();
});
}); });
} }
HeteroSubgraph subg = hg_view->EdgeSubgraph(remain_edges, true); HeteroSubgraph subg = hg_view->EdgeSubgraph(remain_edges, true);
subg.induced_edges = std::move(remain_induced_edges); subg.induced_edges = std::move(remain_induced_edges);
return subg; return std::make_pair(subg, remain_weights);
}
std::pair<HeteroSubgraph, std::vector<FloatArray>> SampleLabors(
const HeteroGraphPtr hg, const std::vector<IdArray>& nodes,
const std::vector<int64_t>& fanouts, EdgeDir dir,
const std::vector<FloatArray>& prob,
const std::vector<IdArray>& exclude_edges, const int importance_sampling,
const IdArray random_seed, const std::vector<IdArray>& NIDs) {
// sanity check
CHECK_EQ(nodes.size(), hg->NumVertexTypes())
<< "Number of node ID tensors must match the number of node types.";
CHECK_EQ(fanouts.size(), hg->NumEdgeTypes())
<< "Number of fanout values must match the number of edge types.";
DGLContext ctx = aten::GetContextOf(nodes);
std::vector<HeteroGraphPtr> subrels(hg->NumEdgeTypes());
std::vector<FloatArray> subimportances(hg->NumEdgeTypes());
std::vector<IdArray> induced_edges(hg->NumEdgeTypes());
for (dgl_type_t etype = 0; etype < hg->NumEdgeTypes(); ++etype) {
auto pair = hg->meta_graph()->FindEdge(etype);
const dgl_type_t src_vtype = pair.first;
const dgl_type_t dst_vtype = pair.second;
const IdArray nodes_ntype =
nodes[(dir == EdgeDir::kOut) ? src_vtype : dst_vtype];
const IdArray NIDs_ntype =
NIDs[(dir == EdgeDir::kIn) ? src_vtype : dst_vtype];
const int64_t num_nodes = nodes_ntype->shape[0];
if (num_nodes == 0 || fanouts[etype] == 0) {
// Nothing to sample for this etype, create a placeholder relation graph
subrels[etype] = UnitGraph::Empty(
hg->GetRelationGraph(etype)->NumVertexTypes(),
hg->NumVertices(src_vtype), hg->NumVertices(dst_vtype),
hg->DataType(), ctx);
induced_edges[etype] = aten::NullArray(hg->DataType(), ctx);
} else if (fanouts[etype] == -1) {
const auto& earr = (dir == EdgeDir::kOut)
? hg->OutEdges(etype, nodes_ntype)
: hg->InEdges(etype, nodes_ntype);
subrels[etype] = UnitGraph::CreateFromCOO(
hg->GetRelationGraph(etype)->NumVertexTypes(),
hg->NumVertices(src_vtype), hg->NumVertices(dst_vtype), earr.src,
earr.dst);
induced_edges[etype] = earr.id;
} else {
// sample from one relation graph
auto req_fmt = (dir == EdgeDir::kOut) ? CSR_CODE : CSC_CODE;
auto avail_fmt = hg->SelectFormat(etype, req_fmt);
COOMatrix sampled_coo;
FloatArray importances;
switch (avail_fmt) {
case SparseFormat::kCOO:
if (dir == EdgeDir::kIn) {
auto fs = aten::COOLaborSampling(
aten::COOTranspose(hg->GetCOOMatrix(etype)), nodes_ntype,
fanouts[etype], prob[etype], importance_sampling, random_seed,
NIDs_ntype);
sampled_coo = aten::COOTranspose(fs.first);
importances = fs.second;
} else {
std::tie(sampled_coo, importances) = aten::COOLaborSampling(
hg->GetCOOMatrix(etype), nodes_ntype, fanouts[etype],
prob[etype], importance_sampling, random_seed, NIDs_ntype);
}
break;
case SparseFormat::kCSR:
CHECK(dir == EdgeDir::kOut)
<< "Cannot sample out edges on CSC matrix.";
std::tie(sampled_coo, importances) = aten::CSRLaborSampling(
hg->GetCSRMatrix(etype), nodes_ntype, fanouts[etype], prob[etype],
importance_sampling, random_seed, NIDs_ntype);
break;
case SparseFormat::kCSC:
CHECK(dir == EdgeDir::kIn) << "Cannot sample in edges on CSR matrix.";
std::tie(sampled_coo, importances) = aten::CSRLaborSampling(
hg->GetCSCMatrix(etype), nodes_ntype, fanouts[etype], prob[etype],
importance_sampling, random_seed, NIDs_ntype);
sampled_coo = aten::COOTranspose(sampled_coo);
break;
default:
LOG(FATAL) << "Unsupported sparse format.";
}
subrels[etype] = UnitGraph::CreateFromCOO(
hg->GetRelationGraph(etype)->NumVertexTypes(), sampled_coo.num_rows,
sampled_coo.num_cols, sampled_coo.row, sampled_coo.col);
subimportances[etype] = importances;
induced_edges[etype] = sampled_coo.data;
}
}
HeteroSubgraph ret;
ret.graph =
CreateHeteroGraph(hg->meta_graph(), subrels, hg->NumVerticesPerType());
ret.induced_vertices.resize(hg->NumVertexTypes());
ret.induced_edges = std::move(induced_edges);
if (!exclude_edges.empty())
return ExcludeCertainEdges(ret, exclude_edges, &subimportances);
return std::make_pair(ret, std::move(subimportances));
} }
HeteroSubgraph SampleNeighbors( HeteroSubgraph SampleNeighbors(
...@@ -142,7 +262,7 @@ HeteroSubgraph SampleNeighbors( ...@@ -142,7 +262,7 @@ HeteroSubgraph SampleNeighbors(
ret.induced_vertices.resize(hg->NumVertexTypes()); ret.induced_vertices.resize(hg->NumVertexTypes());
ret.induced_edges = std::move(induced_edges); ret.induced_edges = std::move(induced_edges);
if (!exclude_edges.empty()) { if (!exclude_edges.empty()) {
return ExcludeCertainEdges(ret, exclude_edges); return ExcludeCertainEdges(ret, exclude_edges).first;
} }
return ret; return ret;
} }
...@@ -395,6 +515,37 @@ DGL_REGISTER_GLOBAL("sampling.neighbor._CAPI_DGLSampleNeighborsEType") ...@@ -395,6 +515,37 @@ DGL_REGISTER_GLOBAL("sampling.neighbor._CAPI_DGLSampleNeighborsEType")
*rv = HeteroSubgraphRef(subg); *rv = HeteroSubgraphRef(subg);
}); });
DGL_REGISTER_GLOBAL("sampling.labor._CAPI_DGLSampleLabors")
.set_body([](DGLArgs args, DGLRetValue* rv) {
HeteroGraphRef hg = args[0];
const auto& nodes = ListValueToVector<IdArray>(args[1]);
IdArray fanouts_array = args[2];
const auto& fanouts = fanouts_array.ToVector<int64_t>();
const std::string dir_str = args[3];
const auto& prob = ListValueToVector<FloatArray>(args[4]);
const auto& exclude_edges = ListValueToVector<IdArray>(args[5]);
const int importance_sampling = args[6];
const IdArray random_seed = args[7];
const auto& NIDs = ListValueToVector<IdArray>(args[8]);
CHECK(dir_str == "in" || dir_str == "out")
<< "Invalid edge direction. Must be \"in\" or \"out\".";
EdgeDir dir = (dir_str == "in") ? EdgeDir::kIn : EdgeDir::kOut;
std::shared_ptr<HeteroSubgraph> subg_ptr(new HeteroSubgraph);
auto&& subg_importances = sampling::SampleLabors(
hg.sptr(), nodes, fanouts, dir, prob, exclude_edges,
importance_sampling, random_seed, NIDs);
*subg_ptr = subg_importances.first;
List<Value> ret_val;
ret_val.push_back(Value(subg_ptr));
for (auto& imp : subg_importances.second)
ret_val.push_back(Value(MakeValue(imp)));
*rv = ret_val;
});
DGL_REGISTER_GLOBAL("sampling.neighbor._CAPI_DGLSampleNeighbors") DGL_REGISTER_GLOBAL("sampling.neighbor._CAPI_DGLSampleNeighbors")
.set_body([](DGLArgs args, DGLRetValue* rv) { .set_body([](DGLArgs args, DGLRetValue* rv) {
HeteroGraphRef hg = args[0]; HeteroGraphRef hg = args[0];
......
/** /**
* Copyright 2020-2021 Contributors * Copyright 2020-2022 Contributors
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
...@@ -20,6 +20,8 @@ ...@@ -20,6 +20,8 @@
#ifndef DGL_GRAPH_TRANSFORM_CUDA_CUDA_MAP_EDGES_CUH_ #ifndef DGL_GRAPH_TRANSFORM_CUDA_CUDA_MAP_EDGES_CUH_
#define DGL_GRAPH_TRANSFORM_CUDA_CUDA_MAP_EDGES_CUH_ #define DGL_GRAPH_TRANSFORM_CUDA_CUDA_MAP_EDGES_CUH_
#include <dgl/runtime/c_runtime_api.h>
#include <dgl/base_heterograph.h>
#include <cuda_runtime.h> #include <cuda_runtime.h>
#include <dgl/runtime/c_runtime_api.h> #include <dgl/runtime/c_runtime_api.h>
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment