Unverified Commit a04a8d06 authored by Quan (Andy) Gan's avatar Quan (Andy) Gan Committed by GitHub
Browse files

[Feature] Graceful handling of exceptions thrown within OpenMP blocks (#3353)



* graceful c++ exception in OpenMP

* credits

* add test
Co-authored-by: default avatarJinjing Zhou <VoVAllen@users.noreply.github.com>
parent bc14829f
...@@ -10,6 +10,8 @@ ...@@ -10,6 +10,8 @@
#include <algorithm> #include <algorithm>
#include <string> #include <string>
#include <cstdlib> #include <cstdlib>
#include <exception>
#include <atomic>
namespace { namespace {
int64_t divup(int64_t x, int64_t y) { int64_t divup(int64_t x, int64_t y) {
...@@ -67,6 +69,9 @@ void parallel_for( ...@@ -67,6 +69,9 @@ void parallel_for(
#ifdef _OPENMP #ifdef _OPENMP
auto num_threads = compute_num_threads(begin, end, grain_size); auto num_threads = compute_num_threads(begin, end, grain_size);
// (BarclayII) the exception code is borrowed from PyTorch.
std::atomic_flag err_flag = ATOMIC_FLAG_INIT;
std::exception_ptr eptr;
#pragma omp parallel num_threads(num_threads) #pragma omp parallel num_threads(num_threads)
{ {
...@@ -75,9 +80,16 @@ void parallel_for( ...@@ -75,9 +80,16 @@ void parallel_for(
auto begin_tid = begin + tid * chunk_size; auto begin_tid = begin + tid * chunk_size;
if (begin_tid < end) { if (begin_tid < end) {
auto end_tid = std::min(end, chunk_size + begin_tid); auto end_tid = std::min(end, chunk_size + begin_tid);
f(begin_tid, end_tid); try {
f(begin_tid, end_tid);
} catch (...) {
if (!err_flag.test_and_set())
eptr = std::current_exception();
}
} }
} }
if (eptr)
std::rethrow_exception(eptr);
#else #else
f(begin, end); f(begin, end);
#endif #endif
......
...@@ -176,6 +176,8 @@ std::pair<IdArray, IdArray> MetapathBasedRandomWalk( ...@@ -176,6 +176,8 @@ std::pair<IdArray, IdArray> MetapathBasedRandomWalk(
TerminatePredicate<IdxType> terminate) { TerminatePredicate<IdxType> terminate) {
int64_t max_num_steps = metapath->shape[0]; int64_t max_num_steps = metapath->shape[0];
const IdxType *metapath_data = static_cast<IdxType *>(metapath->data); const IdxType *metapath_data = static_cast<IdxType *>(metapath->data);
const int64_t begin_ntype = hg->meta_graph()->FindEdge(metapath_data[0]).first;
const int64_t max_nodes = hg->NumVertices(begin_ntype);
// Prefetch all edges. // Prefetch all edges.
// This forces the heterograph to materialize all OutCSR's before the OpenMP loop; // This forces the heterograph to materialize all OutCSR's before the OpenMP loop;
...@@ -206,7 +208,7 @@ std::pair<IdArray, IdArray> MetapathBasedRandomWalk( ...@@ -206,7 +208,7 @@ std::pair<IdArray, IdArray> MetapathBasedRandomWalk(
return MetapathRandomWalkStep<XPU, IdxType>( return MetapathRandomWalkStep<XPU, IdxType>(
data, curr, len, edges_by_type, csr_has_data, metapath_data, prob, terminate); data, curr, len, edges_by_type, csr_has_data, metapath_data, prob, terminate);
}; };
return GenericRandomWalk<XPU, IdxType>(seeds, max_num_steps, step); return GenericRandomWalk<XPU, IdxType>(seeds, max_num_steps, step, max_nodes);
} else { } else {
StepFunc<IdxType> step = StepFunc<IdxType> step =
[&edges_by_type, &csr_has_data, metapath_data, &prob, terminate] [&edges_by_type, &csr_has_data, metapath_data, &prob, terminate]
...@@ -214,7 +216,7 @@ std::pair<IdArray, IdArray> MetapathBasedRandomWalk( ...@@ -214,7 +216,7 @@ std::pair<IdArray, IdArray> MetapathBasedRandomWalk(
return MetapathRandomWalkStepUniform<XPU, IdxType>( return MetapathRandomWalkStepUniform<XPU, IdxType>(
data, curr, len, edges_by_type, csr_has_data, metapath_data, prob, terminate); data, curr, len, edges_by_type, csr_has_data, metapath_data, prob, terminate);
}; };
return GenericRandomWalk<XPU, IdxType>(seeds, max_num_steps, step); return GenericRandomWalk<XPU, IdxType>(seeds, max_num_steps, step, max_nodes);
} }
} }
......
...@@ -163,7 +163,7 @@ std::pair<IdArray, IdArray> Node2vecRandomWalk( ...@@ -163,7 +163,7 @@ std::pair<IdArray, IdArray> Node2vecRandomWalk(
edges, csr_has_data, prob, terminate); edges, csr_has_data, prob, terminate);
}; };
return GenericRandomWalk<XPU, IdxType>(seeds, max_num_steps, step); return GenericRandomWalk<XPU, IdxType>(seeds, max_num_steps, step, g->NumVertices(0));
} }
}; // namespace }; // namespace
......
...@@ -31,6 +31,7 @@ namespace { ...@@ -31,6 +31,7 @@ namespace {
* edge type in the metapath. * edge type in the metapath.
* \param max_num_steps The maximum number of steps of a random walk path. * \param max_num_steps The maximum number of steps of a random walk path.
* \param step The random walk step function with type \c StepFunc. * \param step The random walk step function with type \c StepFunc.
* \param max_nodes Throws an error if one of the values in \c seeds exceeds this argument.
* \return A 2D array of shape (len(seeds), max_num_steps + 1) with node IDs. * \return A 2D array of shape (len(seeds), max_num_steps + 1) with node IDs.
* \note The graph itself should be bounded in the closure of \c step. * \note The graph itself should be bounded in the closure of \c step.
*/ */
...@@ -38,7 +39,8 @@ template<DLDeviceType XPU, typename IdxType> ...@@ -38,7 +39,8 @@ template<DLDeviceType XPU, typename IdxType>
std::pair<IdArray, IdArray> GenericRandomWalk( std::pair<IdArray, IdArray> GenericRandomWalk(
const IdArray seeds, const IdArray seeds,
int64_t max_num_steps, int64_t max_num_steps,
StepFunc<IdxType> step) { StepFunc<IdxType> step,
int64_t max_nodes) {
int64_t num_seeds = seeds->shape[0]; int64_t num_seeds = seeds->shape[0];
int64_t trace_length = max_num_steps + 1; int64_t trace_length = max_num_steps + 1;
IdArray traces = IdArray::Empty({num_seeds, trace_length}, seeds->dtype, seeds->ctx); IdArray traces = IdArray::Empty({num_seeds, trace_length}, seeds->dtype, seeds->ctx);
...@@ -54,6 +56,8 @@ std::pair<IdArray, IdArray> GenericRandomWalk( ...@@ -54,6 +56,8 @@ std::pair<IdArray, IdArray> GenericRandomWalk(
dgl_id_t curr = seed_data[seed_id]; dgl_id_t curr = seed_data[seed_id];
traces_data[seed_id * trace_length] = curr; traces_data[seed_id * trace_length] = curr;
CHECK_LT(curr, max_nodes) << "Seed node ID exceeds the maximum number of nodes.";
for (i = 0; i < max_num_steps; ++i) { for (i = 0; i < max_num_steps; ++i) {
const auto &succ = step(traces_data + seed_id * max_num_steps, curr, i); const auto &succ = step(traces_data + seed_id * max_num_steps, curr, i);
traces_data[seed_id * trace_length + i + 1] = curr = std::get<0>(succ); traces_data[seed_id * trace_length + i + 1] = curr = std::get<0>(succ);
......
...@@ -48,6 +48,12 @@ def test_random_walk(): ...@@ -48,6 +48,12 @@ def test_random_walk():
traces, eids, ntypes = dgl.sampling.random_walk(g1, [0, 1, 2, 0, 1, 2], length=4, return_eids=True) traces, eids, ntypes = dgl.sampling.random_walk(g1, [0, 1, 2, 0, 1, 2], length=4, return_eids=True)
check_random_walk(g1, ['follow'] * 4, traces, ntypes, trace_eids=eids) check_random_walk(g1, ['follow'] * 4, traces, ntypes, trace_eids=eids)
try:
dgl.sampling.random_walk(g1, [0, 1, 2, 10], length=4, return_eids=True)
fail = False # shouldn't abort
except:
fail = True
assert fail
traces, eids, ntypes = dgl.sampling.random_walk(g1, [0, 1, 2, 0, 1, 2], length=4, restart_prob=0., return_eids=True) traces, eids, ntypes = dgl.sampling.random_walk(g1, [0, 1, 2, 0, 1, 2], length=4, restart_prob=0., return_eids=True)
check_random_walk(g1, ['follow'] * 4, traces, ntypes, trace_eids=eids) check_random_walk(g1, ['follow'] * 4, traces, ntypes, trace_eids=eids)
traces, ntypes = dgl.sampling.random_walk( traces, ntypes = dgl.sampling.random_walk(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment