Commit 14195876 authored by Guolin Ke's avatar Guolin Ke
Browse files

support multi-threading exceptions.

parent 6ed335df
...@@ -733,7 +733,7 @@ inline int LGBM_APIHandleException(const std::string& ex) { ...@@ -733,7 +733,7 @@ inline int LGBM_APIHandleException(const std::string& ex) {
return -1; return -1;
} }
#define API_BEGIN() Log::ResetUseException(true); try { #define API_BEGIN() try {
#define API_END() } \ #define API_END() } \
catch(std::exception& ex) { return LGBM_APIHandleException(ex); } \ catch(std::exception& ex) { return LGBM_APIHandleException(ex); } \
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
#include <LightGBM/utils/random.h> #include <LightGBM/utils/random.h>
#include <LightGBM/utils/text_reader.h> #include <LightGBM/utils/text_reader.h>
#include <LightGBM/utils/openmp_wrapper.h>
#include <LightGBM/meta.h> #include <LightGBM/meta.h>
#include <LightGBM/config.h> #include <LightGBM/config.h>
...@@ -34,7 +35,7 @@ class DatasetLoader; ...@@ -34,7 +35,7 @@ class DatasetLoader;
*/ */
class Metadata { class Metadata {
public: public:
/*! /*!
* \brief Null costructor * \brief Null costructor
*/ */
Metadata(); Metadata();
...@@ -47,7 +48,7 @@ public: ...@@ -47,7 +48,7 @@ public:
/*! /*!
* \brief init as subset * \brief init as subset
* \param metadata Filename of data * \param metadata Filename of data
* \param used_indices * \param used_indices
* \param num_used_indices * \param num_used_indices
*/ */
void Init(const Metadata& metadata, const data_size_t* used_indices, data_size_t num_used_indices); void Init(const Metadata& metadata, const data_size_t* used_indices, data_size_t num_used_indices);
...@@ -79,7 +80,7 @@ public: ...@@ -79,7 +80,7 @@ public:
* \param used_data_indices Indices of local used training data * \param used_data_indices Indices of local used training data
*/ */
void CheckOrPartition(data_size_t num_all_data, void CheckOrPartition(data_size_t num_all_data,
const std::vector<data_size_t>& used_data_indices); const std::vector<data_size_t>& used_data_indices);
void SetLabel(const float* label, data_size_t len); void SetLabel(const float* label, data_size_t len);
...@@ -155,12 +156,12 @@ public: ...@@ -155,12 +156,12 @@ public:
/*! /*!
* \brief Get data boundaries on queries, if not exists, will return nullptr * \brief Get data boundaries on queries, if not exists, will return nullptr
* we assume data will order by query, * we assume data will order by query,
* the interval of [query_boundaris[i], query_boundaris[i+1]) * the interval of [query_boundaris[i], query_boundaris[i+1])
* is the data indices for query i. * is the data indices for query i.
* \return Pointer of data boundaries on queries * \return Pointer of data boundaries on queries
*/ */
inline const data_size_t* query_boundaries() const { inline const data_size_t* query_boundaries() const {
if (!query_boundaries_.empty()) { if (!query_boundaries_.empty()) {
return query_boundaries_.data(); return query_boundaries_.data();
} else { } else {
...@@ -178,7 +179,7 @@ public: ...@@ -178,7 +179,7 @@ public:
* \brief Get weights for queries, if not exists, will return nullptr * \brief Get weights for queries, if not exists, will return nullptr
* \return Pointer of weights for queries * \return Pointer of weights for queries
*/ */
inline const float* query_weights() const { inline const float* query_weights() const {
if (!query_weights_.empty()) { if (!query_weights_.empty()) {
return query_weights_.data(); return query_weights_.data();
} else { } else {
...@@ -190,7 +191,7 @@ public: ...@@ -190,7 +191,7 @@ public:
* \brief Get initial scores, if not exists, will return nullptr * \brief Get initial scores, if not exists, will return nullptr
* \return Pointer of initial scores * \return Pointer of initial scores
*/ */
inline const double* init_score() const { inline const double* init_score() const {
if (!init_score_.empty()) { if (!init_score_.empty()) {
return init_score_.data(); return init_score_.data();
} else { } else {
...@@ -261,7 +262,7 @@ public: ...@@ -261,7 +262,7 @@ public:
* \param out_label Label will store to this if exists * \param out_label Label will store to this if exists
*/ */
virtual void ParseOneLine(const char* str, virtual void ParseOneLine(const char* str,
std::vector<std::pair<int, double>>* out_features, double* out_label) const = 0; std::vector<std::pair<int, double>>* out_features, double* out_label) const = 0;
/*! /*!
* \brief Create a object of parser, will auto choose the format depend on file * \brief Create a object of parser, will auto choose the format depend on file
...@@ -395,7 +396,7 @@ public: ...@@ -395,7 +396,7 @@ public:
HistogramBinEntry* histogram_data) const; HistogramBinEntry* histogram_data) const;
void FixHistogram(int feature_idx, double sum_gradient, double sum_hessian, data_size_t num_data, void FixHistogram(int feature_idx, double sum_gradient, double sum_hessian, data_size_t num_data,
HistogramBinEntry* data) const; HistogramBinEntry* data) const;
inline data_size_t Split( inline data_size_t Split(
int feature, int feature,
...@@ -419,9 +420,9 @@ public: ...@@ -419,9 +420,9 @@ public:
inline int FeatureNumBin(int i) const { inline int FeatureNumBin(int i) const {
const int group = feature2group_[i]; const int group = feature2group_[i];
const int sub_feature = feature2subfeature_[i]; const int sub_feature = feature2subfeature_[i];
return feature_groups_[group]->bin_mappers_[sub_feature]->num_bin(); return feature_groups_[group]->bin_mappers_[sub_feature]->num_bin();
} }
inline const BinMapper* FeatureBinMapper(int i) const { inline const BinMapper* FeatureBinMapper(int i) const {
const int group = feature2group_[i]; const int group = feature2group_[i];
const int sub_feature = feature2subfeature_[i]; const int sub_feature = feature2subfeature_[i];
...@@ -442,10 +443,14 @@ public: ...@@ -442,10 +443,14 @@ public:
inline void CreateOrderedBins(std::vector<std::unique_ptr<OrderedBin>>* ordered_bins) const { inline void CreateOrderedBins(std::vector<std::unique_ptr<OrderedBin>>* ordered_bins) const {
ordered_bins->resize(num_groups_); ordered_bins->resize(num_groups_);
#pragma omp parallel for schedule(guided) OMP_INIT_EX();
#pragma omp parallel for schedule(guided)
for (int i = 0; i < num_groups_; ++i) { for (int i = 0; i < num_groups_; ++i) {
ordered_bins->at(i).reset(feature_groups_[i]->bin_data_->CreateOrderedBin()); OMP_LOOP_EX_BEGIN();
ordered_bins->at(i).reset(feature_groups_[i]->bin_data_->CreateOrderedBin());
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
} }
/*! /*!
......
...@@ -45,10 +45,6 @@ public: ...@@ -45,10 +45,6 @@ public:
GetLevel() = level; GetLevel() = level;
} }
static void ResetUseException(bool use_ex) {
UseException() = use_ex;
}
static void Debug(const char *format, ...) { static void Debug(const char *format, ...) {
va_list val; va_list val;
va_start(val, format); va_start(val, format);
...@@ -79,11 +75,7 @@ public: ...@@ -79,11 +75,7 @@ public:
va_end(val); va_end(val);
fprintf(stderr, "[LightGBM] [Fatal] %s\n", str_buf); fprintf(stderr, "[LightGBM] [Fatal] %s\n", str_buf);
fflush(stderr); fflush(stderr);
if (UseException()) { throw std::runtime_error(std::string(str_buf));
throw std::runtime_error(std::string(str_buf));
} else {
std::exit(-1);
}
} }
private: private:
...@@ -106,8 +98,6 @@ private: ...@@ -106,8 +98,6 @@ private:
static LogLevel& GetLevel() { static thread_local LogLevel level = LogLevel::Info; return level; } static LogLevel& GetLevel() { static thread_local LogLevel level = LogLevel::Info; return level; }
#endif #endif
static bool& UseException() { static bool use_ex = false; return use_ex; }
}; };
} // namespace LightGBM } // namespace LightGBM
......
#ifndef LIGHTGBM_OPENMP_WRAPPER_H_ #ifndef LIGHTGBM_OPENMP_WRAPPER_H_
#define LIGHTGBM_OPENMP_WRAPPER_H_ #define LIGHTGBM_OPENMP_WRAPPER_H_
#ifdef _OPENMP #ifdef _OPENMP
#include <omp.h>
#include <omp.h>
#include <exception>
#include <stdexcept>
#include <mutex>
#include <vector>
#include <memory>
#include "log.h"
class ThreadExceptionHelper {
public:
ThreadExceptionHelper() {
ex_ptr_ = nullptr;
}
~ThreadExceptionHelper() {
ReThrow();
}
void ReThrow() {
if (ex_ptr_ != nullptr) {
std::rethrow_exception(ex_ptr_);
ex_ptr_ = nullptr;
}
}
void CaptureException() {
// only catch first exception.
if (ex_ptr_ != nullptr) { return; }
std::unique_lock<std::mutex> guard(lock_);
if (ex_ptr_ != nullptr) { return; }
ex_ptr_ = std::current_exception();
}
private:
std::exception_ptr ex_ptr_;
std::mutex lock_;
};
#define OMP_INIT_EX() ThreadExceptionHelper omp_except_helper
#define OMP_LOOP_EX_BEGIN() try {
#define OMP_LOOP_EX_END() } \
catch(std::exception& ex) { Log::Warning(ex.what()); omp_except_helper.CaptureException(); } \
catch(...) { omp_except_helper.CaptureException(); }
#define OMP_THROW_EX() omp_except_helper.ReThrow()
#else #else
#ifdef _MSC_VER
#pragma warning( disable : 4068 ) // disable unknown pragma warning #ifdef _MSC_VER
#endif #pragma warning( disable : 4068 ) // disable unknown pragma warning
#endif
#ifdef __cplusplus
extern "C" { #ifdef __cplusplus
#endif extern "C" {
/** Fall here if no OPENMP support, so just #endif
simulate a single thread running. /** Fall here if no OPENMP support, so just
All #pragma omp should be ignored by the compiler **/ simulate a single thread running.
inline void omp_set_num_threads(int) {} All #pragma omp should be ignored by the compiler **/
inline int omp_get_num_threads() {return 1;} inline void omp_set_num_threads(int) {}
inline int omp_get_thread_num() {return 0;} inline int omp_get_num_threads() {return 1;}
#ifdef __cplusplus inline int omp_get_thread_num() {return 0;}
}; // extern "C" #ifdef __cplusplus
#endif }; // extern "C"
#endif
#define OMP_INIT_EX()
#define OMP_LOOP_EX_BEGIN()
#define OMP_LOOP_EX_END()
#define OMP_THROW_EX()
#endif #endif
......
...@@ -48,7 +48,7 @@ public: ...@@ -48,7 +48,7 @@ public:
read_cnt = fread(buffer_process.data(), 1, buffer_size, file); read_cnt = fread(buffer_process.data(), 1, buffer_size, file);
size_t last_read_cnt = 0; size_t last_read_cnt = 0;
while (read_cnt > 0) { while (read_cnt > 0) {
// strat read thread // start read thread
std::thread read_worker = std::thread( std::thread read_worker = std::thread(
[file, &buffer_read, buffer_size, &last_read_cnt] { [file, &buffer_read, buffer_size, &last_read_cnt] {
last_read_cnt = fread(buffer_read.data(), 1, buffer_size, file); last_read_cnt = fread(buffer_read.data(), 1, buffer_size, file);
......
...@@ -21,15 +21,19 @@ public: ...@@ -21,15 +21,19 @@ public:
} }
INDEX_T num_inner = (end - start + num_threads - 1) / num_threads; INDEX_T num_inner = (end - start + num_threads - 1) / num_threads;
if (num_inner <= 0) { num_inner = 1; } if (num_inner <= 0) { num_inner = 1; }
OMP_INIT_EX();
#pragma omp parallel for schedule(static,1) #pragma omp parallel for schedule(static,1)
for (int i = 0; i < num_threads; ++i) { for (int i = 0; i < num_threads; ++i) {
OMP_LOOP_EX_BEGIN();
INDEX_T inner_start = start + num_inner * i; INDEX_T inner_start = start + num_inner * i;
INDEX_T inner_end = inner_start + num_inner; INDEX_T inner_end = inner_start + num_inner;
if (inner_end > end) { inner_end = end; } if (inner_end > end) { inner_end = end; }
if (inner_start < end) { if (inner_start < end) {
inner_fun(i, inner_start, inner_end); inner_fun(i, inner_start, inner_end);
} }
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
} }
}; };
......
...@@ -109,15 +109,18 @@ public: ...@@ -109,15 +109,18 @@ public:
(data_size_t, const std::vector<std::string>& lines) { (data_size_t, const std::vector<std::string>& lines) {
std::vector<std::pair<int, double>> oneline_features; std::vector<std::pair<int, double>> oneline_features;
std::vector<std::string> pred_result(lines.size(), ""); std::vector<std::string> pred_result(lines.size(), "");
OMP_INIT_EX();
#pragma omp parallel for schedule(static) private(oneline_features) #pragma omp parallel for schedule(static) private(oneline_features)
for (data_size_t i = 0; i < static_cast<data_size_t>(lines.size()); ++i) { for (data_size_t i = 0; i < static_cast<data_size_t>(lines.size()); ++i) {
OMP_LOOP_EX_BEGIN();
oneline_features.clear(); oneline_features.clear();
// parser // parser
parser_fun(lines[i].c_str(), &oneline_features); parser_fun(lines[i].c_str(), &oneline_features);
// predict // predict
pred_result[i] = Common::Join<double>(predict_fun_(oneline_features), "\t"); pred_result[i] = Common::Join<double>(predict_fun_(oneline_features), "\t");
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
for (size_t i = 0; i < pred_result.size(); ++i) { for (size_t i = 0; i < pred_result.size(); ++i) {
fprintf(result_file, "%s\n", pred_result[i].c_str()); fprintf(result_file, "%s\n", pred_result[i].c_str());
} }
......
...@@ -229,9 +229,10 @@ void GBDT::Bagging(int iter) { ...@@ -229,9 +229,10 @@ void GBDT::Bagging(int iter) {
const data_size_t min_inner_size = 1000; const data_size_t min_inner_size = 1000;
data_size_t inner_size = (num_data_ + num_threads_ - 1) / num_threads_; data_size_t inner_size = (num_data_ + num_threads_ - 1) / num_threads_;
if (inner_size < min_inner_size) { inner_size = min_inner_size; } if (inner_size < min_inner_size) { inner_size = min_inner_size; }
OMP_INIT_EX();
#pragma omp parallel for schedule(static,1) #pragma omp parallel for schedule(static,1)
for (int i = 0; i < num_threads_; ++i) { for (int i = 0; i < num_threads_; ++i) {
OMP_LOOP_EX_BEGIN();
left_cnts_buf_[i] = 0; left_cnts_buf_[i] = 0;
right_cnts_buf_[i] = 0; right_cnts_buf_[i] = 0;
data_size_t cur_start = i * inner_size; data_size_t cur_start = i * inner_size;
...@@ -243,7 +244,9 @@ void GBDT::Bagging(int iter) { ...@@ -243,7 +244,9 @@ void GBDT::Bagging(int iter) {
offsets_buf_[i] = cur_start; offsets_buf_[i] = cur_start;
left_cnts_buf_[i] = cur_left_count; left_cnts_buf_[i] = cur_left_count;
right_cnts_buf_[i] = cur_cnt - cur_left_count; right_cnts_buf_[i] = cur_cnt - cur_left_count;
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
data_size_t left_cnt = 0; data_size_t left_cnt = 0;
left_write_pos_buf_[0] = 0; left_write_pos_buf_[0] = 0;
right_write_pos_buf_[0] = 0; right_write_pos_buf_[0] = 0;
...@@ -255,6 +258,7 @@ void GBDT::Bagging(int iter) { ...@@ -255,6 +258,7 @@ void GBDT::Bagging(int iter) {
#pragma omp parallel for schedule(static, 1) #pragma omp parallel for schedule(static, 1)
for (int i = 0; i < num_threads_; ++i) { for (int i = 0; i < num_threads_; ++i) {
OMP_LOOP_EX_BEGIN();
if (left_cnts_buf_[i] > 0) { if (left_cnts_buf_[i] > 0) {
std::memcpy(bag_data_indices_.data() + left_write_pos_buf_[i], std::memcpy(bag_data_indices_.data() + left_write_pos_buf_[i],
tmp_indices_.data() + offsets_buf_[i], left_cnts_buf_[i] * sizeof(data_size_t)); tmp_indices_.data() + offsets_buf_[i], left_cnts_buf_[i] * sizeof(data_size_t));
...@@ -263,7 +267,9 @@ void GBDT::Bagging(int iter) { ...@@ -263,7 +267,9 @@ void GBDT::Bagging(int iter) {
std::memcpy(bag_data_indices_.data() + left_cnt + right_write_pos_buf_[i], std::memcpy(bag_data_indices_.data() + left_cnt + right_write_pos_buf_[i],
tmp_indices_.data() + offsets_buf_[i] + left_cnts_buf_[i], right_cnts_buf_[i] * sizeof(data_size_t)); tmp_indices_.data() + offsets_buf_[i] + left_cnts_buf_[i], right_cnts_buf_[i] * sizeof(data_size_t));
} }
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
bag_data_cnt_ = left_cnt; bag_data_cnt_ = left_cnt;
CHECK(bag_data_indices_[bag_data_cnt_ - 1] > bag_data_indices_[bag_data_cnt_]); CHECK(bag_data_indices_[bag_data_cnt_ - 1] > bag_data_indices_[bag_data_cnt_]);
Log::Debug("Re-bagging, using %d data to train", bag_data_cnt_); Log::Debug("Re-bagging, using %d data to train", bag_data_cnt_);
......
...@@ -131,9 +131,10 @@ public: ...@@ -131,9 +131,10 @@ public:
const data_size_t min_inner_size = 100; const data_size_t min_inner_size = 100;
data_size_t inner_size = (num_data_ + num_threads_ - 1) / num_threads_; data_size_t inner_size = (num_data_ + num_threads_ - 1) / num_threads_;
if (inner_size < min_inner_size) { inner_size = min_inner_size; } if (inner_size < min_inner_size) { inner_size = min_inner_size; }
OMP_INIT_EX();
#pragma omp parallel for schedule(static, 1) #pragma omp parallel for schedule(static, 1)
for (int i = 0; i < num_threads_; ++i) { for (int i = 0; i < num_threads_; ++i) {
OMP_LOOP_EX_BEGIN();
left_cnts_buf_[i] = 0; left_cnts_buf_[i] = 0;
right_cnts_buf_[i] = 0; right_cnts_buf_[i] = 0;
data_size_t cur_start = i * inner_size; data_size_t cur_start = i * inner_size;
...@@ -146,7 +147,9 @@ public: ...@@ -146,7 +147,9 @@ public:
offsets_buf_[i] = cur_start; offsets_buf_[i] = cur_start;
left_cnts_buf_[i] = cur_left_count; left_cnts_buf_[i] = cur_left_count;
right_cnts_buf_[i] = cur_cnt - cur_left_count; right_cnts_buf_[i] = cur_cnt - cur_left_count;
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
data_size_t left_cnt = 0; data_size_t left_cnt = 0;
left_write_pos_buf_[0] = 0; left_write_pos_buf_[0] = 0;
right_write_pos_buf_[0] = 0; right_write_pos_buf_[0] = 0;
...@@ -158,6 +161,7 @@ public: ...@@ -158,6 +161,7 @@ public:
#pragma omp parallel for schedule(static, 1) #pragma omp parallel for schedule(static, 1)
for (int i = 0; i < num_threads_; ++i) { for (int i = 0; i < num_threads_; ++i) {
OMP_LOOP_EX_BEGIN();
if (left_cnts_buf_[i] > 0) { if (left_cnts_buf_[i] > 0) {
std::memcpy(bag_data_indices_.data() + left_write_pos_buf_[i], std::memcpy(bag_data_indices_.data() + left_write_pos_buf_[i],
tmp_indices_.data() + offsets_buf_[i], left_cnts_buf_[i] * sizeof(data_size_t)); tmp_indices_.data() + offsets_buf_[i], left_cnts_buf_[i] * sizeof(data_size_t));
...@@ -166,7 +170,9 @@ public: ...@@ -166,7 +170,9 @@ public:
std::memcpy(bag_data_indices_.data() + left_cnt + right_write_pos_buf_[i], std::memcpy(bag_data_indices_.data() + left_cnt + right_write_pos_buf_[i],
tmp_indice_right_.data() + offsets_buf_[i], right_cnts_buf_[i] * sizeof(data_size_t)); tmp_indice_right_.data() + offsets_buf_[i], right_cnts_buf_[i] * sizeof(data_size_t));
} }
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
bag_data_cnt_ = left_cnt; bag_data_cnt_ = left_cnt;
// set bagging data to tree learner // set bagging data to tree learner
if (!is_use_subset_) { if (!is_use_subset_) {
......
...@@ -351,12 +351,16 @@ LIGHTGBM_C_EXPORT int LGBM_DatasetPushRows(DatasetHandle dataset, ...@@ -351,12 +351,16 @@ LIGHTGBM_C_EXPORT int LGBM_DatasetPushRows(DatasetHandle dataset,
API_BEGIN(); API_BEGIN();
auto p_dataset = reinterpret_cast<Dataset*>(dataset); auto p_dataset = reinterpret_cast<Dataset*>(dataset);
auto get_row_fun = RowFunctionFromDenseMatric(data, nrow, ncol, data_type, 1); auto get_row_fun = RowFunctionFromDenseMatric(data, nrow, ncol, data_type, 1);
OMP_INIT_EX();
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (int i = 0; i < nrow; ++i) { for (int i = 0; i < nrow; ++i) {
OMP_LOOP_EX_BEGIN();
const int tid = omp_get_thread_num(); const int tid = omp_get_thread_num();
auto one_row = get_row_fun(i); auto one_row = get_row_fun(i);
p_dataset->PushOneRow(tid, start_row + i, one_row); p_dataset->PushOneRow(tid, start_row + i, one_row);
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
if (start_row + nrow == p_dataset->num_data()) { if (start_row + nrow == p_dataset->num_data()) {
p_dataset->FinishLoad(); p_dataset->FinishLoad();
} }
...@@ -377,13 +381,17 @@ LIGHTGBM_C_EXPORT int LGBM_DatasetPushRowsByCSR(DatasetHandle dataset, ...@@ -377,13 +381,17 @@ LIGHTGBM_C_EXPORT int LGBM_DatasetPushRowsByCSR(DatasetHandle dataset,
auto p_dataset = reinterpret_cast<Dataset*>(dataset); auto p_dataset = reinterpret_cast<Dataset*>(dataset);
auto get_row_fun = RowFunctionFromCSR(indptr, indptr_type, indices, data, data_type, nindptr, nelem); auto get_row_fun = RowFunctionFromCSR(indptr, indptr_type, indices, data, data_type, nindptr, nelem);
int32_t nrow = static_cast<int32_t>(nindptr - 1); int32_t nrow = static_cast<int32_t>(nindptr - 1);
OMP_INIT_EX();
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (int i = 0; i < nrow; ++i) { for (int i = 0; i < nrow; ++i) {
OMP_LOOP_EX_BEGIN();
const int tid = omp_get_thread_num(); const int tid = omp_get_thread_num();
auto one_row = get_row_fun(i); auto one_row = get_row_fun(i);
p_dataset->PushOneRow(tid, p_dataset->PushOneRow(tid,
static_cast<data_size_t>(start_row + i), one_row); static_cast<data_size_t>(start_row + i), one_row);
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
if (start_row + nrow == static_cast<int64_t>(p_dataset->num_data())) { if (start_row + nrow == static_cast<int64_t>(p_dataset->num_data())) {
p_dataset->FinishLoad(); p_dataset->FinishLoad();
} }
...@@ -433,13 +441,16 @@ LIGHTGBM_C_EXPORT int LGBM_DatasetCreateFromMat(const void* data, ...@@ -433,13 +441,16 @@ LIGHTGBM_C_EXPORT int LGBM_DatasetCreateFromMat(const void* data,
ret->CreateValid( ret->CreateValid(
reinterpret_cast<const Dataset*>(reference)); reinterpret_cast<const Dataset*>(reference));
} }
OMP_INIT_EX();
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (int i = 0; i < nrow; ++i) { for (int i = 0; i < nrow; ++i) {
OMP_LOOP_EX_BEGIN();
const int tid = omp_get_thread_num(); const int tid = omp_get_thread_num();
auto one_row = get_row_fun(i); auto one_row = get_row_fun(i);
ret->PushOneRow(tid, i, one_row); ret->PushOneRow(tid, i, one_row);
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
ret->FinishLoad(); ret->FinishLoad();
*out = ret.release(); *out = ret.release();
API_END(); API_END();
...@@ -497,13 +508,16 @@ LIGHTGBM_C_EXPORT int LGBM_DatasetCreateFromCSR(const void* indptr, ...@@ -497,13 +508,16 @@ LIGHTGBM_C_EXPORT int LGBM_DatasetCreateFromCSR(const void* indptr,
ret->CreateValid( ret->CreateValid(
reinterpret_cast<const Dataset*>(reference)); reinterpret_cast<const Dataset*>(reference));
} }
OMP_INIT_EX();
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (int i = 0; i < nindptr - 1; ++i) { for (int i = 0; i < nindptr - 1; ++i) {
OMP_LOOP_EX_BEGIN();
const int tid = omp_get_thread_num(); const int tid = omp_get_thread_num();
auto one_row = get_row_fun(i); auto one_row = get_row_fun(i);
ret->PushOneRow(tid, i, one_row); ret->PushOneRow(tid, i, one_row);
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
ret->FinishLoad(); ret->FinishLoad();
*out = ret.release(); *out = ret.release();
API_END(); API_END();
...@@ -534,8 +548,10 @@ LIGHTGBM_C_EXPORT int LGBM_DatasetCreateFromCSC(const void* col_ptr, ...@@ -534,8 +548,10 @@ LIGHTGBM_C_EXPORT int LGBM_DatasetCreateFromCSC(const void* col_ptr,
sample_cnt = static_cast<int>(sample_indices.size()); sample_cnt = static_cast<int>(sample_indices.size());
std::vector<std::vector<double>> sample_values(ncol_ptr - 1); std::vector<std::vector<double>> sample_values(ncol_ptr - 1);
std::vector<std::vector<int>> sample_idx(ncol_ptr - 1); std::vector<std::vector<int>> sample_idx(ncol_ptr - 1);
OMP_INIT_EX();
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (int i = 0; i < static_cast<int>(sample_values.size()); ++i) { for (int i = 0; i < static_cast<int>(sample_values.size()); ++i) {
OMP_LOOP_EX_BEGIN();
CSC_RowIterator col_it(col_ptr, col_ptr_type, indices, data, data_type, ncol_ptr, nelem, i); CSC_RowIterator col_it(col_ptr, col_ptr_type, indices, data, data_type, ncol_ptr, nelem, i);
for (int j = 0; j < sample_cnt; j++) { for (int j = 0; j < sample_cnt; j++) {
auto val = col_it.Get(sample_indices[j]); auto val = col_it.Get(sample_indices[j]);
...@@ -544,7 +560,9 @@ LIGHTGBM_C_EXPORT int LGBM_DatasetCreateFromCSC(const void* col_ptr, ...@@ -544,7 +560,9 @@ LIGHTGBM_C_EXPORT int LGBM_DatasetCreateFromCSC(const void* col_ptr,
sample_idx[i].emplace_back(j); sample_idx[i].emplace_back(j);
} }
} }
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
DatasetLoader loader(io_config, nullptr, 1, nullptr); DatasetLoader loader(io_config, nullptr, 1, nullptr);
ret.reset(loader.CostructFromSampleData(Common::Vector2Ptr<double>(sample_values).data(), ret.reset(loader.CostructFromSampleData(Common::Vector2Ptr<double>(sample_values).data(),
Common::Vector2Ptr<int>(sample_idx).data(), Common::Vector2Ptr<int>(sample_idx).data(),
...@@ -556,9 +574,10 @@ LIGHTGBM_C_EXPORT int LGBM_DatasetCreateFromCSC(const void* col_ptr, ...@@ -556,9 +574,10 @@ LIGHTGBM_C_EXPORT int LGBM_DatasetCreateFromCSC(const void* col_ptr,
ret->CreateValid( ret->CreateValid(
reinterpret_cast<const Dataset*>(reference)); reinterpret_cast<const Dataset*>(reference));
} }
OMP_INIT_EX();
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (int i = 0; i < ncol_ptr - 1; ++i) { for (int i = 0; i < ncol_ptr - 1; ++i) {
OMP_LOOP_EX_BEGIN();
const int tid = omp_get_thread_num(); const int tid = omp_get_thread_num();
int feature_idx = ret->InnerFeatureIndex(i); int feature_idx = ret->InnerFeatureIndex(i);
if (feature_idx < 0) { continue; } if (feature_idx < 0) { continue; }
...@@ -573,7 +592,9 @@ LIGHTGBM_C_EXPORT int LGBM_DatasetCreateFromCSC(const void* col_ptr, ...@@ -573,7 +592,9 @@ LIGHTGBM_C_EXPORT int LGBM_DatasetCreateFromCSC(const void* col_ptr,
if (row_idx < 0) { break; } if (row_idx < 0) { break; }
ret->PushOneData(tid, row_idx, group, sub_feature, pair.second); ret->PushOneData(tid, row_idx, group, sub_feature, pair.second);
} }
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
ret->FinishLoad(); ret->FinishLoad();
*out = ret.release(); *out = ret.release();
API_END(); API_END();
...@@ -937,14 +958,18 @@ LIGHTGBM_C_EXPORT int LGBM_BoosterPredictForCSR(BoosterHandle handle, ...@@ -937,14 +958,18 @@ LIGHTGBM_C_EXPORT int LGBM_BoosterPredictForCSR(BoosterHandle handle,
auto get_row_fun = RowFunctionFromCSR(indptr, indptr_type, indices, data, data_type, nindptr, nelem); auto get_row_fun = RowFunctionFromCSR(indptr, indptr_type, indices, data, data_type, nindptr, nelem);
int64_t num_preb_in_one_row = GetNumPredOneRow(ref_booster, predict_type, num_iteration); int64_t num_preb_in_one_row = GetNumPredOneRow(ref_booster, predict_type, num_iteration);
int nrow = static_cast<int>(nindptr - 1); int nrow = static_cast<int>(nindptr - 1);
OMP_INIT_EX();
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (int i = 0; i < nrow; ++i) { for (int i = 0; i < nrow; ++i) {
OMP_LOOP_EX_BEGIN();
auto one_row = get_row_fun(i); auto one_row = get_row_fun(i);
auto predicton_result = predictor.GetPredictFunction()(one_row); auto predicton_result = predictor.GetPredictFunction()(one_row);
for (int j = 0; j < static_cast<int>(predicton_result.size()); ++j) { for (int j = 0; j < static_cast<int>(predicton_result.size()); ++j) {
out_result[i * num_preb_in_one_row + j] = static_cast<double>(predicton_result[j]); out_result[i * num_preb_in_one_row + j] = static_cast<double>(predicton_result[j]);
} }
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
*out_len = nrow * num_preb_in_one_row; *out_len = nrow * num_preb_in_one_row;
API_END(); API_END();
} }
...@@ -1009,14 +1034,18 @@ LIGHTGBM_C_EXPORT int LGBM_BoosterPredictForMat(BoosterHandle handle, ...@@ -1009,14 +1034,18 @@ LIGHTGBM_C_EXPORT int LGBM_BoosterPredictForMat(BoosterHandle handle,
auto predictor = ref_booster->NewPredictor(static_cast<int>(num_iteration), predict_type); auto predictor = ref_booster->NewPredictor(static_cast<int>(num_iteration), predict_type);
auto get_row_fun = RowPairFunctionFromDenseMatric(data, nrow, ncol, data_type, is_row_major); auto get_row_fun = RowPairFunctionFromDenseMatric(data, nrow, ncol, data_type, is_row_major);
int64_t num_preb_in_one_row = GetNumPredOneRow(ref_booster, predict_type, num_iteration); int64_t num_preb_in_one_row = GetNumPredOneRow(ref_booster, predict_type, num_iteration);
OMP_INIT_EX();
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (int i = 0; i < nrow; ++i) { for (int i = 0; i < nrow; ++i) {
OMP_LOOP_EX_BEGIN();
auto one_row = get_row_fun(i); auto one_row = get_row_fun(i);
auto predicton_result = predictor.GetPredictFunction()(one_row); auto predicton_result = predictor.GetPredictFunction()(one_row);
for (int j = 0; j < static_cast<int>(predicton_result.size()); ++j) { for (int j = 0; j < static_cast<int>(predicton_result.size()); ++j) {
out_result[i * num_preb_in_one_row + j] = static_cast<double>(predicton_result[j]); out_result[i * num_preb_in_one_row + j] = static_cast<double>(predicton_result[j]);
} }
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
*out_len = nrow * num_preb_in_one_row; *out_len = nrow * num_preb_in_one_row;
API_END(); API_END();
} }
......
...@@ -114,10 +114,14 @@ void Dataset::Construct( ...@@ -114,10 +114,14 @@ void Dataset::Construct(
void Dataset::FinishLoad() { void Dataset::FinishLoad() {
if (is_finish_load_) { return; } if (is_finish_load_) { return; }
OMP_INIT_EX();
#pragma omp parallel for schedule(guided) #pragma omp parallel for schedule(guided)
for (int i = 0; i < num_groups_; ++i) { for (int i = 0; i < num_groups_; ++i) {
OMP_LOOP_EX_BEGIN();
feature_groups_[i]->bin_data_->FinishLoad(); feature_groups_[i]->bin_data_->FinishLoad();
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
is_finish_load_ = true; is_finish_load_ = true;
} }
...@@ -210,19 +214,27 @@ void Dataset::CreateValid(const Dataset* dataset) { ...@@ -210,19 +214,27 @@ void Dataset::CreateValid(const Dataset* dataset) {
void Dataset::ReSize(data_size_t num_data) { void Dataset::ReSize(data_size_t num_data) {
if (num_data_ != num_data) { if (num_data_ != num_data) {
num_data_ = num_data; num_data_ = num_data;
OMP_INIT_EX();
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (int group = 0; group < num_groups_; ++group) { for (int group = 0; group < num_groups_; ++group) {
OMP_LOOP_EX_BEGIN();
feature_groups_[group]->bin_data_->ReSize(num_data_); feature_groups_[group]->bin_data_->ReSize(num_data_);
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
} }
} }
void Dataset::CopySubset(const Dataset* fullset, const data_size_t* used_indices, data_size_t num_used_indices, bool need_meta_data) { void Dataset::CopySubset(const Dataset* fullset, const data_size_t* used_indices, data_size_t num_used_indices, bool need_meta_data) {
CHECK(num_used_indices == num_data_); CHECK(num_used_indices == num_data_);
OMP_INIT_EX();
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (int group = 0; group < num_groups_; ++group) { for (int group = 0; group < num_groups_; ++group) {
OMP_LOOP_EX_BEGIN();
feature_groups_[group]->CopySubset(fullset->feature_groups_[group].get(), used_indices, num_used_indices); feature_groups_[group]->CopySubset(fullset->feature_groups_[group].get(), used_indices, num_used_indices);
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
if (need_meta_data) { if (need_meta_data) {
metadata_.Init(fullset->metadata_, used_indices, num_used_indices); metadata_.Init(fullset->metadata_, used_indices, num_used_indices);
} }
...@@ -412,9 +424,10 @@ void Dataset::ConstructHistograms( ...@@ -412,9 +424,10 @@ void Dataset::ConstructHistograms(
ptr_ordered_grad = ordered_gradients; ptr_ordered_grad = ordered_gradients;
ptr_ordered_hess = ordered_hessians; ptr_ordered_hess = ordered_hessians;
} }
OMP_INIT_EX();
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (int group = 0; group < num_groups_; ++group) { for (int group = 0; group < num_groups_; ++group) {
OMP_LOOP_EX_BEGIN();
bool is_groud_used = false; bool is_groud_used = false;
const int f_cnt = group_feature_cnt_[group]; const int f_cnt = group_feature_cnt_[group];
for (int j = 0; j < f_cnt; ++j) { for (int j = 0; j < f_cnt; ++j) {
...@@ -445,7 +458,9 @@ void Dataset::ConstructHistograms( ...@@ -445,7 +458,9 @@ void Dataset::ConstructHistograms(
hessians, hessians,
data_ptr); data_ptr);
} }
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
} }
void Dataset::FixHistogram(int feature_idx, double sum_gradient, double sum_hessian, data_size_t num_data, void Dataset::FixHistogram(int feature_idx, double sum_gradient, double sum_hessian, data_size_t num_data,
......
...@@ -490,9 +490,10 @@ Dataset* DatasetLoader::CostructFromSampleData(double** sample_values, ...@@ -490,9 +490,10 @@ Dataset* DatasetLoader::CostructFromSampleData(double** sample_values,
const data_size_t filter_cnt = static_cast<data_size_t>( const data_size_t filter_cnt = static_cast<data_size_t>(
static_cast<double>(io_config_.min_data_in_leaf * total_sample_size) / num_data); static_cast<double>(io_config_.min_data_in_leaf * total_sample_size) / num_data);
OMP_INIT_EX();
#pragma omp parallel for schedule(guided) #pragma omp parallel for schedule(guided)
for (int i = 0; i < num_col; ++i) { for (int i = 0; i < num_col; ++i) {
OMP_LOOP_EX_BEGIN();
if (ignore_features_.count(i) > 0) { if (ignore_features_.count(i) > 0) {
bin_mappers[i] = nullptr; bin_mappers[i] = nullptr;
continue; continue;
...@@ -504,7 +505,9 @@ Dataset* DatasetLoader::CostructFromSampleData(double** sample_values, ...@@ -504,7 +505,9 @@ Dataset* DatasetLoader::CostructFromSampleData(double** sample_values,
bin_mappers[i].reset(new BinMapper()); bin_mappers[i].reset(new BinMapper());
bin_mappers[i]->FindBin(sample_values[i], num_per_col[i], total_sample_size, bin_mappers[i]->FindBin(sample_values[i], num_per_col[i], total_sample_size,
io_config_.max_bin, io_config_.min_data_in_bin, filter_cnt, bin_type); io_config_.max_bin, io_config_.min_data_in_bin, filter_cnt, bin_type);
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
auto dataset = std::unique_ptr<Dataset>(new Dataset(num_data)); auto dataset = std::unique_ptr<Dataset>(new Dataset(num_data));
dataset->feature_names_ = feature_names_; dataset->feature_names_ = feature_names_;
dataset->Construct(bin_mappers, sample_indices, num_per_col, total_sample_size, io_config_); dataset->Construct(bin_mappers, sample_indices, num_per_col, total_sample_size, io_config_);
...@@ -708,9 +711,11 @@ void DatasetLoader::ConstructBinMappersFromTextData(int rank, int num_machines, ...@@ -708,9 +711,11 @@ void DatasetLoader::ConstructBinMappersFromTextData(int rank, int num_machines,
// start find bins // start find bins
if (num_machines == 1) { if (num_machines == 1) {
OMP_INIT_EX();
// if only one machine, find bin locally // if only one machine, find bin locally
#pragma omp parallel for schedule(guided) #pragma omp parallel for schedule(guided)
for (int i = 0; i < static_cast<int>(sample_values.size()); ++i) { for (int i = 0; i < static_cast<int>(sample_values.size()); ++i) {
OMP_LOOP_EX_BEGIN();
if (ignore_features_.count(i) > 0) { if (ignore_features_.count(i) > 0) {
bin_mappers[i] = nullptr; bin_mappers[i] = nullptr;
continue; continue;
...@@ -722,7 +727,9 @@ void DatasetLoader::ConstructBinMappersFromTextData(int rank, int num_machines, ...@@ -722,7 +727,9 @@ void DatasetLoader::ConstructBinMappersFromTextData(int rank, int num_machines,
bin_mappers[i].reset(new BinMapper()); bin_mappers[i].reset(new BinMapper());
bin_mappers[i]->FindBin(sample_values[i].data(), static_cast<int>(sample_values[i].size()), bin_mappers[i]->FindBin(sample_values[i].data(), static_cast<int>(sample_values[i].size()),
sample_data.size(), io_config_.max_bin, io_config_.min_data_in_bin, filter_cnt, bin_type); sample_data.size(), io_config_.max_bin, io_config_.min_data_in_bin, filter_cnt, bin_type);
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
} else { } else {
// if have multi-machines, need to find bin distributed // if have multi-machines, need to find bin distributed
// different machines will find bin for different features // different machines will find bin for different features
...@@ -741,8 +748,10 @@ void DatasetLoader::ConstructBinMappersFromTextData(int rank, int num_machines, ...@@ -741,8 +748,10 @@ void DatasetLoader::ConstructBinMappersFromTextData(int rank, int num_machines,
start[i + 1] = start[i] + len[i]; start[i + 1] = start[i] + len[i];
} }
len[num_machines - 1] = total_num_feature - start[num_machines - 1]; len[num_machines - 1] = total_num_feature - start[num_machines - 1];
OMP_INIT_EX();
#pragma omp parallel for schedule(guided) #pragma omp parallel for schedule(guided)
for (int i = 0; i < len[rank]; ++i) { for (int i = 0; i < len[rank]; ++i) {
OMP_LOOP_EX_BEGIN();
if (ignore_features_.count(start[rank] + i) > 0) { if (ignore_features_.count(start[rank] + i) > 0) {
continue; continue;
} }
...@@ -753,7 +762,9 @@ void DatasetLoader::ConstructBinMappersFromTextData(int rank, int num_machines, ...@@ -753,7 +762,9 @@ void DatasetLoader::ConstructBinMappersFromTextData(int rank, int num_machines,
bin_mappers[i].reset(new BinMapper()); bin_mappers[i].reset(new BinMapper());
bin_mappers[i]->FindBin(sample_values[start[rank] + i].data(), static_cast<int>(sample_values[start[rank] + i].size()), bin_mappers[i]->FindBin(sample_values[start[rank] + i].data(), static_cast<int>(sample_values[start[rank] + i].size()),
sample_data.size(), io_config_.max_bin, io_config_.min_data_in_bin, filter_cnt, bin_type); sample_data.size(), io_config_.max_bin, io_config_.min_data_in_bin, filter_cnt, bin_type);
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
// get max_bin // get max_bin
int local_max_bin = 0; int local_max_bin = 0;
for (int i = 0; i < len[rank]; ++i) { for (int i = 0; i < len[rank]; ++i) {
...@@ -793,13 +804,16 @@ void DatasetLoader::ConstructBinMappersFromTextData(int rank, int num_machines, ...@@ -793,13 +804,16 @@ void DatasetLoader::ConstructBinMappersFromTextData(int rank, int num_machines,
// find local feature bins and copy to buffer // find local feature bins and copy to buffer
#pragma omp parallel for schedule(guided) #pragma omp parallel for schedule(guided)
for (int i = 0; i < len[rank]; ++i) { for (int i = 0; i < len[rank]; ++i) {
OMP_LOOP_EX_BEGIN();
if (ignore_features_.count(start[rank] + i) > 0) { if (ignore_features_.count(start[rank] + i) > 0) {
continue; continue;
} }
bin_mappers[i]->CopyTo(input_buffer.data() + i * type_size); bin_mappers[i]->CopyTo(input_buffer.data() + i * type_size);
// free // free
bin_mappers[i].reset(nullptr); bin_mappers[i].reset(nullptr);
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
// convert to binary size // convert to binary size
for (int i = 0; i < num_machines; ++i) { for (int i = 0; i < num_machines; ++i) {
start[i] *= type_size; start[i] *= type_size;
...@@ -827,9 +841,11 @@ void DatasetLoader::ExtractFeaturesFromMemory(std::vector<std::string>& text_dat ...@@ -827,9 +841,11 @@ void DatasetLoader::ExtractFeaturesFromMemory(std::vector<std::string>& text_dat
std::vector<std::pair<int, double>> oneline_features; std::vector<std::pair<int, double>> oneline_features;
double tmp_label = 0.0f; double tmp_label = 0.0f;
if (predict_fun_ == nullptr) { if (predict_fun_ == nullptr) {
OMP_INIT_EX();
// if doesn't need to prediction with initial model // if doesn't need to prediction with initial model
#pragma omp parallel for schedule(static) private(oneline_features) firstprivate(tmp_label) #pragma omp parallel for schedule(static) private(oneline_features) firstprivate(tmp_label)
for (data_size_t i = 0; i < dataset->num_data_; ++i) { for (data_size_t i = 0; i < dataset->num_data_; ++i) {
OMP_LOOP_EX_BEGIN();
const int tid = omp_get_thread_num(); const int tid = omp_get_thread_num();
oneline_features.clear(); oneline_features.clear();
// parser // parser
...@@ -857,12 +873,16 @@ void DatasetLoader::ExtractFeaturesFromMemory(std::vector<std::string>& text_dat ...@@ -857,12 +873,16 @@ void DatasetLoader::ExtractFeaturesFromMemory(std::vector<std::string>& text_dat
} }
} }
} }
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
} else { } else {
OMP_INIT_EX();
// if need to prediction with initial model // if need to prediction with initial model
std::vector<double> init_score(dataset->num_data_ * num_class_); std::vector<double> init_score(dataset->num_data_ * num_class_);
#pragma omp parallel for schedule(static) private(oneline_features) firstprivate(tmp_label) #pragma omp parallel for schedule(static) private(oneline_features) firstprivate(tmp_label)
for (data_size_t i = 0; i < dataset->num_data_; ++i) { for (data_size_t i = 0; i < dataset->num_data_; ++i) {
OMP_LOOP_EX_BEGIN();
const int tid = omp_get_thread_num(); const int tid = omp_get_thread_num();
oneline_features.clear(); oneline_features.clear();
// parser // parser
...@@ -895,7 +915,9 @@ void DatasetLoader::ExtractFeaturesFromMemory(std::vector<std::string>& text_dat ...@@ -895,7 +915,9 @@ void DatasetLoader::ExtractFeaturesFromMemory(std::vector<std::string>& text_dat
} }
} }
} }
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
// metadata_ will manage space of init_score // metadata_ will manage space of init_score
dataset->metadata_.SetInitScore(init_score.data(), dataset->num_data_ * num_class_); dataset->metadata_.SetInitScore(init_score.data(), dataset->num_data_ * num_class_);
} }
...@@ -915,8 +937,10 @@ void DatasetLoader::ExtractFeaturesFromFile(const char* filename, const Parser* ...@@ -915,8 +937,10 @@ void DatasetLoader::ExtractFeaturesFromFile(const char* filename, const Parser*
(data_size_t start_idx, const std::vector<std::string>& lines) { (data_size_t start_idx, const std::vector<std::string>& lines) {
std::vector<std::pair<int, double>> oneline_features; std::vector<std::pair<int, double>> oneline_features;
double tmp_label = 0.0f; double tmp_label = 0.0f;
OMP_INIT_EX();
#pragma omp parallel for schedule(static) private(oneline_features) firstprivate(tmp_label) #pragma omp parallel for schedule(static) private(oneline_features) firstprivate(tmp_label)
for (data_size_t i = 0; i < static_cast<data_size_t>(lines.size()); ++i) { for (data_size_t i = 0; i < static_cast<data_size_t>(lines.size()); ++i) {
OMP_LOOP_EX_BEGIN();
const int tid = omp_get_thread_num(); const int tid = omp_get_thread_num();
oneline_features.clear(); oneline_features.clear();
// parser // parser
...@@ -947,7 +971,9 @@ void DatasetLoader::ExtractFeaturesFromFile(const char* filename, const Parser* ...@@ -947,7 +971,9 @@ void DatasetLoader::ExtractFeaturesFromFile(const char* filename, const Parser*
} }
} }
} }
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
}; };
TextReader<data_size_t> text_reader(filename, io_config_.has_header); TextReader<data_size_t> text_reader(filename, io_config_.has_header);
if (!used_data_indices.empty()) { if (!used_data_indices.empty()) {
......
...@@ -2,6 +2,22 @@ ...@@ -2,6 +2,22 @@
#include <LightGBM/application.h> #include <LightGBM/application.h>
int main(int argc, char** argv) { int main(int argc, char** argv) {
LightGBM::Application app(argc, argv); try {
app.Run(); LightGBM::Application app(argc, argv);
} app.Run();
}
catch (const std::exception& ex) {
std::cerr << "Met Exceptions:" << std::endl;
std::cerr << ex.what() << std::endl;
exit(-1);
}
catch (const std::string& ex) {
std::cerr << "Met Exceptions:" << std::endl;
std::cerr << ex << std::endl;
exit(-1);
}
catch (...) {
std::cerr << "Unknown Exceptions" << std::endl;
exit(-1);
}
}
\ No newline at end of file
...@@ -60,7 +60,7 @@ public: ...@@ -60,7 +60,7 @@ public:
std::vector<double> Eval(const double* score) const override { std::vector<double> Eval(const double* score) const override {
double sum_loss = 0.0f; double sum_loss = 0.0f;
if (weights_ == nullptr) { if (weights_ == nullptr) {
#pragma omp parallel for schedule(static) reduction(+:sum_loss) #pragma omp parallel for schedule(static) reduction(+:sum_loss)
for (data_size_t i = 0; i < num_data_; ++i) { for (data_size_t i = 0; i < num_data_; ++i) {
// sigmoid transform // sigmoid transform
double prob = 1.0f / (1.0f + std::exp(-sigmoid_ * score[i])); double prob = 1.0f / (1.0f + std::exp(-sigmoid_ * score[i]));
...@@ -68,7 +68,7 @@ public: ...@@ -68,7 +68,7 @@ public:
sum_loss += PointWiseLossCalculator::LossOnPoint(label_[i], prob); sum_loss += PointWiseLossCalculator::LossOnPoint(label_[i], prob);
} }
} else { } else {
#pragma omp parallel for schedule(static) reduction(+:sum_loss) #pragma omp parallel for schedule(static) reduction(+:sum_loss)
for (data_size_t i = 0; i < num_data_; ++i) { for (data_size_t i = 0; i < num_data_; ++i) {
// sigmoid transform // sigmoid transform
double prob = 1.0f / (1.0f + std::exp(-sigmoid_ * score[i])); double prob = 1.0f / (1.0f + std::exp(-sigmoid_ * score[i]));
......
...@@ -28,7 +28,7 @@ public: ...@@ -28,7 +28,7 @@ public:
data_size_t cnt_positive = 0; data_size_t cnt_positive = 0;
data_size_t cnt_negative = 0; data_size_t cnt_negative = 0;
// count for positive and negative samples // count for positive and negative samples
#pragma omp parallel for schedule(static) reduction(+:cnt_positive, cnt_negative) #pragma omp parallel for schedule(static) reduction(+:cnt_positive, cnt_negative)
for (data_size_t i = 0; i < num_data_; ++i) { for (data_size_t i = 0; i < num_data_; ++i) {
if (label_[i] > 0) { if (label_[i] > 0) {
++cnt_positive; ++cnt_positive;
......
...@@ -23,15 +23,15 @@ public: ...@@ -23,15 +23,15 @@ public:
} }
void GetGradients(const double* score, score_t* gradients, void GetGradients(const double* score, score_t* gradients,
score_t* hessians) const override { score_t* hessians) const override {
if (weights_ == nullptr) { if (weights_ == nullptr) {
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (data_size_t i = 0; i < num_data_; ++i) { for (data_size_t i = 0; i < num_data_; ++i) {
gradients[i] = static_cast<score_t>(score[i] - label_[i]); gradients[i] = static_cast<score_t>(score[i] - label_[i]);
hessians[i] = 1.0f; hessians[i] = 1.0f;
} }
} else { } else {
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (data_size_t i = 0; i < num_data_; ++i) { for (data_size_t i = 0; i < num_data_; ++i) {
gradients[i] = static_cast<score_t>(score[i] - label_[i]) * weights_[i]; gradients[i] = static_cast<score_t>(score[i] - label_[i]) * weights_[i];
hessians[i] = weights_[i]; hessians[i] = weights_[i];
...@@ -70,9 +70,9 @@ public: ...@@ -70,9 +70,9 @@ public:
} }
void GetGradients(const double* score, score_t* gradients, void GetGradients(const double* score, score_t* gradients,
score_t* hessians) const override { score_t* hessians) const override {
if (weights_ == nullptr) { if (weights_ == nullptr) {
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (data_size_t i = 0; i < num_data_; ++i) { for (data_size_t i = 0; i < num_data_; ++i) {
const double diff = score[i] - label_[i]; const double diff = score[i] - label_[i];
if (diff >= 0.0f) { if (diff >= 0.0f) {
...@@ -83,7 +83,7 @@ public: ...@@ -83,7 +83,7 @@ public:
hessians[i] = static_cast<score_t>(Common::ApproximateHessianWithGaussian(score[i], label_[i], gradients[i], eta_)); hessians[i] = static_cast<score_t>(Common::ApproximateHessianWithGaussian(score[i], label_[i], gradients[i], eta_));
} }
} else { } else {
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (data_size_t i = 0; i < num_data_; ++i) { for (data_size_t i = 0; i < num_data_; ++i) {
const double diff = score[i] - label_[i]; const double diff = score[i] - label_[i];
if (diff >= 0.0f) { if (diff >= 0.0f) {
...@@ -131,9 +131,9 @@ public: ...@@ -131,9 +131,9 @@ public:
} }
void GetGradients(const double* score, score_t* gradients, void GetGradients(const double* score, score_t* gradients,
score_t* hessians) const override { score_t* hessians) const override {
if (weights_ == nullptr) { if (weights_ == nullptr) {
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (data_size_t i = 0; i < num_data_; ++i) { for (data_size_t i = 0; i < num_data_; ++i) {
const double diff = score[i] - label_[i]; const double diff = score[i] - label_[i];
...@@ -150,7 +150,7 @@ public: ...@@ -150,7 +150,7 @@ public:
} }
} }
} else { } else {
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (data_size_t i = 0; i < num_data_; ++i) { for (data_size_t i = 0; i < num_data_; ++i) {
const double diff = score[i] - label_[i]; const double diff = score[i] - label_[i];
...@@ -203,16 +203,16 @@ public: ...@@ -203,16 +203,16 @@ public:
} }
void GetGradients(const double* score, score_t* gradients, void GetGradients(const double* score, score_t* gradients,
score_t* hessians) const override { score_t* hessians) const override {
if (weights_ == nullptr) { if (weights_ == nullptr) {
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (data_size_t i = 0; i < num_data_; ++i) { for (data_size_t i = 0; i < num_data_; ++i) {
const double x = score[i] - label_[i]; const double x = score[i] - label_[i];
gradients[i] = static_cast<score_t>(c_ * x / (std::fabs(x) + c_)); gradients[i] = static_cast<score_t>(c_ * x / (std::fabs(x) + c_));
hessians[i] = static_cast<score_t>(c_ * c_ / ((std::fabs(x) + c_) * (std::fabs(x) + c_))); hessians[i] = static_cast<score_t>(c_ * c_ / ((std::fabs(x) + c_) * (std::fabs(x) + c_)));
} }
} else { } else {
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (data_size_t i = 0; i < num_data_; ++i) { for (data_size_t i = 0; i < num_data_; ++i) {
const double x = score[i] - label_[i]; const double x = score[i] - label_[i];
gradients[i] = static_cast<score_t>(c_ * x / (std::fabs(x) + c_) * weights_[i]); gradients[i] = static_cast<score_t>(c_ * x / (std::fabs(x) + c_) * weights_[i]);
...@@ -243,7 +243,7 @@ private: ...@@ -243,7 +243,7 @@ private:
class RegressionPoissonLoss: public ObjectiveFunction { class RegressionPoissonLoss: public ObjectiveFunction {
public: public:
explicit RegressionPoissonLoss(const ObjectiveConfig& config) { explicit RegressionPoissonLoss(const ObjectiveConfig& config) {
max_delta_step_ = static_cast<double>(config.poisson_max_delta_step); max_delta_step_ = static_cast<double>(config.poisson_max_delta_step);
} }
~RegressionPoissonLoss() {} ~RegressionPoissonLoss() {}
...@@ -255,15 +255,15 @@ public: ...@@ -255,15 +255,15 @@ public:
} }
void GetGradients(const double* score, score_t* gradients, void GetGradients(const double* score, score_t* gradients,
score_t* hessians) const override { score_t* hessians) const override {
if (weights_ == nullptr) { if (weights_ == nullptr) {
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (data_size_t i = 0; i < num_data_; ++i) { for (data_size_t i = 0; i < num_data_; ++i) {
gradients[i] = static_cast<score_t>(score[i] - label_[i]); gradients[i] = static_cast<score_t>(score[i] - label_[i]);
hessians[i] = static_cast<score_t>(score[i] + max_delta_step_); hessians[i] = static_cast<score_t>(score[i] + max_delta_step_);
} }
} else { } else {
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (data_size_t i = 0; i < num_data_; ++i) { for (data_size_t i = 0; i < num_data_; ++i) {
gradients[i] = static_cast<score_t>((score[i] - label_[i]) * weights_[i]); gradients[i] = static_cast<score_t>((score[i] - label_[i]) * weights_[i]);
hessians[i] = static_cast<score_t>((score[i] + max_delta_step_) * weights_[i]); hessians[i] = static_cast<score_t>((score[i] + max_delta_step_) * weights_[i]);
......
...@@ -111,7 +111,7 @@ void DataParallelTreeLearner::BeforeTrain() { ...@@ -111,7 +111,7 @@ void DataParallelTreeLearner::BeforeTrain() {
// sync global data sumup info // sync global data sumup info
std::tuple<data_size_t, double, double> data(smaller_leaf_splits_->num_data_in_leaf(), std::tuple<data_size_t, double, double> data(smaller_leaf_splits_->num_data_in_leaf(),
smaller_leaf_splits_->sum_gradients(), smaller_leaf_splits_->sum_hessians()); smaller_leaf_splits_->sum_gradients(), smaller_leaf_splits_->sum_hessians());
int size = sizeof(data); int size = sizeof(data);
std::memcpy(input_buffer_.data(), &data, size); std::memcpy(input_buffer_.data(), &data, size);
// global sumup reduce // global sumup reduce
...@@ -141,28 +141,30 @@ void DataParallelTreeLearner::BeforeTrain() { ...@@ -141,28 +141,30 @@ void DataParallelTreeLearner::BeforeTrain() {
void DataParallelTreeLearner::FindBestThresholds() { void DataParallelTreeLearner::FindBestThresholds() {
train_data_->ConstructHistograms(is_feature_used_, train_data_->ConstructHistograms(is_feature_used_,
smaller_leaf_splits_->data_indices(), smaller_leaf_splits_->num_data_in_leaf(), smaller_leaf_splits_->data_indices(), smaller_leaf_splits_->num_data_in_leaf(),
smaller_leaf_splits_->LeafIndex(), smaller_leaf_splits_->LeafIndex(),
ordered_bins_, gradients_, hessians_, ordered_bins_, gradients_, hessians_,
ordered_gradients_.data(), ordered_hessians_.data(), ordered_gradients_.data(), ordered_hessians_.data(),
smaller_leaf_histogram_array_[0].RawData() - 1); smaller_leaf_histogram_array_[0].RawData() - 1);
// construct local histograms // construct local histograms
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (int feature_index = 0; feature_index < num_features_; ++feature_index) { for (int feature_index = 0; feature_index < num_features_; ++feature_index) {
if ((!is_feature_used_.empty() && is_feature_used_[feature_index] == false)) continue; if ((!is_feature_used_.empty() && is_feature_used_[feature_index] == false)) continue;
// copy to buffer // copy to buffer
std::memcpy(input_buffer_.data() + buffer_write_start_pos_[feature_index], std::memcpy(input_buffer_.data() + buffer_write_start_pos_[feature_index],
smaller_leaf_histogram_array_[feature_index].RawData(), smaller_leaf_histogram_array_[feature_index].RawData(),
smaller_leaf_histogram_array_[feature_index].SizeOfHistgram()); smaller_leaf_histogram_array_[feature_index].SizeOfHistgram());
} }
// Reduce scatter for histogram // Reduce scatter for histogram
Network::ReduceScatter(input_buffer_.data(), reduce_scatter_size_, block_start_.data(), Network::ReduceScatter(input_buffer_.data(), reduce_scatter_size_, block_start_.data(),
block_len_.data(), output_buffer_.data(), &HistogramBinEntry::SumReducer); block_len_.data(), output_buffer_.data(), &HistogramBinEntry::SumReducer);
std::vector<SplitInfo> smaller_best(num_threads_, SplitInfo()); std::vector<SplitInfo> smaller_best(num_threads_, SplitInfo());
std::vector<SplitInfo> larger_best(num_threads_, SplitInfo()); std::vector<SplitInfo> larger_best(num_threads_, SplitInfo());
#pragma omp parallel for schedule(static) OMP_INIT_EX();
#pragma omp parallel for schedule(static)
for (int feature_index = 0; feature_index < num_features_; ++feature_index) { for (int feature_index = 0; feature_index < num_features_; ++feature_index) {
OMP_LOOP_EX_BEGIN();
if (!is_feature_aggregated_[feature_index]) continue; if (!is_feature_aggregated_[feature_index]) continue;
const int tid = omp_get_thread_num(); const int tid = omp_get_thread_num();
// restore global histograms from buffer // restore global histograms from buffer
...@@ -170,9 +172,9 @@ void DataParallelTreeLearner::FindBestThresholds() { ...@@ -170,9 +172,9 @@ void DataParallelTreeLearner::FindBestThresholds() {
output_buffer_.data() + buffer_read_start_pos_[feature_index]); output_buffer_.data() + buffer_read_start_pos_[feature_index]);
train_data_->FixHistogram(feature_index, train_data_->FixHistogram(feature_index,
smaller_leaf_splits_->sum_gradients(), smaller_leaf_splits_->sum_hessians(), smaller_leaf_splits_->sum_gradients(), smaller_leaf_splits_->sum_hessians(),
GetGlobalDataCountInLeaf(smaller_leaf_splits_->LeafIndex()), GetGlobalDataCountInLeaf(smaller_leaf_splits_->LeafIndex()),
smaller_leaf_histogram_array_[feature_index].RawData()); smaller_leaf_histogram_array_[feature_index].RawData());
SplitInfo smaller_split; SplitInfo smaller_split;
// find best threshold for smaller child // find best threshold for smaller child
smaller_leaf_histogram_array_[feature_index].FindBestThreshold( smaller_leaf_histogram_array_[feature_index].FindBestThreshold(
...@@ -202,7 +204,9 @@ void DataParallelTreeLearner::FindBestThresholds() { ...@@ -202,7 +204,9 @@ void DataParallelTreeLearner::FindBestThresholds() {
larger_best[tid] = larger_split; larger_best[tid] = larger_split;
larger_best[tid].feature = train_data_->RealFeatureIndex(feature_index); larger_best[tid].feature = train_data_->RealFeatureIndex(feature_index);
} }
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
auto smaller_best_idx = ArrayArgs<SplitInfo>::ArgMax(smaller_best); auto smaller_best_idx = ArrayArgs<SplitInfo>::ArgMax(smaller_best);
int leaf = smaller_leaf_splits_->LeafIndex(); int leaf = smaller_leaf_splits_->LeafIndex();
best_split_per_leaf_[leaf] = smaller_best[smaller_best_idx]; best_split_per_leaf_[leaf] = smaller_best[smaller_best_idx];
...@@ -229,7 +233,7 @@ void DataParallelTreeLearner::FindBestSplitsForLeaves() { ...@@ -229,7 +233,7 @@ void DataParallelTreeLearner::FindBestSplitsForLeaves() {
std::memcpy(input_buffer_.data() + sizeof(SplitInfo), &larger_best, sizeof(SplitInfo)); std::memcpy(input_buffer_.data() + sizeof(SplitInfo), &larger_best, sizeof(SplitInfo));
Network::Allreduce(input_buffer_.data(), sizeof(SplitInfo) * 2, sizeof(SplitInfo), Network::Allreduce(input_buffer_.data(), sizeof(SplitInfo) * 2, sizeof(SplitInfo),
output_buffer_.data(), &SplitInfo::MaxReducer); output_buffer_.data(), &SplitInfo::MaxReducer);
std::memcpy(&smaller_best, output_buffer_.data(), sizeof(SplitInfo)); std::memcpy(&smaller_best, output_buffer_.data(), sizeof(SplitInfo));
std::memcpy(&larger_best, output_buffer_.data() + sizeof(SplitInfo), sizeof(SplitInfo)); std::memcpy(&larger_best, output_buffer_.data() + sizeof(SplitInfo), sizeof(SplitInfo));
......
...@@ -102,8 +102,10 @@ public: ...@@ -102,8 +102,10 @@ public:
data_size_t inner_size = (cnt + num_threads_ - 1) / num_threads_; data_size_t inner_size = (cnt + num_threads_ - 1) / num_threads_;
if (inner_size < min_inner_size) { inner_size = min_inner_size; } if (inner_size < min_inner_size) { inner_size = min_inner_size; }
// split data multi-threading // split data multi-threading
OMP_INIT_EX();
#pragma omp parallel for schedule(static, 1) #pragma omp parallel for schedule(static, 1)
for (int i = 0; i < num_threads_; ++i) { for (int i = 0; i < num_threads_; ++i) {
OMP_LOOP_EX_BEGIN();
left_cnts_buf_[i] = 0; left_cnts_buf_[i] = 0;
right_cnts_buf_[i] = 0; right_cnts_buf_[i] = 0;
data_size_t cur_start = i * inner_size; data_size_t cur_start = i * inner_size;
...@@ -116,7 +118,9 @@ public: ...@@ -116,7 +118,9 @@ public:
offsets_buf_[i] = cur_start; offsets_buf_[i] = cur_start;
left_cnts_buf_[i] = cur_left_count; left_cnts_buf_[i] = cur_left_count;
right_cnts_buf_[i] = cur_cnt - cur_left_count; right_cnts_buf_[i] = cur_cnt - cur_left_count;
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
data_size_t left_cnt = 0; data_size_t left_cnt = 0;
left_write_pos_buf_[0] = 0; left_write_pos_buf_[0] = 0;
right_write_pos_buf_[0] = 0; right_write_pos_buf_[0] = 0;
......
...@@ -377,8 +377,10 @@ public: ...@@ -377,8 +377,10 @@ public:
Reset(cache_size, total_size); Reset(cache_size, total_size);
pool_.resize(cache_size); pool_.resize(cache_size);
data_.resize(cache_size); data_.resize(cache_size);
OMP_INIT_EX();
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (int i = old_cache_size; i < cache_size_; ++i) { for (int i = old_cache_size; i < cache_size_; ++i) {
OMP_LOOP_EX_BEGIN();
pool_[i].reset(new FeatureHistogram[train_data->num_features()]); pool_[i].reset(new FeatureHistogram[train_data->num_features()]);
data_[i].resize(num_total_bin); data_[i].resize(num_total_bin);
uint64_t offset = 0; uint64_t offset = 0;
...@@ -392,7 +394,9 @@ public: ...@@ -392,7 +394,9 @@ public:
offset += static_cast<uint64_t>(num_bin); offset += static_cast<uint64_t>(num_bin);
} }
CHECK(offset == num_total_bin); CHECK(offset == num_total_bin);
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
} }
void ResetConfig(const TreeConfig* tree_config) { void ResetConfig(const TreeConfig* tree_config) {
......
...@@ -19,22 +19,22 @@ std::chrono::duration<double, std::milli> ordered_bin_time; ...@@ -19,22 +19,22 @@ std::chrono::duration<double, std::milli> ordered_bin_time;
SerialTreeLearner::SerialTreeLearner(const TreeConfig* tree_config) SerialTreeLearner::SerialTreeLearner(const TreeConfig* tree_config)
:tree_config_(tree_config) { :tree_config_(tree_config) {
random_ = Random(tree_config_->feature_fraction_seed); random_ = Random(tree_config_->feature_fraction_seed);
#pragma omp parallel #pragma omp parallel
#pragma omp master #pragma omp master
{ {
num_threads_ = omp_get_num_threads(); num_threads_ = omp_get_num_threads();
} }
} }
SerialTreeLearner::~SerialTreeLearner() { SerialTreeLearner::~SerialTreeLearner() {
#ifdef TIMETAG #ifdef TIMETAG
Log::Info("SerialTreeLearner::init_train costs %f", init_train_time * 1e-3); Log::Info("SerialTreeLearner::init_train costs %f", init_train_time * 1e-3);
Log::Info("SerialTreeLearner::init_split costs %f", init_split_time * 1e-3); Log::Info("SerialTreeLearner::init_split costs %f", init_split_time * 1e-3);
Log::Info("SerialTreeLearner::hist_build costs %f", hist_time * 1e-3); Log::Info("SerialTreeLearner::hist_build costs %f", hist_time * 1e-3);
Log::Info("SerialTreeLearner::find_split costs %f", find_split_time * 1e-3); Log::Info("SerialTreeLearner::find_split costs %f", find_split_time * 1e-3);
Log::Info("SerialTreeLearner::split costs %f", split_time * 1e-3); Log::Info("SerialTreeLearner::split costs %f", split_time * 1e-3);
Log::Info("SerialTreeLearner::ordered_bin costs %f", ordered_bin_time * 1e-3); Log::Info("SerialTreeLearner::ordered_bin costs %f", ordered_bin_time * 1e-3);
#endif #endif
} }
void SerialTreeLearner::Init(const Dataset* train_data) { void SerialTreeLearner::Init(const Dataset* train_data) {
...@@ -168,15 +168,15 @@ Tree* SerialTreeLearner::Train(const score_t* gradients, const score_t *hessians ...@@ -168,15 +168,15 @@ Tree* SerialTreeLearner::Train(const score_t* gradients, const score_t *hessians
gradients_ = gradients; gradients_ = gradients;
hessians_ = hessians; hessians_ = hessians;
#ifdef TIMETAG #ifdef TIMETAG
auto start_time = std::chrono::steady_clock::now(); auto start_time = std::chrono::steady_clock::now();
#endif #endif
// some initial works before training // some initial works before training
BeforeTrain(); BeforeTrain();
#ifdef TIMETAG #ifdef TIMETAG
init_train_time += std::chrono::steady_clock::now() - start_time; init_train_time += std::chrono::steady_clock::now() - start_time;
#endif #endif
auto tree = std::unique_ptr<Tree>(new Tree(tree_config_->num_leaves)); auto tree = std::unique_ptr<Tree>(new Tree(tree_config_->num_leaves));
// root leaf // root leaf
...@@ -185,14 +185,14 @@ Tree* SerialTreeLearner::Train(const score_t* gradients, const score_t *hessians ...@@ -185,14 +185,14 @@ Tree* SerialTreeLearner::Train(const score_t* gradients, const score_t *hessians
// only root leaf can be splitted on first time // only root leaf can be splitted on first time
int right_leaf = -1; int right_leaf = -1;
for (int split = 0; split < tree_config_->num_leaves - 1; ++split) { for (int split = 0; split < tree_config_->num_leaves - 1; ++split) {
#ifdef TIMETAG #ifdef TIMETAG
start_time = std::chrono::steady_clock::now(); start_time = std::chrono::steady_clock::now();
#endif #endif
// some initial works before finding best split // some initial works before finding best split
if (BeforeFindBestSplit(tree.get(), left_leaf, right_leaf)) { if (BeforeFindBestSplit(tree.get(), left_leaf, right_leaf)) {
#ifdef TIMETAG #ifdef TIMETAG
init_split_time += std::chrono::steady_clock::now() - start_time; init_split_time += std::chrono::steady_clock::now() - start_time;
#endif #endif
// find best threshold for every feature // find best threshold for every feature
FindBestThresholds(); FindBestThresholds();
// find best split from all features // find best split from all features
...@@ -207,14 +207,14 @@ Tree* SerialTreeLearner::Train(const score_t* gradients, const score_t *hessians ...@@ -207,14 +207,14 @@ Tree* SerialTreeLearner::Train(const score_t* gradients, const score_t *hessians
Log::Info("No further splits with positive gain, best gain: %f", best_leaf_SplitInfo.gain); Log::Info("No further splits with positive gain, best gain: %f", best_leaf_SplitInfo.gain);
break; break;
} }
#ifdef TIMETAG #ifdef TIMETAG
start_time = std::chrono::steady_clock::now(); start_time = std::chrono::steady_clock::now();
#endif #endif
// split tree with best leaf // split tree with best leaf
Split(tree.get(), best_leaf, &left_leaf, &right_leaf); Split(tree.get(), best_leaf, &left_leaf, &right_leaf);
#ifdef TIMETAG #ifdef TIMETAG
split_time += std::chrono::steady_clock::now() - start_time; split_time += std::chrono::steady_clock::now() - start_time;
#endif #endif
cur_depth = std::max(cur_depth, tree->leaf_depth(left_leaf)); cur_depth = std::max(cur_depth, tree->leaf_depth(left_leaf));
} }
Log::Info("Trained a tree with leaves=%d and max_depth=%d", tree->num_leaves(), cur_depth); Log::Info("Trained a tree with leaves=%d and max_depth=%d", tree->num_leaves(), cur_depth);
...@@ -224,8 +224,10 @@ Tree* SerialTreeLearner::Train(const score_t* gradients, const score_t *hessians ...@@ -224,8 +224,10 @@ Tree* SerialTreeLearner::Train(const score_t* gradients, const score_t *hessians
Tree* SerialTreeLearner::FitByExistingTree(const Tree* old_tree, const score_t* gradients, const score_t *hessians) const { Tree* SerialTreeLearner::FitByExistingTree(const Tree* old_tree, const score_t* gradients, const score_t *hessians) const {
auto tree = std::unique_ptr<Tree>(new Tree(*old_tree)); auto tree = std::unique_ptr<Tree>(new Tree(*old_tree));
CHECK(data_partition_->num_leaves() >= tree->num_leaves()); CHECK(data_partition_->num_leaves() >= tree->num_leaves());
OMP_INIT_EX();
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (int i = 0; i < data_partition_->num_leaves(); ++i) { for (int i = 0; i < data_partition_->num_leaves(); ++i) {
OMP_LOOP_EX_BEGIN();
data_size_t cnt_leaf_data = 0; data_size_t cnt_leaf_data = 0;
auto tmp_idx = data_partition_->GetIndexOnLeaf(i, &cnt_leaf_data); auto tmp_idx = data_partition_->GetIndexOnLeaf(i, &cnt_leaf_data);
double sum_grad = 0.0f; double sum_grad = 0.0f;
...@@ -240,7 +242,9 @@ Tree* SerialTreeLearner::FitByExistingTree(const Tree* old_tree, const score_t* ...@@ -240,7 +242,9 @@ Tree* SerialTreeLearner::FitByExistingTree(const Tree* old_tree, const score_t*
double output = FeatureHistogram::CalculateSplittedLeafOutput(sum_grad, sum_hess, double output = FeatureHistogram::CalculateSplittedLeafOutput(sum_grad, sum_hess,
tree_config_->lambda_l1, tree_config_->lambda_l2); tree_config_->lambda_l1, tree_config_->lambda_l2);
tree->SetLeafOutput(i, output); tree->SetLeafOutput(i, output);
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
return tree.release(); return tree.release();
} }
...@@ -255,14 +259,14 @@ void SerialTreeLearner::BeforeTrain() { ...@@ -255,14 +259,14 @@ void SerialTreeLearner::BeforeTrain() {
std::memset(is_feature_used_.data(), 0, sizeof(int8_t) * num_features_); std::memset(is_feature_used_.data(), 0, sizeof(int8_t) * num_features_);
// Get used feature at current tree // Get used feature at current tree
auto used_feature_indices = random_.Sample(train_data_->num_total_features(), used_feature_cnt); auto used_feature_indices = random_.Sample(train_data_->num_total_features(), used_feature_cnt);
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (int i = 0; i < static_cast<int>(used_feature_indices.size()); ++i) { for (int i = 0; i < static_cast<int>(used_feature_indices.size()); ++i) {
int inner_feature_index = train_data_->InnerFeatureIndex(used_feature_indices[i]); int inner_feature_index = train_data_->InnerFeatureIndex(used_feature_indices[i]);
if (inner_feature_index < 0) { continue; } if (inner_feature_index < 0) { continue; }
is_feature_used_[inner_feature_index] = 1; is_feature_used_[inner_feature_index] = 1;
} }
} else { } else {
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (int i = 0; i < num_features_; ++i) { for (int i = 0; i < num_features_; ++i) {
is_feature_used_[i] = 1; is_feature_used_[i] = 1;
} }
...@@ -290,15 +294,19 @@ void SerialTreeLearner::BeforeTrain() { ...@@ -290,15 +294,19 @@ void SerialTreeLearner::BeforeTrain() {
// if has ordered bin, need to initialize the ordered bin // if has ordered bin, need to initialize the ordered bin
if (has_ordered_bin_) { if (has_ordered_bin_) {
#ifdef TIMETAG #ifdef TIMETAG
auto start_time = std::chrono::steady_clock::now(); auto start_time = std::chrono::steady_clock::now();
#endif #endif
if (data_partition_->leaf_count(0) == num_data_) { if (data_partition_->leaf_count(0) == num_data_) {
// use all data, pass nullptr // use all data, pass nullptr
#pragma omp parallel for schedule(static) OMP_INIT_EX();
#pragma omp parallel for schedule(static)
for (int i = 0; i < static_cast<int>(ordered_bin_indices_.size()); ++i) { for (int i = 0; i < static_cast<int>(ordered_bin_indices_.size()); ++i) {
OMP_LOOP_EX_BEGIN();
ordered_bins_[ordered_bin_indices_[i]]->Init(nullptr, tree_config_->num_leaves); ordered_bins_[ordered_bin_indices_[i]]->Init(nullptr, tree_config_->num_leaves);
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
} else { } else {
// bagging, only use part of data // bagging, only use part of data
...@@ -306,23 +314,27 @@ void SerialTreeLearner::BeforeTrain() { ...@@ -306,23 +314,27 @@ void SerialTreeLearner::BeforeTrain() {
const data_size_t* indices = data_partition_->indices(); const data_size_t* indices = data_partition_->indices();
data_size_t begin = data_partition_->leaf_begin(0); data_size_t begin = data_partition_->leaf_begin(0);
data_size_t end = begin + data_partition_->leaf_count(0); data_size_t end = begin + data_partition_->leaf_count(0);
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (data_size_t i = begin; i < end; ++i) { for (data_size_t i = begin; i < end; ++i) {
is_data_in_leaf_[indices[i]] = 1; is_data_in_leaf_[indices[i]] = 1;
} }
OMP_INIT_EX();
// initialize ordered bin // initialize ordered bin
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (int i = 0; i < static_cast<int>(ordered_bin_indices_.size()); ++i) { for (int i = 0; i < static_cast<int>(ordered_bin_indices_.size()); ++i) {
OMP_LOOP_EX_BEGIN();
ordered_bins_[ordered_bin_indices_[i]]->Init(is_data_in_leaf_.data(), tree_config_->num_leaves); ordered_bins_[ordered_bin_indices_[i]]->Init(is_data_in_leaf_.data(), tree_config_->num_leaves);
OMP_LOOP_EX_END();
} }
#pragma omp parallel for schedule(static) OMP_THROW_EX();
#pragma omp parallel for schedule(static)
for (data_size_t i = begin; i < end; ++i) { for (data_size_t i = begin; i < end; ++i) {
is_data_in_leaf_[indices[i]] = 0; is_data_in_leaf_[indices[i]] = 0;
} }
} }
#ifdef TIMETAG #ifdef TIMETAG
ordered_bin_time += std::chrono::steady_clock::now() - start_time; ordered_bin_time += std::chrono::steady_clock::now() - start_time;
#endif #endif
} }
} }
...@@ -366,9 +378,9 @@ bool SerialTreeLearner::BeforeFindBestSplit(const Tree* tree, int left_leaf, int ...@@ -366,9 +378,9 @@ bool SerialTreeLearner::BeforeFindBestSplit(const Tree* tree, int left_leaf, int
} }
// split for the ordered bin // split for the ordered bin
if (has_ordered_bin_ && right_leaf >= 0) { if (has_ordered_bin_ && right_leaf >= 0) {
#ifdef TIMETAG #ifdef TIMETAG
auto start_time = std::chrono::steady_clock::now(); auto start_time = std::chrono::steady_clock::now();
#endif #endif
// mark data that at left-leaf // mark data that at left-leaf
const data_size_t* indices = data_partition_->indices(); const data_size_t* indices = data_partition_->indices();
const auto left_cnt = data_partition_->leaf_count(left_leaf); const auto left_cnt = data_partition_->leaf_count(left_leaf);
...@@ -381,32 +393,36 @@ bool SerialTreeLearner::BeforeFindBestSplit(const Tree* tree, int left_leaf, int ...@@ -381,32 +393,36 @@ bool SerialTreeLearner::BeforeFindBestSplit(const Tree* tree, int left_leaf, int
end = begin + right_cnt; end = begin + right_cnt;
mark = 0; mark = 0;
} }
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (data_size_t i = begin; i < end; ++i) { for (data_size_t i = begin; i < end; ++i) {
is_data_in_leaf_[indices[i]] = 1; is_data_in_leaf_[indices[i]] = 1;
} }
OMP_INIT_EX();
// split the ordered bin // split the ordered bin
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (int i = 0; i < static_cast<int>(ordered_bin_indices_.size()); ++i) { for (int i = 0; i < static_cast<int>(ordered_bin_indices_.size()); ++i) {
OMP_LOOP_EX_BEGIN();
ordered_bins_[ordered_bin_indices_[i]]->Split(left_leaf, right_leaf, is_data_in_leaf_.data(), mark); ordered_bins_[ordered_bin_indices_[i]]->Split(left_leaf, right_leaf, is_data_in_leaf_.data(), mark);
OMP_LOOP_EX_END();
} }
#pragma omp parallel for schedule(static) OMP_THROW_EX();
#pragma omp parallel for schedule(static)
for (data_size_t i = begin; i < end; ++i) { for (data_size_t i = begin; i < end; ++i) {
is_data_in_leaf_[indices[i]] = 0; is_data_in_leaf_[indices[i]] = 0;
} }
#ifdef TIMETAG #ifdef TIMETAG
ordered_bin_time += std::chrono::steady_clock::now() - start_time; ordered_bin_time += std::chrono::steady_clock::now() - start_time;
#endif #endif
} }
return true; return true;
} }
void SerialTreeLearner::FindBestThresholds() { void SerialTreeLearner::FindBestThresholds() {
#ifdef TIMETAG #ifdef TIMETAG
auto start_time = std::chrono::steady_clock::now(); auto start_time = std::chrono::steady_clock::now();
#endif #endif
std::vector<int8_t> is_feature_used(num_features_, 0); std::vector<int8_t> is_feature_used(num_features_, 0);
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (int feature_index = 0; feature_index < num_features_; ++feature_index) { for (int feature_index = 0; feature_index < num_features_; ++feature_index) {
if (!is_feature_used_[feature_index]) continue; if (!is_feature_used_[feature_index]) continue;
if (parent_leaf_histogram_array_ != nullptr if (parent_leaf_histogram_array_ != nullptr
...@@ -439,17 +455,19 @@ void SerialTreeLearner::FindBestThresholds() { ...@@ -439,17 +455,19 @@ void SerialTreeLearner::FindBestThresholds() {
ordered_gradients_.data(), ordered_hessians_.data(), ordered_gradients_.data(), ordered_hessians_.data(),
ptr_larger_leaf_hist_data); ptr_larger_leaf_hist_data);
} }
#ifdef TIMETAG #ifdef TIMETAG
hist_time += std::chrono::steady_clock::now() - start_time; hist_time += std::chrono::steady_clock::now() - start_time;
#endif #endif
#ifdef TIMETAG #ifdef TIMETAG
start_time = std::chrono::steady_clock::now(); start_time = std::chrono::steady_clock::now();
#endif #endif
std::vector<SplitInfo> smaller_best(num_threads_); std::vector<SplitInfo> smaller_best(num_threads_);
std::vector<SplitInfo> larger_best(num_threads_); std::vector<SplitInfo> larger_best(num_threads_);
OMP_INIT_EX();
// find splits // find splits
#pragma omp parallel for schedule(static) #pragma omp parallel for schedule(static)
for (int feature_index = 0; feature_index < num_features_; ++feature_index) { for (int feature_index = 0; feature_index < num_features_; ++feature_index) {
OMP_LOOP_EX_BEGIN();
if (!is_feature_used[feature_index]) { continue; } if (!is_feature_used[feature_index]) { continue; }
const int tid = omp_get_thread_num(); const int tid = omp_get_thread_num();
SplitInfo smaller_split; SplitInfo smaller_split;
...@@ -488,7 +506,9 @@ void SerialTreeLearner::FindBestThresholds() { ...@@ -488,7 +506,9 @@ void SerialTreeLearner::FindBestThresholds() {
larger_best[tid] = larger_split; larger_best[tid] = larger_split;
larger_best[tid].feature = train_data_->RealFeatureIndex(feature_index); larger_best[tid].feature = train_data_->RealFeatureIndex(feature_index);
} }
OMP_LOOP_EX_END();
} }
OMP_THROW_EX();
auto smaller_best_idx = ArrayArgs<SplitInfo>::ArgMax(smaller_best); auto smaller_best_idx = ArrayArgs<SplitInfo>::ArgMax(smaller_best);
int leaf = smaller_leaf_splits_->LeafIndex(); int leaf = smaller_leaf_splits_->LeafIndex();
...@@ -499,9 +519,9 @@ void SerialTreeLearner::FindBestThresholds() { ...@@ -499,9 +519,9 @@ void SerialTreeLearner::FindBestThresholds() {
auto larger_best_idx = ArrayArgs<SplitInfo>::ArgMax(larger_best); auto larger_best_idx = ArrayArgs<SplitInfo>::ArgMax(larger_best);
best_split_per_leaf_[leaf] = larger_best[larger_best_idx]; best_split_per_leaf_[leaf] = larger_best[larger_best_idx];
} }
#ifdef TIMETAG #ifdef TIMETAG
find_split_time += std::chrono::steady_clock::now() - start_time; find_split_time += std::chrono::steady_clock::now() - start_time;
#endif #endif
} }
void SerialTreeLearner::FindBestSplitsForLeaves() { void SerialTreeLearner::FindBestSplitsForLeaves() {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment