Unverified Commit 16061925 authored by Quan (Andy) Gan's avatar Quan (Andy) Gan Committed by GitHub
Browse files

[NodeFlow] Non-uniform neighbor sampling (#711)

* nonuniform sampler

* unit test

* test on out neighbors

* error checks

* lint

* fix

* clarification

* use macro switcher

* use empty array for uniform sampling

* oops

* Revert "oops"

This reverts commit a11f9ae707aaeb67fb5921c887a17d3711d5b04a.

* Revert "use empty array for uniform sampling"

This reverts commit 8526ce4cade89f2c1b09a08aca8830375ebafb31.

* re-reverting

* use a method
parent 742d79a7
......@@ -38,13 +38,16 @@ class SamplerOp {
* \param num_hops the number of hops to sample neighbors.
* \param expand_factor the max number of neighbors to sample.
* \param add_self_loop whether to add self loop to the sampled subgraph
* \param probability the transition probability (float/double).
* \return a NodeFlow graph.
*/
static NodeFlow NeighborUniformSample(const ImmutableGraph *graph,
const std::vector<dgl_id_t>& seeds,
const std::string &edge_type,
int num_hops, int expand_factor,
const bool add_self_loop);
template<typename ValueType>
static NodeFlow NeighborSample(const ImmutableGraph *graph,
const std::vector<dgl_id_t>& seeds,
const std::string &edge_type,
int num_hops, int expand_factor,
const bool add_self_loop,
const ValueType *probability);
/*!
* \brief Sample a graph from the seed vertices with layer sampling.
......
......@@ -205,7 +205,7 @@ class NodeFlowSampler(object):
return self._batch_size
class NeighborSampler(NodeFlowSampler):
'''Create a sampler that samples neighborhood.
r'''Create a sampler that samples neighborhood.
It returns a generator of :class:`~dgl.NodeFlow`. This can be viewed as
an analogy of *mini-batch training* on graph data -- the given graph represents
......@@ -258,10 +258,30 @@ class NeighborSampler(NodeFlowSampler):
* "out": the neighbors on the out-edges.
Default: "in"
node_prob : Tensor, optional
A 1D tensor for the probability that a neighbor node is sampled.
None means uniform sampling. Otherwise, the number of elements
should be equal to the number of vertices in the graph.
transition_prob : str, optional
A 1D tensor containing the (unnormalized) transition probability.
The probability of a node v being sampled from a neighbor u is proportional to
the edge weight, normalized by the sum over edge weights grouping by the
destination node.
In other words, given a node v, the probability of node u and edge (u, v)
included in the NodeFlow layer preceding that of v is given by:
.. math::
p(u, v) = \frac{w_{u, v}}{\sum_{u', (u', v) \in E} w_{u', v}}
If neighbor type is "out", then the probability is instead normalized by the sum
grouping by source node:
.. math::
p(v, u) = \frac{w_{v, u}}{\sum_{u', (v, u') \in E} w_{v, u'}}
If a str is given, the edge weight will be loaded from the edge feature column with
the same name. The feature column must be a scalar column in this case.
Default: None
seed_nodes : Tensor, optional
A 1D tensor list of nodes where we sample NodeFlows from.
......@@ -287,7 +307,7 @@ class NeighborSampler(NodeFlowSampler):
expand_factor=None,
num_hops=1,
neighbor_type='in',
node_prob=None,
transition_prob=None,
seed_nodes=None,
shuffle=False,
num_workers=1,
......@@ -299,7 +319,6 @@ class NeighborSampler(NodeFlowSampler):
assert g.is_readonly, "NeighborSampler doesn't support mutable graphs. " + \
"Please turn it into an immutable graph with DGLGraph.readonly"
assert node_prob is None, 'non-uniform node probability not supported'
assert isinstance(expand_factor, Integral), 'non-int expand_factor not supported'
self._expand_factor = int(expand_factor)
......@@ -307,9 +326,17 @@ class NeighborSampler(NodeFlowSampler):
self._add_self_loop = add_self_loop
self._num_workers = int(num_workers)
self._neighbor_type = neighbor_type
self._transition_prob = transition_prob
def fetch(self, current_nodeflow_index):
nfobjs = _CAPI_UniformSampling(
if self._transition_prob is None:
prob = F.tensor([], F.float32)
elif isinstance(self._transition_prob, str):
prob = self.g.edata[self._transition_prob]
else:
prob = self._transition_prob
nfobjs = _CAPI_NeighborSampling(
self.g._graph,
self.seed_nodes.todgltensor(),
current_nodeflow_index, # start batch id
......@@ -318,7 +345,9 @@ class NeighborSampler(NodeFlowSampler):
self._expand_factor,
self._num_hops,
self._neighbor_type,
self._add_self_loop)
self._add_self_loop,
F.zerocopy_to_dgl_ndarray(prob))
nflows = [NodeFlow(self.g, obj) for obj in nfobjs]
return nflows
......
......@@ -9,15 +9,16 @@
namespace dgl {
namespace aten {
#define ATEN_XPU_SWITCH(val, XPU, ...) \
#define ATEN_XPU_SWITCH(val, XPU, ...) do { \
if ((val) == kDLCPU) { \
constexpr auto XPU = kDLCPU; \
{__VA_ARGS__} \
} else { \
LOG(FATAL) << "Device type: " << (val) << " is not supported."; \
}
} \
} while (0)
#define ATEN_ID_TYPE_SWITCH(val, IdType, ...) \
#define ATEN_ID_TYPE_SWITCH(val, IdType, ...) do { \
CHECK_EQ((val).code, kDLInt) << "ID must be integer type"; \
if ((val).bits == 32) { \
typedef int32_t IdType; \
......@@ -26,10 +27,25 @@ namespace aten {
typedef int64_t IdType; \
{__VA_ARGS__} \
} else { \
LOG(FATAL) << "ID can Only be int32 or int64"; \
}
LOG(FATAL) << "ID can only be int32 or int64"; \
} \
} while (0)
#define ATEN_CSR_DTYPE_SWITCH(val, DType, ...) \
#define ATEN_FLOAT_TYPE_SWITCH(val, FloatType, val_name, ...) do { \
CHECK_EQ((val).code, kDLFloat) \
<< (val_name) << " must be float type"; \
if ((val).bits == 32) { \
typedef float FloatType; \
{__VA_ARGS__} \
} else if ((val).bits == 64) { \
typedef double FloatType; \
{__VA_ARGS__} \
} else { \
LOG(FATAL) << (val_name) << " can only be float32 or float64"; \
} \
} while (0)
#define ATEN_CSR_DTYPE_SWITCH(val, DType, ...) do { \
if ((val).code == kDLInt && (val).bits == 32) { \
typedef int32_t DType; \
{__VA_ARGS__} \
......@@ -38,7 +54,8 @@ namespace aten {
{__VA_ARGS__} \
} else { \
LOG(FATAL) << "CSR matrix data can only be int32 or int64"; \
}
} \
} while (0)
// Macro to dispatch according to device context, index type and data type
// TODO(minjie): In our current use cases, data type and id type are the
......
......@@ -14,6 +14,7 @@
#include <cmath>
#include <numeric>
#include "../c_api_common.h"
#include "../array/common.h" // for ATEN_FLOAT_TYPE_SWITCH
using namespace dgl::runtime;
......@@ -23,9 +24,10 @@ namespace {
/*
* ArrayHeap is used to sample elements from vector
*/
template<typename ValueType>
class ArrayHeap {
public:
explicit ArrayHeap(const std::vector<float>& prob) {
explicit ArrayHeap(const std::vector<ValueType>& prob) {
vec_size_ = prob.size();
bit_len_ = ceil(log2(vec_size_));
limit_ = 1 << bit_len_;
......@@ -49,7 +51,7 @@ class ArrayHeap {
*/
void Delete(size_t index) {
size_t i = index + limit_;
float w = heap_[i];
ValueType w = heap_[i];
for (int j = bit_len_; j >= 0; --j) {
heap_[i] -= w;
i = i >> 1;
......@@ -59,7 +61,7 @@ class ArrayHeap {
/*
* Add value w to index (this costs O(log m) steps)
*/
void Add(size_t index, float w) {
void Add(size_t index, ValueType w) {
size_t i = index + limit_;
for (int j = bit_len_; j >= 0; --j) {
heap_[i] += w;
......@@ -71,7 +73,7 @@ class ArrayHeap {
* Sample from arrayHeap
*/
size_t Sample() {
float xi = heap_[1] * RandomEngine::ThreadLocal()->Uniform<float>();
ValueType xi = heap_[1] * RandomEngine::ThreadLocal()->Uniform<float>();
int i = 1;
while (i < limit_) {
i = i << 1;
......@@ -98,7 +100,7 @@ class ArrayHeap {
int vec_size_; // sample size
int bit_len_; // bit size
int limit_;
std::vector<float> heap_;
std::vector<ValueType> heap_;
};
/*
......@@ -177,8 +179,11 @@ void GetUniformSample(const dgl_id_t* edge_id_list,
/*
* Non-uniform sample via ArrayHeap
*
* \param probability Transition probability on the entire graph, indexed by edge ID
*/
void GetNonUniformSample(const float* probability,
template<typename ValueType>
void GetNonUniformSample(const ValueType* probability,
const dgl_id_t* edge_id_list,
const dgl_id_t* vid_list,
const size_t ver_len,
......@@ -193,11 +198,11 @@ void GetNonUniformSample(const float* probability,
}
// Make sample
std::vector<size_t> sp_index(max_num_neighbor);
std::vector<float> sp_prob(ver_len);
std::vector<ValueType> sp_prob(ver_len);
for (size_t i = 0; i < ver_len; ++i) {
sp_prob[i] = probability[vid_list[i]];
sp_prob[i] = probability[edge_id_list[i]];
}
ArrayHeap arrayHeap(sp_prob);
ArrayHeap<ValueType> arrayHeap(sp_prob);
arrayHeap.SampleWithoutReplacement(max_num_neighbor, &sp_index);
out_ver->resize(max_num_neighbor);
out_edge->resize(max_num_neighbor);
......@@ -366,9 +371,10 @@ NodeFlow ConstructNodeFlow(std::vector<dgl_id_t> neighbor_list,
return nf;
}
template<typename ValueType>
NodeFlow SampleSubgraph(const ImmutableGraph *graph,
const std::vector<dgl_id_t>& seeds,
const float* probability,
const ValueType* probability,
const std::string &edge_type,
int num_hops,
size_t num_neighbor,
......@@ -510,14 +516,16 @@ DGL_REGISTER_GLOBAL("nodeflow._CAPI_NodeFlowGetBlockOffsets")
*rv = nflow->flow_offsets;
});
NodeFlow SamplerOp::NeighborUniformSample(const ImmutableGraph *graph,
const std::vector<dgl_id_t>& seeds,
const std::string &edge_type,
int num_hops, int expand_factor,
const bool add_self_loop) {
template<typename ValueType>
NodeFlow SamplerOp::NeighborSample(const ImmutableGraph *graph,
const std::vector<dgl_id_t>& seeds,
const std::string &edge_type,
int num_hops, int expand_factor,
const bool add_self_loop,
const ValueType *probability) {
return SampleSubgraph(graph,
seeds, // seed vector
nullptr, // sample_id_probability
seeds,
probability,
edge_type,
num_hops + 1,
expand_factor,
......@@ -711,21 +719,18 @@ void BuildCsr(const ImmutableGraph &g, const std::string neigh_type) {
}
}
DGL_REGISTER_GLOBAL("sampling._CAPI_UniformSampling")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
// arguments
GraphRef g = args[0];
const IdArray seed_nodes = args[1];
const int64_t batch_start_id = args[2];
const int64_t batch_size = args[3];
const int64_t max_num_workers = args[4];
const int64_t expand_factor = args[5];
const int64_t num_hops = args[6];
const std::string neigh_type = args[7];
const bool add_self_loop = args[8];
template<typename ValueType>
std::vector<NodeFlow> NeighborSamplingImpl(const ImmutableGraphPtr gptr,
const IdArray seed_nodes,
const int64_t batch_start_id,
const int64_t batch_size,
const int64_t max_num_workers,
const int64_t expand_factor,
const int64_t num_hops,
const std::string neigh_type,
const bool add_self_loop,
const ValueType *probability) {
// process args
auto gptr = std::dynamic_pointer_cast<ImmutableGraph>(g.sptr());
CHECK(gptr) << "sampling isn't implemented in mutable graph";
CHECK(IsValidIdArray(seed_nodes));
const dgl_id_t* seed_nodes_data = static_cast<dgl_id_t*>(seed_nodes->data);
const int64_t num_seeds = seed_nodes->shape[0];
......@@ -744,9 +749,82 @@ DGL_REGISTER_GLOBAL("sampling._CAPI_UniformSampling")
std::vector<dgl_id_t> worker_seeds(end - start);
std::copy(seed_nodes_data + start, seed_nodes_data + end,
worker_seeds.begin());
nflows[i] = SamplerOp::NeighborUniformSample(
gptr.get(), worker_seeds, neigh_type, num_hops, expand_factor, add_self_loop);
nflows[i] = SamplerOp::NeighborSample(
gptr.get(), worker_seeds, neigh_type, num_hops, expand_factor,
add_self_loop, probability);
}
return nflows;
}
DGL_REGISTER_GLOBAL("sampling._CAPI_UniformSampling")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
// arguments
const GraphRef g = args[0];
const IdArray seed_nodes = args[1];
const int64_t batch_start_id = args[2];
const int64_t batch_size = args[3];
const int64_t max_num_workers = args[4];
const int64_t expand_factor = args[5];
const int64_t num_hops = args[6];
const std::string neigh_type = args[7];
const bool add_self_loop = args[8];
auto gptr = std::dynamic_pointer_cast<ImmutableGraph>(g.sptr());
CHECK(gptr) << "sampling isn't implemented in mutable graph";
std::vector<NodeFlow> nflows = NeighborSamplingImpl<float>(
gptr, seed_nodes, batch_start_id, batch_size, max_num_workers,
expand_factor, num_hops, neigh_type, add_self_loop, nullptr);
*rv = List<NodeFlow>(nflows);
});
DGL_REGISTER_GLOBAL("sampling._CAPI_NeighborSampling")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
// arguments
const GraphRef g = args[0];
const IdArray seed_nodes = args[1];
const int64_t batch_start_id = args[2];
const int64_t batch_size = args[3];
const int64_t max_num_workers = args[4];
const int64_t expand_factor = args[5];
const int64_t num_hops = args[6];
const std::string neigh_type = args[7];
const bool add_self_loop = args[8];
const NDArray probability = args[9];
auto gptr = std::dynamic_pointer_cast<ImmutableGraph>(g.sptr());
CHECK(gptr) << "sampling isn't implemented in mutable graph";
std::vector<NodeFlow> nflows;
CHECK(probability->dtype.code == kDLFloat)
<< "transition probability must be float";
CHECK(probability->ndim == 1)
<< "transition probability must be a 1-dimensional vector";
ATEN_FLOAT_TYPE_SWITCH(
probability->dtype,
FloatType,
"transition probability",
{
const FloatType *prob;
if (probability->ndim == 1 && probability->shape[0] == 0) {
prob = nullptr;
} else {
CHECK(probability->shape[0] == gptr->NumEdges())
<< "transition probability must have same number of elements as edges";
CHECK(probability.IsContiguous())
<< "transition probability must be contiguous tensor";
prob = static_cast<const FloatType *>(probability->data);
}
nflows = NeighborSamplingImpl(
gptr, seed_nodes, batch_start_id, batch_size, max_num_workers,
expand_factor, num_hops, neigh_type, add_self_loop, prob);
});
*rv = List<NodeFlow>(nflows);
});
......
......@@ -155,6 +155,48 @@ def test_layer_sampler():
_test_layer_sampler()
_test_layer_sampler(prefetch=True)
def test_nonuniform_neighbor_sampler():
# Construct a graph with
# (1) A path (0, 1, ..., 99) with weight 1
# (2) A bunch of random edges with weight 0.
edges = []
for i in range(99):
edges.append((i, i + 1))
for i in range(1000):
edge = (np.random.randint(100), np.random.randint(100))
if edge not in edges:
edges.append(edge)
src, dst = zip(*edges)
g = dgl.DGLGraph()
g.add_nodes(100)
g.add_edges(src, dst)
g.readonly()
g.edata['w'] = F.cat([
F.ones((99,), F.float64, F.cpu()),
F.zeros((len(edges) - 99,), F.float64, F.cpu())], 0)
# Test 1-neighbor NodeFlow with 99 as target node.
# The generated NodeFlow should only contain node i on layer i.
sampler = dgl.contrib.sampling.NeighborSampler(
g, 1, 1, 99, 'in', transition_prob='w', seed_nodes=[99])
nf = next(iter(sampler))
assert nf.num_layers == 100
for i in range(nf.num_layers):
assert nf.layer_size(i) == 1
assert nf.layer_parent_nid(i)[0] == i
# Test the reverse direction
sampler = dgl.contrib.sampling.NeighborSampler(
g, 1, 1, 99, 'out', transition_prob='w', seed_nodes=[0])
nf = next(iter(sampler))
assert nf.num_layers == 100
for i in range(nf.num_layers):
assert nf.layer_size(i) == 1
assert nf.layer_parent_nid(i)[0] == 99 - i
def test_setseed():
g = generate_rand_graph(100)
......@@ -184,4 +226,5 @@ if __name__ == '__main__':
test_1neighbor_sampler()
test_10neighbor_sampler()
test_layer_sampler()
test_nonuniform_neighbor_sampler()
test_setseed()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment