Unverified Commit 5fcd7f29 authored by Xin Yao's avatar Xin Yao Committed by GitHub
Browse files

[Feature] Enable UVA for GPU PinSAGE and RandomWalk (#3857)



* enable uva for pinsage sampler

* unit test

* modify some checks on the python side

* remove legacy random walk code

* update unit test

* update unit test

* fix unit test

* adjust checks

* move some checks to c++

* move max_nodes check to cuda kernel

* fix ci for tf
Co-authored-by: default avatarQuan (Andy) Gan <coin2028@hotmail.com>
Co-authored-by: default avatarnv-dlasalle <63612878+nv-dlasalle@users.noreply.github.com>
parent 23a5e674
from .sampler import NeighborSampler, LayerSampler, EdgeSampler from .sampler import NeighborSampler, LayerSampler, EdgeSampler
from .randomwalk import *
from .dis_sampler import SamplerSender, SamplerReceiver from .dis_sampler import SamplerSender, SamplerReceiver
from .dis_sampler import SamplerPool from .dis_sampler import SamplerPool
import numpy as np
from ... import utils
from ... import backend as F
from ..._ffi.function import _init_api
from ..._ffi.object import register_object, ObjectBase
from ... import ndarray
from ...base import dgl_warning
__all__ = ['random_walk',
'random_walk_with_restart',
'bipartite_single_sided_random_walk_with_restart',
'metapath_random_walk',
]
@register_object('sampler.RandomWalkTraces')
class RandomWalkTraces(ObjectBase):
pass
def random_walk(g, seeds, num_traces, num_hops):
"""**DEPRECATED**: please use :func:`dgl.sampling.random_walk` instead.
Batch-generate random walk traces on given graph with the same length.
Parameters
----------
g : DGLGraphStale
The graph.
seeds : Tensor
The node ID tensor from which the random walk traces starts.
num_traces : int
Number of traces to generate for each seed.
num_hops : int
Number of hops for each trace.
Returns
-------
traces : Tensor
A 3-dimensional node ID tensor with shape
(num_seeds, num_traces, num_hops + 1)
traces[i, j, 0] are always starting nodes (i.e. seed[i]).
"""
dgl_warning(
"This function is deprecated; please use dgl.sampling.random_walk instead",
DeprecationWarning)
if len(seeds) == 0:
return utils.toindex([]).tousertensor()
seeds = utils.toindex(seeds).todgltensor()
traces = _CAPI_DGLRandomWalk(g._graph,
seeds, int(num_traces), int(num_hops))
return F.zerocopy_from_dlpack(traces.to_dlpack())
def _split_traces(traces):
"""Splits the flattened RandomWalkTraces structure into list of list
of tensors.
Parameters
----------
traces : RandomWalkTraces
Returns
-------
traces : list[list[Tensor]]
traces[i][j] is the j-th trace generated for i-th seed.
"""
trace_counts = traces.trace_counts.asnumpy().tolist()
trace_vertices = F.zerocopy_from_dgl_ndarray(traces.vertices)
trace_vertices = F.split(
trace_vertices, traces.trace_lengths.asnumpy().tolist(), 0)
results = []
s = 0
for c in trace_counts:
results.append(trace_vertices[s:s+c])
s += c
return results
def random_walk_with_restart(
g, seeds, restart_prob, max_nodes_per_seed,
max_visit_counts=0, max_frequent_visited_nodes=0):
"""**DEPRECATED**: please use :func:`dgl.sampling.random_walk` instead.
Batch-generate random walk traces on given graph with restart probability.
Parameters
----------
g : DGLGraphStale
The graph.
seeds : Tensor
The node ID tensor from which the random walk traces starts.
restart_prob : float
Probability to stop a random walk after each step.
max_nodes_per_seed : int
Stop generating traces for a seed if the total number of nodes
visited exceeds this number. [1]
max_visit_counts : int, optional
max_frequent_visited_nodes : int, optional
Alternatively, stop generating traces for a seed if no less than
``max_frequent_visited_nodes`` are visited no less than
``max_visit_counts`` times. [1]
Returns
-------
traces : list[list[Tensor]]
traces[i][j] is the j-th trace generated for i-th seed.
Notes
-----
The traces does **not** include the seed nodes themselves.
Reference
---------
[1] Eksombatchai et al., 2017 https://arxiv.org/abs/1711.07601
"""
dgl_warning(
"This function is deprecated; please use dgl.sampling.random_walk instead",
DeprecationWarning)
if len(seeds) == 0:
return []
seeds = utils.toindex(seeds).todgltensor()
traces = _CAPI_DGLRandomWalkWithRestart(
g._graph, seeds, restart_prob, int(max_nodes_per_seed),
int(max_visit_counts), int(max_frequent_visited_nodes))
return _split_traces(traces)
def bipartite_single_sided_random_walk_with_restart(
g, seeds, restart_prob, max_nodes_per_seed,
max_visit_counts=0, max_frequent_visited_nodes=0):
"""**DEPRECATED**: please use :func:`dgl.sampling.random_walk` instead.
Batch-generate random walk traces on given graph with restart probability.
The graph must be a bipartite graph.
A single random walk step involves two normal steps, so that the "visited"
nodes always stay on the same side. [1]
Parameters
----------
g : DGLGraphStale
The graph.
seeds : Tensor
The node ID tensor from which the random walk traces starts.
restart_prob : float
Probability to stop a random walk after each step.
max_nodes_per_seed : int
Stop generating traces for a seed if the total number of nodes
visited exceeds this number. [1]
max_visit_counts : int, optional
max_frequent_visited_nodes : int, optional
Alternatively, stop generating traces for a seed if no less than
``max_frequent_visited_nodes`` are visited no less than
``max_visit_counts`` times. [1]
Returns
-------
traces : list[list[Tensor]]
traces[i][j] is the j-th trace generated for i-th seed.
Notes
-----
The current implementation does not ensure that the graph is a bipartite
graph.
The traces does **not** include the seed nodes themselves.
Reference
---------
[1] Eksombatchai et al., 2017 https://arxiv.org/abs/1711.07601
"""
dgl_warning(
"This function is deprecated; please use dgl.sampling.random_walk instead",
DeprecationWarning)
if len(seeds) == 0:
return []
seeds = utils.toindex(seeds).todgltensor()
traces = _CAPI_DGLBipartiteSingleSidedRandomWalkWithRestart(
g._graph, seeds, restart_prob, int(max_nodes_per_seed),
int(max_visit_counts), int(max_frequent_visited_nodes))
return _split_traces(traces)
def metapath_random_walk(hg, etypes, seeds, num_traces):
"""**DEPRECATED**: please use :func:`dgl.sampling.random_walk` instead.
For a single seed node, ``num_traces`` traces would be generated. A trace would
1. Start from the given seed and set ``t`` to 0.
2. Pick and traverse along edge type ``etypes[t % len(etypes)]`` from the current node.
3. If no edge can be found, halt. Otherwise, increment ``t`` and go to step 2.
Parameters
----------
hg : DGLHeteroGraph
The heterogeneous graph.
etypes : list[str or tuple of str]
Metapath, specified as a list of edge types.
The beginning and ending node type must be the same.
seeds : Tensor
The seed nodes. Node type is the same as the beginning node type of metapath.
num_traces : int
The number of traces
Returns
-------
traces : list[list[Tensor]]
traces[i][j] is the j-th trace generated for i-th seed.
traces[i][j][k] would have node type the same as the destination node type of edge
type ``etypes[k % len(etypes)]``
Notes
-----
The traces does **not** include the seed nodes themselves.
"""
dgl_warning(
"This function is deprecated; please use dgl.sampling.random_walk instead",
DeprecationWarning)
if len(etypes) == 0:
raise ValueError('empty metapath')
if hg.to_canonical_etype(etypes[0])[0] != hg.to_canonical_etype(etypes[-1])[2]:
raise ValueError('beginning and ending node type mismatch')
if len(seeds) == 0:
return []
etype_array = ndarray.array(np.asarray([hg.get_etype_id(et) for et in etypes], dtype="int64"))
seed_array = utils.toindex(seeds, hg._idtype_str).todgltensor()
traces = _CAPI_DGLMetapathRandomWalk(hg._graph, etype_array, seed_array, num_traces)
return _split_traces(traces)
_init_api('dgl.sampler.randomwalk', __name__)
...@@ -113,6 +113,7 @@ class RandomWalkNeighborSampler(object): ...@@ -113,6 +113,7 @@ class RandomWalkNeighborSampler(object):
to the algorithm above. The returned graph is on CPU. to the algorithm above. The returned graph is on CPU.
""" """
seed_nodes = utils.prepare_tensor(self.G, seed_nodes, 'seed_nodes') seed_nodes = utils.prepare_tensor(self.G, seed_nodes, 'seed_nodes')
self.restart_prob = F.copy_to(self.restart_prob, F.context(seed_nodes))
seed_nodes = F.repeat(seed_nodes, self.num_random_walks, 0) seed_nodes = F.repeat(seed_nodes, self.num_random_walks, 0)
paths, _ = random_walk( paths, _ = random_walk(
......
...@@ -169,8 +169,11 @@ def random_walk(g, nodes, *, metapath=None, length=None, prob=None, restart_prob ...@@ -169,8 +169,11 @@ def random_walk(g, nodes, *, metapath=None, length=None, prob=None, restart_prob
metapath = [g.get_etype_id(etype) for etype in metapath] metapath = [g.get_etype_id(etype) for etype in metapath]
gidx = g._graph gidx = g._graph
nodes = F.to_dgl_nd(utils.prepare_tensor(g, nodes, 'nodes')) nodes = utils.prepare_tensor(g, nodes, 'nodes')
metapath = F.to_dgl_nd(utils.prepare_tensor(g, metapath, 'metapath')) nodes = F.to_dgl_nd(nodes)
# (Xin) Since metapath array is created by us, safe to skip the check
# and keep it on CPU to make max_nodes sanity check easier.
metapath = F.to_dgl_nd(F.astype(F.tensor(metapath), g.idtype))
# Load the probability tensor from the edge frames # Load the probability tensor from the edge frames
if prob is None: if prob is None:
......
...@@ -30,10 +30,12 @@ def prepare_tensor(g, data, name): ...@@ -30,10 +30,12 @@ def prepare_tensor(g, data, name):
Data in tensor object. Data in tensor object.
""" """
if F.is_tensor(data): if F.is_tensor(data):
if (F.dtype(data) != g.idtype or F.context(data) != g.device) and not g.is_pinned(): if F.dtype(data) != g.idtype:
raise DGLError('Expect argument "{}" to have data type {} and device ' raise DGLError(f'Expect argument "{name}" to have data type {g.idtype}. '
'context {}. But got {} and {}.'.format( f'But got {F.dtype(data)}.')
name, g.idtype, g.device, F.dtype(data), F.context(data))) if F.context(data) != g.device and not g.is_pinned():
raise DGLError(f'Expect argument "{name}" to have device {g.device}. '
f'But got {F.context(data)}.')
ret = data ret = data
else: else:
data = F.tensor(data) data = F.tensor(data)
......
/*!
* Copyright (c) 2019 by Contributors
* \file graph/sampler/metapath.cc
* \brief Metapath sampling
*/
#include <dgl/array.h>
#include <dgl/random.h>
#include <dgl/packed_func_ext.h>
#include <dgl/array.h>
#include "../../c_api_common.h"
#include "../unit_graph.h"
#include "randomwalk.h"
using namespace dgl::runtime;
using namespace dgl::aten;
namespace dgl {
namespace sampling {
namespace {
/*!
* \brief Random walk based on the given metapath.
*
* \tparam IdType Index dtype of graph
* \param hg The heterograph
* \param etypes The metapath as an array of edge type IDs
* \param seeds The array of starting vertices for random walks
* \param num_traces Number of traces to generate for each starting vertex
* \note The metapath should have the same starting and ending node type.
*/
template <typename T>
RandomWalkTracesPtr MetapathRandomWalk(
const HeteroGraphPtr hg,
const IdArray etypes,
const IdArray seeds,
int num_traces);
template <>
RandomWalkTracesPtr MetapathRandomWalk<int64_t>(
const HeteroGraphPtr hg,
const IdArray etypes,
const IdArray seeds,
int num_traces) {
const auto metagraph = hg->meta_graph();
uint64_t num_etypes = etypes->shape[0];
uint64_t num_seeds = seeds->shape[0];
const dgl_type_t *etype_data = static_cast<dgl_type_t *>(etypes->data);
const dgl_id_t *seed_data = static_cast<dgl_id_t *>(seeds->data);
std::vector<dgl_id_t> vertices;
std::vector<size_t> trace_lengths, trace_counts;
// TODO(quan): use omp to parallelize this loop
for (uint64_t seed_id = 0; seed_id < num_seeds; ++seed_id) {
int curr_num_traces = 0;
for (; curr_num_traces < num_traces; ++curr_num_traces) {
dgl_id_t curr = seed_data[seed_id];
size_t trace_length = 0;
for (size_t i = 0; i < num_etypes; ++i) {
const auto &succ = hg->SuccVec(etype_data[i], curr);
if (succ.size() == 0)
break;
curr = succ[RandomEngine::ThreadLocal()->RandInt(succ.size())];
vertices.push_back(curr);
++trace_length;
}
trace_lengths.push_back(trace_length);
}
trace_counts.push_back(curr_num_traces);
}
RandomWalkTraces *tl = new RandomWalkTraces;
tl->vertices = VecToIdArray(vertices);
tl->trace_lengths = VecToIdArray(trace_lengths);
tl->trace_counts = VecToIdArray(trace_counts);
return RandomWalkTracesPtr(tl);
}
/*!
* \brief This is a patch function for int32 HeteroGraph
* TODO: Refactor this with CSR and COO operations
*/
template <>
RandomWalkTracesPtr MetapathRandomWalk<int32_t>(
const HeteroGraphPtr hg,
const IdArray etypes,
const IdArray seeds,
int num_traces) {
const auto metagraph = hg->meta_graph();
uint64_t num_etypes = etypes->shape[0];
uint64_t num_seeds = seeds->shape[0];
const dgl_type_t *etype_data = static_cast<dgl_type_t *>(etypes->data);
const int32_t *seed_data = static_cast<int32_t *>(seeds->data);
std::vector<int32_t> vertices;
std::vector<size_t> trace_lengths, trace_counts;
// TODO(quan): use omp to parallelize this loop
for (uint64_t seed_id = 0; seed_id < num_seeds; ++seed_id) {
int curr_num_traces = 0;
for (; curr_num_traces < num_traces; ++curr_num_traces) {
int32_t curr = seed_data[seed_id];
size_t trace_length = 0;
for (size_t i = 0; i < num_etypes; ++i) {
auto ug = std::dynamic_pointer_cast<UnitGraph>(hg->GetRelationGraph(etype_data[i]));
CHECK_NOTNULL(ug);
const auto &succ = ug->SuccVec32(etype_data[i], curr);
if (succ.size() == 0)
break;
curr = succ[RandomEngine::ThreadLocal()->RandInt(succ.size())];
vertices.push_back(curr);
++trace_length;
}
trace_lengths.push_back(trace_length);
}
trace_counts.push_back(curr_num_traces);
}
RandomWalkTraces *tl = new RandomWalkTraces;
tl->vertices = VecToIdArray(vertices);
tl->trace_lengths = VecToIdArray(trace_lengths);
tl->trace_counts = VecToIdArray(trace_counts);
return RandomWalkTracesPtr(tl);
}
}; // namespace
DGL_REGISTER_GLOBAL("sampler.randomwalk._CAPI_DGLMetapathRandomWalk")
.set_body([](DGLArgs args, DGLRetValue *rv) {
const HeteroGraphRef hg = args[0];
const IdArray etypes = args[1];
const IdArray seeds = args[2];
int num_traces = args[3];
CHECK(aten::IsValidIdArray(etypes));
CHECK_EQ(etypes->ctx.device_type, kDLCPU)
<< "MetapathRandomWalk only support CPU sampling";
CHECK(aten::IsValidIdArray(seeds));
CHECK_EQ(seeds->ctx.device_type, kDLCPU)
<< "MetapathRandomWalk only support CPU sampling";
const int64_t bits = hg->NumBits();
RandomWalkTracesPtr tl;
ATEN_ID_BITS_SWITCH(bits, IdType, {
tl = MetapathRandomWalk<IdType>(hg.sptr(), etypes, seeds, num_traces);
});
*rv = RandomWalkTracesRef(tl);
});
}; // namespace sampling
}; // namespace dgl
/*!
* Copyright (c) 2018 by Contributors
* \file graph/sampler.cc
* \brief DGL sampler implementation
*/
#include <dmlc/omp.h>
#include <dgl/immutable_graph.h>
#include <dgl/packed_func_ext.h>
#include <dgl/random.h>
#include <algorithm>
#include <cstdlib>
#include <cmath>
#include <numeric>
#include <functional>
#include <vector>
#include "randomwalk.h"
#include "../../c_api_common.h"
using namespace dgl::runtime;
namespace dgl {
namespace sampling {
using Walker = std::function<dgl_id_t(const GraphInterface *, dgl_id_t)>;
namespace {
/*!
* \brief Randomly select a single direct successor given the current vertex
* \return Whether such a successor could be found
*/
dgl_id_t WalkOneHop(
const GraphInterface *gptr,
dgl_id_t cur) {
const auto succ = gptr->SuccVec(cur);
const size_t size = succ.size();
if (size == 0)
return DGL_INVALID_ID;
return succ[RandomEngine::ThreadLocal()->RandInt(size)];
}
/*!
* \brief Randomly select a single direct successor after \c hops hops given the current vertex
* \return Whether such a successor could be found
*/
template<int hops>
dgl_id_t WalkMultipleHops(
const GraphInterface *gptr,
dgl_id_t cur) {
dgl_id_t next;
for (int i = 0; i < hops; ++i) {
if ((next = WalkOneHop(gptr, cur)) == DGL_INVALID_ID)
return DGL_INVALID_ID;
cur = next;
}
return cur;
}
IdArray GenericRandomWalk(
const GraphInterface *gptr,
IdArray seeds,
int num_traces,
int num_hops,
Walker walker) {
const int64_t num_nodes = seeds->shape[0];
const dgl_id_t *seed_ids = static_cast<dgl_id_t *>(seeds->data);
IdArray traces = IdArray::Empty(
{num_nodes, num_traces, num_hops + 1},
DLDataType{kDLInt, 64, 1},
DLContext{kDLCPU, 0});
dgl_id_t *trace_data = static_cast<dgl_id_t *>(traces->data);
// FIXME: does OpenMP work with exceptions? Especially without throwing SIGABRT?
dgl_id_t next;
for (int64_t i = 0; i < num_nodes; ++i) {
const dgl_id_t seed_id = seed_ids[i];
for (int j = 0; j < num_traces; ++j) {
dgl_id_t cur = seed_id;
const int kmax = num_hops + 1;
for (int k = 0; k < kmax; ++k) {
const int64_t offset = (i * num_traces + j) * kmax + k;
trace_data[offset] = cur;
if ((next = walker(gptr, cur)) == DGL_INVALID_ID)
LOG(FATAL) << "no successors from vertex " << cur;
cur = next;
}
}
}
return traces;
}
RandomWalkTracesPtr GenericRandomWalkWithRestart(
const GraphInterface *gptr,
IdArray seeds,
double restart_prob,
uint64_t visit_threshold_per_seed,
uint64_t max_visit_counts,
uint64_t max_frequent_visited_nodes,
Walker walker) {
std::vector<dgl_id_t> vertices;
std::vector<size_t> trace_lengths, trace_counts, visit_counts;
const dgl_id_t *seed_ids = static_cast<dgl_id_t *>(seeds->data);
const uint64_t num_nodes = seeds->shape[0];
visit_counts.resize(gptr->NumVertices());
for (uint64_t i = 0; i < num_nodes; ++i) {
int stop = 0;
size_t total_trace_length = 0;
size_t num_traces = 0;
uint64_t num_frequent_visited_nodes = 0;
std::fill(visit_counts.begin(), visit_counts.end(), 0);
while (1) {
dgl_id_t cur = seed_ids[i], next;
size_t trace_length = 0;
for (; ; ++trace_length) {
if ((trace_length > 0) &&
(++visit_counts[cur] == max_visit_counts) &&
(++num_frequent_visited_nodes == max_frequent_visited_nodes))
stop = 1;
if ((trace_length > 0) &&
(RandomEngine::ThreadLocal()->Uniform<double>() < restart_prob))
break;
if ((next = walker(gptr, cur)) == DGL_INVALID_ID)
LOG(FATAL) << "no successors from vertex " << cur;
cur = next;
vertices.push_back(cur);
}
total_trace_length += trace_length;
++num_traces;
trace_lengths.push_back(trace_length);
if ((total_trace_length >= visit_threshold_per_seed) || stop)
break;
}
trace_counts.push_back(num_traces);
}
RandomWalkTraces *traces = new RandomWalkTraces;
traces->trace_counts = IdArray::Empty(
{static_cast<int64_t>(trace_counts.size())},
DLDataType{kDLInt, 64, 1},
DLContext{kDLCPU, 0});
traces->trace_lengths = IdArray::Empty(
{static_cast<int64_t>(trace_lengths.size())},
DLDataType{kDLInt, 64, 1},
DLContext{kDLCPU, 0});
traces->vertices = IdArray::Empty(
{static_cast<int64_t>(vertices.size())},
DLDataType{kDLInt, 64, 1},
DLContext{kDLCPU, 0});
dgl_id_t *trace_counts_data = static_cast<dgl_id_t *>(traces->trace_counts->data);
dgl_id_t *trace_lengths_data = static_cast<dgl_id_t *>(traces->trace_lengths->data);
dgl_id_t *vertices_data = static_cast<dgl_id_t *>(traces->vertices->data);
std::copy(trace_counts.begin(), trace_counts.end(), trace_counts_data);
std::copy(trace_lengths.begin(), trace_lengths.end(), trace_lengths_data);
std::copy(vertices.begin(), vertices.end(), vertices_data);
return RandomWalkTracesPtr(traces);
}
}; // namespace
IdArray RandomWalk(
const GraphInterface *gptr,
IdArray seeds,
int num_traces,
int num_hops) {
return GenericRandomWalk(gptr, seeds, num_traces, num_hops, WalkMultipleHops<1>);
}
RandomWalkTracesPtr RandomWalkWithRestart(
const GraphInterface *gptr,
IdArray seeds,
double restart_prob,
uint64_t visit_threshold_per_seed,
uint64_t max_visit_counts,
uint64_t max_frequent_visited_nodes) {
return GenericRandomWalkWithRestart(
gptr, seeds, restart_prob, visit_threshold_per_seed, max_visit_counts,
max_frequent_visited_nodes, WalkMultipleHops<1>);
}
RandomWalkTracesPtr BipartiteSingleSidedRandomWalkWithRestart(
const GraphInterface *gptr,
IdArray seeds,
double restart_prob,
uint64_t visit_threshold_per_seed,
uint64_t max_visit_counts,
uint64_t max_frequent_visited_nodes) {
return GenericRandomWalkWithRestart(
gptr, seeds, restart_prob, visit_threshold_per_seed, max_visit_counts,
max_frequent_visited_nodes, WalkMultipleHops<2>);
}
DGL_REGISTER_GLOBAL("sampler.randomwalk._CAPI_DGLRandomWalk")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
GraphRef g = args[0];
const IdArray seeds = args[1];
const int num_traces = args[2];
const int num_hops = args[3];
CHECK(aten::IsValidIdArray(seeds));
CHECK_EQ(seeds->ctx.device_type, kDLCPU)
<< "RandomWalk only support CPU sampling";
*rv = RandomWalk(g.sptr().get(), seeds, num_traces, num_hops);
});
DGL_REGISTER_GLOBAL("sampler.randomwalk._CAPI_DGLRandomWalkWithRestart")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
GraphRef g = args[0];
const IdArray seeds = args[1];
const double restart_prob = args[2];
const uint64_t visit_threshold_per_seed = args[3];
const uint64_t max_visit_counts = args[4];
const uint64_t max_frequent_visited_nodes = args[5];
CHECK(aten::IsValidIdArray(seeds));
CHECK_EQ(seeds->ctx.device_type, kDLCPU)
<< "RandomWalkWithRestart only support CPU sampling";
*rv = RandomWalkTracesRef(
RandomWalkWithRestart(g.sptr().get(), seeds, restart_prob, visit_threshold_per_seed,
max_visit_counts, max_frequent_visited_nodes));
});
DGL_REGISTER_GLOBAL("sampler.randomwalk._CAPI_DGLBipartiteSingleSidedRandomWalkWithRestart")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
GraphRef g = args[0];
const IdArray seeds = args[1];
const double restart_prob = args[2];
const uint64_t visit_threshold_per_seed = args[3];
const uint64_t max_visit_counts = args[4];
const uint64_t max_frequent_visited_nodes = args[5];
CHECK(aten::IsValidIdArray(seeds));
CHECK_EQ(seeds->ctx.device_type, kDLCPU)
<< "BipartiteSingleSidedRandomWalkWithRestart only support CPU sampling";
*rv = RandomWalkTracesRef(
BipartiteSingleSidedRandomWalkWithRestart(
g.sptr().get(), seeds, restart_prob, visit_threshold_per_seed,
max_visit_counts, max_frequent_visited_nodes));
});
}; // namespace sampling
}; // namespace dgl
/*!
* Copyright (c) 2018 by Contributors
* \file dgl/sampler.h
* \brief DGL sampler header.
*/
#ifndef DGL_GRAPH_SAMPLER_RANDOMWALK_H_
#define DGL_GRAPH_SAMPLER_RANDOMWALK_H_
#include <dgl/runtime/object.h>
#include <dgl/array.h>
#include <dgl/base_heterograph.h>
#include <memory>
namespace dgl {
namespace sampling {
/*! \brief Structure of multiple random walk traces */
struct RandomWalkTraces : public runtime::Object {
/*! \brief number of traces generated for each seed */
IdArray trace_counts;
/*! \brief length of each trace, concatenated */
IdArray trace_lengths;
/*! \brief the vertices, concatenated */
IdArray vertices;
void VisitAttrs(runtime::AttrVisitor *v) final {
v->Visit("vertices", &vertices);
v->Visit("trace_lengths", &trace_lengths);
v->Visit("trace_counts", &trace_counts);
}
static constexpr const char *_type_key = "sampler.RandomWalkTraces";
DGL_DECLARE_OBJECT_TYPE_INFO(RandomWalkTraces, runtime::Object);
};
typedef std::shared_ptr<RandomWalkTraces> RandomWalkTracesPtr;
DGL_DEFINE_OBJECT_REF(RandomWalkTracesRef, RandomWalkTraces);
/*!
* \brief Batch-generate random walk traces
* \param seeds The array of starting vertex IDs
* \param num_traces The number of traces to generate for each seed
* \param num_hops The number of hops for each trace
* \return a flat ID array with shape (num_seeds, num_traces, num_hops + 1)
*/
IdArray RandomWalk(const GraphInterface *gptr,
IdArray seeds,
int num_traces,
int num_hops);
/*!
* \brief Batch-generate random walk traces with restart
*
* Stop generating traces if max_frequrent_visited_nodes nodes are visited more than
* max_visit_counts times.
*
* \param seeds The array of starting vertex IDs
* \param restart_prob The restart probability
* \param visit_threshold_per_seed Stop generating more traces once the number of nodes
* visited for a seed exceeds this number. (Algorithm 1 in [1])
* \param max_visit_counts Alternatively, stop generating traces for a seed if no less
* than \c max_frequent_visited_nodes are visited no less than \c max_visit_counts
* times. (Algorithm 2 in [1])
* \param max_frequent_visited_nodes See \c max_visit_counts
* \return A RandomWalkTraces pointer.
*
* \sa [1] Eksombatchai et al., 2017 https://arxiv.org/abs/1711.07601
*/
RandomWalkTracesPtr RandomWalkWithRestart(
const GraphInterface *gptr,
IdArray seeds,
double restart_prob,
uint64_t visit_threshold_per_seed,
uint64_t max_visit_counts,
uint64_t max_frequent_visited_nodes);
/*
* \brief Batch-generate random walk traces with restart on a bipartite graph, walking two
* hops at a time.
*
* Since it is walking on a bipartite graph, the vertices of a trace will always stay on the
* same side.
*
* Stop generating traces if max_frequrent_visited_nodes nodes are visited more than
* max_visit_counts times.
*
* \param seeds The array of starting vertex IDs
* \param restart_prob The restart probability
* \param visit_threshold_per_seed Stop generating more traces once the number of nodes
* visited for a seed exceeds this number. (Algorithm 1 in [1])
* \param max_visit_counts Alternatively, stop generating traces for a seed if no less
* than \c max_frequent_visited_nodes are visited no less than \c max_visit_counts
* times. (Algorithm 2 in [1])
* \param max_frequent_visited_nodes See \c max_visit_counts
* \return A RandomWalkTraces instance.
*
* \note Doesn't verify whether the graph is indeed a bipartite graph
*
* \sa [1] Eksombatchai et al., 2017 https://arxiv.org/abs/1711.07601
*/
RandomWalkTracesPtr BipartiteSingleSidedRandomWalkWithRestart(
const GraphInterface *gptr,
IdArray seeds,
double restart_prob,
uint64_t visit_threshold_per_seed,
uint64_t max_visit_counts,
uint64_t max_frequent_visited_nodes);
}; // namespace sampling
}; // namespace dgl
#endif // DGL_GRAPH_SAMPLER_RANDOMWALK_H_
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
#include <utility> #include <utility>
#include <tuple> #include <tuple>
#include "../../../runtime/cuda/cuda_common.h"
#include "frequency_hashmap.cuh" #include "frequency_hashmap.cuh"
namespace dgl { namespace dgl {
...@@ -40,6 +41,7 @@ __global__ void _RandomWalkKernel( ...@@ -40,6 +41,7 @@ __global__ void _RandomWalkKernel(
const GraphKernelData<IdType>* graphs, const GraphKernelData<IdType>* graphs,
const FloatType* restart_prob_data, const FloatType* restart_prob_data,
const int64_t restart_prob_size, const int64_t restart_prob_size,
const int64_t max_nodes,
IdType *out_traces_data, IdType *out_traces_data,
IdType *out_eids_data) { IdType *out_eids_data) {
assert(BLOCK_SIZE == blockDim.x); assert(BLOCK_SIZE == blockDim.x);
...@@ -53,6 +55,7 @@ __global__ void _RandomWalkKernel( ...@@ -53,6 +55,7 @@ __global__ void _RandomWalkKernel(
while (idx < last_idx) { while (idx < last_idx) {
IdType curr = seed_data[idx]; IdType curr = seed_data[idx];
assert(curr < max_nodes);
IdType *traces_data_ptr = &out_traces_data[idx * trace_length]; IdType *traces_data_ptr = &out_traces_data[idx * trace_length];
IdType *eids_data_ptr = &out_eids_data[idx * max_num_steps]; IdType *eids_data_ptr = &out_eids_data[idx * max_num_steps];
*(traces_data_ptr++) = curr; *(traces_data_ptr++) = curr;
...@@ -97,25 +100,23 @@ std::pair<IdArray, IdArray> RandomWalkUniform( ...@@ -97,25 +100,23 @@ std::pair<IdArray, IdArray> RandomWalkUniform(
FloatArray restart_prob) { FloatArray restart_prob) {
const int64_t max_num_steps = metapath->shape[0]; const int64_t max_num_steps = metapath->shape[0];
const IdType *metapath_data = static_cast<IdType *>(metapath->data); const IdType *metapath_data = static_cast<IdType *>(metapath->data);
const int64_t begin_ntype = hg->meta_graph()->FindEdge(metapath_data[0]).first;
const int64_t max_nodes = hg->NumVertices(begin_ntype);
int64_t num_etypes = hg->NumEdgeTypes(); int64_t num_etypes = hg->NumEdgeTypes();
auto ctx = seeds->ctx;
CHECK(seeds->ctx.device_type == kDLGPU) << "seeds should be in GPU.";
CHECK(metapath->ctx.device_type == kDLGPU) << "metapath should be in GPU.";
const IdType *seed_data = static_cast<const IdType*>(seeds->data); const IdType *seed_data = static_cast<const IdType*>(seeds->data);
CHECK(seeds->ndim == 1) << "seeds shape is not one dimension."; CHECK(seeds->ndim == 1) << "seeds shape is not one dimension.";
const int64_t num_seeds = seeds->shape[0]; const int64_t num_seeds = seeds->shape[0];
int64_t trace_length = max_num_steps + 1; int64_t trace_length = max_num_steps + 1;
IdArray traces = IdArray::Empty({num_seeds, trace_length}, seeds->dtype, seeds->ctx); IdArray traces = IdArray::Empty({num_seeds, trace_length}, seeds->dtype, ctx);
IdArray eids = IdArray::Empty({num_seeds, max_num_steps}, seeds->dtype, seeds->ctx); IdArray eids = IdArray::Empty({num_seeds, max_num_steps}, seeds->dtype, ctx);
IdType *traces_data = traces.Ptr<IdType>(); IdType *traces_data = traces.Ptr<IdType>();
IdType *eids_data = eids.Ptr<IdType>(); IdType *eids_data = eids.Ptr<IdType>();
std::vector<GraphKernelData<IdType>> h_graphs(num_etypes); std::vector<GraphKernelData<IdType>> h_graphs(num_etypes);
DGLContext ctx;
for (int64_t etype = 0; etype < num_etypes; ++etype) { for (int64_t etype = 0; etype < num_etypes; ++etype) {
const CSRMatrix &csr = hg->GetCSRMatrix(etype); const CSRMatrix &csr = hg->GetCSRMatrix(etype);
ctx = csr.indptr->ctx;
CHECK(ctx.device_type == kDLGPU) << "graph should be in GPU.";
h_graphs[etype].in_ptr = static_cast<const IdType*>(csr.indptr->data); h_graphs[etype].in_ptr = static_cast<const IdType*>(csr.indptr->data);
h_graphs[etype].in_cols = static_cast<const IdType*>(csr.indices->data); h_graphs[etype].in_cols = static_cast<const IdType*>(csr.indices->data);
h_graphs[etype].data = (CSRHasData(csr) ? static_cast<const IdType*>(csr.data->data) : nullptr); h_graphs[etype].data = (CSRHasData(csr) ? static_cast<const IdType*>(csr.data->data) : nullptr);
...@@ -125,7 +126,6 @@ std::pair<IdArray, IdArray> RandomWalkUniform( ...@@ -125,7 +126,6 @@ std::pair<IdArray, IdArray> RandomWalkUniform(
auto device = DeviceAPI::Get(ctx); auto device = DeviceAPI::Get(ctx);
auto d_graphs = static_cast<GraphKernelData<IdType>*>( auto d_graphs = static_cast<GraphKernelData<IdType>*>(
device->AllocWorkspace(ctx, (num_etypes) * sizeof(GraphKernelData<IdType>))); device->AllocWorkspace(ctx, (num_etypes) * sizeof(GraphKernelData<IdType>)));
auto d_metapath_data = metapath_data;
// copy graph metadata pointers to GPU // copy graph metadata pointers to GPU
device->CopyDataFromTo(h_graphs.data(), 0, d_graphs, 0, device->CopyDataFromTo(h_graphs.data(), 0, d_graphs, 0,
(num_etypes) * sizeof(GraphKernelData<IdType>), (num_etypes) * sizeof(GraphKernelData<IdType>),
...@@ -133,6 +133,9 @@ std::pair<IdArray, IdArray> RandomWalkUniform( ...@@ -133,6 +133,9 @@ std::pair<IdArray, IdArray> RandomWalkUniform(
ctx, ctx,
hg->GetCSRMatrix(0).indptr->dtype, hg->GetCSRMatrix(0).indptr->dtype,
stream); stream);
// copy metapath to GPU
auto d_metapath = metapath.CopyTo(ctx);
const IdType *d_metapath_data = static_cast<IdType *>(d_metapath->data);
constexpr int BLOCK_SIZE = 256; constexpr int BLOCK_SIZE = 256;
constexpr int TILE_SIZE = BLOCK_SIZE * 4; constexpr int TILE_SIZE = BLOCK_SIZE * 4;
...@@ -144,7 +147,9 @@ std::pair<IdArray, IdArray> RandomWalkUniform( ...@@ -144,7 +147,9 @@ std::pair<IdArray, IdArray> RandomWalkUniform(
CHECK(restart_prob->ndim == 1) << "restart prob dimension should be 1."; CHECK(restart_prob->ndim == 1) << "restart prob dimension should be 1.";
const FloatType *restart_prob_data = restart_prob.Ptr<FloatType>(); const FloatType *restart_prob_data = restart_prob.Ptr<FloatType>();
const int64_t restart_prob_size = restart_prob->shape[0]; const int64_t restart_prob_size = restart_prob->shape[0];
_RandomWalkKernel<IdType, FloatType, BLOCK_SIZE, TILE_SIZE> <<<grid, block, 0, stream>>>( CUDA_KERNEL_CALL(
(_RandomWalkKernel<IdType, FloatType, BLOCK_SIZE, TILE_SIZE>),
grid, block, 0, stream,
random_seed, random_seed,
seed_data, seed_data,
num_seeds, num_seeds,
...@@ -153,6 +158,7 @@ std::pair<IdArray, IdArray> RandomWalkUniform( ...@@ -153,6 +158,7 @@ std::pair<IdArray, IdArray> RandomWalkUniform(
d_graphs, d_graphs,
restart_prob_data, restart_prob_data,
restart_prob_size, restart_prob_size,
max_nodes,
traces_data, traces_data,
eids_data); eids_data);
}); });
...@@ -194,9 +200,9 @@ std::pair<IdArray, IdArray> RandomWalkWithRestart( ...@@ -194,9 +200,9 @@ std::pair<IdArray, IdArray> RandomWalkWithRestart(
LOG(FATAL) << "Non-uniform choice is not supported in GPU."; LOG(FATAL) << "Non-uniform choice is not supported in GPU.";
} }
} }
auto device_ctx = seeds->ctx;
auto restart_prob_array = NDArray::Empty( auto restart_prob_array = NDArray::Empty(
{1}, DLDataType{kDLFloat, 64, 1}, seeds->ctx); {1}, DLDataType{kDLFloat, 64, 1}, device_ctx);
auto device_ctx = restart_prob_array->ctx;
auto device = dgl::runtime::DeviceAPI::Get(device_ctx); auto device = dgl::runtime::DeviceAPI::Get(device_ctx);
// use default stream // use default stream
......
...@@ -32,6 +32,16 @@ void CheckRandomWalkInputs( ...@@ -32,6 +32,16 @@ void CheckRandomWalkInputs(
CHECK_INT(metapath, "metapath"); CHECK_INT(metapath, "metapath");
CHECK_NDIM(seeds, 1, "seeds"); CHECK_NDIM(seeds, 1, "seeds");
CHECK_NDIM(metapath, 1, "metapath"); CHECK_NDIM(metapath, 1, "metapath");
// (Xin): metapath is copied to GPU in CUDA random walk code
// CHECK_SAME_CONTEXT(seeds, metapath);
if (hg->IsPinned()) {
CHECK_EQ(seeds->ctx.device_type, kDLGPU) << "Expected seeds (" << seeds->ctx << ")" \
<< " to be on the GPU when the graph is pinned.";
} else if (hg->Context() != seeds->ctx) {
LOG(FATAL) << "Expected seeds (" << seeds->ctx << ")" << " to have the same " \
<< "context as graph (" << hg->Context() << ").";
}
for (uint64_t i = 0; i < prob.size(); ++i) { for (uint64_t i = 0; i < prob.size(); ++i) {
FloatArray p = prob[i]; FloatArray p = prob[i];
CHECK_FLOAT(p, "probability"); CHECK_FLOAT(p, "probability");
...@@ -51,7 +61,7 @@ std::tuple<IdArray, IdArray, TypeArray> RandomWalk( ...@@ -51,7 +61,7 @@ std::tuple<IdArray, IdArray, TypeArray> RandomWalk(
TypeArray vtypes; TypeArray vtypes;
std::pair<IdArray, IdArray> result; std::pair<IdArray, IdArray> result;
ATEN_XPU_SWITCH_CUDA(hg->Context().device_type, XPU, "RandomWalk", { ATEN_XPU_SWITCH_CUDA(seeds->ctx.device_type, XPU, "RandomWalk", {
ATEN_ID_TYPE_SWITCH(seeds->dtype, IdxType, { ATEN_ID_TYPE_SWITCH(seeds->dtype, IdxType, {
vtypes = impl::GetNodeTypesFromMetapath<XPU, IdxType>(hg, metapath); vtypes = impl::GetNodeTypesFromMetapath<XPU, IdxType>(hg, metapath);
result = impl::RandomWalk<XPU, IdxType>(hg, seeds, metapath, prob); result = impl::RandomWalk<XPU, IdxType>(hg, seeds, metapath, prob);
...@@ -72,7 +82,7 @@ std::tuple<IdArray, IdArray, TypeArray> RandomWalkWithRestart( ...@@ -72,7 +82,7 @@ std::tuple<IdArray, IdArray, TypeArray> RandomWalkWithRestart(
TypeArray vtypes; TypeArray vtypes;
std::pair<IdArray, IdArray> result; std::pair<IdArray, IdArray> result;
ATEN_XPU_SWITCH_CUDA(hg->Context().device_type, XPU, "RandomWalkWithRestart", { ATEN_XPU_SWITCH_CUDA(seeds->ctx.device_type, XPU, "RandomWalkWithRestart", {
ATEN_ID_TYPE_SWITCH(seeds->dtype, IdxType, { ATEN_ID_TYPE_SWITCH(seeds->dtype, IdxType, {
vtypes = impl::GetNodeTypesFromMetapath<XPU, IdxType>(hg, metapath); vtypes = impl::GetNodeTypesFromMetapath<XPU, IdxType>(hg, metapath);
result = impl::RandomWalkWithRestart<XPU, IdxType>(hg, seeds, metapath, prob, restart_prob); result = impl::RandomWalkWithRestart<XPU, IdxType>(hg, seeds, metapath, prob, restart_prob);
...@@ -93,7 +103,7 @@ std::tuple<IdArray, IdArray, TypeArray> RandomWalkWithStepwiseRestart( ...@@ -93,7 +103,7 @@ std::tuple<IdArray, IdArray, TypeArray> RandomWalkWithStepwiseRestart(
TypeArray vtypes; TypeArray vtypes;
std::pair<IdArray, IdArray> result; std::pair<IdArray, IdArray> result;
ATEN_XPU_SWITCH_CUDA(hg->Context().device_type, XPU, "RandomWalkWithStepwiseRestart", { ATEN_XPU_SWITCH_CUDA(seeds->ctx.device_type, XPU, "RandomWalkWithStepwiseRestart", {
ATEN_ID_TYPE_SWITCH(seeds->dtype, IdxType, { ATEN_ID_TYPE_SWITCH(seeds->dtype, IdxType, {
vtypes = impl::GetNodeTypesFromMetapath<XPU, IdxType>(hg, metapath); vtypes = impl::GetNodeTypesFromMetapath<XPU, IdxType>(hg, metapath);
result = impl::RandomWalkWithStepwiseRestart<XPU, IdxType>( result = impl::RandomWalkWithStepwiseRestart<XPU, IdxType>(
......
import dgl
from dgl import utils
import backend as F
import numpy as np
from utils import parametrize_dtype
import pytest
def test_random_walk():
edge_list = [(0, 1), (1, 2), (2, 3), (3, 4),
(4, 3), (3, 2), (2, 1), (1, 0)]
seeds = [0, 1]
n_traces = 3
n_hops = 4
g = dgl.DGLGraphStale(edge_list, readonly=True)
traces = dgl.contrib.sampling.random_walk(g, seeds, n_traces, n_hops)
traces = F.zerocopy_to_numpy(traces)
assert traces.shape == (len(seeds), n_traces, n_hops + 1)
for i, seed in enumerate(seeds):
assert (traces[i, :, 0] == seeds[i]).all()
trace_diff = np.diff(traces, axis=-1)
# only nodes with adjacent IDs are connected
assert (np.abs(trace_diff) == 1).all()
def test_random_walk_with_restart():
edge_list = [(0, 1), (1, 2), (2, 3), (3, 4),
(4, 3), (3, 2), (2, 1), (1, 0)]
seeds = [0, 1]
max_nodes = 10
g = dgl.DGLGraphStale(edge_list)
# test normal RWR
traces = dgl.contrib.sampling.random_walk_with_restart(g, seeds, 0.2, max_nodes)
assert len(traces) == len(seeds)
for traces_per_seed in traces:
total_nodes = 0
for t in traces_per_seed:
total_nodes += len(t)
trace_diff = np.diff(F.zerocopy_to_numpy(t), axis=-1)
assert (np.abs(trace_diff) == 1).all()
assert total_nodes >= max_nodes
# test RWR with early stopping
traces = dgl.contrib.sampling.random_walk_with_restart(
g, seeds, 1, 100, max_nodes, 1)
assert len(traces) == len(seeds)
for traces_per_seed in traces:
assert sum(len(t) for t in traces_per_seed) < 100
# test bipartite RWR
traces = dgl.contrib.sampling.bipartite_single_sided_random_walk_with_restart(
g, seeds, 0.2, max_nodes)
assert len(traces) == len(seeds)
for traces_per_seed in traces:
for t in traces_per_seed:
trace_diff = np.diff(F.zerocopy_to_numpy(t), axis=-1)
assert (trace_diff % 2 == 0).all()
...@@ -24,52 +24,22 @@ def check_random_walk(g, metapath, traces, ntypes, prob=None, trace_eids=None): ...@@ -24,52 +24,22 @@ def check_random_walk(g, metapath, traces, ntypes, prob=None, trace_eids=None):
u, v = g.find_edges(trace_eids[i, j], etype=metapath[j]) u, v = g.find_edges(trace_eids[i, j], etype=metapath[j])
assert (u == traces[i, j]) and (v == traces[i, j + 1]) assert (u == traces[i, j]) and (v == traces[i, j + 1])
@unittest.skipIf(F._default_context_str == 'gpu', reason="GPU random walk not implemented") @unittest.skipIf(F._default_context_str == 'gpu', reason="Random walk with non-uniform prob is not supported in GPU.")
def test_random_walk(): def test_non_uniform_random_walk():
g1 = dgl.heterograph({
('user', 'follow', 'user'): ([0, 1, 2], [1, 2, 0])
})
g2 = dgl.heterograph({ g2 = dgl.heterograph({
('user', 'follow', 'user'): ([0, 1, 1, 2, 3], [1, 2, 3, 0, 0]) ('user', 'follow', 'user'): ([0, 1, 1, 2, 3], [1, 2, 3, 0, 0])
}) }).to(F.ctx())
g3 = dgl.heterograph({
('user', 'follow', 'user'): ([0, 1, 2], [1, 2, 0]),
('user', 'view', 'item'): ([0, 1, 2], [0, 1, 2]),
('item', 'viewed-by', 'user'): ([0, 1, 2], [0, 1, 2])})
g4 = dgl.heterograph({ g4 = dgl.heterograph({
('user', 'follow', 'user'): ([0, 1, 1, 2, 3], [1, 2, 3, 0, 0]), ('user', 'follow', 'user'): ([0, 1, 1, 2, 3], [1, 2, 3, 0, 0]),
('user', 'view', 'item'): ([0, 0, 1, 2, 3, 3], [0, 1, 1, 2, 2, 1]), ('user', 'view', 'item'): ([0, 0, 1, 2, 3, 3], [0, 1, 1, 2, 2, 1]),
('item', 'viewed-by', 'user'): ([0, 1, 1, 2, 2, 1], [0, 0, 1, 2, 3, 3])}) ('item', 'viewed-by', 'user'): ([0, 1, 1, 2, 2, 1], [0, 0, 1, 2, 3, 3])
}).to(F.ctx())
g2.edata['p'] = F.tensor([3, 0, 3, 3, 3], dtype=F.float32) g2.edata['p'] = F.tensor([3, 0, 3, 3, 3], dtype=F.float32)
g2.edata['p2'] = F.tensor([[3], [0], [3], [3], [3]], dtype=F.float32) g2.edata['p2'] = F.tensor([[3], [0], [3], [3], [3]], dtype=F.float32)
g4.edges['follow'].data['p'] = F.tensor([3, 0, 3, 3, 3], dtype=F.float32) g4.edges['follow'].data['p'] = F.tensor([3, 0, 3, 3, 3], dtype=F.float32)
g4.edges['viewed-by'].data['p'] = F.tensor([1, 1, 1, 1, 1, 1], dtype=F.float32) g4.edges['viewed-by'].data['p'] = F.tensor([1, 1, 1, 1, 1, 1], dtype=F.float32)
traces, eids, ntypes = dgl.sampling.random_walk(g1, [0, 1, 2, 0, 1, 2], length=4, return_eids=True)
check_random_walk(g1, ['follow'] * 4, traces, ntypes, trace_eids=eids)
try:
dgl.sampling.random_walk(g1, [0, 1, 2, 10], length=4, return_eids=True)
fail = False # shouldn't abort
except:
fail = True
assert fail
traces, eids, ntypes = dgl.sampling.random_walk(g1, [0, 1, 2, 0, 1, 2], length=4, restart_prob=0., return_eids=True)
check_random_walk(g1, ['follow'] * 4, traces, ntypes, trace_eids=eids)
traces, ntypes = dgl.sampling.random_walk(
g1, [0, 1, 2, 0, 1, 2], length=4, restart_prob=F.zeros((4,), F.float32, F.cpu()))
check_random_walk(g1, ['follow'] * 4, traces, ntypes)
traces, ntypes = dgl.sampling.random_walk(
g1, [0, 1, 2, 0, 1, 2], length=5,
restart_prob=F.tensor([0, 0, 0, 0, 1], dtype=F.float32))
check_random_walk(
g1, ['follow'] * 4, F.slice_axis(traces, 1, 0, 5), F.slice_axis(ntypes, 0, 0, 5))
assert (F.asnumpy(traces)[:, 5] == -1).all()
traces, eids, ntypes = dgl.sampling.random_walk(
g2, [0, 1, 2, 3, 0, 1, 2, 3], length=4, return_eids=True)
check_random_walk(g2, ['follow'] * 4, traces, ntypes, trace_eids=eids)
traces, eids, ntypes = dgl.sampling.random_walk( traces, eids, ntypes = dgl.sampling.random_walk(
g2, [0, 1, 2, 3, 0, 1, 2, 3], length=4, prob='p', return_eids=True) g2, [0, 1, 2, 3, 0, 1, 2, 3], length=4, prob='p', return_eids=True)
check_random_walk(g2, ['follow'] * 4, traces, ntypes, 'p', trace_eids=eids) check_random_walk(g2, ['follow'] * 4, traces, ntypes, 'p', trace_eids=eids)
...@@ -82,20 +52,6 @@ def test_random_walk(): ...@@ -82,20 +52,6 @@ def test_random_walk():
fail = True fail = True
assert fail assert fail
metapath = ['follow', 'view', 'viewed-by'] * 2
traces, eids, ntypes = dgl.sampling.random_walk(
g3, [0, 1, 2, 0, 1, 2], metapath=metapath, return_eids=True)
check_random_walk(g3, metapath, traces, ntypes, trace_eids=eids)
metapath = ['follow', 'view', 'viewed-by'] * 2
traces, eids, ntypes = dgl.sampling.random_walk(
g4, [0, 1, 2, 3, 0, 1, 2, 3], metapath=metapath, return_eids=True)
check_random_walk(g4, metapath, traces, ntypes, trace_eids=eids)
traces, eids, ntypes = dgl.sampling.random_walk(
g4, [0, 1, 2, 0, 1, 2], metapath=metapath, return_eids=True)
check_random_walk(g4, metapath, traces, ntypes, trace_eids=eids)
metapath = ['follow', 'view', 'viewed-by'] * 2 metapath = ['follow', 'view', 'viewed-by'] * 2
traces, eids, ntypes = dgl.sampling.random_walk( traces, eids, ntypes = dgl.sampling.random_walk(
g4, [0, 1, 2, 3, 0, 1, 2, 3], metapath=metapath, prob='p', return_eids=True) g4, [0, 1, 2, 3, 0, 1, 2, 3], metapath=metapath, prob='p', return_eids=True)
...@@ -113,6 +69,83 @@ def test_random_walk(): ...@@ -113,6 +69,83 @@ def test_random_walk():
check_random_walk(g4, metapath, traces[:, :7], ntypes[:7], 'p', trace_eids=eids) check_random_walk(g4, metapath, traces[:, :7], ntypes[:7], 'p', trace_eids=eids)
assert (F.asnumpy(traces[:, 7]) == -1).all() assert (F.asnumpy(traces[:, 7]) == -1).all()
def _use_uva():
if F._default_context_str == 'cpu':
return [False]
else:
return [True, False]
@pytest.mark.parametrize('use_uva', _use_uva())
def test_uniform_random_walk(use_uva):
g1 = dgl.heterograph({
('user', 'follow', 'user'): ([0, 1, 2], [1, 2, 0])
})
g2 = dgl.heterograph({
('user', 'follow', 'user'): ([0, 1, 1, 2, 3], [1, 2, 3, 0, 0])
})
g3 = dgl.heterograph({
('user', 'follow', 'user'): ([0, 1, 2], [1, 2, 0]),
('user', 'view', 'item'): ([0, 1, 2], [0, 1, 2]),
('item', 'viewed-by', 'user'): ([0, 1, 2], [0, 1, 2])
})
g4 = dgl.heterograph({
('user', 'follow', 'user'): ([0, 1, 1, 2, 3], [1, 2, 3, 0, 0]),
('user', 'view', 'item'): ([0, 0, 1, 2, 3, 3], [0, 1, 1, 2, 2, 1]),
('item', 'viewed-by', 'user'): ([0, 1, 1, 2, 2, 1], [0, 0, 1, 2, 3, 3])
})
if use_uva:
for g in (g1, g2, g3, g4):
g.create_formats_()
g.pin_memory_()
elif F._default_context_str == 'gpu':
g1 = g1.to(F.ctx())
g2 = g2.to(F.ctx())
g3 = g3.to(F.ctx())
g4 = g4.to(F.ctx())
try:
traces, eids, ntypes = dgl.sampling.random_walk(
g1, F.tensor([0, 1, 2, 0, 1, 2], dtype=g1.idtype), length=4, return_eids=True)
check_random_walk(g1, ['follow'] * 4, traces, ntypes, trace_eids=eids)
if F._default_context_str == 'cpu':
with pytest.raises(dgl.DGLError):
dgl.sampling.random_walk(g1, F.tensor([0, 1, 2, 10], dtype=g1.idtype), length=4, return_eids=True)
traces, eids, ntypes = dgl.sampling.random_walk(
g1, F.tensor([0, 1, 2, 0, 1, 2], dtype=g1.idtype), length=4, restart_prob=0., return_eids=True)
check_random_walk(g1, ['follow'] * 4, traces, ntypes, trace_eids=eids)
traces, ntypes = dgl.sampling.random_walk(
g1, F.tensor([0, 1, 2, 0, 1, 2], dtype=g1.idtype), length=4, restart_prob=F.zeros((4,), F.float32))
check_random_walk(g1, ['follow'] * 4, traces, ntypes)
traces, ntypes = dgl.sampling.random_walk(
g1, F.tensor([0, 1, 2, 0, 1, 2], dtype=g1.idtype), length=5,
restart_prob=F.tensor([0, 0, 0, 0, 1], dtype=F.float32))
check_random_walk(
g1, ['follow'] * 4, F.slice_axis(traces, 1, 0, 5), F.slice_axis(ntypes, 0, 0, 5))
assert (F.asnumpy(traces)[:, 5] == -1).all()
traces, eids, ntypes = dgl.sampling.random_walk(
g2, F.tensor([0, 1, 2, 3, 0, 1, 2, 3], dtype=g2.idtype), length=4, return_eids=True)
check_random_walk(g2, ['follow'] * 4, traces, ntypes, trace_eids=eids)
metapath = ['follow', 'view', 'viewed-by'] * 2
traces, eids, ntypes = dgl.sampling.random_walk(
g3, F.tensor([0, 1, 2, 0, 1, 2], dtype=g3.idtype), metapath=metapath, return_eids=True)
check_random_walk(g3, metapath, traces, ntypes, trace_eids=eids)
metapath = ['follow', 'view', 'viewed-by'] * 2
traces, eids, ntypes = dgl.sampling.random_walk(
g4, F.tensor([0, 1, 2, 3, 0, 1, 2, 3], dtype=g4.idtype), metapath=metapath, return_eids=True)
check_random_walk(g4, metapath, traces, ntypes, trace_eids=eids)
traces, eids, ntypes = dgl.sampling.random_walk(
g4, F.tensor([0, 1, 2, 0, 1, 2], dtype=g4.idtype), metapath=metapath, return_eids=True)
check_random_walk(g4, metapath, traces, ntypes, trace_eids=eids)
finally: # make sure to unpin the graphs even if some test fails
for g in (g1, g2, g3, g4):
if g.is_pinned():
g.unpin_memory_()
@unittest.skipIf(F._default_context_str == 'gpu', reason="GPU random walk not implemented") @unittest.skipIf(F._default_context_str == 'gpu', reason="GPU random walk not implemented")
def test_node2vec(): def test_node2vec():
g1 = dgl.heterograph({ g1 = dgl.heterograph({
...@@ -146,9 +179,10 @@ def test_pack_traces(): ...@@ -146,9 +179,10 @@ def test_pack_traces():
assert F.array_equal(result[2], F.tensor([2, 7], dtype=F.int64)) assert F.array_equal(result[2], F.tensor([2, 7], dtype=F.int64))
assert F.array_equal(result[3], F.tensor([0, 2], dtype=F.int64)) assert F.array_equal(result[3], F.tensor([0, 2], dtype=F.int64))
def test_pinsage_sampling(): @pytest.mark.parametrize('use_uva', _use_uva())
def test_pinsage_sampling(use_uva):
def _test_sampler(g, sampler, ntype): def _test_sampler(g, sampler, ntype):
seeds = F.copy_to(F.tensor([0, 2], dtype=F.int64), F.ctx()) seeds = F.copy_to(F.tensor([0, 2], dtype=g.idtype), F.ctx())
neighbor_g = sampler(seeds) neighbor_g = sampler(seeds)
assert neighbor_g.ntypes == [ntype] assert neighbor_g.ntypes == [ntype]
u, v = neighbor_g.all_edges(form='uv', order='eid') u, v = neighbor_g.all_edges(form='uv', order='eid')
...@@ -159,7 +193,12 @@ def test_pinsage_sampling(): ...@@ -159,7 +193,12 @@ def test_pinsage_sampling():
g = dgl.heterograph({ g = dgl.heterograph({
('item', 'bought-by', 'user'): ([0, 0, 1, 1, 2, 2, 3, 3], [0, 1, 0, 1, 2, 3, 2, 3]), ('item', 'bought-by', 'user'): ([0, 0, 1, 1, 2, 2, 3, 3], [0, 1, 0, 1, 2, 3, 2, 3]),
('user', 'bought', 'item'): ([0, 1, 0, 1, 2, 3, 2, 3], [0, 0, 1, 1, 2, 2, 3, 3])}) ('user', 'bought', 'item'): ([0, 1, 0, 1, 2, 3, 2, 3], [0, 0, 1, 1, 2, 2, 3, 3])})
if use_uva:
g.create_formats_()
g.pin_memory_()
elif F._default_context_str == 'gpu':
g = g.to(F.ctx()) g = g.to(F.ctx())
try:
sampler = dgl.sampling.PinSAGESampler(g, 'item', 'user', 4, 0.5, 3, 2) sampler = dgl.sampling.PinSAGESampler(g, 'item', 'user', 4, 0.5, 3, 2)
_test_sampler(g, sampler, 'item') _test_sampler(g, sampler, 'item')
sampler = dgl.sampling.RandomWalkNeighborSampler(g, 4, 0.5, 3, 2, ['bought-by', 'bought']) sampler = dgl.sampling.RandomWalkNeighborSampler(g, 4, 0.5, 3, 2, ['bought-by', 'bought'])
...@@ -167,18 +206,39 @@ def test_pinsage_sampling(): ...@@ -167,18 +206,39 @@ def test_pinsage_sampling():
sampler = dgl.sampling.RandomWalkNeighborSampler(g, 4, 0.5, 3, 2, sampler = dgl.sampling.RandomWalkNeighborSampler(g, 4, 0.5, 3, 2,
[('item', 'bought-by', 'user'), ('user', 'bought', 'item')]) [('item', 'bought-by', 'user'), ('user', 'bought', 'item')])
_test_sampler(g, sampler, 'item') _test_sampler(g, sampler, 'item')
finally:
if g.is_pinned():
g.unpin_memory_()
g = dgl.graph(([0, 0, 1, 1, 2, 2, 3, 3], g = dgl.graph(([0, 0, 1, 1, 2, 2, 3, 3],
[0, 1, 0, 1, 2, 3, 2, 3])) [0, 1, 0, 1, 2, 3, 2, 3]))
if use_uva:
g.create_formats_()
g.pin_memory_()
elif F._default_context_str == 'gpu':
g = g.to(F.ctx()) g = g.to(F.ctx())
try:
sampler = dgl.sampling.RandomWalkNeighborSampler(g, 4, 0.5, 3, 2) sampler = dgl.sampling.RandomWalkNeighborSampler(g, 4, 0.5, 3, 2)
_test_sampler(g, sampler, g.ntypes[0]) _test_sampler(g, sampler, g.ntypes[0])
finally:
if g.is_pinned():
g.unpin_memory_()
g = dgl.heterograph({ g = dgl.heterograph({
('A', 'AB', 'B'): ([0, 2], [1, 3]), ('A', 'AB', 'B'): ([0, 2], [1, 3]),
('B', 'BC', 'C'): ([1, 3], [2, 1]), ('B', 'BC', 'C'): ([1, 3], [2, 1]),
('C', 'CA', 'A'): ([2, 1], [0, 2])}) ('C', 'CA', 'A'): ([2, 1], [0, 2])})
if use_uva:
g.create_formats_()
g.pin_memory_()
elif F._default_context_str == 'gpu':
g = g.to(F.ctx()) g = g.to(F.ctx())
try:
sampler = dgl.sampling.RandomWalkNeighborSampler(g, 4, 0.5, 3, 2, ['AB', 'BC', 'CA']) sampler = dgl.sampling.RandomWalkNeighborSampler(g, 4, 0.5, 3, 2, ['AB', 'BC', 'CA'])
_test_sampler(g, sampler, 'A') _test_sampler(g, sampler, 'A')
finally:
if g.is_pinned():
g.unpin_memory_()
def _gen_neighbor_sampling_test_graph(hypersparse, reverse): def _gen_neighbor_sampling_test_graph(hypersparse, reverse):
if hypersparse: if hypersparse:
...@@ -930,7 +990,8 @@ if __name__ == '__main__': ...@@ -930,7 +990,8 @@ if __name__ == '__main__':
from itertools import product from itertools import product
for args in product(['coo', 'csr', 'csc'], ['in', 'out'], [False, True]): for args in product(['coo', 'csr', 'csc'], ['in', 'out'], [False, True]):
test_sample_neighbors_etype_homogeneous(*args) test_sample_neighbors_etype_homogeneous(*args)
test_random_walk() test_non_uniform_random_walk()
test_uniform_random_walk(False)
test_pack_traces() test_pack_traces()
test_pinsage_sampling() test_pinsage_sampling()
test_sample_neighbors_outedge() test_sample_neighbors_outedge()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment