Unverified Commit 90f10b31 authored by Quan (Andy) Gan's avatar Quan (Andy) Gan Committed by GitHub
Browse files

[Feature] Negative sampling (#3599)

* first commit

* a bunch of fixes

* add unique

* lint

* lint

* lint

* address comments

* Update negative_sampler.py

* fix

* description

* address comments and fix

* fix

* replace unique with replace

* test pylint

* Update negative_sampler.py
parent 01bec4a3
...@@ -87,6 +87,9 @@ to generate negative edges. ...@@ -87,6 +87,9 @@ to generate negative edges.
.. autoclass:: Uniform .. autoclass:: Uniform
:members: __call__ :members: __call__
.. autoclass:: GlobalUniform
:members: __call__
Async Copying to/from GPUs Async Copying to/from GPUs
-------------------------- --------------------------
.. currentmodule:: dgl.dataloading .. currentmodule:: dgl.dataloading
......
...@@ -25,3 +25,11 @@ Neighbor sampling ...@@ -25,3 +25,11 @@ Neighbor sampling
sample_neighbors_biased sample_neighbors_biased
select_topk select_topk
PinSAGESampler PinSAGESampler
Negative sampling
-----------------
.. autosummary::
:toctree: ../../generated/
global_uniform_negative_sampling
/*!
* Copyright (c) 2020 by Contributors
* \file dgl/array_iterator.h
* \brief Various iterators.
*/
#ifndef DGL_ARRAY_ITERATOR_H_
#define DGL_ARRAY_ITERATOR_H_
#ifdef __CUDA_ARCH__
#define CUB_INLINE __host__ __device__ __forceinline__
#else
#define CUB_INLINE inline
#endif // __CUDA_ARCH__
#include <iterator>
#include <algorithm>
#include <utility>
namespace dgl {
namespace aten {
using std::swap;
// Make std::pair work on both host and device
template <typename DType>
struct Pair {
Pair() = default;
Pair(const Pair& other) = default;
Pair(Pair&& other) = default;
CUB_INLINE Pair(DType a, DType b) : first(a), second(b) {}
CUB_INLINE Pair& operator=(const Pair& other) {
first = other.first;
second = other.second;
return *this;
}
CUB_INLINE operator std::pair<DType, DType>() const {
return std::make_pair(first, second);
}
CUB_INLINE bool operator==(const Pair& other) const {
return (first == other.first) && (second == other.second);
}
CUB_INLINE void swap(const Pair& other) const {
std::swap(first, other.first);
std::swap(second, other.second);
}
DType first, second;
};
template <typename DType>
CUB_INLINE void swap(const Pair<DType>& r1, const Pair<DType>& r2) {
r1.swap(r2);
}
// PairRef and PairIterator that serves as an iterator over a pair of arrays in a
// zipped fashion like zip(a, b).
template <typename DType>
struct PairRef {
PairRef() = delete;
PairRef(const PairRef& other) = default;
PairRef(PairRef&& other) = default;
CUB_INLINE PairRef(DType* const r, DType* const c) : a(r), b(c) {}
CUB_INLINE PairRef& operator=(const PairRef& other) {
*a = *other.a;
*b = *other.b;
return *this;
}
CUB_INLINE PairRef& operator=(const Pair<DType>& val) {
*a = val.first;
*b = val.second;
return *this;
}
CUB_INLINE operator Pair<DType>() const {
return Pair<DType>(*a, *b);
}
CUB_INLINE operator std::pair<DType, DType>() const {
return std::make_pair(*a, *b);
}
CUB_INLINE bool operator==(const PairRef& other) const {
return (*a == *(other.a)) && (*b == *(other.b));
}
CUB_INLINE void swap(const PairRef& other) const {
std::swap(*a, *other.a);
std::swap(*b, *other.b);
}
DType *a, *b;
};
template <typename DType>
CUB_INLINE void swap(const PairRef<DType>& r1, const PairRef<DType>& r2) {
r1.swap(r2);
}
template <typename DType>
struct PairIterator : public std::iterator<std::random_access_iterator_tag,
Pair<DType>,
std::ptrdiff_t,
Pair<DType*>,
PairRef<DType>> {
PairIterator() = default;
PairIterator(const PairIterator& other) = default;
PairIterator(PairIterator&& other) = default;
CUB_INLINE PairIterator(DType* x, DType* y) : a(x), b(y) {}
PairIterator& operator=(const PairIterator& other) = default;
PairIterator& operator=(PairIterator&& other) = default;
~PairIterator() = default;
CUB_INLINE bool operator==(const PairIterator& other) const { return a == other.a; }
CUB_INLINE bool operator!=(const PairIterator& other) const { return a != other.a; }
CUB_INLINE bool operator<(const PairIterator& other) const { return a < other.a; }
CUB_INLINE bool operator>(const PairIterator& other) const { return a > other.a; }
CUB_INLINE bool operator<=(const PairIterator& other) const { return a <= other.a; }
CUB_INLINE bool operator>=(const PairIterator& other) const { return a >= other.a; }
CUB_INLINE PairIterator& operator+=(const std::ptrdiff_t& movement) {
a += movement;
b += movement;
return *this;
}
CUB_INLINE PairIterator& operator-=(const std::ptrdiff_t& movement) {
a -= movement;
b -= movement;
return *this;
}
CUB_INLINE PairIterator& operator++() {
++a;
++b;
return *this;
}
CUB_INLINE PairIterator& operator--() {
--a;
--b;
return *this;
}
CUB_INLINE PairIterator operator++(int) {
PairIterator ret(*this);
operator++();
return ret;
}
CUB_INLINE PairIterator operator--(int) {
PairIterator ret(*this);
operator--();
return ret;
}
CUB_INLINE PairIterator operator+(const std::ptrdiff_t& movement) const {
return PairIterator(a + movement, b + movement);
}
CUB_INLINE PairIterator operator-(const std::ptrdiff_t& movement) const {
return PairIterator(a - movement, b - movement);
}
CUB_INLINE std::ptrdiff_t operator-(const PairIterator& other) const {
return a - other.a;
}
CUB_INLINE PairRef<DType> operator*() const {
return PairRef<DType>(a, b);
}
CUB_INLINE PairRef<DType> operator*() {
return PairRef<DType>(a, b);
}
CUB_INLINE PairRef<DType> operator[](size_t offset) const {
return PairRef<DType>(a + offset, b + offset);
}
CUB_INLINE PairRef<DType> operator[](size_t offset) {
return PairRef<DType>(a + offset, b + offset);
}
DType *a, *b;
};
}; // namespace aten
}; // namespace dgl
#endif // DGL_ARRAY_ITERATOR_H_
...@@ -547,6 +547,29 @@ COOMatrix CSRRowWiseSamplingBiased( ...@@ -547,6 +547,29 @@ COOMatrix CSRRowWiseSamplingBiased(
bool replace = true bool replace = true
); );
/*!
* \brief Uniformly sample row-column pairs whose entries do not exist in the given
* sparse matrix using rejection sampling.
*
* \note The number of samples returned may not necessarily be the number of samples
* given.
*
* \param csr The CSR matrix.
* \param num_samples The number of samples.
* \param num_trials The number of trials.
* \param exclude_self_loops Do not include the examples where the row equals the column.
* \param replace Whether to sample with replacement.
* \param redundancy How much redundant negative examples to take in case of duplicate examples.
* \return A pair of row and column tensors.
*/
std::pair<IdArray, IdArray> CSRGlobalUniformNegativeSampling(
const CSRMatrix& csr,
int64_t num_samples,
int num_trials,
bool exclude_self_loops,
bool replace,
double redundancy);
/*! /*!
* \brief Sort the column index according to the tag of each column. * \brief Sort the column index according to the tag of each column.
* *
......
...@@ -62,6 +62,13 @@ class RandomEngine { ...@@ -62,6 +62,13 @@ class RandomEngine {
rng_.seed(seed + GetThreadId()); rng_.seed(seed + GetThreadId());
} }
/*!
* \brief Generate an arbitrary random 32-bit integer.
*/
int32_t RandInt32() {
return static_cast<int32_t>(rng_());
}
/*! /*!
* \brief Generate a uniform random integer in [0, upper) * \brief Generate a uniform random integer in [0, upper)
*/ */
......
...@@ -38,6 +38,8 @@ struct DLDataTypeTraits { ...@@ -38,6 +38,8 @@ struct DLDataTypeTraits {
struct DLDataTypeTraits<T> { \ struct DLDataTypeTraits<T> { \
static constexpr DLDataType dtype{code, bits, 1}; \ static constexpr DLDataType dtype{code, bits, 1}; \
} }
GEN_DLDATATYPETRAITS_FOR(int8_t, kDLInt, 8);
GEN_DLDATATYPETRAITS_FOR(int16_t, kDLInt, 16);
GEN_DLDATATYPETRAITS_FOR(int32_t, kDLInt, 32); GEN_DLDATATYPETRAITS_FOR(int32_t, kDLInt, 32);
GEN_DLDATATYPETRAITS_FOR(int64_t, kDLInt, 64); GEN_DLDATATYPETRAITS_FOR(int64_t, kDLInt, 64);
// XXX(BarclayII) most DL frameworks do not support unsigned int and long arrays, so I'm just // XXX(BarclayII) most DL frameworks do not support unsigned int and long arrays, so I'm just
......
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
#include <string> #include <string>
#include <cstdlib> #include <cstdlib>
#include <exception> #include <exception>
#include <vector>
#include <atomic> #include <atomic>
namespace { namespace {
...@@ -54,8 +55,8 @@ static DefaultGrainSizeT default_grain_size; ...@@ -54,8 +55,8 @@ static DefaultGrainSizeT default_grain_size;
* \brief OpenMP-based parallel for loop. * \brief OpenMP-based parallel for loop.
* *
* It requires each thread's workload to have at least \a grain_size elements. * It requires each thread's workload to have at least \a grain_size elements.
* The loop body will be a function that takes in a single argument \a i, which * The loop body will be a function that takes in two arguments \a begin and \a end, which
* stands for the index of the workload. * stands for the starting (inclusive) and ending index (exclusive) of the workload.
*/ */
template <typename F> template <typename F>
void parallel_for( void parallel_for(
...@@ -110,6 +111,78 @@ void parallel_for( ...@@ -110,6 +111,78 @@ void parallel_for(
F&& f) { F&& f) {
parallel_for(begin, end, default_grain_size(), std::forward<F>(f)); parallel_for(begin, end, default_grain_size(), std::forward<F>(f));
} }
/*!
* \brief OpenMP-based two-stage parallel reduction.
*
* The first-stage reduction function \a f works in parallel. Each thread's workload has
* at least \a grain_size elements. The loop body will be a function that takes in
* the starting index (inclusive), the ending index (exclusive), and the reduction identity.
*
* The second-stage reduction function \a sf is a binary function working in the main
* thread. It aggregates the partially reduced result computed from each thread.
*
* Example to compute a parallelized max reduction of an array \c a:
*
* parallel_reduce(
* 0, // starting index
* 100, // ending index
* 1, // grain size
* -std::numeric_limits<float>::infinity, // identity
* [&a] (int begin, int end, float ident) { // first-stage partial reducer
* float result = ident;
* for (int i = begin; i < end; ++i)
* result = std::max(result, a[i]);
* return result;
* },
* [] (float result, float partial_result) {
* return std::max(result, partial_result);
* });
*/
template <typename DType, typename F, typename SF>
DType parallel_reduce(
const size_t begin,
const size_t end,
const size_t grain_size,
const DType ident,
const F& f,
const SF& sf) {
if (begin >= end) {
return ident;
}
int num_threads = compute_num_threads(begin, end, grain_size);
if (num_threads == 1) {
return f(begin, end, ident);
}
std::vector<DType> results(num_threads, ident);
std::atomic_flag err_flag = ATOMIC_FLAG_INIT;
std::exception_ptr eptr;
#pragma omp parallel num_threads(num_threads)
{
auto tid = omp_get_thread_num();
auto chunk_size = divup((end - begin), num_threads);
auto begin_tid = begin + tid * chunk_size;
if (begin_tid < end) {
auto end_tid = std::min(end, chunk_size + begin_tid);
try {
results[tid] = f(begin_tid, end_tid, ident);
} catch (...) {
if (!err_flag.test_and_set())
eptr = std::current_exception();
}
}
}
if (eptr)
std::rethrow_exception(eptr);
DType out = ident;
for (int64_t i = 0; i < num_threads; ++i)
out = sf(out, results[i]);
return out;
}
} // namespace runtime } // namespace runtime
} // namespace dgl } // namespace dgl
......
/*!
* Copyright (c) 2020 by Contributors
* \file dgl/sampling/negative.h
* \brief Negative sampling.
*/
#ifndef DGL_SAMPLING_NEGATIVE_H_
#define DGL_SAMPLING_NEGATIVE_H_
#include <dgl/base_heterograph.h>
#include <dgl/array.h>
#include <utility>
namespace dgl {
namespace sampling {
/*!
* \brief Given an edge type, uniformly sample source-destination pairs that do not have
* an edge in between using rejection sampling.
*
* \note This function may not return the same number of elements as the given number
* of samples.
* \note This function requires sorting the CSR or CSC matrix of the graph in-place. It
* prefers CSC over CSR.
*
* \param hg The graph.
* \param etype The edge type.
* \param num_samples The number of negative examples to sample.
* \param num_trials The number of rejection sampling trials.
* \param exclude_self_loops Do not include the examples where the source equals the
* destination.
* \param replace Whether to sample with replacement.
* \param redundancy How much redundant negative examples to take in case of duplicate examples.
* \return The pair of source and destination tensors.
*/
std::pair<IdArray, IdArray> GlobalUniformNegativeSampling(
HeteroGraphPtr hg,
dgl_type_t etype,
int64_t num_samples,
int num_trials,
bool exclude_self_loops,
bool replace,
double redundancy);
}; // namespace sampling
}; // namespace dgl
#endif // DGL_SAMPLING_NEGATIVE_H_
"""Negative samplers""" """Negative samplers"""
from collections.abc import Mapping from collections.abc import Mapping
from .. import backend as F from .. import backend as F
from ..sampling import global_uniform_negative_sampling
class _BaseNegativeSampler(object): class _BaseNegativeSampler(object):
def _generate(self, g, eids, canonical_etype): def _generate(self, g, eids, canonical_etype):
raise NotImplementedError raise NotImplementedError
def __call__(self, g, eids): def __call__(self, g, eids):
"""Returns negative examples. """Returns negative samples.
Parameters Parameters
---------- ----------
...@@ -19,7 +20,7 @@ class _BaseNegativeSampler(object): ...@@ -19,7 +20,7 @@ class _BaseNegativeSampler(object):
Returns Returns
------- -------
tuple[Tensor, Tensor] or dict[etype, tuple[Tensor, Tensor]] tuple[Tensor, Tensor] or dict[etype, tuple[Tensor, Tensor]]
The returned source-destination pairs as negative examples. The returned source-destination pairs as negative samples.
""" """
if isinstance(eids, Mapping): if isinstance(eids, Mapping):
eids = {g.to_canonical_etype(k): v for k, v in eids.items()} eids = {g.to_canonical_etype(k): v for k, v in eids.items()}
...@@ -31,7 +32,7 @@ class _BaseNegativeSampler(object): ...@@ -31,7 +32,7 @@ class _BaseNegativeSampler(object):
return neg_pair return neg_pair
class Uniform(_BaseNegativeSampler): class PerSourceUniform(_BaseNegativeSampler):
"""Negative sampler that randomly chooses negative destination nodes """Negative sampler that randomly chooses negative destination nodes
for each source node according to a uniform distribution. for each source node according to a uniform distribution.
...@@ -43,12 +44,12 @@ class Uniform(_BaseNegativeSampler): ...@@ -43,12 +44,12 @@ class Uniform(_BaseNegativeSampler):
Parameters Parameters
---------- ----------
k : int k : int
The number of negative examples per edge. The number of negative samples per edge.
Examples Examples
-------- --------
>>> g = dgl.graph(([0, 1, 2], [1, 2, 3])) >>> g = dgl.graph(([0, 1, 2], [1, 2, 3]))
>>> neg_sampler = dgl.dataloading.negative_sampler.Uniform(2) >>> neg_sampler = dgl.dataloading.negative_sampler.PerSourceUniform(2)
>>> neg_sampler(g, torch.tensor([0, 1])) >>> neg_sampler(g, torch.tensor([0, 1]))
(tensor([0, 0, 1, 1]), tensor([1, 0, 2, 3])) (tensor([0, 0, 1, 1]), tensor([1, 0, 2, 3]))
""" """
...@@ -65,3 +66,60 @@ class Uniform(_BaseNegativeSampler): ...@@ -65,3 +66,60 @@ class Uniform(_BaseNegativeSampler):
src = F.repeat(src, self.k, 0) src = F.repeat(src, self.k, 0)
dst = F.randint(shape, dtype, ctx, 0, g.number_of_nodes(vtype)) dst = F.randint(shape, dtype, ctx, 0, g.number_of_nodes(vtype))
return src, dst return src, dst
# Alias
Uniform = PerSourceUniform
class GlobalUniform(_BaseNegativeSampler):
"""Negative sampler that randomly chooses negative source-destination pairs according
to a uniform distribution.
For each edge ``(u, v)`` of type ``(srctype, etype, dsttype)``, DGL generates at most
:attr:`k` pairs of negative edges ``(u', v')``, where ``u'`` is chosen uniformly from
all the nodes of type ``srctype`` and ``v'`` is chosen uniformly from all the nodes
of type ``dsttype``. The resulting edges will also have type
``(srctype, etype, dsttype)``. DGL guarantees that the sampled pairs will not have
edges in between.
Parameters
----------
k : int
The desired number of negative samples to generate per edge.
exclude_self_loops : bool, optional
Whether to exclude self-loops from negative samples. (Default: True)
replace : bool, optional
Whether to sample with replacement. Setting it to True will make things
faster. (Default: True)
redundancy : float, optional
Indicates how much more negative samples to actually generate during rejection sampling
before finding the unique pairs.
Increasing it will increase the likelihood of getting :attr:`k` negative samples
per edge, but will also take more time and memory.
(Default: automatically determined by the density of graph)
Notes
-----
This negative sampler will try to generate as many negative samples as possible, but
it may rarely return less than :attr:`k` negative samples per edge.
This is more likely to happen if a graph is so small or dense that not many unique
negative samples exist.
Examples
--------
>>> g = dgl.graph(([0, 1, 2], [1, 2, 3]))
>>> neg_sampler = dgl.dataloading.negative_sampler.GlobalUniform(2, True)
>>> neg_sampler(g, torch.LongTensor([0, 1]))
(tensor([0, 1, 3, 2]), tensor([2, 0, 2, 1]))
"""
def __init__(self, k, exclude_self_loops=True, replace=False, redundancy=None):
self.k = k
self.exclude_self_loops = exclude_self_loops
self.replace = replace
self.redundancy = redundancy
def _generate(self, g, eids, canonical_etype):
return global_uniform_negative_sampling(
g, len(eids) * self.k, self.exclude_self_loops, self.replace,
canonical_etype, self.redundancy)
...@@ -9,3 +9,4 @@ from .randomwalks import * ...@@ -9,3 +9,4 @@ from .randomwalks import *
from .pinsage import * from .pinsage import *
from .neighbor import * from .neighbor import *
from .node2vec_randomwalk import * from .node2vec_randomwalk import *
from .negative import *
"""Negative sampling APIs"""
from numpy.polynomial import polynomial
from .._ffi.function import _init_api
from .. import backend as F
__all__ = [
'global_uniform_negative_sampling']
def _calc_redundancy(k_hat, num_edges, num_pairs, r=3): # pylint: disable=invalid-name
# pylint: disable=invalid-name
# Calculates the number of samples required based on a lower-bound
# of the expected number of negative samples, based on N draws from
# a binomial distribution. Solves the following equation for N:
#
# k_hat = N*p_k - r * np.sqrt(N*p_k*(1-p_k))
#
# where p_k is the probability that a node pairing is a negative edge
# and r is the number of standard deviations to construct the lower bound
#
# Credits to @zjost
p_m = num_edges / num_pairs
p_k = 1 - p_m
a = p_k ** 2
b = -p_k * (2 * k_hat + r ** 2 * p_m)
c = k_hat ** 2
poly = polynomial.Polynomial([c, b, a])
N = poly.roots()[-1]
redundancy = N / k_hat - 1.
return redundancy
def global_uniform_negative_sampling(
g, num_samples, exclude_self_loops=True, replace=False, etype=None,
redundancy=None):
"""Performs negative sampling, which generate source-destination pairs such that
edges with the given type do not exist.
Specifically, this function takes in an edge type and a number of samples. It
returns two tensors ``src`` and ``dst``, the former in the range of ``[0, num_src)``
and the latter in the range of ``[0, num_dst)``, where ``num_src`` and ``num_dst``
represents the number of nodes with the source and destination node type respectively.
It guarantees that no edge will exist between the corresponding pairs of ``src``
with the source node type and ``dst`` with the destination node type.
.. note::
This negative sampler will try to generate as many negative samples as possible, but
it may rarely return less than :attr:`num_samples` negative samples.
This is more likely to happen when a graph is so small or dense that not many
unique negative samples exist.
Parameters
----------
g : DGLGraph
The graph.
num_samples : int
The number of desired negative samples to generate.
exclude_self_loops : bool, optional
Whether to exclude self-loops from the negative samples. Only impacts the
edge types whose source and destination node types are the same.
Default: True.
replace : bool, optional
Whether to sample with replacement. Setting it to True will make things
faster. (Default: False)
etype : str or tuple of str, optional
The edge type. Can be omitted if the graph only has one edge type.
redundancy : float, optional
Indicates how much more negative samples to actually generate during rejection sampling
before finding the unique pairs.
Increasing it will increase the likelihood of getting :attr:`num_samples` negative
samples, but will also take more time and memory.
(Default: automatically determined by the density of graph)
Returns
-------
tuple[Tensor, Tensor]
The source and destination pairs.
Examples
--------
>>> g = dgl.graph(([0, 1, 2], [1, 2, 3]))
>>> dgl.sampling.global_uniform_negative_sampling(g, 3)
(tensor([0, 1, 3]), tensor([2, 0, 2]))
"""
if etype is None:
etype = g.etypes[0]
utype, _, vtype = g.to_canonical_etype(etype)
exclude_self_loops = exclude_self_loops and (utype == vtype)
redundancy = _calc_redundancy(
num_samples, g.num_edges(etype), g.num_nodes(utype) * g.num_nodes(vtype))
etype_id = g.get_etype_id(etype)
src, dst = _CAPI_DGLGlobalUniformNegativeSampling(
g._graph, etype_id, num_samples, 3, exclude_self_loops, replace, redundancy)
return F.from_dgl_nd(src), F.from_dgl_nd(dst)
_init_api('dgl.sampling.negative', __name__)
...@@ -617,6 +617,23 @@ COOMatrix CSRRowWiseSamplingBiased( ...@@ -617,6 +617,23 @@ COOMatrix CSRRowWiseSamplingBiased(
return ret; return ret;
} }
std::pair<IdArray, IdArray> CSRGlobalUniformNegativeSampling(
const CSRMatrix& csr,
int64_t num_samples,
int num_trials,
bool exclude_self_loops,
bool replace,
double redundancy) {
CHECK_GT(num_samples, 0) << "Number of samples must be positive";
CHECK_GT(num_trials, 0) << "Number of sampling trials must be positive";
std::pair<IdArray, IdArray> result;
ATEN_CSR_SWITCH_CUDA(csr, XPU, IdType, "CSRGlobalUniformNegativeSampling", {
result = impl::CSRGlobalUniformNegativeSampling<XPU, IdType>(
csr, num_samples, num_trials, exclude_self_loops, replace, redundancy);
});
return result;
}
CSRMatrix UnionCsr(const std::vector<CSRMatrix>& csrs) { CSRMatrix UnionCsr(const std::vector<CSRMatrix>& csrs) {
CSRMatrix ret; CSRMatrix ret;
......
...@@ -192,8 +192,16 @@ COOMatrix CSRRowWiseSamplingBiased( ...@@ -192,8 +192,16 @@ COOMatrix CSRRowWiseSamplingBiased(
int64_t num_samples, int64_t num_samples,
NDArray tag_offset, NDArray tag_offset,
FloatArray bias, FloatArray bias,
bool replace bool replace);
);
template <DLDeviceType XPU, typename IdType>
std::pair<IdArray, IdArray> CSRGlobalUniformNegativeSampling(
const CSRMatrix& csr,
int64_t num_samples,
int num_trials,
bool exclude_self_loops,
bool replace,
double redundancy);
// Union CSRMatrixes // Union CSRMatrixes
template <DLDeviceType XPU, typename IdType> template <DLDeviceType XPU, typename IdType>
......
...@@ -53,7 +53,7 @@ IdArray BinaryElewise(IdArray lhs, IdArray rhs) { ...@@ -53,7 +53,7 @@ IdArray BinaryElewise(IdArray lhs, IdArray rhs) {
IdType* ret_data = static_cast<IdType*>(ret->data); IdType* ret_data = static_cast<IdType*>(ret->data);
// TODO(BarclayII): this usually incurs lots of overhead in thread spawning, scheduling, // TODO(BarclayII): this usually incurs lots of overhead in thread spawning, scheduling,
// etc., especially since the workload is very light. Need to replace with parallel_for. // etc., especially since the workload is very light. Need to replace with parallel_for.
for (size_t i = 0; i < lhs->shape[0]; i++) { for (int64_t i = 0; i < lhs->shape[0]; i++) {
ret_data[i] = Op::Call(lhs_data[i], rhs_data[i]); ret_data[i] = Op::Call(lhs_data[i], rhs_data[i]);
} }
return ret; return ret;
...@@ -89,7 +89,7 @@ IdArray BinaryElewise(IdArray lhs, IdType rhs) { ...@@ -89,7 +89,7 @@ IdArray BinaryElewise(IdArray lhs, IdType rhs) {
IdType* ret_data = static_cast<IdType*>(ret->data); IdType* ret_data = static_cast<IdType*>(ret->data);
// TODO(BarclayII): this usually incurs lots of overhead in thread spawning, scheduling, // TODO(BarclayII): this usually incurs lots of overhead in thread spawning, scheduling,
// etc., especially since the workload is very light. Need to replace with parallel_for. // etc., especially since the workload is very light. Need to replace with parallel_for.
for (size_t i = 0; i < lhs->shape[0]; i++) { for (int64_t i = 0; i < lhs->shape[0]; i++) {
ret_data[i] = Op::Call(lhs_data[i], rhs); ret_data[i] = Op::Call(lhs_data[i], rhs);
} }
return ret; return ret;
...@@ -125,7 +125,7 @@ IdArray BinaryElewise(IdType lhs, IdArray rhs) { ...@@ -125,7 +125,7 @@ IdArray BinaryElewise(IdType lhs, IdArray rhs) {
IdType* ret_data = static_cast<IdType*>(ret->data); IdType* ret_data = static_cast<IdType*>(ret->data);
// TODO(BarclayII): this usually incurs lots of overhead in thread spawning, scheduling, // TODO(BarclayII): this usually incurs lots of overhead in thread spawning, scheduling,
// etc., especially since the workload is very light. Need to replace with parallel_for. // etc., especially since the workload is very light. Need to replace with parallel_for.
for (size_t i = 0; i < rhs->shape[0]; i++) { for (int64_t i = 0; i < rhs->shape[0]; i++) {
ret_data[i] = Op::Call(lhs, rhs_data[i]); ret_data[i] = Op::Call(lhs, rhs_data[i]);
} }
return ret; return ret;
...@@ -161,7 +161,7 @@ IdArray UnaryElewise(IdArray lhs) { ...@@ -161,7 +161,7 @@ IdArray UnaryElewise(IdArray lhs) {
IdType* ret_data = static_cast<IdType*>(ret->data); IdType* ret_data = static_cast<IdType*>(ret->data);
// TODO(BarclayII): this usually incurs lots of overhead in thread spawning, scheduling, // TODO(BarclayII): this usually incurs lots of overhead in thread spawning, scheduling,
// etc., especially since the workload is very light. Need to replace with parallel_for. // etc., especially since the workload is very light. Need to replace with parallel_for.
for (size_t i = 0; i < lhs->shape[0]; i++) { for (int64_t i = 0; i < lhs->shape[0]; i++) {
ret_data[i] = Op::Call(lhs_data[i]); ret_data[i] = Op::Call(lhs_data[i]);
} }
return ret; return ret;
......
...@@ -18,19 +18,17 @@ template <DLDeviceType XPU, typename IdType> ...@@ -18,19 +18,17 @@ template <DLDeviceType XPU, typename IdType>
bool CSRIsSorted(CSRMatrix csr) { bool CSRIsSorted(CSRMatrix csr) {
const IdType* indptr = csr.indptr.Ptr<IdType>(); const IdType* indptr = csr.indptr.Ptr<IdType>();
const IdType* indices = csr.indices.Ptr<IdType>(); const IdType* indices = csr.indices.Ptr<IdType>();
bool ret = true; return runtime::parallel_reduce(0, csr.num_rows, 1, 1,
[indptr, indices](size_t b, size_t e, bool ident) {
for (int64_t row = 0; row < csr.num_rows; ++row) { for (size_t row = b; row < e; ++row) {
if (!ret)
continue;
for (IdType i = indptr[row] + 1; i < indptr[row + 1]; ++i) { for (IdType i = indptr[row] + 1; i < indptr[row + 1]; ++i) {
if (indices[i - 1] > indices[i]) { if (indices[i - 1] > indices[i])
ret = false; return false;
break;
}
} }
} }
return ret; return ident;
},
[](bool a, bool b) { return a && b; });
} }
template bool CSRIsSorted<kDLCPU, int64_t>(CSRMatrix csr); template bool CSRIsSorted<kDLCPU, int64_t>(CSRMatrix csr);
...@@ -45,6 +43,12 @@ void CSRSort_(CSRMatrix* csr) { ...@@ -45,6 +43,12 @@ void CSRSort_(CSRMatrix* csr) {
const int64_t nnz = csr->indices->shape[0]; const int64_t nnz = csr->indices->shape[0];
const IdType* indptr_data = static_cast<IdType*>(csr->indptr->data); const IdType* indptr_data = static_cast<IdType*>(csr->indptr->data);
IdType* indices_data = static_cast<IdType*>(csr->indices->data); IdType* indices_data = static_cast<IdType*>(csr->indices->data);
if (CSRIsSorted(*csr)) {
csr->sorted = true;
return;
}
if (!CSRHasData(*csr)) { if (!CSRHasData(*csr)) {
csr->data = aten::Range(0, nnz, csr->indptr->dtype.bits, csr->indptr->ctx); csr->data = aten::Range(0, nnz, csr->indptr->dtype.bits, csr->indptr->ctx);
} }
......
/*!
* Copyright (c) 2021 by Contributors
* \file array/cpu/negative_sampling.cc
* \brief Uniform negative sampling on CSR.
*/
#include <dgl/array.h>
#include <dgl/array_iterator.h>
#include <dgl/runtime/parallel_for.h>
#include <dgl/random.h>
#include <utility>
#include <algorithm>
using namespace dgl::runtime;
namespace dgl {
namespace aten {
namespace impl {
template <DLDeviceType XPU, typename IdType>
std::pair<IdArray, IdArray> CSRGlobalUniformNegativeSampling(
const CSRMatrix &csr,
int64_t num_samples,
int num_trials,
bool exclude_self_loops,
bool replace,
double redundancy) {
const int64_t num_row = csr.num_rows;
const int64_t num_col = csr.num_cols;
const int64_t num_actual_samples = static_cast<int64_t>(num_samples * redundancy);
IdArray row = Full<IdType>(-1, num_actual_samples, csr.indptr->ctx);
IdArray col = Full<IdType>(-1, num_actual_samples, csr.indptr->ctx);
IdType* row_data = row.Ptr<IdType>();
IdType* col_data = col.Ptr<IdType>();
parallel_for(0, num_actual_samples, 1, [&](int64_t b, int64_t e) {
for (int64_t i = b; i < e; ++i) {
for (int trial = 0; trial < num_trials; ++trial) {
IdType u = RandomEngine::ThreadLocal()->RandInt(num_row);
IdType v = RandomEngine::ThreadLocal()->RandInt(num_col);
if (!(exclude_self_loops && (u == v)) && !CSRIsNonZero(csr, u, v)) {
row_data[i] = u;
col_data[i] = v;
break;
}
}
}
});
PairIterator<IdType> begin(row_data, col_data);
PairIterator<IdType> end = std::remove_if(begin, begin + num_actual_samples,
[](const std::pair<IdType, IdType>& val) { return val.first == -1; });
if (!replace) {
std::sort(begin, end,
[](const std::pair<IdType, IdType>& a, const std::pair<IdType, IdType>& b) {
return a.first < b.first || (a.first == b.first && a.second < b.second);
});;
end = std::unique(begin, end);
}
int64_t num_sampled = std::min(end - begin, num_samples);
return {row.CreateView({num_sampled}, row->dtype), col.CreateView({num_sampled}, col->dtype)};
}
template std::pair<IdArray, IdArray> CSRGlobalUniformNegativeSampling<kDLCPU, int32_t>(
const CSRMatrix&, int64_t, int, bool, bool, double);
template std::pair<IdArray, IdArray> CSRGlobalUniformNegativeSampling<kDLCPU, int64_t>(
const CSRMatrix&, int64_t, int, bool, bool, double);
}; // namespace impl
}; // namespace aten
}; // namespace dgl
...@@ -52,9 +52,12 @@ NDArray CSRIsNonZero(CSRMatrix csr, NDArray row, NDArray col) { ...@@ -52,9 +52,12 @@ NDArray CSRIsNonZero(CSRMatrix csr, NDArray row, NDArray col) {
const IdType* col_data = static_cast<IdType*>(col->data); const IdType* col_data = static_cast<IdType*>(col->data);
const int64_t row_stride = (rowlen == 1 && collen != 1) ? 0 : 1; const int64_t row_stride = (rowlen == 1 && collen != 1) ? 0 : 1;
const int64_t col_stride = (collen == 1 && rowlen != 1) ? 0 : 1; const int64_t col_stride = (collen == 1 && rowlen != 1) ? 0 : 1;
for (int64_t i = 0, j = 0; i < rowlen && j < collen; i += row_stride, j += col_stride) { runtime::parallel_for(0, std::max(rowlen, collen), 1, [=](int64_t b, int64_t e) {
*(rst_data++) = CSRIsNonZero<XPU, IdType>(csr, row_data[i], col_data[j])? 1 : 0; int64_t i = (row_stride == 0) ? 0 : b;
} int64_t j = (col_stride == 0) ? 0 : b;
for (int64_t k = b; i < e && j < e; i += row_stride, j += col_stride, ++k)
rst_data[k] = CSRIsNonZero<XPU, IdType>(csr, row_data[i], col_data[j]) ? 1 : 0;
});
return rst; return rst;
} }
......
...@@ -55,15 +55,7 @@ IdArray NonZero(IdArray array) { ...@@ -55,15 +55,7 @@ IdArray NonZero(IdArray array) {
device->FreeWorkspace(ctx, temp); device->FreeWorkspace(ctx, temp);
// copy number of selected elements from GPU to CPU // copy number of selected elements from GPU to CPU
int64_t num_nonzeros; int64_t num_nonzeros = cuda::GetCUDAScalar(device, ctx, d_num_nonzeros, stream);
device->CopyDataFromTo(
d_num_nonzeros, 0,
&num_nonzeros, 0,
sizeof(num_nonzeros),
ctx,
DGLContext{kDLCPU, 0},
DGLType{kDLInt, 64, 1},
stream);
device->FreeWorkspace(ctx, d_num_nonzeros); device->FreeWorkspace(ctx, d_num_nonzeros);
device->StreamSync(ctx, stream); device->StreamSync(ctx, stream);
......
/*!
* Copyright (c) 2021 by Contributors
* \file array/cuda/negative_sampling.cu
* \brief rowwise sampling
*/
#include <dgl/random.h>
#include <dgl/array.h>
#include <dgl/array_iterator.h>
#include <curand_kernel.h>
#include "./dgl_cub.cuh"
#include "./utils.h"
#include "../../runtime/cuda/cuda_common.h"
using namespace dgl::runtime;
namespace dgl {
namespace aten {
namespace impl {
namespace {
template <typename IdType>
__global__ void _GlobalUniformNegativeSamplingKernel(
const IdType* __restrict__ indptr,
const IdType* __restrict__ indices,
IdType* __restrict__ row,
IdType* __restrict__ col,
int64_t num_row,
int64_t num_col,
int64_t num_samples,
int num_trials,
bool exclude_self_loops,
int32_t random_seed) {
int64_t tx = blockIdx.x * blockDim.x + threadIdx.x;
const int stride_x = gridDim.x * blockDim.x;
curandStatePhilox4_32_10_t rng; // this allows generating 4 32-bit ints at a time
curand_init(random_seed * gridDim.x + blockIdx.x, threadIdx.x, 0, &rng);
while (tx < num_samples) {
for (int i = 0; i < num_trials; ++i) {
uint4 result = curand4(&rng);
IdType u = ((result.x << 32) | result.y) % num_row;
IdType v = ((result.z << 32) | result.w) % num_col;
if (exclude_self_loops && (u == v))
continue;
// binary search of v among indptr[u:u+1]
int64_t b = indptr[u], e = indptr[u + 1] - 1;
bool found = false;
while (b <= e) {
int64_t m = (b + e) / 2;
if (indices[m] == v) {
found = true;
break;
} else if (indices[m] < v) {
b = m + 1;
} else {
e = m - 1;
}
}
if (!found) {
row[tx] = u;
col[tx] = v;
break;
}
}
tx += stride_x;
}
}
template <typename DType>
struct IsNotMinusOne {
__device__ __forceinline__ bool operator() (const std::pair<DType, DType>& a) {
return a.first != -1;
}
};
/*!
* \brief Sort ordered pairs in ascending order, using \a tmp_major and \a tmp_minor as
* temporary buffers, each with \a n elements.
*/
template <typename IdType>
void SortOrderedPairs(
runtime::DeviceAPI* device,
DLContext ctx,
IdType* major,
IdType* minor,
IdType* tmp_major,
IdType* tmp_minor,
int64_t n,
cudaStream_t stream) {
// Sort ordered pairs in lexicographical order by two radix sorts since
// cub's radix sorts are stable.
// We need a 2*n auxiliary storage to store the results form the first radix sort.
size_t s1 = 0, s2 = 0;
void* tmp1 = nullptr;
void* tmp2 = nullptr;
// Radix sort by minor key first, reorder the major key in the progress.
CUDA_CALL(cub::DeviceRadixSort::SortPairs(
tmp1, s1, minor, tmp_minor, major, tmp_major, n, 0, sizeof(IdType) * 8, stream));
tmp1 = device->AllocWorkspace(ctx, s1);
CUDA_CALL(cub::DeviceRadixSort::SortPairs(
tmp1, s1, minor, tmp_minor, major, tmp_major, n, 0, sizeof(IdType) * 8, stream));
// Radix sort by major key next.
CUDA_CALL(cub::DeviceRadixSort::SortPairs(
tmp2, s2, tmp_major, major, tmp_minor, minor, n, 0, sizeof(IdType) * 8, stream));
tmp2 = (s2 > s1) ? device->AllocWorkspace(ctx, s2) : tmp1; // reuse buffer if s2 <= s1
CUDA_CALL(cub::DeviceRadixSort::SortPairs(
tmp2, s2, tmp_major, major, tmp_minor, minor, n, 0, sizeof(IdType) * 8, stream));
if (tmp1 != tmp2)
device->FreeWorkspace(ctx, tmp2);
device->FreeWorkspace(ctx, tmp1);
}
}; // namespace
template <DLDeviceType XPU, typename IdType>
std::pair<IdArray, IdArray> CSRGlobalUniformNegativeSampling(
const CSRMatrix& csr,
int64_t num_samples,
int num_trials,
bool exclude_self_loops,
bool replace,
double redundancy) {
auto ctx = csr.indptr->ctx;
auto dtype = csr.indptr->dtype;
const int64_t num_row = csr.num_rows;
const int64_t num_col = csr.num_cols;
const int64_t num_actual_samples = static_cast<int64_t>(num_samples * redundancy);
IdArray row = Full<IdType>(-1, num_actual_samples, ctx);
IdArray col = Full<IdType>(-1, num_actual_samples, ctx);
IdArray out_row = IdArray::Empty({num_actual_samples}, dtype, ctx);
IdArray out_col = IdArray::Empty({num_actual_samples}, dtype, ctx);
IdType* row_data = row.Ptr<IdType>();
IdType* col_data = col.Ptr<IdType>();
IdType* out_row_data = out_row.Ptr<IdType>();
IdType* out_col_data = out_col.Ptr<IdType>();
auto device = runtime::DeviceAPI::Get(ctx);
auto* thr_entry = runtime::CUDAThreadEntry::ThreadLocal();
const int nt = cuda::FindNumThreads(num_actual_samples);
const int nb = (num_actual_samples + nt - 1) / nt;
std::pair<IdArray, IdArray> result;
int64_t num_out;
CUDA_KERNEL_CALL(_GlobalUniformNegativeSamplingKernel,
nb, nt, 0, thr_entry->stream,
csr.indptr.Ptr<IdType>(), csr.indices.Ptr<IdType>(),
row_data, col_data, num_row, num_col, num_actual_samples, num_trials,
exclude_self_loops, RandomEngine::ThreadLocal()->RandInt32());
size_t tmp_size = 0;
int64_t* num_out_cuda = static_cast<int64_t*>(device->AllocWorkspace(ctx, sizeof(int64_t)));
IsNotMinusOne<IdType> op;
PairIterator<IdType> begin(row_data, col_data);
PairIterator<IdType> out_begin(out_row_data, out_col_data);
CUDA_CALL(cub::DeviceSelect::If(
nullptr, tmp_size, begin, out_begin, num_out_cuda, num_actual_samples, op,
thr_entry->stream));
void* tmp = device->AllocWorkspace(ctx, tmp_size);
CUDA_CALL(cub::DeviceSelect::If(
tmp, tmp_size, begin, out_begin, num_out_cuda, num_actual_samples, op,
thr_entry->stream));
num_out = cuda::GetCUDAScalar(device, ctx, num_out_cuda, static_cast<cudaStream_t>(0));
if (!replace) {
IdArray unique_row = IdArray::Empty({num_out}, dtype, ctx);
IdArray unique_col = IdArray::Empty({num_out}, dtype, ctx);
IdType* unique_row_data = unique_row.Ptr<IdType>();
IdType* unique_col_data = unique_col.Ptr<IdType>();
PairIterator<IdType> unique_begin(unique_row_data, unique_col_data);
SortOrderedPairs(
device, ctx, out_row_data, out_col_data, unique_row_data, unique_col_data,
num_out, thr_entry->stream);
size_t tmp_size_unique = 0;
void* tmp_unique = nullptr;
CUDA_CALL(cub::DeviceSelect::Unique(
nullptr, tmp_size_unique, out_begin, unique_begin, num_out_cuda, num_out,
thr_entry->stream));
tmp_unique = (tmp_size_unique > tmp_size) ?
device->AllocWorkspace(ctx, tmp_size_unique) :
tmp; // reuse buffer
CUDA_CALL(cub::DeviceSelect::Unique(
tmp_unique, tmp_size_unique, out_begin, unique_begin, num_out_cuda, num_out,
thr_entry->stream));
num_out = cuda::GetCUDAScalar(device, ctx, num_out_cuda, static_cast<cudaStream_t>(0));
num_out = std::min(num_samples, num_out);
result = {unique_row.CreateView({num_out}, dtype), unique_col.CreateView({num_out}, dtype)};
if (tmp_unique != tmp)
device->FreeWorkspace(ctx, tmp_unique);
} else {
num_out = std::min(num_samples, num_out);
result = {out_row.CreateView({num_out}, dtype), out_col.CreateView({num_out}, dtype)};
}
device->FreeWorkspace(ctx, tmp);
device->FreeWorkspace(ctx, num_out_cuda);
return result;
}
template std::pair<IdArray, IdArray> CSRGlobalUniformNegativeSampling<kDLGPU, int32_t>(
const CSRMatrix&, int64_t, int, bool, bool, double);
template std::pair<IdArray, IdArray> CSRGlobalUniformNegativeSampling<kDLGPU, int64_t>(
const CSRMatrix&, int64_t, int, bool, bool, double);
}; // namespace impl
}; // namespace aten
}; // namespace dgl
...@@ -19,8 +19,7 @@ bool AllTrue(int8_t* flags, int64_t length, const DLContext& ctx) { ...@@ -19,8 +19,7 @@ bool AllTrue(int8_t* flags, int64_t length, const DLContext& ctx) {
CUDA_CALL(cub::DeviceReduce::Min(nullptr, workspace_size, flags, rst, length)); CUDA_CALL(cub::DeviceReduce::Min(nullptr, workspace_size, flags, rst, length));
void* workspace = device->AllocWorkspace(ctx, workspace_size); void* workspace = device->AllocWorkspace(ctx, workspace_size);
CUDA_CALL(cub::DeviceReduce::Min(workspace, workspace_size, flags, rst, length)); CUDA_CALL(cub::DeviceReduce::Min(workspace, workspace_size, flags, rst, length));
int8_t cpu_rst = 0; int8_t cpu_rst = GetCUDAScalar(device, ctx, rst, static_cast<cudaStream_t>(0));
CUDA_CALL(cudaMemcpy(&cpu_rst, rst, 1, cudaMemcpyDeviceToHost));
device->FreeWorkspace(ctx, workspace); device->FreeWorkspace(ctx, workspace);
device->FreeWorkspace(ctx, rst); device->FreeWorkspace(ctx, rst);
return cpu_rst == 1; return cpu_rst == 1;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment