Unverified Commit 3e80df7e authored by Guolin Ke's avatar Guolin Ke Committed by GitHub
Browse files

some code refactoring (#2769)

* some refines

* more omp refactoring

* format define

* fix merge bug

* some fixes

* fix some warnings

* Apply suggestions from code review

* Apply suggestions from code review

* remove dup codes
parent fa2e0b35
......@@ -36,8 +36,9 @@ class FeatureGroup {
// use bin at zero to store most_freq_bin
num_total_bin_ = 1;
bin_offsets_.emplace_back(num_total_bin_);
auto& ref_bin_mappers = *bin_mappers;
for (int i = 0; i < num_feature_; ++i) {
bin_mappers_.emplace_back(bin_mappers->at(i).release());
bin_mappers_.emplace_back(ref_bin_mappers[i].release());
auto num_bin = bin_mappers_[i]->num_bin();
if (bin_mappers_[i]->GetMostFreqBin() == 0) {
num_bin -= 1;
......@@ -68,8 +69,9 @@ class FeatureGroup {
// use bin at zero to store default_bin
num_total_bin_ = 1;
bin_offsets_.emplace_back(num_total_bin_);
auto& ref_bin_mappers = *bin_mappers;
for (int i = 0; i < num_feature_; ++i) {
bin_mappers_.emplace_back(bin_mappers->at(i).release());
bin_mappers_.emplace_back(ref_bin_mappers[i].release());
auto num_bin = bin_mappers_[i]->num_bin();
if (bin_mappers_[i]->GetMostFreqBin() == 0) {
num_bin -= 1;
......
......@@ -6,6 +6,7 @@
#define LIGHTGBM_UTILS_ARRAY_AGRS_H_
#include <LightGBM/utils/openmp_wrapper.h>
#include <LightGBM/utils/threading.h>
#include <algorithm>
#include <utility>
......@@ -26,13 +27,10 @@ class ArrayArgs {
{
num_threads = omp_get_num_threads();
}
int step = std::max(1, (static_cast<int>(array.size()) + num_threads - 1) / num_threads);
std::vector<size_t> arg_maxs(num_threads, 0);
#pragma omp parallel for schedule(static, 1)
for (int i = 0; i < num_threads; ++i) {
size_t start = step * i;
if (start >= array.size()) { continue; }
size_t end = std::min(array.size(), start + step);
int n_blocks = Threading::For<size_t>(
0, array.size(), 1024,
[&array, &arg_maxs](int i, size_t start, size_t end) {
size_t arg_max = start;
for (size_t j = start + 1; j < end; ++j) {
if (array[j] > array[arg_max]) {
......@@ -40,9 +38,9 @@ class ArrayArgs {
}
}
arg_maxs[i] = arg_max;
}
});
size_t ret = arg_maxs[0];
for (int i = 1; i < num_threads; ++i) {
for (int i = 1; i < n_blocks; ++i) {
if (array[arg_maxs[i]] > array[ret]) {
ret = arg_maxs[i];
}
......
......@@ -44,9 +44,15 @@ class ThreadExceptionHelper {
#define OMP_INIT_EX() ThreadExceptionHelper omp_except_helper
#define OMP_LOOP_EX_BEGIN() try {
#define OMP_LOOP_EX_END() } \
catch(std::exception& ex) { Log::Warning(ex.what()); omp_except_helper.CaptureException(); } \
catch(...) { omp_except_helper.CaptureException(); }
#define OMP_LOOP_EX_END() \
} \
catch (std::exception & ex) { \
Log::Warning(ex.what()); \
omp_except_helper.CaptureException(); \
} \
catch (...) { \
omp_except_helper.CaptureException(); \
}
#define OMP_THROW_EX() omp_except_helper.ReThrow()
#else
......
......@@ -51,7 +51,7 @@ class PipelineReader {
while (read_cnt > 0) {
// start read thread
std::thread read_worker = std::thread(
[&] {
[=, &last_read_cnt, &reader, &buffer_read] {
last_read_cnt = reader->Read(buffer_read.data(), buffer_size);
});
// start process
......
......@@ -90,7 +90,7 @@ class TextReader {
INDEX_T total_cnt = 0;
size_t bytes_read = 0;
PipelineReader::Read(filename_, skip_bytes_,
[&]
[&process_fun, &bytes_read, &total_cnt, this]
(const char* buffer_process, size_t read_cnt) {
size_t cnt = 0;
size_t i = 0;
......@@ -172,8 +172,8 @@ class TextReader {
INDEX_T SampleFromFile(Random* random, INDEX_T sample_cnt, std::vector<std::string>* out_sampled_data) {
INDEX_T cur_sample_cnt = 0;
return ReadAllAndProcess(
[&]
return ReadAllAndProcess([=, &random, &cur_sample_cnt,
&out_sampled_data]
(INDEX_T line_idx, const char* buffer, size_t size) {
if (cur_sample_cnt < sample_cnt) {
out_sampled_data->emplace_back(buffer, size);
......@@ -195,7 +195,7 @@ class TextReader {
INDEX_T ReadAndFilterLines(const std::function<bool(INDEX_T)>& filter_fun, std::vector<INDEX_T>* out_used_data_indices) {
out_used_data_indices->clear();
INDEX_T total_cnt = ReadAllAndProcess(
[&]
[&filter_fun, &out_used_data_indices, this]
(INDEX_T line_idx , const char* buffer, size_t size) {
bool is_used = filter_fun(line_idx);
if (is_used) { out_used_data_indices->push_back(line_idx); }
......@@ -209,7 +209,8 @@ class TextReader {
INDEX_T cur_sample_cnt = 0;
out_used_data_indices->clear();
INDEX_T total_cnt = ReadAllAndProcess(
[&]
[=, &filter_fun, &out_used_data_indices, &random, &cur_sample_cnt,
&out_sampled_data]
(INDEX_T line_idx, const char* buffer, size_t size) {
bool is_used = filter_fun(line_idx);
if (is_used) { out_used_data_indices->push_back(line_idx); }
......@@ -240,7 +241,7 @@ class TextReader {
size_t bytes_read = 0;
INDEX_T used_cnt = 0;
PipelineReader::Read(filename_, skip_bytes_,
[&]
[&process_fun, &filter_fun, &total_cnt, &bytes_read, &used_cnt, this]
(const char* buffer_process, size_t read_cnt) {
size_t cnt = 0;
size_t i = 0;
......
......@@ -14,29 +14,47 @@ namespace LightGBM {
class Threading {
public:
template<typename INDEX_T>
static inline void For(INDEX_T start, INDEX_T end, const std::function<void(int, INDEX_T, INDEX_T)>& inner_fun) {
template <typename INDEX_T>
static inline void BlockInfo(INDEX_T cnt, INDEX_T min_cnt_per_block,
int* out_nblock, INDEX_T* block_size) {
int num_threads = 1;
#pragma omp parallel
#pragma omp master
{
num_threads = omp_get_num_threads();
#pragma omp parallel
#pragma omp master
{ num_threads = omp_get_num_threads(); }
BlockInfo<INDEX_T>(num_threads, cnt, min_cnt_per_block, out_nblock,
block_size);
}
INDEX_T num_inner = (end - start + num_threads - 1) / num_threads;
if (num_inner <= 0) { num_inner = 1; }
template <typename INDEX_T>
static inline void BlockInfo(int num_threads, INDEX_T cnt,
INDEX_T min_cnt_per_block, int* out_nblock,
INDEX_T* block_size) {
*out_nblock = std::min<int>(
num_threads,
static_cast<int>((cnt + min_cnt_per_block - 1) / min_cnt_per_block));
if (*out_nblock > 1) {
*block_size = SIZE_ALIGNED((cnt + (*out_nblock) - 1) / (*out_nblock));
} else {
*block_size = cnt;
}
}
template <typename INDEX_T>
static inline int For(
INDEX_T start, INDEX_T end, INDEX_T min_block_size,
const std::function<void(int, INDEX_T, INDEX_T)>& inner_fun) {
int n_block = 1;
INDEX_T num_inner = end - start;
BlockInfo<INDEX_T>(end - start, min_block_size, &n_block, &num_inner);
OMP_INIT_EX();
#pragma omp parallel for schedule(static, 1)
for (int i = 0; i < num_threads; ++i) {
#pragma omp parallel for schedule(static, 1)
for (int i = 0; i < n_block; ++i) {
OMP_LOOP_EX_BEGIN();
INDEX_T inner_start = start + num_inner * i;
INDEX_T inner_end = inner_start + num_inner;
if (inner_end > end) { inner_end = end; }
if (inner_start < end) {
INDEX_T inner_end = std::min(end, inner_start + num_inner);
inner_fun(i, inner_start, inner_end);
}
OMP_LOOP_EX_END();
}
OMP_THROW_EX();
return n_block;
}
};
......
......@@ -36,10 +36,11 @@ class Predictor {
* \param predict_leaf_index True to output leaf index instead of prediction score
* \param predict_contrib True to output feature contributions instead of prediction score
*/
Predictor(Boosting* boosting, int num_iteration,
bool is_raw_score, bool predict_leaf_index, bool predict_contrib,
bool early_stop, int early_stop_freq, double early_stop_margin) {
early_stop_ = CreatePredictionEarlyStopInstance("none", LightGBM::PredictionEarlyStopConfig());
Predictor(Boosting* boosting, int num_iteration, bool is_raw_score,
bool predict_leaf_index, bool predict_contrib, bool early_stop,
int early_stop_freq, double early_stop_margin) {
early_stop_ = CreatePredictionEarlyStopInstance(
"none", LightGBM::PredictionEarlyStopConfig());
if (early_stop && !boosting->NeedAccuratePrediction()) {
PredictionEarlyStopConfig pred_early_stop_config;
CHECK(early_stop_freq > 0);
......@@ -47,68 +48,85 @@ class Predictor {
pred_early_stop_config.margin_threshold = early_stop_margin;
pred_early_stop_config.round_period = early_stop_freq;
if (boosting->NumberOfClasses() == 1) {
early_stop_ = CreatePredictionEarlyStopInstance("binary", pred_early_stop_config);
early_stop_ =
CreatePredictionEarlyStopInstance("binary", pred_early_stop_config);
} else {
early_stop_ = CreatePredictionEarlyStopInstance("multiclass", pred_early_stop_config);
early_stop_ = CreatePredictionEarlyStopInstance("multiclass",
pred_early_stop_config);
}
}
#pragma omp parallel
#pragma omp master
{
num_threads_ = omp_get_num_threads();
}
#pragma omp parallel
#pragma omp master
{ num_threads_ = omp_get_num_threads(); }
boosting->InitPredict(num_iteration, predict_contrib);
boosting_ = boosting;
num_pred_one_row_ = boosting_->NumPredictOneRow(num_iteration, predict_leaf_index, predict_contrib);
num_pred_one_row_ = boosting_->NumPredictOneRow(
num_iteration, predict_leaf_index, predict_contrib);
num_feature_ = boosting_->MaxFeatureIdx() + 1;
predict_buf_.resize(num_threads_, std::vector<double, Common::AlignmentAllocator<double, kAlignedSize>>(num_feature_, 0.0f));
predict_buf_.resize(
num_threads_,
std::vector<double, Common::AlignmentAllocator<double, kAlignedSize>>(
num_feature_, 0.0f));
const int kFeatureThreshold = 100000;
const size_t KSparseThreshold = static_cast<size_t>(0.01 * num_feature_);
if (predict_leaf_index) {
predict_fun_ = [=](const std::vector<std::pair<int, double>>& features, double* output) {
predict_fun_ = [=](const std::vector<std::pair<int, double>>& features,
double* output) {
int tid = omp_get_thread_num();
if (num_feature_ > kFeatureThreshold && features.size() < KSparseThreshold) {
if (num_feature_ > kFeatureThreshold &&
features.size() < KSparseThreshold) {
auto buf = CopyToPredictMap(features);
boosting_->PredictLeafIndexByMap(buf, output);
} else {
CopyToPredictBuffer(predict_buf_[tid].data(), features);
// get result for leaf index
boosting_->PredictLeafIndex(predict_buf_[tid].data(), output);
ClearPredictBuffer(predict_buf_[tid].data(), predict_buf_[tid].size(), features);
ClearPredictBuffer(predict_buf_[tid].data(), predict_buf_[tid].size(),
features);
}
};
} else if (predict_contrib) {
predict_fun_ = [=](const std::vector<std::pair<int, double>>& features, double* output) {
predict_fun_ = [=](const std::vector<std::pair<int, double>>& features,
double* output) {
int tid = omp_get_thread_num();
CopyToPredictBuffer(predict_buf_[tid].data(), features);
// get result for leaf index
boosting_->PredictContrib(predict_buf_[tid].data(), output, &early_stop_);
ClearPredictBuffer(predict_buf_[tid].data(), predict_buf_[tid].size(), features);
boosting_->PredictContrib(predict_buf_[tid].data(), output,
&early_stop_);
ClearPredictBuffer(predict_buf_[tid].data(), predict_buf_[tid].size(),
features);
};
} else {
if (is_raw_score) {
predict_fun_ = [=](const std::vector<std::pair<int, double>>& features, double* output) {
predict_fun_ = [=](const std::vector<std::pair<int, double>>& features,
double* output) {
int tid = omp_get_thread_num();
if (num_feature_ > kFeatureThreshold && features.size() < KSparseThreshold) {
if (num_feature_ > kFeatureThreshold &&
features.size() < KSparseThreshold) {
auto buf = CopyToPredictMap(features);
boosting_->PredictRawByMap(buf, output, &early_stop_);
} else {
CopyToPredictBuffer(predict_buf_[tid].data(), features);
boosting_->PredictRaw(predict_buf_[tid].data(), output, &early_stop_);
ClearPredictBuffer(predict_buf_[tid].data(), predict_buf_[tid].size(), features);
boosting_->PredictRaw(predict_buf_[tid].data(), output,
&early_stop_);
ClearPredictBuffer(predict_buf_[tid].data(),
predict_buf_[tid].size(), features);
}
};
} else {
predict_fun_ = [=](const std::vector<std::pair<int, double>>& features, double* output) {
predict_fun_ = [=](const std::vector<std::pair<int, double>>& features,
double* output) {
int tid = omp_get_thread_num();
if (num_feature_ > kFeatureThreshold && features.size() < KSparseThreshold) {
if (num_feature_ > kFeatureThreshold &&
features.size() < KSparseThreshold) {
auto buf = CopyToPredictMap(features);
boosting_->PredictByMap(buf, output, &early_stop_);
} else {
CopyToPredictBuffer(predict_buf_[tid].data(), features);
boosting_->Predict(predict_buf_[tid].data(), output, &early_stop_);
ClearPredictBuffer(predict_buf_[tid].data(), predict_buf_[tid].size(), features);
ClearPredictBuffer(predict_buf_[tid].data(),
predict_buf_[tid].size(), features);
}
};
}
......@@ -176,7 +194,7 @@ class Predictor {
// function for parse data
std::function<void(const char*, std::vector<std::pair<int, double>>*)> parser_fun;
double tmp_label;
parser_fun = [&]
parser_fun = [&parser, &feature_remapper, &tmp_label, need_adjust]
(const char* buffer, std::vector<std::pair<int, double>>* feature) {
parser->ParseOneLine(buffer, feature, &tmp_label);
if (need_adjust) {
......@@ -194,8 +212,9 @@ class Predictor {
}
};
std::function<void(data_size_t, const std::vector<std::string>&)> process_fun = [&]
(data_size_t, const std::vector<std::string>& lines) {
std::function<void(data_size_t, const std::vector<std::string>&)>
process_fun = [&parser_fun, &writer, this](
data_size_t, const std::vector<std::string>& lines) {
std::vector<std::pair<int, double>> oneline_features;
std::vector<std::string> result_to_write(lines.size());
OMP_INIT_EX();
......
......@@ -10,6 +10,7 @@
#include <LightGBM/prediction_early_stop.h>
#include <LightGBM/utils/common.h>
#include <LightGBM/utils/openmp_wrapper.h>
#include <LightGBM/utils/threading.h>
#include <chrono>
#include <ctime>
......@@ -216,55 +217,55 @@ data_size_t GBDT::BalancedBaggingHelper(Random* cur_rand, data_size_t start, dat
void GBDT::Bagging(int iter) {
Common::FunctionTimer fun_timer("GBDT::Bagging", global_timer);
// if need bagging
if ((bag_data_cnt_ < num_data_ && iter % config_->bagging_freq == 0)
|| need_re_bagging_) {
if ((bag_data_cnt_ < num_data_ && iter % config_->bagging_freq == 0) ||
need_re_bagging_) {
need_re_bagging_ = false;
const data_size_t min_inner_size = 1024;
const int n_block = std::min(
num_threads_, (num_data_ + min_inner_size - 1) / min_inner_size);
data_size_t inner_size = SIZE_ALIGNED((num_data_ + n_block - 1) / n_block);
OMP_INIT_EX();
#pragma omp parallel for schedule(static, 1)
for (int i = 0; i < n_block; ++i) {
OMP_LOOP_EX_BEGIN();
data_size_t cur_start = i * inner_size;
data_size_t cur_cnt = std::min(inner_size, num_data_ - cur_start);
int n_block = Threading::For<data_size_t>(
0, num_data_, 1024,
[this, iter](int i, data_size_t cur_start, data_size_t cur_end) {
data_size_t cur_cnt = cur_end - cur_start;
if (cur_cnt <= 0) {
left_cnts_buf_[i] = 0;
right_cnts_buf_[i] = 0;
continue;
}
} else {
Random cur_rand(config_->bagging_seed + iter * num_threads_ + i);
data_size_t cur_left_count = 0;
if (balanced_bagging_) {
cur_left_count = BalancedBaggingHelper(&cur_rand, cur_start, cur_cnt, tmp_indices_.data() + cur_start);
cur_left_count =
BalancedBaggingHelper(&cur_rand, cur_start, cur_cnt,
tmp_indices_.data() + cur_start);
} else {
cur_left_count = BaggingHelper(&cur_rand, cur_start, cur_cnt, tmp_indices_.data() + cur_start);
cur_left_count = BaggingHelper(&cur_rand, cur_start, cur_cnt,
tmp_indices_.data() + cur_start);
}
offsets_buf_[i] = cur_start;
left_cnts_buf_[i] = cur_left_count;
right_cnts_buf_[i] = cur_cnt - cur_left_count;
OMP_LOOP_EX_END();
}
OMP_THROW_EX();
});
data_size_t left_cnt = 0;
left_write_pos_buf_[0] = 0;
right_write_pos_buf_[0] = 0;
for (int i = 1; i < n_block; ++i) {
left_write_pos_buf_[i] = left_write_pos_buf_[i - 1] + left_cnts_buf_[i - 1];
right_write_pos_buf_[i] = right_write_pos_buf_[i - 1] + right_cnts_buf_[i - 1];
left_write_pos_buf_[i] =
left_write_pos_buf_[i - 1] + left_cnts_buf_[i - 1];
right_write_pos_buf_[i] =
right_write_pos_buf_[i - 1] + right_cnts_buf_[i - 1];
}
left_cnt = left_write_pos_buf_[n_block - 1] + left_cnts_buf_[n_block - 1];
#pragma omp parallel for schedule(static, 1)
#pragma omp parallel for schedule(static, 1)
for (int i = 0; i < n_block; ++i) {
if (left_cnts_buf_[i] > 0) {
std::memcpy(bag_data_indices_.data() + left_write_pos_buf_[i],
tmp_indices_.data() + offsets_buf_[i], left_cnts_buf_[i] * sizeof(data_size_t));
tmp_indices_.data() + offsets_buf_[i],
left_cnts_buf_[i] * sizeof(data_size_t));
}
if (right_cnts_buf_[i] > 0) {
std::memcpy(bag_data_indices_.data() + left_cnt + right_write_pos_buf_[i],
tmp_indices_.data() + offsets_buf_[i] + left_cnts_buf_[i], right_cnts_buf_[i] * sizeof(data_size_t));
std::memcpy(
bag_data_indices_.data() + left_cnt + right_write_pos_buf_[i],
tmp_indices_.data() + offsets_buf_[i] + left_cnts_buf_[i],
right_cnts_buf_[i] * sizeof(data_size_t));
}
}
bag_data_cnt_ = left_cnt;
......@@ -275,7 +276,8 @@ void GBDT::Bagging(int iter) {
} else {
// get subset
tmp_subset_->ReSize(bag_data_cnt_);
tmp_subset_->CopySubset(train_data_, bag_data_indices_.data(), bag_data_cnt_, false);
tmp_subset_->CopySubset(train_data_, bag_data_indices_.data(),
bag_data_cnt_, false);
tree_learner_->ResetTrainingData(tmp_subset_.get());
}
}
......
......@@ -133,63 +133,7 @@ class GOSS: public GBDT {
bag_data_cnt_ = num_data_;
// not subsample for first iterations
if (iter < static_cast<int>(1.0f / config_->learning_rate)) { return; }
const data_size_t min_inner_size = 128;
const int n_block = std::min(
num_threads_, (num_data_ + min_inner_size - 1) / min_inner_size);
data_size_t inner_size = SIZE_ALIGNED((num_data_ + n_block - 1) / n_block);
OMP_INIT_EX();
#pragma omp parallel for schedule(static, 1)
for (int i = 0; i < n_block; ++i) {
OMP_LOOP_EX_BEGIN();
data_size_t cur_start = i * inner_size;
data_size_t cur_cnt = std::min(inner_size, num_data_ - cur_start);
if (cur_cnt <= 0) {
left_cnts_buf_[i] = 0;
right_cnts_buf_[i] = 0;
continue;
}
Random cur_rand(config_->bagging_seed + iter * num_threads_ + i);
data_size_t cur_left_count = BaggingHelper(&cur_rand, cur_start, cur_cnt,
tmp_indices_.data() + cur_start, tmp_indice_right_.data() + cur_start);
offsets_buf_[i] = cur_start;
left_cnts_buf_[i] = cur_left_count;
right_cnts_buf_[i] = cur_cnt - cur_left_count;
OMP_LOOP_EX_END();
}
OMP_THROW_EX();
data_size_t left_cnt = 0;
left_write_pos_buf_[0] = 0;
right_write_pos_buf_[0] = 0;
for (int i = 1; i < n_block; ++i) {
left_write_pos_buf_[i] = left_write_pos_buf_[i - 1] + left_cnts_buf_[i - 1];
right_write_pos_buf_[i] = right_write_pos_buf_[i - 1] + right_cnts_buf_[i - 1];
}
left_cnt = left_write_pos_buf_[n_block - 1] + left_cnts_buf_[n_block - 1];
#pragma omp parallel for schedule(static, 1)
for (int i = 0; i < n_block; ++i) {
OMP_LOOP_EX_BEGIN();
if (left_cnts_buf_[i] > 0) {
std::memcpy(bag_data_indices_.data() + left_write_pos_buf_[i],
tmp_indices_.data() + offsets_buf_[i], left_cnts_buf_[i] * sizeof(data_size_t));
}
if (right_cnts_buf_[i] > 0) {
std::memcpy(bag_data_indices_.data() + left_cnt + right_write_pos_buf_[i],
tmp_indice_right_.data() + offsets_buf_[i], right_cnts_buf_[i] * sizeof(data_size_t));
}
OMP_LOOP_EX_END();
}
OMP_THROW_EX();
bag_data_cnt_ = left_cnt;
// set bagging data to tree learner
if (!is_use_subset_) {
tree_learner_->SetBaggingData(bag_data_indices_.data(), bag_data_cnt_);
} else {
tmp_subset_->ReSize(bag_data_cnt_);
tmp_subset_->CopySubset(train_data_, bag_data_indices_.data(), bag_data_cnt_, false);
tree_learner_->ResetTrainingData(tmp_subset_.get());
}
GBDT::Bagging(iter);
}
private:
......
......@@ -29,10 +29,7 @@ class ScoreUpdater {
int64_t total_size = static_cast<int64_t>(num_data_) * num_tree_per_iteration;
score_.resize(total_size);
// default start score is zero
#pragma omp parallel for schedule(static)
for (int64_t i = 0; i < total_size; ++i) {
score_[i] = 0.0f;
}
std::memset(score_.data(), '0', total_size * sizeof(double));
has_init_score_ = false;
const double* init_score = data->metadata().init_score();
// if exists initial score, will start from it
......@@ -42,7 +39,7 @@ class ScoreUpdater {
Log::Fatal("Number of class for initial score error");
}
has_init_score_ = true;
#pragma omp parallel for schedule(static)
#pragma omp parallel for schedule(static, 512) if (total_size >= 1024)
for (int64_t i = 0; i < total_size; ++i) {
score_[i] = init_score[i];
}
......@@ -57,7 +54,7 @@ class ScoreUpdater {
inline void AddScore(double val, int cur_tree_id) {
Common::FunctionTimer fun_timer("ScoreUpdater::AddScore", global_timer);
const size_t offset = static_cast<size_t>(num_data_) * cur_tree_id;
#pragma omp parallel for schedule(static)
#pragma omp parallel for schedule(static, 512) if (num_data_ >= 1024)
for (int i = 0; i < num_data_; ++i) {
score_[offset + i] += val;
}
......@@ -65,7 +62,7 @@ class ScoreUpdater {
inline void MultiplyScore(double val, int cur_tree_id) {
const size_t offset = static_cast<size_t>(num_data_) * cur_tree_id;
#pragma omp parallel for schedule(static)
#pragma omp parallel for schedule(static, 512) if (num_data_ >= 1024)
for (int i = 0; i < num_data_; ++i) {
score_[offset + i] *= val;
}
......
......@@ -929,14 +929,14 @@ int LGBM_DatasetCreateFromCSRFunc(void* get_row_funptr,
}
OMP_INIT_EX();
std::vector<std::pair<int, double>> threadBuffer;
#pragma omp parallel for schedule(static) private(threadBuffer)
std::vector<std::pair<int, double>> thread_buffer;
#pragma omp parallel for schedule(static) private(thread_buffer)
for (int i = 0; i < num_rows; ++i) {
OMP_LOOP_EX_BEGIN();
{
const int tid = omp_get_thread_num();
get_row_fun(i, threadBuffer);
ret->PushOneRow(tid, i, threadBuffer);
get_row_fun(i, thread_buffer);
ret->PushOneRow(tid, i, thread_buffer);
}
OMP_LOOP_EX_END();
}
......@@ -1541,8 +1541,9 @@ int LGBM_BoosterPredictForCSC(BoosterHandle handle,
}
}
std::function<std::vector<std::pair<int, double>>(int row_idx)> get_row_fun =
[&iterators, ncol] (int i) {
[&iterators, ncol](int i) {
std::vector<std::pair<int, double>> one_row;
one_row.reserve(ncol);
const int tid = omp_get_thread_num();
for (int j = 0; j < ncol; ++j) {
auto val = iterators[tid][j].Get(i);
......@@ -1809,6 +1810,7 @@ RowPairFunctionFromDenseMatric(const void* data, int num_row, int num_col, int d
return [inner_function] (int row_idx) {
auto raw_values = inner_function(row_idx);
std::vector<std::pair<int, double>> ret;
ret.reserve(raw_values.size());
for (int i = 0; i < static_cast<int>(raw_values.size()); ++i) {
if (std::fabs(raw_values[i]) > kZeroThreshold || std::isnan(raw_values[i])) {
ret.emplace_back(i, raw_values[i]);
......@@ -1827,6 +1829,7 @@ RowPairFunctionFromDenseRows(const void** data, int num_col, int data_type) {
auto inner_function = RowFunctionFromDenseMatric(data[row_idx], 1, num_col, data_type, /* is_row_major */ true);
auto raw_values = inner_function(0);
std::vector<std::pair<int, double>> ret;
ret.reserve(raw_values.size());
for (int i = 0; i < static_cast<int>(raw_values.size()); ++i) {
if (std::fabs(raw_values[i]) > kZeroThreshold || std::isnan(raw_values[i])) {
ret.emplace_back(i, raw_values[i]);
......
This diff is collapsed.
......@@ -631,6 +631,7 @@ Dataset* DatasetLoader::CostructFromSampleData(double** sample_values,
}
OMP_LOOP_EX_END();
}
OMP_THROW_EX();
comm_size_t self_buf_size = 0;
for (int i = 0; i < len[rank]; ++i) {
if (ignore_features_.count(start[rank] + i) > 0) {
......
/*!
* Copyright (c) 2018 Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE file in the project root for license information.
* Licensed under the MIT License. See LICENSE file in the project root for
* license information.
*/
#include <LightGBM/utils/file_io.h>
......@@ -17,7 +18,8 @@
namespace LightGBM {
struct LocalFile : VirtualFileReader, VirtualFileWriter {
LocalFile(const std::string& filename, const std::string& mode) : filename_(filename), mode_(mode) {}
LocalFile(const std::string& filename, const std::string& mode)
: filename_(filename), mode_(mode) {}
virtual ~LocalFile() {
if (file_ != NULL) {
fclose(file_);
......@@ -60,7 +62,8 @@ const char* kHdfsProto = "hdfs://";
const size_t kHdfsProtoLength = static_cast<size_t>(strlen(kHdfsProto));
struct HDFSFile : VirtualFileReader, VirtualFileWriter {
HDFSFile(const std::string& filename, int flags) : filename_(filename), flags_(flags) {}
HDFSFile(const std::string& filename, int flags)
: filename_(filename), flags_(flags) {}
~HDFSFile() {
if (file_ != NULL) {
hdfsCloseFile(fs_, file_);
......@@ -72,7 +75,8 @@ struct HDFSFile : VirtualFileReader, VirtualFileWriter {
if (fs_ == NULL) {
fs_ = GetHDFSFileSystem(filename_);
}
if (fs_ != NULL && (flags_ == O_WRONLY || 0 == hdfsExists(fs_, filename_.c_str()))) {
if (fs_ != NULL &&
(flags_ == O_WRONLY || 0 == hdfsExists(fs_, filename_.c_str()))) {
file_ = hdfsOpenFile(fs_, filename_.c_str(), flags_, 0, 0, 0);
}
}
......@@ -96,10 +100,11 @@ struct HDFSFile : VirtualFileReader, VirtualFileWriter {
private:
template <typename BufferType>
using fileOp = tSize(*)(hdfsFS, hdfsFile, BufferType, tSize);
using fileOp = tSize (*)(hdfsFS, hdfsFile, BufferType, tSize);
template <typename BufferType>
inline size_t FileOperation(BufferType data, size_t bytes, fileOp<BufferType> op) const {
inline size_t FileOperation(BufferType data, size_t bytes,
fileOp<BufferType> op) const {
char* buffer = const_cast<char*>(static_cast<const char*>(data));
size_t remain = bytes;
while (remain != 0) {
......@@ -151,33 +156,47 @@ struct HDFSFile : VirtualFileReader, VirtualFileWriter {
static std::unordered_map<std::string, hdfsFS> fs_cache_;
};
std::unordered_map<std::string, hdfsFS> HDFSFile::fs_cache_ = std::unordered_map<std::string, hdfsFS>();
std::unordered_map<std::string, hdfsFS> HDFSFile::fs_cache_ =
std::unordered_map<std::string, hdfsFS>();
#define WITH_HDFS(x) x
#else
#define WITH_HDFS(x) Log::Fatal("HDFS support is not enabled")
#endif // USE_HDFS
std::unique_ptr<VirtualFileReader> VirtualFileReader::Make(const std::string& filename) {
std::unique_ptr<VirtualFileReader> VirtualFileReader::Make(
const std::string& filename) {
#ifdef USE_HDFS
if (0 == filename.find(kHdfsProto)) {
WITH_HDFS(return std::unique_ptr<VirtualFileReader>(new HDFSFile(filename, O_RDONLY)));
} else {
WITH_HDFS(return std::unique_ptr<VirtualFileReader>(
new HDFSFile(filename, O_RDONLY)));
} else
#endif
{
return std::unique_ptr<VirtualFileReader>(new LocalFile(filename, "rb"));
}
}
std::unique_ptr<VirtualFileWriter> VirtualFileWriter::Make(const std::string& filename) {
std::unique_ptr<VirtualFileWriter> VirtualFileWriter::Make(
const std::string& filename) {
#ifdef USE_HDFS
if (0 == filename.find(kHdfsProto)) {
WITH_HDFS(return std::unique_ptr<VirtualFileWriter>(new HDFSFile(filename, O_WRONLY)));
} else {
WITH_HDFS(return std::unique_ptr<VirtualFileWriter>(
new HDFSFile(filename, O_WRONLY)));
} else
#endif
{
return std::unique_ptr<VirtualFileWriter>(new LocalFile(filename, "wb"));
}
}
bool VirtualFileWriter::Exists(const std::string& filename) {
#ifdef USE_HDFS
if (0 == filename.find(kHdfsProto)) {
WITH_HDFS(HDFSFile file(filename, O_RDONLY); return file.Exists());
} else {
} else
#endif
{
LocalFile file(filename, "rb");
return file.Exists();
}
......
......@@ -59,7 +59,7 @@ void Metadata::Init(const Metadata& fullset, const data_size_t* used_indices, da
num_data_ = num_used_indices;
label_ = std::vector<label_t>(num_used_indices);
#pragma omp parallel for schedule(static)
#pragma omp parallel for schedule(static, 512) if (num_used_indices >= 1024)
for (data_size_t i = 0; i < num_used_indices; ++i) {
label_[i] = fullset.label_[used_indices[i]];
}
......@@ -67,7 +67,7 @@ void Metadata::Init(const Metadata& fullset, const data_size_t* used_indices, da
if (!fullset.weights_.empty()) {
weights_ = std::vector<label_t>(num_used_indices);
num_weights_ = num_used_indices;
#pragma omp parallel for schedule(static)
#pragma omp parallel for schedule(static, 512) if (num_used_indices >= 1024)
for (data_size_t i = 0; i < num_used_indices; ++i) {
weights_[i] = fullset.weights_[used_indices[i]];
}
......@@ -131,7 +131,7 @@ void Metadata::PartitionLabel(const std::vector<data_size_t>& used_indices) {
auto old_label = label_;
num_data_ = static_cast<data_size_t>(used_indices.size());
label_ = std::vector<label_t>(num_data_);
#pragma omp parallel for schedule(static)
#pragma omp parallel for schedule(static, 512) if (num_data_ >= 1024)
for (data_size_t i = 0; i < num_data_; ++i) {
label_[i] = old_label[used_indices[i]];
}
......@@ -202,7 +202,7 @@ void Metadata::CheckOrPartition(data_size_t num_all_data, const std::vector<data
auto old_weights = weights_;
num_weights_ = num_data_;
weights_ = std::vector<label_t>(num_data_);
#pragma omp parallel for schedule(static)
#pragma omp parallel for schedule(static, 512)
for (int i = 0; i < static_cast<int>(used_data_indices.size()); ++i) {
weights_[i] = old_weights[used_data_indices[i]];
}
......@@ -263,7 +263,7 @@ void Metadata::CheckOrPartition(data_size_t num_all_data, const std::vector<data
int num_class = static_cast<int>(num_init_score_ / num_all_data);
num_init_score_ = static_cast<int64_t>(num_data_) * num_class;
init_score_ = std::vector<double>(num_init_score_);
#pragma omp parallel for schedule(static)
#pragma omp parallel for schedule(static)
for (int k = 0; k < num_class; ++k) {
const size_t offset_dest = static_cast<size_t>(k) * num_data_;
const size_t offset_src = static_cast<size_t>(k) * num_all_data;
......@@ -293,7 +293,7 @@ void Metadata::SetInitScore(const double* init_score, data_size_t len) {
if (init_score_.empty()) { init_score_.resize(len); }
num_init_score_ = len;
#pragma omp parallel for schedule(static)
#pragma omp parallel for schedule(static, 512) if (num_init_score_ >= 1024)
for (int64_t i = 0; i < num_init_score_; ++i) {
init_score_[i] = Common::AvoidInf(init_score[i]);
}
......@@ -310,7 +310,7 @@ void Metadata::SetLabel(const label_t* label, data_size_t len) {
}
if (label_.empty()) { label_.resize(num_data_); }
#pragma omp parallel for schedule(static)
#pragma omp parallel for schedule(static, 512) if (num_data_ >= 1024)
for (data_size_t i = 0; i < num_data_; ++i) {
label_[i] = Common::AvoidInf(label[i]);
}
......@@ -330,7 +330,7 @@ void Metadata::SetWeights(const label_t* weights, data_size_t len) {
if (weights_.empty()) { weights_.resize(num_data_); }
num_weights_ = num_data_;
#pragma omp parallel for schedule(static)
#pragma omp parallel for schedule(static, 512) if (num_weights_ >= 1024)
for (data_size_t i = 0; i < num_weights_; ++i) {
weights_[i] = Common::AvoidInf(weights[i]);
}
......
......@@ -156,16 +156,11 @@ class MultiValDenseBin : public MultiValBin {
const std::vector<uint32_t>&,
const std::vector<uint32_t>&,
const std::vector<uint32_t>& delta) override {
const auto other = reinterpret_cast<const MultiValDenseBin<VAL_T>*>(full_bin);
int num_threads = 1;
#pragma omp parallel
#pragma omp master
{ num_threads = omp_get_num_threads(); }
const int min_block_size = 1024;
const int n_block = std::min(
num_threads, (num_data_ + min_block_size - 1) / min_block_size);
const data_size_t block_size = (num_data_ + n_block - 1) / n_block;
const auto other =
reinterpret_cast<const MultiValDenseBin<VAL_T>*>(full_bin);
int n_block = 1;
data_size_t block_size = num_data_;
Threading::BlockInfo<data_size_t>(num_data_, 1024, &n_block, &block_size);
#pragma omp parallel for schedule(static, 1)
for (int tid = 0; tid < n_block; ++tid) {
data_size_t start = tid * block_size;
......
......@@ -240,15 +240,10 @@ class MultiValSparseBin : public MultiValBin {
const std::vector<uint32_t>& delta) override {
const auto other =
reinterpret_cast<const MultiValSparseBin<VAL_T>*>(full_bin);
int num_threads = 1;
#pragma omp parallel
#pragma omp master
{ num_threads = omp_get_num_threads(); }
const int min_block_size = 1024;
const int n_block = std::min(
num_threads, (num_data_ + min_block_size - 1) / min_block_size);
const data_size_t block_size = (num_data_ + n_block - 1) / n_block;
int n_block = 1;
data_size_t block_size = num_data_;
Threading::BlockInfo<data_size_t>(static_cast<int>(t_data_.size() + 1),
num_data_, 1024, &n_block, &block_size);
std::vector<data_size_t> sizes(t_data_.size() + 1, 0);
const int pre_alloc_size = 50;
#pragma omp parallel for schedule(static, 1)
......
......@@ -331,6 +331,8 @@ class SparseBin: public Bin {
void LoadFromPair(const std::vector<std::pair<data_size_t, VAL_T>>& idx_val_pairs) {
deltas_.clear();
vals_.clear();
deltas_.reserve(idx_val_pairs.size());
vals_.reserve(idx_val_pairs.size());
// transform to delta array
data_size_t last_idx = 0;
for (size_t i = 0; i < idx_val_pairs.size(); ++i) {
......
......@@ -98,24 +98,26 @@ int Tree::SplitCategorical(int leaf, int feature, int real_feature, const uint32
return num_leaves_ - 1;
}
#define PredictionFun(niter, fidx_in_iter, start_pos, decision_fun, iter_idx, data_idx) \
std::vector<std::unique_ptr<BinIterator>> iter((niter)); \
for (int i = 0; i < (niter); ++i) { \
#define PredictionFun(niter, fidx_in_iter, start_pos, decision_fun, iter_idx, \
data_idx) \
std::vector<std::unique_ptr<BinIterator>> iter((niter)); \
for (int i = 0; i < (niter); ++i) { \
iter[i].reset(data->FeatureIterator((fidx_in_iter))); \
iter[i]->Reset((start_pos)); \
}\
for (data_size_t i = start; i < end; ++i) {\
int node = 0;\
while (node >= 0) {\
node = decision_fun(iter[(iter_idx)]->Get((data_idx)), node, default_bins[node], max_bins[node]);\
} \
for (data_size_t i = start; i < end; ++i) { \
int node = 0; \
while (node >= 0) { \
node = decision_fun(iter[(iter_idx)]->Get((data_idx)), node, \
default_bins[node], max_bins[node]); \
} \
score[(data_idx)] += static_cast<double>(leaf_value_[~node]); \
}\
score[(data_idx)] += static_cast<double>(leaf_value_[~node]);\
}\
void Tree::AddPredictionToScore(const Dataset* data, data_size_t num_data, double* score) const {
if (num_leaves_ <= 1) {
if (leaf_value_[0] != 0.0f) {
#pragma omp parallel for schedule(static)
#pragma omp parallel for schedule(static, 512) if (num_data >= 1024)
for (data_size_t i = 0; i < num_data; ++i) {
score[i] += leaf_value_[0];
}
......@@ -132,24 +134,24 @@ void Tree::AddPredictionToScore(const Dataset* data, data_size_t num_data, doubl
}
if (num_cat_ > 0) {
if (data->num_features() > num_leaves_ - 1) {
Threading::For<data_size_t>(0, num_data, [this, &data, score, &default_bins, &max_bins]
Threading::For<data_size_t>(0, num_data, 512, [this, &data, score, &default_bins, &max_bins]
(int, data_size_t start, data_size_t end) {
PredictionFun(num_leaves_ - 1, split_feature_inner_[i], start, DecisionInner, node, i);
});
} else {
Threading::For<data_size_t>(0, num_data, [this, &data, score, &default_bins, &max_bins]
Threading::For<data_size_t>(0, num_data, 512, [this, &data, score, &default_bins, &max_bins]
(int, data_size_t start, data_size_t end) {
PredictionFun(data->num_features(), i, start, DecisionInner, split_feature_inner_[node], i);
});
}
} else {
if (data->num_features() > num_leaves_ - 1) {
Threading::For<data_size_t>(0, num_data, [this, &data, score, &default_bins, &max_bins]
Threading::For<data_size_t>(0, num_data, 512, [this, &data, score, &default_bins, &max_bins]
(int, data_size_t start, data_size_t end) {
PredictionFun(num_leaves_ - 1, split_feature_inner_[i], start, NumericalDecisionInner, node, i);
});
} else {
Threading::For<data_size_t>(0, num_data, [this, &data, score, &default_bins, &max_bins]
Threading::For<data_size_t>(0, num_data, 512, [this, &data, score, &default_bins, &max_bins]
(int, data_size_t start, data_size_t end) {
PredictionFun(data->num_features(), i, start, NumericalDecisionInner, split_feature_inner_[node], i);
});
......@@ -162,7 +164,7 @@ void Tree::AddPredictionToScore(const Dataset* data,
data_size_t num_data, double* score) const {
if (num_leaves_ <= 1) {
if (leaf_value_[0] != 0.0f) {
#pragma omp parallel for schedule(static)
#pragma omp parallel for schedule(static, 512) if (num_data >= 1024)
for (data_size_t i = 0; i < num_data; ++i) {
score[used_data_indices[i]] += leaf_value_[0];
}
......@@ -179,24 +181,24 @@ void Tree::AddPredictionToScore(const Dataset* data,
}
if (num_cat_ > 0) {
if (data->num_features() > num_leaves_ - 1) {
Threading::For<data_size_t>(0, num_data, [this, &data, score, used_data_indices, &default_bins, &max_bins]
Threading::For<data_size_t>(0, num_data, 512, [this, &data, score, used_data_indices, &default_bins, &max_bins]
(int, data_size_t start, data_size_t end) {
PredictionFun(num_leaves_ - 1, split_feature_inner_[i], used_data_indices[start], DecisionInner, node, used_data_indices[i]);
});
} else {
Threading::For<data_size_t>(0, num_data, [this, &data, score, used_data_indices, &default_bins, &max_bins]
Threading::For<data_size_t>(0, num_data, 512, [this, &data, score, used_data_indices, &default_bins, &max_bins]
(int, data_size_t start, data_size_t end) {
PredictionFun(data->num_features(), i, used_data_indices[start], DecisionInner, split_feature_inner_[node], used_data_indices[i]);
});
}
} else {
if (data->num_features() > num_leaves_ - 1) {
Threading::For<data_size_t>(0, num_data, [this, &data, score, used_data_indices, &default_bins, &max_bins]
Threading::For<data_size_t>(0, num_data, 512, [this, &data, score, used_data_indices, &default_bins, &max_bins]
(int, data_size_t start, data_size_t end) {
PredictionFun(num_leaves_ - 1, split_feature_inner_[i], used_data_indices[start], NumericalDecisionInner, node, used_data_indices[i]);
});
} else {
Threading::For<data_size_t>(0, num_data, [this, &data, score, used_data_indices, &default_bins, &max_bins]
Threading::For<data_size_t>(0, num_data, 512, [this, &data, score, used_data_indices, &default_bins, &max_bins]
(int, data_size_t start, data_size_t end) {
PredictionFun(data->num_features(), i, used_data_indices[start], NumericalDecisionInner, split_feature_inner_[node], used_data_indices[i]);
});
......
......@@ -117,7 +117,7 @@ LGBM_SE LGBM_DatasetGetSubset_R(LGBM_SE handle,
int len = R_AS_INT(len_used_row_indices);
std::vector<int> idxvec(len);
// convert from one-based to zero-based index
#pragma omp parallel for schedule(static)
#pragma omp parallel for schedule(static, 512) if (len >= 1024)
for (int i = 0; i < len; ++i) {
idxvec[i] = R_INT_PTR(used_row_indices)[i] - 1;
}
......@@ -196,7 +196,7 @@ LGBM_SE LGBM_DatasetSetField_R(LGBM_SE handle,
const char* name = R_CHAR_PTR(field_name);
if (!strcmp("group", name) || !strcmp("query", name)) {
std::vector<int32_t> vec(len);
#pragma omp parallel for schedule(static)
#pragma omp parallel for schedule(static, 512) if (len >= 1024)
for (int i = 0; i < len; ++i) {
vec[i] = static_cast<int32_t>(R_INT_PTR(field_data)[i]);
}
......@@ -205,7 +205,7 @@ LGBM_SE LGBM_DatasetSetField_R(LGBM_SE handle,
CHECK_CALL(LGBM_DatasetSetField(R_GET_PTR(handle), name, R_REAL_PTR(field_data), len, C_API_DTYPE_FLOAT64));
} else {
std::vector<float> vec(len);
#pragma omp parallel for schedule(static)
#pragma omp parallel for schedule(static, 512) if (len >= 1024)
for (int i = 0; i < len; ++i) {
vec[i] = static_cast<float>(R_REAL_PTR(field_data)[i]);
}
......@@ -228,19 +228,19 @@ LGBM_SE LGBM_DatasetGetField_R(LGBM_SE handle,
if (!strcmp("group", name) || !strcmp("query", name)) {
auto p_data = reinterpret_cast<const int32_t*>(res);
// convert from boundaries to size
#pragma omp parallel for schedule(static)
#pragma omp parallel for schedule(static, 512) if (out_len >= 1024)
for (int i = 0; i < out_len - 1; ++i) {
R_INT_PTR(field_data)[i] = p_data[i + 1] - p_data[i];
}
} else if (!strcmp("init_score", name)) {
auto p_data = reinterpret_cast<const double*>(res);
#pragma omp parallel for schedule(static)
#pragma omp parallel for schedule(static, 512) if (out_len >= 1024)
for (int i = 0; i < out_len; ++i) {
R_REAL_PTR(field_data)[i] = p_data[i];
}
} else {
auto p_data = reinterpret_cast<const float*>(res);
#pragma omp parallel for schedule(static)
#pragma omp parallel for schedule(static, 512) if (out_len >= 1024)
for (int i = 0; i < out_len; ++i) {
R_REAL_PTR(field_data)[i] = p_data[i];
}
......@@ -396,7 +396,7 @@ LGBM_SE LGBM_BoosterUpdateOneIterCustom_R(LGBM_SE handle,
R_API_BEGIN();
int int_len = R_AS_INT(len);
std::vector<float> tgrad(int_len), thess(int_len);
#pragma omp parallel for schedule(static)
#pragma omp parallel for schedule(static, 512) if (int_len >= 1024)
for (int j = 0; j < int_len; ++j) {
tgrad[j] = static_cast<float>(R_REAL_PTR(grad)[j]);
thess[j] = static_cast<float>(R_REAL_PTR(hess)[j]);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment