Unverified Commit f5183820 authored by Tomasz Patejko's avatar Tomasz Patejko Committed by GitHub
Browse files

[Performance, CPU] Rewriting OpenMP pragmas into parallel_for (#3171)

* [CPU, Parallel] Rewriting omp pragmas with parallel_for

* [CPU, Parallel] Decrease number of calls to task function

* c[CPU, Parallel] Modify calls to new interface of parallel_for
parent 21a40279
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include <dgl/runtime/device_api.h> #include <dgl/runtime/device_api.h>
#include <dgl/random.h> #include <dgl/random.h>
#include <dgl/runtime/parallel_for.h>
#include <dmlc/omp.h> #include <dmlc/omp.h>
#include <vector> #include <vector>
#include <tuple> #include <tuple>
...@@ -234,21 +235,23 @@ void KdTreeKNN(const NDArray& data_points, const IdArray& data_offsets, ...@@ -234,21 +235,23 @@ void KdTreeKNN(const NDArray& data_points, const IdArray& data_offsets,
KDTreeNDArrayAdapter<FloatType, IdType> kdtree(feature_size, current_data_points); KDTreeNDArrayAdapter<FloatType, IdType> kdtree(feature_size, current_data_points);
// query // query
std::vector<IdType> out_buffer(k); parallel_for(0, q_length, [&](IdType b, IdType e) {
std::vector<FloatType> out_dist_buffer(k); for (auto q = b; q < e; ++q) {
#pragma omp parallel for firstprivate(out_buffer) firstprivate(out_dist_buffer) std::vector<IdType> out_buffer(k);
for (IdType q = 0; q < q_length; ++q) { std::vector<FloatType> out_dist_buffer(k);
auto curr_out_offset = k * q + out_offset;
const FloatType* q_point = current_query_pts_data + q * feature_size; auto curr_out_offset = k * q + out_offset;
size_t num_matches = kdtree.GetIndex()->knnSearch( const FloatType* q_point = current_query_pts_data + q * feature_size;
q_point, k, out_buffer.data(), out_dist_buffer.data()); size_t num_matches = kdtree.GetIndex()->knnSearch(
q_point, k, out_buffer.data(), out_dist_buffer.data());
for (size_t i = 0; i < num_matches; ++i) {
query_out[curr_out_offset] = q + q_offset; for (size_t i = 0; i < num_matches; ++i) {
data_out[curr_out_offset] = out_buffer[i] + d_offset; query_out[curr_out_offset] = q + q_offset;
curr_out_offset++; data_out[curr_out_offset] = out_buffer[i] + d_offset;
curr_out_offset++;
}
} }
} });
} }
} }
...@@ -271,30 +274,32 @@ void BruteForceKNN(const NDArray& data_points, const IdArray& data_offsets, ...@@ -271,30 +274,32 @@ void BruteForceKNN(const NDArray& data_points, const IdArray& data_offsets,
std::vector<FloatType> dist_buffer(k); std::vector<FloatType> dist_buffer(k);
#pragma omp parallel for firstprivate(dist_buffer) parallel_for(q_start, q_end, [&](IdType b, IdType e) {
for (IdType q_idx = q_start; q_idx < q_end; ++q_idx) { for (auto q_idx = b; q_idx < e; ++q_idx) {
for (IdType k_idx = 0; k_idx < k; ++k_idx) { std::vector<FloatType> dist_buffer(k);
query_out[q_idx * k + k_idx] = q_idx; for (IdType k_idx = 0; k_idx < k; ++k_idx) {
dist_buffer[k_idx] = std::numeric_limits<FloatType>::max(); query_out[q_idx * k + k_idx] = q_idx;
} dist_buffer[k_idx] = std::numeric_limits<FloatType>::max();
FloatType worst_dist = std::numeric_limits<FloatType>::max(); }
FloatType worst_dist = std::numeric_limits<FloatType>::max();
for (IdType d_idx = d_start; d_idx < d_end; ++d_idx) { for (IdType d_idx = d_start; d_idx < d_end; ++d_idx) {
FloatType tmp_dist = EuclideanDistWithCheck<FloatType, IdType>( FloatType tmp_dist = EuclideanDistWithCheck<FloatType, IdType>(
query_points_data + q_idx * feature_size, query_points_data + q_idx * feature_size,
data_points_data + d_idx * feature_size, data_points_data + d_idx * feature_size,
feature_size, worst_dist); feature_size, worst_dist);
if (tmp_dist == std::numeric_limits<FloatType>::max()) { if (tmp_dist == std::numeric_limits<FloatType>::max()) {
continue; continue;
} }
IdType out_offset = q_idx * k; IdType out_offset = q_idx * k;
HeapInsert<FloatType, IdType>( HeapInsert<FloatType, IdType>(
data_out + out_offset, dist_buffer.data(), d_idx, tmp_dist, k); data_out + out_offset, dist_buffer.data(), d_idx, tmp_dist, k);
worst_dist = dist_buffer[0]; worst_dist = dist_buffer[0];
}
} }
} });
} }
} }
} // namespace impl } // namespace impl
...@@ -356,103 +361,105 @@ void NNDescent(const NDArray& points, const IdArray& offsets, ...@@ -356,103 +361,105 @@ void NNDescent(const NDArray& points, const IdArray& offsets,
IdType segment_size = point_idx_end - point_idx_start; IdType segment_size = point_idx_end - point_idx_start;
// random initialization // random initialization
#pragma omp parallel for runtime::parallel_for(point_idx_start, point_idx_end, [&](size_t b, size_t e) {
for (IdType i = point_idx_start; i < point_idx_end; ++i) { for (auto i = b; i < e; ++i) {
IdType local_idx = i - point_idx_start; IdType local_idx = i - point_idx_start;
dgl::RandomEngine::ThreadLocal()->UniformChoice<IdType>( dgl::RandomEngine::ThreadLocal()->UniformChoice<IdType>(
k, segment_size, neighbors + i * k, false); k, segment_size, neighbors + i * k, false);
for (IdType n = 0; n < k; ++n) { for (IdType n = 0; n < k; ++n) {
central_nodes[i * k + n] = i; central_nodes[i * k + n] = i;
neighbors[i * k + n] += point_idx_start; neighbors[i * k + n] += point_idx_start;
flags[local_idx * k + n] = true; flags[local_idx * k + n] = true;
neighbors_dists[local_idx * k + n] = impl::EuclideanDist<FloatType, IdType>( neighbors_dists[local_idx * k + n] = impl::EuclideanDist<FloatType, IdType>(
points_data + i * feature_size, points_data + i * feature_size,
points_data + neighbors[i * k + n] * feature_size, points_data + neighbors[i * k + n] * feature_size,
feature_size); feature_size);
}
impl::BuildHeap<FloatType, IdType>(neighbors + i * k, neighbors_dists + local_idx * k, k);
} }
impl::BuildHeap<FloatType, IdType>(neighbors + i * k, neighbors_dists + local_idx * k, k); });
}
size_t num_updates = 0; size_t num_updates = 0;
for (int iter = 0; iter < num_iters; ++iter) { for (int iter = 0; iter < num_iters; ++iter) {
num_updates = 0; num_updates = 0;
// initialize candidates array as empty value // initialize candidates array as empty value
#pragma omp parallel for runtime::parallel_for(point_idx_start, point_idx_end, [&](size_t b, size_t e) {
for (IdType i = point_idx_start; i < point_idx_end; ++i) { for (auto i = b; i < e; ++i) {
IdType local_idx = i - point_idx_start; IdType local_idx = i - point_idx_start;
for (IdType c = 0; c < num_candidates; ++c) { for (IdType c = 0; c < num_candidates; ++c) {
new_candidates[local_idx * num_candidates + c] = num_nodes; new_candidates[local_idx * num_candidates + c] = num_nodes;
old_candidates[local_idx * num_candidates + c] = num_nodes; old_candidates[local_idx * num_candidates + c] = num_nodes;
new_candidates_dists[local_idx * num_candidates + c] = new_candidates_dists[local_idx * num_candidates + c] =
std::numeric_limits<FloatType>::max(); std::numeric_limits<FloatType>::max();
old_candidates_dists[local_idx * num_candidates + c] = old_candidates_dists[local_idx * num_candidates + c] =
std::numeric_limits<FloatType>::max(); std::numeric_limits<FloatType>::max();
}
} }
} });
// randomly select neighbors as candidates // randomly select neighbors as candidates
int tid, num_threads; int num_threads = omp_get_max_threads();
#pragma omp parallel private(tid, num_threads) runtime::parallel_for(0, num_threads, [&](size_t b, size_t e) {
{ for (auto tid = b; tid < e; ++tid) {
tid = omp_get_thread_num(); for (IdType i = point_idx_start; i < point_idx_end; ++i) {
num_threads = omp_get_num_threads(); IdType local_idx = i - point_idx_start;
for (IdType i = point_idx_start; i < point_idx_end; ++i) { for (IdType n = 0; n < k; ++n) {
IdType local_idx = i - point_idx_start; IdType neighbor_idx = neighbors[i * k + n];
for (IdType n = 0; n < k; ++n) { bool is_new = flags[local_idx * k + n];
IdType neighbor_idx = neighbors[i * k + n]; IdType local_neighbor_idx = neighbor_idx - point_idx_start;
bool is_new = flags[local_idx * k + n]; FloatType random_dist = dgl::RandomEngine::ThreadLocal()->Uniform<FloatType>();
IdType local_neighbor_idx = neighbor_idx - point_idx_start;
FloatType random_dist = dgl::RandomEngine::ThreadLocal()->Uniform<FloatType>(); if (is_new) {
if (local_idx % num_threads == tid) {
if (is_new) { impl::HeapInsert<FloatType, IdType>(
if (local_idx % num_threads == tid) { new_candidates + local_idx * num_candidates,
impl::HeapInsert<FloatType, IdType>( new_candidates_dists + local_idx * num_candidates,
new_candidates + local_idx * num_candidates, neighbor_idx, random_dist, num_candidates, true);
new_candidates_dists + local_idx * num_candidates, }
neighbor_idx, random_dist, num_candidates, true); if (local_neighbor_idx % num_threads == tid) {
} impl::HeapInsert<FloatType, IdType>(
if (local_neighbor_idx % num_threads == tid) { new_candidates + local_neighbor_idx * num_candidates,
impl::HeapInsert<FloatType, IdType>( new_candidates_dists + local_neighbor_idx * num_candidates,
new_candidates + local_neighbor_idx * num_candidates, i, random_dist, num_candidates, true);
new_candidates_dists + local_neighbor_idx * num_candidates, }
i, random_dist, num_candidates, true); } else {
} if (local_idx % num_threads == tid) {
} else { impl::HeapInsert<FloatType, IdType>(
if (local_idx % num_threads == tid) { old_candidates + local_idx * num_candidates,
impl::HeapInsert<FloatType, IdType>( old_candidates_dists + local_idx * num_candidates,
old_candidates + local_idx * num_candidates, neighbor_idx, random_dist, num_candidates, true);
old_candidates_dists + local_idx * num_candidates, }
neighbor_idx, random_dist, num_candidates, true); if (local_neighbor_idx % num_threads == tid) {
} impl::HeapInsert<FloatType, IdType>(
if (local_neighbor_idx % num_threads == tid) { old_candidates + local_neighbor_idx * num_candidates,
impl::HeapInsert<FloatType, IdType>( old_candidates_dists + local_neighbor_idx * num_candidates,
old_candidates + local_neighbor_idx * num_candidates, i, random_dist, num_candidates, true);
old_candidates_dists + local_neighbor_idx * num_candidates, }
i, random_dist, num_candidates, true);
} }
} }
} }
} }
} });
// mark all elements in new_candidates as false // mark all elements in new_candidates as false
#pragma omp parallel for runtime::parallel_for(point_idx_start, point_idx_end, [&](size_t b, size_t e) {
for (IdType i = point_idx_start; i < point_idx_end; ++i) { for (auto i = b; i < e; ++i) {
IdType local_idx = i - point_idx_start; IdType local_idx = i - point_idx_start;
for (IdType n = 0; n < k; ++n) { for (IdType n = 0; n < k; ++n) {
IdType n_idx = neighbors[i * k + n]; IdType n_idx = neighbors[i * k + n];
for (IdType c = 0; c < num_candidates; ++c) { for (IdType c = 0; c < num_candidates; ++c) {
if (new_candidates[local_idx * num_candidates + c] == n_idx) { if (new_candidates[local_idx * num_candidates + c] == n_idx) {
flags[local_idx * k + n] = false; flags[local_idx * k + n] = false;
break; break;
}
} }
} }
} }
} });
// update neighbors block by block // update neighbors block by block
for (IdType block_start = point_idx_start; for (IdType block_start = point_idx_start;
...@@ -463,55 +470,57 @@ void NNDescent(const NDArray& points, const IdArray& offsets, ...@@ -463,55 +470,57 @@ void NNDescent(const NDArray& points, const IdArray& offsets,
nnd_updates_t updates(block_size); nnd_updates_t updates(block_size);
// generate updates // generate updates
#pragma omp parallel for runtime::parallel_for(block_start, block_end, [&](size_t b, size_t e) {
for (IdType i = block_start; i < block_end; ++i) { for (auto i = b; i < e; ++i) {
IdType local_idx = i - point_idx_start; IdType local_idx = i - point_idx_start;
for (IdType c1 = 0; c1 < num_candidates; ++c1) { for (IdType c1 = 0; c1 < num_candidates; ++c1) {
IdType new_c1 = new_candidates[local_idx * num_candidates + c1]; IdType new_c1 = new_candidates[local_idx * num_candidates + c1];
if (new_c1 == num_nodes) continue; if (new_c1 == num_nodes) continue;
IdType c1_local = new_c1 - point_idx_start; IdType c1_local = new_c1 - point_idx_start;
// new-new // new-new
for (IdType c2 = c1; c2 < num_candidates; ++c2) { for (IdType c2 = c1; c2 < num_candidates; ++c2) {
IdType new_c2 = new_candidates[local_idx * num_candidates + c2]; IdType new_c2 = new_candidates[local_idx * num_candidates + c2];
if (new_c2 == num_nodes) continue; if (new_c2 == num_nodes) continue;
IdType c2_local = new_c2 - point_idx_start; IdType c2_local = new_c2 - point_idx_start;
FloatType worst_c1_dist = neighbors_dists[c1_local * k]; FloatType worst_c1_dist = neighbors_dists[c1_local * k];
FloatType worst_c2_dist = neighbors_dists[c2_local * k]; FloatType worst_c2_dist = neighbors_dists[c2_local * k];
FloatType new_dist = impl::EuclideanDistWithCheck<FloatType, IdType>( FloatType new_dist = impl::EuclideanDistWithCheck<FloatType, IdType>(
points_data + new_c1 * feature_size, points_data + new_c1 * feature_size,
points_data + new_c2 * feature_size, points_data + new_c2 * feature_size,
feature_size, feature_size,
std::max(worst_c1_dist, worst_c2_dist)); std::max(worst_c1_dist, worst_c2_dist));
if (new_dist < worst_c1_dist || new_dist < worst_c2_dist) { if (new_dist < worst_c1_dist || new_dist < worst_c2_dist) {
updates[i - block_start].push_back(std::make_tuple(new_c1, new_c2, new_dist)); updates[i - block_start].push_back(std::make_tuple(new_c1, new_c2, new_dist));
}
} }
}
// new-old // new-old
for (IdType c2 = 0; c2 < num_candidates; ++c2) { for (IdType c2 = 0; c2 < num_candidates; ++c2) {
IdType old_c2 = old_candidates[local_idx * num_candidates + c2]; IdType old_c2 = old_candidates[local_idx * num_candidates + c2];
if (old_c2 == num_nodes) continue; if (old_c2 == num_nodes) continue;
IdType c2_local = old_c2 - point_idx_start; IdType c2_local = old_c2 - point_idx_start;
FloatType worst_c1_dist = neighbors_dists[c1_local * k]; FloatType worst_c1_dist = neighbors_dists[c1_local * k];
FloatType worst_c2_dist = neighbors_dists[c2_local * k]; FloatType worst_c2_dist = neighbors_dists[c2_local * k];
FloatType new_dist = impl::EuclideanDistWithCheck<FloatType, IdType>( FloatType new_dist = impl::EuclideanDistWithCheck<FloatType, IdType>(
points_data + new_c1 * feature_size, points_data + new_c1 * feature_size,
points_data + old_c2 * feature_size, points_data + old_c2 * feature_size,
feature_size, feature_size,
std::max(worst_c1_dist, worst_c2_dist)); std::max(worst_c1_dist, worst_c2_dist));
if (new_dist < worst_c1_dist || new_dist < worst_c2_dist) { if (new_dist < worst_c1_dist || new_dist < worst_c2_dist) {
updates[i - block_start].push_back(std::make_tuple(new_c1, old_c2, new_dist)); updates[i - block_start].push_back(std::make_tuple(new_c1, old_c2, new_dist));
}
} }
} }
} }
} });
int tid;
#pragma omp parallel private(tid, num_threads) reduction(+:num_updates) #pragma omp parallel private(tid, num_threads) reduction(+:num_updates)
{ {
tid = omp_get_thread_num(); tid = omp_get_thread_num();
......
...@@ -4,16 +4,17 @@ ...@@ -4,16 +4,17 @@
* \brief Call Metis partitioning * \brief Call Metis partitioning
*/ */
#if !defined(_WIN32)
#include <GKlib.h>
#endif // !defined(_WIN32)
#include <dgl/base_heterograph.h> #include <dgl/base_heterograph.h>
#include <dgl/packed_func_ext.h> #include <dgl/packed_func_ext.h>
#include <dgl/runtime/parallel_for.h>
#include "../heterograph.h" #include "../heterograph.h"
#include "../unit_graph.h" #include "../unit_graph.h"
#if !defined(_WIN32)
#include <GKlib.h>
#endif // !defined(_WIN32)
using namespace dgl::runtime; using namespace dgl::runtime;
namespace dgl { namespace dgl {
...@@ -252,15 +253,16 @@ DGL_REGISTER_GLOBAL("partition._CAPI_DGLPartitionWithHalo_Hetero") ...@@ -252,15 +253,16 @@ DGL_REGISTER_GLOBAL("partition._CAPI_DGLPartitionWithHalo_Hetero")
ugptr->GetOutCSR(); ugptr->GetOutCSR();
std::vector<std::shared_ptr<HaloHeteroSubgraph>> subgs(max_part_id + 1); std::vector<std::shared_ptr<HaloHeteroSubgraph>> subgs(max_part_id + 1);
int num_partitions = part_nodes.size(); int num_partitions = part_nodes.size();
#pragma omp parallel for runtime::parallel_for(0, num_partitions, [&](int b, int e) {
for (int i = 0; i < num_partitions; i++) { for (auto i = b; i < e; i++) {
auto nodes = aten::VecToIdArray(part_nodes[i]); auto nodes = aten::VecToIdArray(part_nodes[i]);
HaloHeteroSubgraph subg = GetSubgraphWithHalo(hgptr, nodes, num_hops); HaloHeteroSubgraph subg = GetSubgraphWithHalo(hgptr, nodes, num_hops);
std::shared_ptr<HaloHeteroSubgraph> subg_ptr( std::shared_ptr<HaloHeteroSubgraph> subg_ptr(
new HaloHeteroSubgraph(subg)); new HaloHeteroSubgraph(subg));
int part_id = part_ids[i]; int part_id = part_ids[i];
subgs[part_id] = subg_ptr; subgs[part_id] = subg_ptr;
} }
});
List<HeteroSubgraphRef> ret_list; List<HeteroSubgraphRef> ret_list;
for (size_t i = 0; i < subgs.size(); i++) { for (size_t i = 0; i < subgs.size(); i++) {
ret_list.push_back(HeteroSubgraphRef(subgs[i])); ret_list.push_back(HeteroSubgraphRef(subgs[i]));
......
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
#include <dmlc/omp.h> #include <dmlc/omp.h>
#include <dgl/runtime/registry.h> #include <dgl/runtime/registry.h>
#include <dgl/runtime/packed_func.h> #include <dgl/runtime/packed_func.h>
#include <dgl/runtime/parallel_for.h>
#include <dgl/random.h> #include <dgl/random.h>
#include <dgl/array.h> #include <dgl/array.h>
...@@ -21,10 +22,12 @@ namespace dgl { ...@@ -21,10 +22,12 @@ namespace dgl {
DGL_REGISTER_GLOBAL("rng._CAPI_SetSeed") DGL_REGISTER_GLOBAL("rng._CAPI_SetSeed")
.set_body([] (DGLArgs args, DGLRetValue *rv) { .set_body([] (DGLArgs args, DGLRetValue *rv) {
const int seed = args[0]; const int seed = args[0];
#pragma omp parallel for
for (int i = 0; i < omp_get_max_threads(); ++i) { runtime::parallel_for(0, omp_get_max_threads(), [&](size_t b, size_t e) {
RandomEngine::ThreadLocal()->SetSeed(seed); for (auto i = b; i < e; ++i) {
} RandomEngine::ThreadLocal()->SetSeed(seed);
}
});
#ifdef DGL_USE_CUDA #ifdef DGL_USE_CUDA
auto* thr_entry = CUDAThreadEntry::ThreadLocal(); auto* thr_entry = CUDAThreadEntry::ThreadLocal();
if (!thr_entry->curand_gen) { if (!thr_entry->curand_gen) {
......
...@@ -11,6 +11,7 @@ ...@@ -11,6 +11,7 @@
#endif #endif
#include <dgl/runtime/container.h> #include <dgl/runtime/container.h>
#include <dgl/runtime/parallel_for.h>
#include <dgl/packed_func_ext.h> #include <dgl/packed_func_ext.h>
#include <dgl/array.h> #include <dgl/array.h>
#include <dgl/random.h> #include <dgl/random.h>
...@@ -454,15 +455,16 @@ DGL_REGISTER_GLOBAL("distributed.rpc._CAPI_DGLRPCFastPull") ...@@ -454,15 +455,16 @@ DGL_REGISTER_GLOBAL("distributed.rpc._CAPI_DGLRPCFastPull")
DLContext{kDLCPU, 0}); DLContext{kDLCPU, 0});
char* return_data = static_cast<char*>(res_tensor->data); char* return_data = static_cast<char*>(res_tensor->data);
// Copy local data // Copy local data
#pragma omp parallel for parallel_for(0, local_ids.size(), [&](size_t b, size_t e) {
for (int64_t i = 0; i < local_ids.size(); ++i) { for (auto i = b; i < e; ++i) {
CHECK_GE(ID_size*row_size, local_ids_orginal[i]*row_size+row_size); CHECK_GE(ID_size*row_size, local_ids_orginal[i]*row_size+row_size);
CHECK_GE(data_size, local_ids[i] * row_size + row_size); CHECK_GE(data_size, local_ids[i] * row_size + row_size);
CHECK_GE(local_ids[i], 0); CHECK_GE(local_ids[i], 0);
memcpy(return_data + local_ids_orginal[i] * row_size, memcpy(return_data + local_ids_orginal[i] * row_size,
local_data_char + local_ids[i] * row_size, local_data_char + local_ids[i] * row_size,
row_size); row_size);
} }
});
// Recv remote message // Recv remote message
for (int i = 0; i < msg_count; ++i) { for (int i = 0; i < msg_count; ++i) {
RPCMessage msg; RPCMessage msg;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment