"include/LightGBM/vscode:/vscode.git/clone" did not exist on "6a1ec44437bbc830e5c352aaea395994b28a79f5"
Unverified Commit 8ed371ce authored by James Lamb's avatar James Lamb Committed by GitHub
Browse files

set explicit number of threads in every OpenMP `parallel` region (#6135)

parent 992f5056
......@@ -18,3 +18,28 @@ cmakelint \
${cmake_files} \
|| exit -1
echo "done running cmakelint"
echo "checking that all OpenMP pragmas specify num_threads()"
get_omp_pragmas_without_num_threads() {
grep \
-n \
-R \
--include='*.c' \
--include='*.cc' \
--include='*.cpp' \
--include='*.h' \
--include='*.hpp' \
'pragma omp parallel' \
| grep -v ' num_threads' \
| grep -v 'openmp_wrapper.h'
}
PROBLEMATIC_LINES=$(
get_omp_pragmas_without_num_threads
)
if test "${PROBLEMATIC_LINES}" != ""; then
get_omp_pragmas_without_num_threads
echo "Found '#pragma omp parallel' not using explicit num_threads() configuration. Fix those."
echo "For details, see https://www.openmp.org/spec-html/5.0/openmpse14.html#x54-800002.6"
exit -1
fi
echo "done checking OpenMP pragmas"
......@@ -226,7 +226,7 @@ SEXP LGBM_DatasetGetSubset_R(SEXP handle,
int32_t len = static_cast<int32_t>(Rf_asInteger(len_used_row_indices));
std::vector<int32_t> idxvec(len);
// convert from one-based to zero-based index
#pragma omp parallel for schedule(static, 512) if (len >= 1024)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static, 512) if (len >= 1024)
for (int32_t i = 0; i < len; ++i) {
idxvec[i] = static_cast<int32_t>(INTEGER(used_row_indices)[i] - 1);
}
......@@ -339,7 +339,7 @@ SEXP LGBM_DatasetSetField_R(SEXP handle,
const char* name = CHAR(PROTECT(Rf_asChar(field_name)));
if (!strcmp("group", name) || !strcmp("query", name)) {
std::vector<int32_t> vec(len);
#pragma omp parallel for schedule(static, 512) if (len >= 1024)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static, 512) if (len >= 1024)
for (int i = 0; i < len; ++i) {
vec[i] = static_cast<int32_t>(INTEGER(field_data)[i]);
}
......@@ -348,7 +348,7 @@ SEXP LGBM_DatasetSetField_R(SEXP handle,
CHECK_CALL(LGBM_DatasetSetField(R_ExternalPtrAddr(handle), name, REAL(field_data), len, C_API_DTYPE_FLOAT64));
} else {
std::vector<float> vec(len);
#pragma omp parallel for schedule(static, 512) if (len >= 1024)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static, 512) if (len >= 1024)
for (int i = 0; i < len; ++i) {
vec[i] = static_cast<float>(REAL(field_data)[i]);
}
......@@ -372,19 +372,19 @@ SEXP LGBM_DatasetGetField_R(SEXP handle,
if (!strcmp("group", name) || !strcmp("query", name)) {
auto p_data = reinterpret_cast<const int32_t*>(res);
// convert from boundaries to size
#pragma omp parallel for schedule(static, 512) if (out_len >= 1024)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static, 512) if (out_len >= 1024)
for (int i = 0; i < out_len - 1; ++i) {
INTEGER(field_data)[i] = p_data[i + 1] - p_data[i];
}
} else if (!strcmp("init_score", name)) {
auto p_data = reinterpret_cast<const double*>(res);
#pragma omp parallel for schedule(static, 512) if (out_len >= 1024)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static, 512) if (out_len >= 1024)
for (int i = 0; i < out_len; ++i) {
REAL(field_data)[i] = p_data[i];
}
} else {
auto p_data = reinterpret_cast<const float*>(res);
#pragma omp parallel for schedule(static, 512) if (out_len >= 1024)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static, 512) if (out_len >= 1024)
for (int i = 0; i < out_len; ++i) {
REAL(field_data)[i] = p_data[i];
}
......@@ -611,7 +611,7 @@ SEXP LGBM_BoosterUpdateOneIterCustom_R(SEXP handle,
int is_finished = 0;
int int_len = Rf_asInteger(len);
std::vector<float> tgrad(int_len), thess(int_len);
#pragma omp parallel for schedule(static, 512) if (int_len >= 1024)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static, 512) if (int_len >= 1024)
for (int j = 0; j < int_len; ++j) {
tgrad[j] = static_cast<float>(REAL(grad)[j]);
thess[j] = static_cast<float>(REAL(hess)[j]);
......
......@@ -361,7 +361,7 @@ class FeatureGroup {
inline void FinishLoad() {
if (is_multi_val_) {
OMP_INIT_EX();
#pragma omp parallel for schedule(guided)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(guided)
for (int i = 0; i < num_feature_; ++i) {
OMP_LOOP_EX_BEGIN();
multi_bin_data_[i]->FinishLoad();
......
......@@ -185,7 +185,7 @@ class Tree {
* \param rate The factor of shrinkage
*/
virtual inline void Shrinkage(double rate) {
#pragma omp parallel for schedule(static, 1024) if (num_leaves_ >= 2048)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static, 1024) if (num_leaves_ >= 2048)
for (int i = 0; i < num_leaves_ - 1; ++i) {
leaf_value_[i] = MaybeRoundToZero(leaf_value_[i] * rate);
internal_value_[i] = MaybeRoundToZero(internal_value_[i] * rate);
......@@ -210,7 +210,7 @@ class Tree {
inline double shrinkage() const { return shrinkage_; }
virtual inline void AddBias(double val) {
#pragma omp parallel for schedule(static, 1024) if (num_leaves_ >= 2048)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static, 1024) if (num_leaves_ >= 2048)
for (int i = 0; i < num_leaves_ - 1; ++i) {
leaf_value_[i] = MaybeRoundToZero(leaf_value_[i] + val);
internal_value_[i] = MaybeRoundToZero(internal_value_[i] + val);
......@@ -218,7 +218,7 @@ class Tree {
leaf_value_[num_leaves_ - 1] =
MaybeRoundToZero(leaf_value_[num_leaves_ - 1] + val);
if (is_linear_) {
#pragma omp parallel for schedule(static, 1024) if (num_leaves_ >= 2048)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static, 1024) if (num_leaves_ >= 2048)
for (int i = 0; i < num_leaves_ - 1; ++i) {
leaf_const_[i] = MaybeRoundToZero(leaf_const_[i] + val);
}
......
......@@ -691,7 +691,7 @@ static void ParallelSort(_RanIt _First, _RanIt _Last, _Pr _Pred, _VTRanIt*) {
size_t inner_size = (len + num_threads - 1) / num_threads;
inner_size = std::max(inner_size, kMinInnerLen);
num_threads = static_cast<int>((len + inner_size - 1) / inner_size);
#pragma omp parallel for schedule(static, 1)
#pragma omp parallel for num_threads(num_threads) schedule(static, 1)
for (int i = 0; i < num_threads; ++i) {
size_t left = inner_size*i;
size_t right = left + inner_size;
......@@ -707,7 +707,7 @@ static void ParallelSort(_RanIt _First, _RanIt _Last, _Pr _Pred, _VTRanIt*) {
// Recursive merge
while (s < len) {
int loop_size = static_cast<int>((len + s * 2 - 1) / (s * 2));
#pragma omp parallel for schedule(static, 1)
#pragma omp parallel for num_threads(num_threads) schedule(static, 1)
for (int i = 0; i < loop_size; ++i) {
size_t left = i * 2 * s;
size_t mid = left + s;
......
......@@ -73,7 +73,7 @@ class Threading {
INDEX_T num_inner = end - start;
BlockInfo<INDEX_T>(num_inner, min_block_size, &n_block, &num_inner);
OMP_INIT_EX();
#pragma omp parallel for schedule(static, 1)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static, 1)
for (int i = 0; i < n_block; ++i) {
OMP_LOOP_EX_BEGIN();
INDEX_T inner_start = start + num_inner * i;
......
......@@ -227,7 +227,7 @@ void Application::Predict() {
TextReader<int> result_reader(config_.output_result.c_str(), false);
result_reader.ReadAllLines();
std::vector<std::vector<int>> pred_leaf(result_reader.Lines().size());
#pragma omp parallel for schedule(static)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static)
for (int i = 0; i < static_cast<int>(result_reader.Lines().size()); ++i) {
pred_leaf[i] = Common::StringToArray<int>(result_reader.Lines()[i], '\t');
// Free memory
......
......@@ -233,7 +233,7 @@ class Predictor {
std::vector<std::pair<int, double>> oneline_features;
std::vector<std::string> result_to_write(lines.size());
OMP_INIT_EX();
#pragma omp parallel for schedule(static) firstprivate(oneline_features)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static) firstprivate(oneline_features)
for (data_size_t i = 0; i < static_cast<data_size_t>(lines.size()); ++i) {
OMP_LOOP_EX_BEGIN();
oneline_features.clear();
......
......@@ -255,7 +255,7 @@ void GBDT::RefitTree(const std::vector<std::vector<int>>& tree_leaf_prediction)
std::vector<int> leaf_pred(num_data_);
if (linear_tree_) {
std::vector<int> max_leaves_by_thread = std::vector<int>(OMP_NUM_THREADS(), 0);
#pragma omp parallel for schedule(static)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static)
for (int i = 0; i < static_cast<int>(tree_leaf_prediction.size()); ++i) {
int tid = omp_get_thread_num();
for (size_t j = 0; j < tree_leaf_prediction[i].size(); ++j) {
......@@ -270,7 +270,7 @@ void GBDT::RefitTree(const std::vector<std::vector<int>>& tree_leaf_prediction)
Boosting();
for (int tree_id = 0; tree_id < num_tree_per_iteration_; ++tree_id) {
int model_index = iter * num_tree_per_iteration_ + tree_id;
#pragma omp parallel for schedule(static)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static)
for (int i = 0; i < num_data_; ++i) {
leaf_pred[i] = tree_leaf_prediction[i][model_index];
CHECK_LT(leaf_pred[i], models_[model_index]->num_leaves());
......@@ -348,7 +348,7 @@ bool GBDT::TrainOneIter(const score_t* gradients, const score_t* hessians) {
if (data_sample_strategy_->IsHessianChange()) {
// need to copy customized gradients when using GOSS
int64_t total_size = static_cast<int64_t>(num_data_) * num_tree_per_iteration_;
#pragma omp parallel for schedule(static)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static)
for (int64_t i = 0; i < total_size; ++i) {
gradients_[i] = gradients[i];
hessians_[i] = hessians[i];
......@@ -669,7 +669,7 @@ void GBDT::GetPredictAt(int data_idx, double* out_result, int64_t* out_len) {
}
#endif // USE_CUDA
if (objective_function_ != nullptr) {
#pragma omp parallel for schedule(static)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static)
for (data_size_t i = 0; i < num_data; ++i) {
std::vector<double> tree_pred(num_tree_per_iteration_);
for (int j = 0; j < num_tree_per_iteration_; ++j) {
......@@ -682,7 +682,7 @@ void GBDT::GetPredictAt(int data_idx, double* out_result, int64_t* out_len) {
}
}
} else {
#pragma omp parallel for schedule(static)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static)
for (data_size_t i = 0; i < num_data; ++i) {
for (int j = 0; j < num_tree_per_iteration_; ++j) {
out_result[j * num_data + i] = static_cast<double>(raw_scores[j * num_data + i]);
......
......@@ -434,7 +434,7 @@ class GBDT : public GBDTBase {
}
start_iteration_for_pred_ = start_iteration;
if (is_pred_contrib) {
#pragma omp parallel for schedule(static)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static)
for (int i = 0; i < static_cast<int>(models_.size()); ++i) {
models_[i]->RecomputeMaxDepth();
}
......
......@@ -354,7 +354,7 @@ std::string GBDT::SaveModelToString(int start_iteration, int num_iteration, int
std::vector<std::string> tree_strs(num_used_model - start_model);
std::vector<size_t> tree_sizes(num_used_model - start_model);
// output tree models
#pragma omp parallel for schedule(static)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static)
for (int i = start_model; i < num_used_model; ++i) {
const int idx = i - start_model;
tree_strs[idx] = "Tree=" + std::to_string(idx) + '\n';
......@@ -552,7 +552,7 @@ bool GBDT::LoadModelFromString(const char* buffer, size_t len) {
models_.emplace_back(nullptr);
}
OMP_INIT_EX();
#pragma omp parallel for schedule(static)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static)
for (int i = 0; i < num_trees; ++i) {
OMP_LOOP_EX_BEGIN();
auto cur_p = p + tree_boundries[i];
......
......@@ -97,7 +97,7 @@ class RF : public GBDT {
}
size_t total_size = static_cast<size_t>(num_data_) * num_tree_per_iteration_;
std::vector<double> tmp_scores(total_size, 0.0f);
#pragma omp parallel for schedule(static)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static)
for (int j = 0; j < num_tree_per_iteration_; ++j) {
size_t offset = static_cast<size_t>(j)* num_data_;
for (data_size_t i = 0; i < num_data_; ++i) {
......
......@@ -39,7 +39,7 @@ class ScoreUpdater {
Log::Fatal("Number of class for initial score error");
}
has_init_score_ = true;
#pragma omp parallel for schedule(static, 512) if (total_size >= 1024)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static, 512) if (total_size >= 1024)
for (int64_t i = 0; i < total_size; ++i) {
score_[i] = init_score[i];
}
......@@ -54,7 +54,7 @@ class ScoreUpdater {
virtual inline void AddScore(double val, int cur_tree_id) {
Common::FunctionTimer fun_timer("ScoreUpdater::AddScore", global_timer);
const size_t offset = static_cast<size_t>(num_data_) * cur_tree_id;
#pragma omp parallel for schedule(static, 512) if (num_data_ >= 1024)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static, 512) if (num_data_ >= 1024)
for (int i = 0; i < num_data_; ++i) {
score_[offset + i] += val;
}
......@@ -62,7 +62,7 @@ class ScoreUpdater {
virtual inline void MultiplyScore(double val, int cur_tree_id) {
const size_t offset = static_cast<size_t>(num_data_) * cur_tree_id;
#pragma omp parallel for schedule(static, 512) if (num_data_ >= 1024)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static, 512) if (num_data_ >= 1024)
for (int i = 0; i < num_data_; ++i) {
score_[offset + i] *= val;
}
......
......@@ -437,7 +437,7 @@ class Booster {
int64_t num_pred_in_one_row = boosting_->NumPredictOneRow(start_iteration, num_iteration, is_predict_leaf, predict_contrib);
auto pred_fun = predictor.GetPredictFunction();
OMP_INIT_EX();
#pragma omp parallel for schedule(static)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static)
for (int i = 0; i < nrow; ++i) {
OMP_LOOP_EX_BEGIN();
auto one_row = get_row_fun(i);
......@@ -459,7 +459,7 @@ class Booster {
auto pred_sparse_fun = predictor.GetPredictSparseFunction();
std::vector<std::vector<std::unordered_map<int, double>>>& agg = *agg_ptr;
OMP_INIT_EX();
#pragma omp parallel for schedule(static)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static)
for (int64_t i = 0; i < nrow; ++i) {
OMP_LOOP_EX_BEGIN();
auto one_row = get_row_fun(i);
......@@ -551,7 +551,7 @@ class Booster {
indptr_index++;
int64_t matrix_start_index = m * static_cast<int64_t>(agg.size());
OMP_INIT_EX();
#pragma omp parallel for schedule(static)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static)
for (int64_t i = 0; i < static_cast<int64_t>(agg.size()); ++i) {
OMP_LOOP_EX_BEGIN();
auto row_vector = agg[i];
......@@ -663,7 +663,7 @@ class Booster {
}
// Note: we parallelize across matrices instead of rows because of the column_counts[m][col_idx] increment inside the loop
OMP_INIT_EX();
#pragma omp parallel for schedule(static)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static)
for (int m = 0; m < num_matrices; ++m) {
OMP_LOOP_EX_BEGIN();
for (int64_t i = 0; i < static_cast<int64_t>(agg.size()); ++i) {
......@@ -1074,7 +1074,7 @@ int LGBM_DatasetPushRows(DatasetHandle dataset,
p_dataset->ResizeRaw(p_dataset->num_numeric_features() + nrow);
}
OMP_INIT_EX();
#pragma omp parallel for schedule(static)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static)
for (int i = 0; i < nrow; ++i) {
OMP_LOOP_EX_BEGIN();
const int tid = omp_get_thread_num();
......@@ -1116,7 +1116,7 @@ int LGBM_DatasetPushRowsWithMetadata(DatasetHandle dataset,
const int max_omp_threads = p_dataset->omp_max_threads() > 0 ? p_dataset->omp_max_threads() : OMP_NUM_THREADS();
OMP_INIT_EX();
#pragma omp parallel for schedule(static)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static)
for (int i = 0; i < nrow; ++i) {
OMP_LOOP_EX_BEGIN();
// convert internal thread id to be unique based on external thread id
......@@ -1153,7 +1153,7 @@ int LGBM_DatasetPushRowsByCSR(DatasetHandle dataset,
p_dataset->ResizeRaw(p_dataset->num_numeric_features() + nrow);
}
OMP_INIT_EX();
#pragma omp parallel for schedule(static)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static)
for (int i = 0; i < nrow; ++i) {
OMP_LOOP_EX_BEGIN();
const int tid = omp_get_thread_num();
......@@ -1199,7 +1199,7 @@ int LGBM_DatasetPushRowsByCSRWithMetadata(DatasetHandle dataset,
const int max_omp_threads = p_dataset->omp_max_threads() > 0 ? p_dataset->omp_max_threads() : OMP_NUM_THREADS();
OMP_INIT_EX();
#pragma omp parallel for schedule(static)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static)
for (int i = 0; i < nrow; ++i) {
OMP_LOOP_EX_BEGIN();
// convert internal thread id to be unique based on external thread id
......@@ -1319,7 +1319,7 @@ int LGBM_DatasetCreateFromMats(int32_t nmat,
int32_t start_row = 0;
for (int j = 0; j < nmat; ++j) {
OMP_INIT_EX();
#pragma omp parallel for schedule(static)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static)
for (int i = 0; i < nrow[j]; ++i) {
OMP_LOOP_EX_BEGIN();
const int tid = omp_get_thread_num();
......@@ -1394,7 +1394,7 @@ int LGBM_DatasetCreateFromCSR(const void* indptr,
}
}
OMP_INIT_EX();
#pragma omp parallel for schedule(static)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static)
for (int i = 0; i < nindptr - 1; ++i) {
OMP_LOOP_EX_BEGIN();
const int tid = omp_get_thread_num();
......@@ -1465,7 +1465,7 @@ int LGBM_DatasetCreateFromCSRFunc(void* get_row_funptr,
OMP_INIT_EX();
std::vector<std::pair<int, double>> thread_buffer;
#pragma omp parallel for schedule(static) private(thread_buffer)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static) private(thread_buffer)
for (int i = 0; i < num_rows; ++i) {
OMP_LOOP_EX_BEGIN();
{
......@@ -1506,7 +1506,7 @@ int LGBM_DatasetCreateFromCSC(const void* col_ptr,
std::vector<std::vector<double>> sample_values(ncol_ptr - 1);
std::vector<std::vector<int>> sample_idx(ncol_ptr - 1);
OMP_INIT_EX();
#pragma omp parallel for schedule(static)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static)
for (int i = 0; i < static_cast<int>(sample_values.size()); ++i) {
OMP_LOOP_EX_BEGIN();
CSC_RowIterator col_it(col_ptr, col_ptr_type, indices, data, data_type, ncol_ptr, nelem, i);
......@@ -1534,7 +1534,7 @@ int LGBM_DatasetCreateFromCSC(const void* col_ptr,
reinterpret_cast<const Dataset*>(reference));
}
OMP_INIT_EX();
#pragma omp parallel for schedule(static)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static)
for (int i = 0; i < ncol_ptr - 1; ++i) {
OMP_LOOP_EX_BEGIN();
const int tid = omp_get_thread_num();
......
......@@ -536,7 +536,7 @@ MultiValBin* Dataset::GetMultiBinFromSparseFeatures(const std::vector<uint32_t>&
std::vector<uint32_t> most_freq_bins;
double sum_sparse_rate = 0;
for (int i = 0; i < num_feature; ++i) {
#pragma omp parallel for schedule(static, 1)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static, 1)
for (int tid = 0; tid < num_threads; ++tid) {
iters[tid].emplace_back(
feature_groups_[multi_group_id]->SubFeatureIterator(i));
......@@ -584,7 +584,7 @@ MultiValBin* Dataset::GetMultiBinFromAllFeatures(const std::vector<uint32_t>& of
for (int fid = 0; fid < feature_groups_[gid]->num_feature_; ++fid) {
const auto& bin_mapper = feature_groups_[gid]->bin_mappers_[fid];
most_freq_bins.push_back(bin_mapper->GetMostFreqBin());
#pragma omp parallel for schedule(static, 1)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static, 1)
for (int tid = 0; tid < num_threads; ++tid) {
iters[tid].emplace_back(
feature_groups_[gid]->SubFeatureIterator(fid));
......@@ -823,7 +823,7 @@ void Dataset::ReSize(data_size_t num_data) {
if (num_data_ != num_data) {
num_data_ = num_data;
OMP_INIT_EX();
#pragma omp parallel for schedule(static)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static)
for (int group = 0; group < num_groups_; ++group) {
OMP_LOOP_EX_BEGIN();
feature_groups_[group]->ReSize(num_data_);
......@@ -856,7 +856,7 @@ void Dataset::CopySubrow(const Dataset* fullset,
int num_copy_tasks = static_cast<int>(group_ids.size());
OMP_INIT_EX();
#pragma omp parallel for schedule(dynamic)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(dynamic)
for (int task_id = 0; task_id < num_copy_tasks; ++task_id) {
OMP_LOOP_EX_BEGIN();
int group = group_ids[task_id];
......@@ -875,7 +875,7 @@ void Dataset::CopySubrow(const Dataset* fullset,
num_numeric_features_ = fullset->num_numeric_features_;
if (has_raw_) {
ResizeRaw(num_used_indices);
#pragma omp parallel for schedule(static)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static)
for (int i = 0; i < num_used_indices; ++i) {
for (int j = 0; j < num_numeric_features_; ++j) {
raw_data_[j][i] = fullset->raw_data_[j][used_indices[i]];
......@@ -1282,7 +1282,7 @@ void Dataset::ConstructHistogramsInner(
int16_t* ordered_gradients_and_hessians = reinterpret_cast<int16_t*>(ordered_gradients);
const int16_t* gradients_and_hessians = reinterpret_cast<const int16_t*>(gradients);
if (USE_INDICES) {
#pragma omp parallel for schedule(static, 512) if (num_data >= 1024)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static, 512) if (num_data >= 1024)
for (data_size_t i = 0; i < num_data; ++i) {
ordered_gradients_and_hessians[i] = gradients_and_hessians[data_indices[i]];
}
......@@ -1292,7 +1292,7 @@ void Dataset::ConstructHistogramsInner(
} else {
if (USE_INDICES) {
if (USE_HESSIAN) {
#pragma omp parallel for schedule(static, 512) if (num_data >= 1024)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static, 512) if (num_data >= 1024)
for (data_size_t i = 0; i < num_data; ++i) {
ordered_gradients[i] = gradients[data_indices[i]];
ordered_hessians[i] = hessians[data_indices[i]];
......@@ -1300,7 +1300,7 @@ void Dataset::ConstructHistogramsInner(
ptr_ordered_grad = ordered_gradients;
ptr_ordered_hess = ordered_hessians;
} else {
#pragma omp parallel for schedule(static, 512) if (num_data >= 1024)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static, 512) if (num_data >= 1024)
for (data_size_t i = 0; i < num_data; ++i) {
ordered_gradients[i] = gradients[data_indices[i]];
}
......
......@@ -625,7 +625,7 @@ Dataset* DatasetLoader::ConstructFromSampleData(double** sample_values,
if (Network::num_machines() == 1) {
// if only one machine, find bin locally
OMP_INIT_EX();
#pragma omp parallel for schedule(guided)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(guided)
for (int i = 0; i < num_col; ++i) {
OMP_LOOP_EX_BEGIN();
if (ignore_features_.count(i) > 0) {
......@@ -674,7 +674,7 @@ Dataset* DatasetLoader::ConstructFromSampleData(double** sample_values,
}
len[num_machines - 1] = num_total_features - start[num_machines - 1];
OMP_INIT_EX();
#pragma omp parallel for schedule(guided)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(guided)
for (int i = 0; i < len[rank]; ++i) {
OMP_LOOP_EX_BEGIN();
if (ignore_features_.count(start[rank] + i) > 0) {
......@@ -1136,7 +1136,7 @@ void DatasetLoader::ConstructBinMappersFromTextData(int rank, int num_machines,
if (num_machines == 1) {
// if only one machine, find bin locally
OMP_INIT_EX();
#pragma omp parallel for schedule(guided)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(guided)
for (int i = 0; i < static_cast<int>(sample_values.size()); ++i) {
OMP_LOOP_EX_BEGIN();
if (ignore_features_.count(i) > 0) {
......@@ -1177,7 +1177,7 @@ void DatasetLoader::ConstructBinMappersFromTextData(int rank, int num_machines,
}
len[num_machines - 1] = dataset->num_total_features_ - start[num_machines - 1];
OMP_INIT_EX();
#pragma omp parallel for schedule(guided)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(guided)
for (int i = 0; i < len[rank]; ++i) {
OMP_LOOP_EX_BEGIN();
if (ignore_features_.count(start[rank] + i) > 0) {
......@@ -1268,7 +1268,7 @@ void DatasetLoader::ExtractFeaturesFromMemory(std::vector<std::string>* text_dat
if (!predict_fun_) {
OMP_INIT_EX();
// if doesn't need to prediction with initial model
#pragma omp parallel for schedule(static) private(oneline_features) firstprivate(tmp_label, feature_row)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static) private(oneline_features) firstprivate(tmp_label, feature_row)
for (data_size_t i = 0; i < dataset->num_data_; ++i) {
OMP_LOOP_EX_BEGIN();
const int tid = omp_get_thread_num();
......@@ -1319,7 +1319,7 @@ void DatasetLoader::ExtractFeaturesFromMemory(std::vector<std::string>* text_dat
OMP_INIT_EX();
// if need to prediction with initial model
std::vector<double> init_score(static_cast<size_t>(dataset->num_data_) * num_class_);
#pragma omp parallel for schedule(static) private(oneline_features) firstprivate(tmp_label, feature_row)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static) private(oneline_features) firstprivate(tmp_label, feature_row)
for (data_size_t i = 0; i < dataset->num_data_; ++i) {
OMP_LOOP_EX_BEGIN();
const int tid = omp_get_thread_num();
......@@ -1394,7 +1394,7 @@ void DatasetLoader::ExtractFeaturesFromFile(const char* filename, const Parser*
double tmp_label = 0.0f;
std::vector<float> feature_row(dataset->num_features_);
OMP_INIT_EX();
#pragma omp parallel for schedule(static) private(oneline_features) firstprivate(tmp_label, feature_row)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static) private(oneline_features) firstprivate(tmp_label, feature_row)
for (data_size_t i = 0; i < static_cast<data_size_t>(lines.size()); ++i) {
OMP_LOOP_EX_BEGIN();
const int tid = omp_get_thread_num();
......
......@@ -101,7 +101,7 @@ void Metadata::Init(const Metadata& fullset, const data_size_t* used_indices, da
num_data_ = num_used_indices;
label_ = std::vector<label_t>(num_used_indices);
#pragma omp parallel for schedule(static, 512) if (num_used_indices >= 1024)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static, 512) if (num_used_indices >= 1024)
for (data_size_t i = 0; i < num_used_indices; ++i) {
label_[i] = fullset.label_[used_indices[i]];
}
......@@ -109,7 +109,7 @@ void Metadata::Init(const Metadata& fullset, const data_size_t* used_indices, da
if (!fullset.weights_.empty()) {
weights_ = std::vector<label_t>(num_used_indices);
num_weights_ = num_used_indices;
#pragma omp parallel for schedule(static, 512) if (num_used_indices >= 1024)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static, 512) if (num_used_indices >= 1024)
for (data_size_t i = 0; i < num_used_indices; ++i) {
weights_[i] = fullset.weights_[used_indices[i]];
}
......@@ -121,7 +121,7 @@ void Metadata::Init(const Metadata& fullset, const data_size_t* used_indices, da
int num_class = static_cast<int>(fullset.num_init_score_ / fullset.num_data_);
init_score_ = std::vector<double>(static_cast<size_t>(num_used_indices) * num_class);
num_init_score_ = static_cast<int64_t>(num_used_indices) * num_class;
#pragma omp parallel for schedule(static)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static)
for (int k = 0; k < num_class; ++k) {
const size_t offset_dest = static_cast<size_t>(k) * num_data_;
const size_t offset_src = static_cast<size_t>(k) * fullset.num_data_;
......@@ -173,7 +173,7 @@ void Metadata::PartitionLabel(const std::vector<data_size_t>& used_indices) {
auto old_label = label_;
num_data_ = static_cast<data_size_t>(used_indices.size());
label_ = std::vector<label_t>(num_data_);
#pragma omp parallel for schedule(static, 512) if (num_data_ >= 1024)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static, 512) if (num_data_ >= 1024)
for (data_size_t i = 0; i < num_data_; ++i) {
label_[i] = old_label[used_indices[i]];
}
......@@ -255,7 +255,7 @@ void Metadata::CheckOrPartition(data_size_t num_all_data, const std::vector<data
auto old_weights = weights_;
num_weights_ = num_data_;
weights_ = std::vector<label_t>(num_data_);
#pragma omp parallel for schedule(static, 512)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static, 512)
for (int i = 0; i < static_cast<int>(used_data_indices.size()); ++i) {
weights_[i] = old_weights[used_data_indices[i]];
}
......@@ -274,7 +274,7 @@ void Metadata::CheckOrPartition(data_size_t num_all_data, const std::vector<data
auto old_positions = positions_;
num_positions_ = num_data_;
positions_ = std::vector<data_size_t>(num_data_);
#pragma omp parallel for schedule(static, 512)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static, 512)
for (int i = 0; i < static_cast<int>(used_data_indices.size()); ++i) {
positions_[i] = old_positions[used_data_indices[i]];
}
......@@ -335,7 +335,7 @@ void Metadata::CheckOrPartition(data_size_t num_all_data, const std::vector<data
int num_class = static_cast<int>(num_init_score_ / num_all_data);
num_init_score_ = static_cast<int64_t>(num_data_) * num_class;
init_score_ = std::vector<double>(num_init_score_);
#pragma omp parallel for schedule(static)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static)
for (int k = 0; k < num_class; ++k) {
const size_t offset_dest = static_cast<size_t>(k) * num_data_;
const size_t offset_src = static_cast<size_t>(k) * num_all_data;
......@@ -369,7 +369,7 @@ void Metadata::SetInitScore(const double* init_score, data_size_t len) {
if (init_score_.empty()) { init_score_.resize(len); }
num_init_score_ = len;
#pragma omp parallel for schedule(static, 512) if (num_init_score_ >= 1024)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static, 512) if (num_init_score_ >= 1024)
for (int64_t i = 0; i < num_init_score_; ++i) {
init_score_[i] = Common::AvoidInf(init_score[i]);
}
......@@ -413,7 +413,7 @@ void Metadata::SetLabel(const label_t* label, data_size_t len) {
}
if (label_.empty()) { label_.resize(num_data_); }
#pragma omp parallel for schedule(static, 512) if (num_data_ >= 1024)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static, 512) if (num_data_ >= 1024)
for (data_size_t i = 0; i < num_data_; ++i) {
label_[i] = Common::AvoidInf(label[i]);
}
......@@ -452,7 +452,7 @@ void Metadata::SetWeights(const label_t* weights, data_size_t len) {
if (weights_.empty()) { weights_.resize(num_data_); }
num_weights_ = num_data_;
#pragma omp parallel for schedule(static, 512) if (num_weights_ >= 1024)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static, 512) if (num_weights_ >= 1024)
for (data_size_t i = 0; i < num_weights_; ++i) {
weights_[i] = Common::AvoidInf(weights[i]);
}
......@@ -492,7 +492,7 @@ void Metadata::SetQuery(const data_size_t* query, data_size_t len) {
return;
}
data_size_t sum = 0;
#pragma omp parallel for schedule(static) reduction(+:sum)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static) reduction(+:sum)
for (data_size_t i = 0; i < len; ++i) {
sum += query[i];
}
......@@ -554,7 +554,7 @@ void Metadata::SetPosition(const data_size_t* positions, data_size_t len) {
Log::Debug("number of unique positions found = %ld", position_ids_.size());
#pragma omp parallel for schedule(static, 512) if (num_positions_ >= 1024)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static, 512) if (num_positions_ >= 1024)
for (data_size_t i = 0; i < num_positions_; ++i) {
positions_[i] = map_id2pos.at(positions[i]);
}
......@@ -590,7 +590,7 @@ void Metadata::LoadWeights() {
Log::Info("Loading weights...");
num_weights_ = static_cast<data_size_t>(reader.Lines().size());
weights_ = std::vector<label_t>(num_weights_);
#pragma omp parallel for schedule(static)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static)
for (data_size_t i = 0; i < num_weights_; ++i) {
double tmp_weight = 0.0f;
Common::Atof(reader.Lines()[i].c_str(), &tmp_weight);
......@@ -645,7 +645,7 @@ void Metadata::LoadInitialScore(const std::string& data_filename) {
init_score_ = std::vector<double>(num_init_score_);
if (num_class == 1) {
#pragma omp parallel for schedule(static)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static)
for (data_size_t i = 0; i < num_line; ++i) {
double tmp = 0.0f;
Common::Atof(reader.Lines()[i].c_str(), &tmp);
......@@ -653,7 +653,7 @@ void Metadata::LoadInitialScore(const std::string& data_filename) {
}
} else {
std::vector<std::string> oneline_init_score;
#pragma omp parallel for schedule(static)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static)
for (data_size_t i = 0; i < num_line; ++i) {
double tmp = 0.0f;
oneline_init_score = Common::Split(reader.Lines()[i].c_str(), '\t');
......
......@@ -271,7 +271,7 @@ class MultiValDenseBin : public MultiValBin {
data_size_t block_size = num_data_;
Threading::BlockInfo<data_size_t>(num_data_, 1024, &n_block,
&block_size);
#pragma omp parallel for schedule(static, 1)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static, 1)
for (int tid = 0; tid < n_block; ++tid) {
data_size_t start = tid * block_size;
data_size_t end = std::min(num_data_, start + block_size);
......
......@@ -85,7 +85,7 @@ class MultiValSparseBin : public MultiValBin {
offsets[tid + 1] = offsets[tid] + sizes[tid + 1];
}
data_.resize(row_ptr_[num_data_]);
#pragma omp parallel for schedule(static, 1)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static, 1)
for (int tid = 0; tid < static_cast<int>(t_data_.size()); ++tid) {
std::copy_n(t_data_[tid].data(), sizes[tid + 1],
data_.data() + offsets[tid]);
......@@ -344,7 +344,7 @@ class MultiValSparseBin : public MultiValBin {
num_data_, 1024, &n_block, &block_size);
std::vector<INDEX_T> sizes(t_data_.size() + 1, 0);
const int pre_alloc_size = 50;
#pragma omp parallel for schedule(static, 1)
#pragma omp parallel for num_threads(OMP_NUM_THREADS()) schedule(static, 1)
for (int tid = 0; tid < n_block; ++tid) {
data_size_t start = tid * block_size;
data_size_t end = std::min(num_data_, start + block_size);
......
......@@ -56,7 +56,7 @@ void MultiValBinWrapper::HistMove(const std::vector<hist_t,
if (HIST_BITS == 32) {
const int64_t* src = reinterpret_cast<const int64_t*>(hist_buf.data()) + hist_buf.size() / 2 -
static_cast<size_t>(num_bin_aligned_);
#pragma omp parallel for schedule(static)
#pragma omp parallel for schedule(static) num_threads(num_threads_)
for (int i = 0; i < static_cast<int>(hist_move_src_.size()); ++i) {
std::copy_n(src + hist_move_src_[i] / 2, hist_move_size_[i] / 2,
reinterpret_cast<int64_t*>(origin_hist_data_) + hist_move_dest_[i] / 2);
......@@ -65,14 +65,14 @@ void MultiValBinWrapper::HistMove(const std::vector<hist_t,
const int32_t* src = reinterpret_cast<const int32_t*>(hist_buf.data()) + hist_buf.size() / 2 -
static_cast<size_t>(num_bin_aligned_);
if (is_use_subcol_) {
#pragma omp parallel for schedule(static)
#pragma omp parallel for schedule(static) num_threads(num_threads_)
for (int i = 0; i < static_cast<int>(hist_move_src_.size()); ++i) {
std::copy_n(src + hist_move_src_[i] / 2, hist_move_size_[i] / 2,
reinterpret_cast<int32_t*>(origin_hist_data_) + hist_move_dest_[i] / 2);
}
} else {
int32_t* orig_ptr = reinterpret_cast<int32_t*>(origin_hist_data_);
#pragma omp parallel for schedule(static)
#pragma omp parallel for schedule(static) num_threads(num_threads_)
for (int i = 0; i < num_bin_; ++i) {
orig_ptr[i] = src[i];
}
......@@ -81,7 +81,7 @@ void MultiValBinWrapper::HistMove(const std::vector<hist_t,
} else {
const hist_t* src = hist_buf.data() + hist_buf.size() -
2 * static_cast<size_t>(num_bin_aligned_);
#pragma omp parallel for schedule(static)
#pragma omp parallel for schedule(static) num_threads(num_threads_)
for (int i = 0; i < static_cast<int>(hist_move_src_.size()); ++i) {
std::copy_n(src + hist_move_src_[i], hist_move_size_[i],
origin_hist_data_ + hist_move_dest_[i]);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment