Commit 1c774687 authored by Guolin Ke's avatar Guolin Ke
Browse files

first commit

parents
#ifndef LIGHTGBM_IO_ORDERED_SPARSE_BIN_HPP_
#define LIGHTGBM_IO_ORDERED_SPARSE_BIN_HPP_
#include <LightGBM/bin.h>
#include <cstring>
#include <cstdint>
#include <vector>
#include <mutex>
#include <algorithm>
namespace LightGBM {
/*!
* \brief Ordered bin for sparse feature . efficient for construct histogram, especally for sparse bin
* There are 2 advantages for using ordered bin.
* 1. group the data by leaf, improve the cache hit.
* 2. only store the non-zero bin, which can speed up the histogram cconsturction for sparse feature.
* But it has a additional cost, it need re-order the bins after leaf split, which will cost much for dense feature.
* So we only use ordered bin for sparse features now.
*/
template <typename VAL_T>
class OrderedSparseBin:public OrderedBin {
public:
/*! \brief Pair to store one bin entry */
struct SparsePair {
data_size_t ridx; // data(row) index
VAL_T bin; // bin for this data
SparsePair(data_size_t r, VAL_T b) : ridx(r), bin(b) {}
};
OrderedSparseBin(const std::vector<uint8_t>& delta, const std::vector<VAL_T>& vals)
:delta_(delta), vals_(vals) {
data_size_t cur_pos = 0;
for (size_t i = 0; i < vals_.size(); ++i) {
cur_pos += delta_[i];
if (vals_[i] > 0) {
ordered_pair_.emplace_back(cur_pos, vals_[i]);
}
}
ordered_pair_.shrink_to_fit();
}
~OrderedSparseBin() {
}
void Init(const char* used_idices, int num_leaves) override {
// initialize the leaf information
leaf_start_ = std::vector<data_size_t>(num_leaves, 0);
leaf_cnt_ = std::vector<data_size_t>(num_leaves, 0);
if (used_idices == nullptr) {
// if using all data, copy all non-zero pair
data_size_t cur_pos = 0;
data_size_t j = 0;
for (size_t i = 0; i < vals_.size(); ++i) {
cur_pos += delta_[i];
if (vals_[i] > 0) {
ordered_pair_[j].ridx = cur_pos;
ordered_pair_[j].bin = vals_[i];
++j;
}
}
leaf_cnt_[0] = static_cast<data_size_t>(ordered_pair_.size());
} else {
// if using part of data(bagging)
data_size_t j = 0;
data_size_t cur_pos = 0;
for (size_t i = 0; i < vals_.size(); ++i) {
cur_pos += delta_[i];
if (vals_[i] > 0 && used_idices[cur_pos] != 0) {
ordered_pair_[j].ridx = cur_pos;
ordered_pair_[j].bin = vals_[i];
++j;
}
}
leaf_cnt_[0] = j;
}
}
void ConstructHistogram(int leaf, const score_t* gradient, const score_t* hessian,
HistogramBinEntry* out) const override {
// get current leaf boundary
const data_size_t start = leaf_start_[leaf];
const data_size_t end = start + leaf_cnt_[leaf];
// use data on current leaf to construct histogram
for (data_size_t i = start; i < end; ++i) {
const VAL_T bin = ordered_pair_[i].bin;
const data_size_t idx = ordered_pair_[i].ridx;
out[bin].sum_gradients += gradient[idx];
out[bin].sum_hessians += hessian[idx];
++out[bin].cnt;
}
}
void Split(int leaf, int right_leaf, const char* left_indices) override {
// get current leaf boundary
const data_size_t l_start = leaf_start_[leaf];
const data_size_t l_end = l_start + leaf_cnt_[leaf];
// new left leaf end after split
data_size_t new_left_end = l_start;
for (data_size_t i = l_start; i < l_end; ++i) {
if (left_indices[ordered_pair_[i].ridx] != 0) {
std::swap(ordered_pair_[new_left_end], ordered_pair_[i]);
++new_left_end;
}
}
leaf_start_[right_leaf] = new_left_end;
leaf_cnt_[leaf] = new_left_end - l_start;
leaf_cnt_[right_leaf] = l_end - new_left_end;
}
/*! \brief Disable copy */
OrderedSparseBin<VAL_T>& operator=(const OrderedSparseBin<VAL_T>&) = delete;
/*! \brief Disable copy */
OrderedSparseBin<VAL_T>(const OrderedSparseBin<VAL_T>&) = delete;
private:
const std::vector<uint8_t>& delta_;
const std::vector<VAL_T>& vals_;
/*! \brief Store non-zero pair , group by leaf */
std::vector<SparsePair> ordered_pair_;
/*! \brief leaf_start_[i] means data in i-th leaf start from */
std::vector<data_size_t> leaf_start_;
/*! \brief leaf_cnt_[i] means number of data in i-th leaf */
std::vector<data_size_t> leaf_cnt_;
};
} // namespace LightGBM
#endif #endif // LightGBM_IO_ORDERED_SPARSE_BIN_HPP_
#include "parser.hpp"
#include <iostream>
#include <fstream>
namespace LightGBM {
void GetStatistic(const char* str, int* comma_cnt, int* tab_cnt, int *colon_cnt) {
*comma_cnt = 0;
*tab_cnt = 0;
*colon_cnt = 0;
for (int i = 0; str[i] != '\0'; ++i) {
if (str[i] == ',') {
++(*comma_cnt);
} else if (str[i] == '\t') {
++(*tab_cnt);
} else if (str[i] == ':') {
++(*colon_cnt);
}
}
}
Parser* Parser::CreateParser(const char* filename) {
std::ifstream tmp_file;
tmp_file.open(filename);
if (!tmp_file.is_open()) {
Log::Stderr("Data file: %s doesn't exist", filename);
}
std::string line1, line2;
if (!tmp_file.eof()) {
std::getline(tmp_file, line1);
} else {
Log::Stderr("Data file: %s at least should have one line", filename);
}
if (!tmp_file.eof()) {
std::getline(tmp_file, line2);
} else {
Log::Stdout("Data file: %s only have one line", filename);
}
tmp_file.close();
int comma_cnt = 0, comma_cnt2 = 0;
int tab_cnt = 0, tab_cnt2 = 0;
int colon_cnt = 0, colon_cnt2 = 0;
// Get some statistic from 2 line
GetStatistic(line1.c_str(), &comma_cnt, &tab_cnt, &colon_cnt);
GetStatistic(line2.c_str(), &comma_cnt2, &tab_cnt2, &colon_cnt2);
if (line2.size() == 0) {
// if only have one line on file
if (colon_cnt > 0) {
return new LibSVMParser();
} else if (tab_cnt > 0) {
return new TSVParser();
} else if (comma_cnt > 0) {
return new CSVParser();
} else {
return nullptr;
}
} else {
if (colon_cnt > 0 || colon_cnt2 > 0) {
return new LibSVMParser();
}
else if (tab_cnt == tab_cnt2 && tab_cnt > 0) {
return new TSVParser();
} else if (comma_cnt == comma_cnt2 && comma_cnt > 0) {
return new CSVParser();
} else {
return nullptr;
}
}
}
} // namespace LightGBM
#ifndef LIGHTGBM_IO_PARSER_HPP_
#define LIGHTGBM_IO_PARSER_HPP_
#include <LightGBM/utils/common.h>
#include <LightGBM/utils/log.h>
#include <LightGBM/dataset.h>
#include <unordered_map>
#include <vector>
#include <utility>
namespace LightGBM {
class CSVParser: public Parser {
public:
inline void ParseOneLine(const char* str,
std::vector<std::pair<int, double>>* out_features) const override {
int idx = 0;
double val = 0.0;
while (*str != '\0') {
str = Common::Atof(str, &val);
out_features->emplace_back(idx, val);
++idx;
if (*str == ',') {
++str;
} else if (*str != '\0') {
Log::Stderr("input format error, should be CSV");
}
}
}
inline void ParseOneLine(const char* str, std::vector<std::pair<int, double>>* out_features,
double* out_label) const override {
// first column is label
str = Common::Atof(str, out_label);
if (*str == ',') {
++str;
} else if (*str != '\0') {
Log::Stderr("input format error, should be CSV");
}
return ParseOneLine(str, out_features);
}
};
class TSVParser: public Parser {
public:
inline void ParseOneLine(const char* str, std::vector<std::pair<int, double>>* out_features) const override {
int idx = 0;
double val = 0.0;
while (*str != '\0') {
str = Common::Atof(str, &val);
out_features->emplace_back(idx, val);
++idx;
if (*str == '\t') {
++str;
} else if (*str != '\0') {
Log::Stderr("input format error, should be TSV");
}
}
}
inline void ParseOneLine(const char* str, std::vector<std::pair<int, double>>* out_features,
double* out_label) const override{
// first column is label
str = Common::Atof(str, out_label);
if (*str == '\t') {
++str;
} else if (*str != '\0') {
Log::Stderr("input format error, should be TSV");
}
return ParseOneLine(str, out_features);
}
};
class LibSVMParser: public Parser {
public:
inline void ParseOneLine(const char* str, std::vector<std::pair<int, double>>* out_features) const override {
int idx = 0;
double val = 0.0;
while (*str != '\0') {
str = Common::Atoi(str, &idx);
str = Common::SkipSpaceAndTab(str);
if (*str == ':') {
++str;
str = Common::Atof(str, &val);
out_features->emplace_back(idx, val);
} else {
Log::Stderr("input format error, should be LibSVM");
}
str = Common::SkipSpaceAndTab(str);
}
}
inline void ParseOneLine(const char* str, std::vector<std::pair<int, double>>* out_features,
double* out_label) const override{
// first column is label
str = Common::Atof(str, out_label);
str = Common::SkipSpaceAndTab(str);
return ParseOneLine(str, out_features);
}
};
} // namespace LightGBM
#endif #endif // LightGBM_IO_PARSER_HPP_
#ifndef LIGHTGBM_IO_SPARSE_BIN_HPP_
#define LIGHTGBM_IO_SPARSE_BIN_HPP_
#include <LightGBM/utils/log.h>
#include <LightGBM/bin.h>
#include "ordered_sparse_bin.hpp"
#include <omp.h>
#include <cstring>
#include <cstdint>
#include <vector>
namespace LightGBM {
const size_t kNumFastIndex = 64;
template <typename VAL_T> class SparseBinIterator;
template <typename VAL_T>
class SparseBin:public Bin {
public:
friend class SparseBinIterator<VAL_T>;
explicit SparseBin(data_size_t num_data)
: num_data_(num_data) {
#pragma omp parallel
#pragma omp master
{
num_threads_ = omp_get_num_threads();
}
for (int i = 0; i < num_threads_; ++i) {
push_buffers_.emplace_back();
}
}
~SparseBin() {
}
void Push(int tid, data_size_t idx, uint32_t value) override {
// not store zero data
if (value == 0) { return; }
push_buffers_[tid].emplace_back(idx, static_cast<VAL_T>(value));
}
BinIterator* GetIterator(data_size_t start_idx) const override;
void ConstructHistogram(data_size_t*, data_size_t , const score_t* ,
const score_t* , HistogramBinEntry*) const override {
// Will use OrderedSparseBin->ConstructHistogram() instead
Log::Stderr("Should use OrderedSparseBin->ConstructHistogram() instead");
}
data_size_t Split(unsigned int threshold, data_size_t* data_indices, data_size_t num_data,
data_size_t* lte_indices, data_size_t* gt_indices) const override {
const auto fast_pair = fast_index_[(data_indices[0]) >> fast_index_shift_];
data_size_t j = fast_pair.first;
data_size_t cur_pos = fast_pair.second;
data_size_t lte_count = 0;
data_size_t gt_count = 0;
for (data_size_t i = 0; i < num_data; i++) {
const data_size_t idx = data_indices[i];
while (cur_pos < idx && j < num_vals_) {
++j;
cur_pos += delta_[j];
}
VAL_T bin = 0;
if (cur_pos == idx && j < num_vals_) {
bin = vals_[j];
}
if (bin > threshold) {
gt_indices[gt_count++] = idx;
} else {
lte_indices[lte_count++] = idx;
}
}
return lte_count;
}
data_size_t num_data() const override { return num_data_; }
OrderedBin* CreateOrderedBin() const override {
return new OrderedSparseBin<VAL_T>(delta_, vals_);
}
void FinishLoad() override {
// get total non zero size
size_t non_zero_size = 0;
for (size_t i = 0; i < push_buffers_.size(); i++) {
non_zero_size += push_buffers_[i].size();
}
// merge
non_zero_pair_.reserve(non_zero_size);
for (size_t i = 0; i < push_buffers_.size(); i++) {
non_zero_pair_.insert(non_zero_pair_.end(), push_buffers_[i].begin(), push_buffers_[i].end());
push_buffers_[i].clear();
push_buffers_[i].shrink_to_fit();
}
push_buffers_.clear();
push_buffers_.shrink_to_fit();
// sort by data index
std::sort(non_zero_pair_.begin(), non_zero_pair_.end(),
[](const std::pair<data_size_t, VAL_T>& a, const std::pair<data_size_t, VAL_T>& b) {
return a.first < b.first;
});
// load detla array
LoadFromPair(non_zero_pair_);
// free memory
non_zero_pair_.clear();
non_zero_pair_.shrink_to_fit();
}
void LoadFromPair(const std::vector<std::pair<data_size_t, VAL_T>>& non_zero_pair) {
delta_.clear();
vals_.clear();
// transform to delta array
const uint8_t kMaxDelta = 255;
data_size_t last_idx = 0;
for (size_t i = 0; i < non_zero_pair.size(); i++) {
const data_size_t cur_idx = non_zero_pair[i].first;
const VAL_T bin = non_zero_pair[i].second;
data_size_t cur_delta = cur_idx - last_idx;
while (cur_delta > kMaxDelta) {
delta_.push_back(255);
vals_.push_back(0);
cur_delta -= kMaxDelta;
}
delta_.push_back(static_cast<uint8_t>(cur_delta));
vals_.push_back(bin);
last_idx = cur_idx;
}
// avoid out of range
delta_.push_back(0);
num_vals_ = static_cast<data_size_t>(vals_.size());
// reduce memory cost
delta_.shrink_to_fit();
vals_.shrink_to_fit();
// generate fast index
GetFastIndex();
}
void GetFastIndex() {
fast_index_.clear();
// get shift cnt
data_size_t mod_size = (num_data_ + kNumFastIndex - 1) / kNumFastIndex;
data_size_t pow2_mod_size = 1;
fast_index_shift_ = 0;
while (pow2_mod_size < mod_size) {
pow2_mod_size <<= 1;
++fast_index_shift_;
}
// build fast index
data_size_t next_i = 0;
data_size_t cur_pos = 0;
for (data_size_t i = 0; i < num_vals_; ++i) {
cur_pos += delta_[i];
while (next_i < cur_pos) {
fast_index_.emplace_back(i, cur_pos);
next_i += pow2_mod_size;
}
}
// avoid out of range
while (next_i < num_data_) {
fast_index_.emplace_back(num_vals_ - 1, cur_pos);
next_i += pow2_mod_size;
}
fast_index_.shrink_to_fit();
}
void SaveBinaryToFile(FILE* file) const override {
fwrite(&num_vals_, sizeof(num_vals_), 1, file);
fwrite(delta_.data(), sizeof(uint8_t), num_vals_ + 1, file);
fwrite(vals_.data(), sizeof(VAL_T), num_vals_, file);
}
size_t SizesInByte() const override {
return sizeof(num_vals_) + sizeof(uint8_t) * (num_vals_ + 1)
+ sizeof(VAL_T) * num_vals_;
}
void LoadFromMemory(const void* memory, const std::vector<data_size_t>& local_used_indices) override {
const char* mem_ptr = reinterpret_cast<const char*>(memory);
data_size_t tmp_num_vals = *(reinterpret_cast<const data_size_t*>(mem_ptr));
mem_ptr += sizeof(tmp_num_vals);
const uint8_t* tmp_delta = reinterpret_cast<const uint8_t*>(mem_ptr);
mem_ptr += sizeof(uint8_t) * (tmp_num_vals + 1);
const VAL_T* tmp_vals = reinterpret_cast<const VAL_T*>(mem_ptr);
if (local_used_indices.size() <= 0) {
delta_.clear();
vals_.clear();
num_vals_ = tmp_num_vals;
for (data_size_t i = 0; i < num_vals_; i++) {
delta_.push_back(tmp_delta[i]);
vals_.push_back(tmp_vals[i]);
}
delta_.push_back(0);
// reduce memory cost
delta_.shrink_to_fit();
vals_.shrink_to_fit();
// generate fast index
GetFastIndex();
} else {
std::vector<std::pair<data_size_t, VAL_T>> tmp_pair;
data_size_t cur_pos = tmp_delta[0];
data_size_t j = 0;
for (data_size_t i = 0; i < static_cast<data_size_t>(local_used_indices.size()); ++i) {
const data_size_t idx = local_used_indices[i];
while (cur_pos < idx && j < tmp_num_vals) {
++j;
cur_pos += tmp_delta[j];
}
VAL_T bin = 0;
if (cur_pos == idx && j < tmp_num_vals) {
bin = tmp_vals[j];
}
if (bin > 0) {
// new row index is i
tmp_pair.emplace_back(i, bin);
}
}
LoadFromPair(tmp_pair);
}
}
private:
data_size_t num_data_;
std::vector<std::pair<data_size_t, VAL_T>> non_zero_pair_;
std::vector<uint8_t> delta_;
std::vector<VAL_T> vals_;
data_size_t num_vals_;
int num_threads_;
std::vector<std::vector<std::pair<data_size_t, VAL_T>>> push_buffers_;
std::vector<std::pair<data_size_t, data_size_t>> fast_index_;
data_size_t fast_index_shift_;
};
template <typename VAL_T>
class SparseBinIterator: public BinIterator {
public:
SparseBinIterator(const SparseBin<VAL_T>* bin_data, data_size_t start_idx)
: bin_data_(bin_data) {
const auto fast_pair = bin_data->fast_index_[start_idx >> bin_data->fast_index_shift_];
i_delta_ = fast_pair.first;
cur_pos_ = fast_pair.second;
}
uint32_t Get(data_size_t idx) override {
while (cur_pos_ < idx && i_delta_ < bin_data_->num_vals_) {
++i_delta_;
cur_pos_ += bin_data_->delta_[i_delta_];
}
if (idx == cur_pos_ && i_delta_ >= 0
&& i_delta_ < bin_data_->vals_.size()) {
return bin_data_->vals_[i_delta_];
} else { return 0; }
}
private:
const SparseBin<VAL_T>* bin_data_;
data_size_t cur_pos_ = 0;
data_size_t i_delta_ = 0;
};
template <typename VAL_T>
BinIterator* SparseBin<VAL_T>::GetIterator(data_size_t start_idx) const {
return new SparseBinIterator<VAL_T>(this, start_idx);
}
} // namespace LightGBM
#endif #endif // LightGBM_IO_SPARSE_BIN_HPP_
#include <LightGBM/tree.h>
#include <LightGBM/utils/threading.h>
#include <LightGBM/utils/common.h>
#include <LightGBM/dataset.h>
#include <LightGBM/feature.h>
#include <sstream>
#include <unordered_map>
#include <functional>
#include <vector>
#include <string>
namespace LightGBM {
Tree::Tree(int max_leaves)
:max_leaves_(max_leaves) {
num_leaves_ = 0;
left_child_ = new int[max_leaves_ - 1];
right_child_ = new int[max_leaves_ - 1];
split_feature_ = new int[max_leaves_ - 1];
split_feature_real_ = new int[max_leaves_ - 1];
threshold_in_bin_ = new unsigned int[max_leaves_ - 1];
threshold_ = new double[max_leaves_ - 1];
split_gain_ = new double[max_leaves_ - 1];
leaf_parent_ = new int[max_leaves_];
leaf_value_ = new score_t[max_leaves_];
num_leaves_ = 1;
leaf_parent_[0] = -1;
}
Tree::~Tree() {
if (leaf_parent_ != nullptr) { delete[] leaf_parent_; }
if (left_child_ != nullptr) { delete[] left_child_; }
if (right_child_ != nullptr) { delete[] right_child_; }
if (split_feature_ != nullptr) { delete[] split_feature_; }
if (split_feature_real_ != nullptr) { delete[] split_feature_real_; }
if (threshold_in_bin_ != nullptr) { delete[] threshold_in_bin_; }
if (threshold_ != nullptr) { delete[] threshold_; }
if (split_gain_ != nullptr) { delete[] split_gain_; }
if (leaf_value_ != nullptr) { delete[] leaf_value_; }
}
int Tree::Split(int leaf, int feature, unsigned int threshold_bin, int real_feature,
double threshold, score_t left_value, score_t right_value, double gain) {
int new_node_idx = num_leaves_ - 1;
// update parent info
int parent = leaf_parent_[leaf];
if (parent >= 0) {
// if cur node is left child
if (left_child_[parent] == ~leaf) {
left_child_[parent] = new_node_idx;
} else {
right_child_[parent] = new_node_idx;
}
}
// add new node
split_feature_[new_node_idx] = feature;
split_feature_real_[new_node_idx] = real_feature;
threshold_in_bin_[new_node_idx] = threshold_bin;
threshold_[new_node_idx] = threshold;
split_gain_[new_node_idx] = gain;
// add two new leaves
left_child_[new_node_idx] = ~leaf;
right_child_[new_node_idx] = ~num_leaves_;
// update new leaves
leaf_parent_[leaf] = new_node_idx;
leaf_parent_[num_leaves_] = new_node_idx;
leaf_value_[leaf] = left_value;
leaf_value_[num_leaves_] = right_value;
++num_leaves_;
return num_leaves_ - 1;
}
void Tree::AddPredictionToScore(const Dataset* data, data_size_t num_data, score_t* score) const {
Threading::For<data_size_t>(0, num_data, [this, data, score](int, data_size_t start, data_size_t end) {
std::vector<BinIterator*> iterators;
for (int i = 0; i < data->num_features(); i++) {
iterators.push_back(data->FeatureAt(i)->bin_data()->GetIterator(start));
}
for (data_size_t i = start; i < end; i++) {
score[i] += leaf_value_[GetLeaf(iterators, i)];
}
});
}
void Tree::AddPredictionToScore(const Dataset* data, const data_size_t* used_data_indices,
data_size_t num_data, score_t* score) const {
Threading::For<data_size_t>(0, num_data,
[this, data, used_data_indices, score](int, data_size_t start, data_size_t end) {
std::vector<BinIterator*> iterators;
for (int i = 0; i < data->num_features(); i++) {
iterators.push_back(data->FeatureAt(i)->bin_data()->GetIterator(used_data_indices[start]));
}
for (data_size_t i = start; i < end; i++) {
score[used_data_indices[i]] += leaf_value_[GetLeaf(iterators, used_data_indices[i])];
}
});
}
std::string Tree::ToString() {
std::stringstream ss;
ss << "num_leaves=" << num_leaves_ << std::endl;
ss << "split_feature="
<< Common::ArrayToString<int>(split_feature_real_, num_leaves_ - 1, ' ') << std::endl;
ss << "split_gain="
<< Common::ArrayToString<double>(split_gain_, num_leaves_ - 1, ' ') << std::endl;
ss << "threshold="
<< Common::ArrayToString<double>(threshold_, num_leaves_ - 1, ' ') << std::endl;
ss << "left_child="
<< Common::ArrayToString<int>(left_child_, num_leaves_ - 1, ' ') << std::endl;
ss << "right_child="
<< Common::ArrayToString<int>(right_child_, num_leaves_ - 1, ' ') << std::endl;
ss << "leaf_parent="
<< Common::ArrayToString<int>(leaf_parent_, num_leaves_, ' ') << std::endl;
ss << "leaf_value="
<< Common::ArrayToString<score_t>(leaf_value_, num_leaves_, ' ') << std::endl;
ss << std::endl;
return ss.str();
}
Tree::Tree(const std::string& str) {
std::vector<std::string> lines = Common::Split(str.c_str(), '\n');
std::unordered_map<std::string, std::string> key_vals;
for (const std::string& line : lines) {
std::vector<std::string> tmp_strs = Common::Split(line.c_str(), '=');
if (tmp_strs.size() == 2) {
std::string key = Common::Trim(tmp_strs[0]);
std::string val = Common::Trim(tmp_strs[1]);
if (key.size() > 0 && val.size() > 0) {
key_vals[key] = val;
}
}
}
if (key_vals.count("num_leaves") <= 0 || key_vals.count("split_feature") <= 0
|| key_vals.count("split_gain") <= 0 || key_vals.count("threshold") <= 0
|| key_vals.count("left_child") <= 0 || key_vals.count("right_child") <= 0
|| key_vals.count("leaf_parent") <= 0 || key_vals.count("leaf_value") <= 0) {
Log::Stderr("tree model string format error");
}
Common::Atoi(key_vals["num_leaves"].c_str(), &num_leaves_);
left_child_ = new int[num_leaves_ - 1];
right_child_ = new int[num_leaves_ - 1];
split_feature_real_ = new int[num_leaves_ - 1];
threshold_ = new double[num_leaves_ - 1];
split_gain_ = new double[num_leaves_ - 1];
leaf_parent_ = new int[num_leaves_];
leaf_value_ = new score_t[num_leaves_];
split_feature_ = nullptr;
threshold_in_bin_ = nullptr;
Common::StringToIntArray(key_vals["split_feature"], ' ',
num_leaves_ - 1, split_feature_real_);
Common::StringToDoubleArray(key_vals["split_gain"], ' ',
num_leaves_ - 1, split_gain_);
Common::StringToDoubleArray(key_vals["threshold"], ' ',
num_leaves_ - 1, threshold_);
Common::StringToIntArray(key_vals["left_child"], ' ',
num_leaves_ - 1, left_child_);
Common::StringToIntArray(key_vals["right_child"], ' ',
num_leaves_ - 1, right_child_);
Common::StringToIntArray(key_vals["leaf_parent"], ' ',
num_leaves_ , leaf_parent_);
Common::StringToDoubleArray(key_vals["leaf_value"], ' ',
num_leaves_ , leaf_value_);
}
} // namespace LightGBM
#include <LightGBM/application.h>
int main(int argc, char** argv) {
LightGBM::Application app(argc, argv);
app.Run();
}
#ifndef LIGHTGBM_METRIC_BINARY_METRIC_HPP_
#define LIGHTGBM_METRIC_BINARY_METRIC_HPP_
#include <LightGBM/utils/log.h>
#include <LightGBM/metric.h>
#include <algorithm>
#include <vector>
namespace LightGBM {
/*!
* \brief Metric for binary classification task.
* Use static class "PointWiseLossCalculator" to calculate loss point-wise
*/
template<typename PointWiseLossCalculator>
class BinaryMetric: public Metric {
public:
explicit BinaryMetric(const MetricConfig& config) {
output_freq_ = config.output_freq;
sigmoid_ = static_cast<score_t>(config.sigmoid);
if (sigmoid_ <= 0.0f) {
Log::Stderr("sigmoid param %f should greater than zero", sigmoid_);
}
}
virtual ~BinaryMetric() {
}
void Init(const char* test_name, const Metadata& metadata, data_size_t num_data) override {
name = test_name;
num_data_ = num_data;
// get label
label_ = metadata.label();
// get weights
weights_ = metadata.weights();
if (weights_ == nullptr) {
sum_weights_ = static_cast<double>(num_data_);
} else {
sum_weights_ = 0.0f;
for (data_size_t i = 0; i < num_data; ++i) {
sum_weights_ += weights_[i];
}
}
}
void Print(int iter, const score_t* score) const override {
score_t sum_loss = 0.0f;
if (output_freq_ > 0 && iter % output_freq_ == 0) {
if (weights_ == nullptr) {
#pragma omp parallel for schedule(static) reduction(+:sum_loss)
for (data_size_t i = 0; i < num_data_; ++i) {
// sigmoid transform
score_t prob = 1.0f / (1.0f + std::exp(-sigmoid_ * score[i]));
// add loss
sum_loss += PointWiseLossCalculator::LossOnPoint(label_[i], prob);
}
} else {
#pragma omp parallel for schedule(static) reduction(+:sum_loss)
for (data_size_t i = 0; i < num_data_; ++i) {
// sigmoid transform
score_t prob = 1.0f / (1.0f + std::exp(-sigmoid_ * score[i]));
// add loss
sum_loss += PointWiseLossCalculator::LossOnPoint(label_[i], prob) * weights_[i];
}
}
Log::Stdout("Iteration:%d, %s's %s: %f", iter, name, PointWiseLossCalculator::Name(), sum_loss / sum_weights_);
}
}
private:
/*! \brief Output frequently */
int output_freq_;
/*! \brief Number of data */
data_size_t num_data_;
/*! \brief Pointer of label */
const float* label_;
/*! \brief Pointer of weighs */
const float* weights_;
/*! \brief Sum weights */
double sum_weights_;
/*! \brief Name of test set */
const char* name;
/*! \brief Sigmoid parameter */
score_t sigmoid_;
};
/*!
* \brief Log loss metric for binary classification task.
*/
class BinaryLoglossMetric: public BinaryMetric<BinaryLoglossMetric> {
public:
explicit BinaryLoglossMetric(const MetricConfig& config) :BinaryMetric<BinaryLoglossMetric>(config) {}
inline static score_t LossOnPoint(float label, score_t prob) {
if (label == 0) {
if (1.0f - prob > kEpsilon) {
return -std::log(1.0f - prob);
}
} else {
if (prob > kEpsilon) {
return -std::log(prob);
}
}
return -std::log(kEpsilon);
}
inline static const char* Name() {
return "log loss";
}
};
/*!
* \brief Error rate metric for binary classification task.
*/
class BinaryErrorMetric: public BinaryMetric<BinaryErrorMetric> {
public:
explicit BinaryErrorMetric(const MetricConfig& config) :BinaryMetric<BinaryErrorMetric>(config) {}
inline static score_t LossOnPoint(float label, score_t prob) {
if (prob < 0.5f) {
return label;
} else {
return 1.0f - label;
}
}
inline static const char* Name() {
return "error rate";
}
};
/*!
* \brief Auc Metric for binary classification task.
*/
class AUCMetric: public Metric {
public:
explicit AUCMetric(const MetricConfig& config) {
output_freq_ = config.output_freq;
}
virtual ~AUCMetric() {
}
void Init(const char* test_name, const Metadata& metadata, data_size_t num_data) override {
name = test_name;
num_data_ = num_data;
// get label
label_ = metadata.label();
// get weights
weights_ = metadata.weights();
if (weights_ == nullptr) {
sum_weights_ = static_cast<double>(num_data_);
} else {
sum_weights_ = 0.0f;
for (data_size_t i = 0; i < num_data; ++i) {
sum_weights_ += weights_[i];
}
}
}
void Print(int iter, const score_t* score) const override {
if (output_freq_ > 0 && iter % output_freq_ == 0) {
// get indices sorted by score, descent order
std::vector<data_size_t> sorted_idx;
for (data_size_t i = 0; i < num_data_; ++i) {
sorted_idx.emplace_back(i);
}
std::sort(sorted_idx.begin(), sorted_idx.end(), [score](data_size_t a, data_size_t b) {return score[a] > score[b]; });
// temp sum of postive label
double cur_pos = 0.0;
// total sum of postive label
double sum_pos = 0.0;
// accumlate of auc
double accum = 0.0;
// temp sum of negative label
double cur_neg = 0.0;
score_t threshold = score[sorted_idx[0]];
if (weights_ == nullptr) { // not weights
for (data_size_t i = 0; i < num_data_; ++i) {
const float cur_label = label_[sorted_idx[i]];
const score_t cur_score = score[sorted_idx[i]];
// new threshold
if (cur_score != threshold) {
threshold = cur_score;
// accmulate
accum += cur_neg*(cur_pos * 0.5 + sum_pos);
sum_pos += cur_pos;
// reset
cur_neg = cur_pos = 0.0;
}
cur_neg += 1.0 - cur_label;
cur_pos += cur_label;
}
} else { // has weights
for (data_size_t i = 0; i < num_data_; ++i) {
const float cur_label = label_[sorted_idx[i]];
const score_t cur_score = score[sorted_idx[i]];
const float cur_weight = weights_[sorted_idx[i]];
// new threshold
if (cur_score != threshold) {
threshold = cur_score;
// accmulate
accum += cur_neg*(cur_pos * 0.5 + sum_pos);
sum_pos += cur_pos;
// reset
cur_neg = cur_pos = 0.0;
}
cur_neg += (1.0 - cur_label)*cur_weight;
cur_pos += cur_label*cur_weight;
}
}
accum += cur_neg*(cur_pos * 0.5 + sum_pos);
sum_pos += cur_pos;
double auc = 1.0;
if (sum_pos > 0.0f && sum_pos != sum_weights_) {
auc = accum / (sum_pos *(sum_weights_ - sum_pos));
}
Log::Stdout("iteration:%d, %s's %s: %f", iter, name, "auc", auc);
}
}
private:
/*! \brief Output frequently */
int output_freq_;
/*! \brief Number of data */
data_size_t num_data_;
/*! \brief Pointer of label */
const float* label_;
/*! \brief Pointer of weighs */
const float* weights_;
/*! \brief Sum weights */
double sum_weights_;
/*! \brief Name of test set */
const char* name;
};
} // namespace LightGBM
#endif #endif // LightGBM_METRIC_BINARY_METRIC_HPP_
#include <LightGBM/metric.h>
#include <LightGBM/utils/log.h>
#include <cmath>
#include <vector>
#include <algorithm>
namespace LightGBM {
/*! \brief Declaration for some static members */
bool DCGCalculator::is_inited_ = false;
std::vector<double> DCGCalculator::label_gain_;
std::vector<double> DCGCalculator::discount_;
const data_size_t DCGCalculator::kMaxPosition = 10000;
void DCGCalculator::Init(std::vector<double> input_label_gain) {
// only inited one time
if (is_inited_) { return; }
label_gain_ = input_label_gain;
discount_.clear();
for (data_size_t i = 0; i < kMaxPosition; ++i) {
discount_.emplace_back(1.0 / std::log(2.0 + i));
}
is_inited_ = true;
}
double DCGCalculator::CalMaxDCGAtK(data_size_t k, const float* label, data_size_t num_data) {
double ret = 0.0;
// counts for all labels
std::vector<data_size_t> label_cnt(label_gain_.size(), 0);
for (data_size_t i = 0; i < num_data; ++i) {
++label_cnt[static_cast<int>(label[i])];
}
size_t top_label = label_gain_.size() - 1;
if (k > num_data) { k = num_data; }
// start from top label, and accumulate DCG
for (data_size_t j = 0; j < k; ++j) {
while (top_label > 0 && label_cnt[top_label] <= 0) {
top_label -= 1;
}
if (top_label < 0) {
break;
}
ret += discount_[j] * label_gain_[top_label];
label_cnt[top_label] -= 1;
}
return ret;
}
void DCGCalculator::CalMaxDCG(const std::vector<data_size_t>& ks,
const float* label,
data_size_t num_data,
std::vector<double>* out) {
std::vector<data_size_t> label_cnt(label_gain_.size(), 0);
// counts for all labels
for (data_size_t i = 0; i < num_data; ++i) {
if (static_cast<size_t>(label[i]) >= label_cnt.size()) { Log::Stderr("label excel %d\n", label[i]); }
++label_cnt[static_cast<int>(label[i])];
}
double cur_result = 0.0;
data_size_t cur_left = 0;
size_t top_label = label_gain_.size() - 1;
// calculate k Max DCG by one pass
for (size_t i = 0; i < ks.size(); ++i) {
data_size_t cur_k = ks[i];
if (cur_k > num_data) { cur_k = num_data; }
for (data_size_t j = cur_left; j < cur_k; ++j) {
while (top_label > 0 && label_cnt[top_label] <= 0) {
top_label -= 1;
}
if (top_label < 0) {
break;
}
cur_result += discount_[j] * label_gain_[top_label];
label_cnt[top_label] -= 1;
}
(*out)[i] = cur_result;
cur_left = cur_k;
}
}
double DCGCalculator::CalDCGAtK(data_size_t k, const float* label,
const score_t* score, data_size_t num_data) {
// get sorted indices by score
std::vector<data_size_t> sorted_idx;
for (data_size_t i = 0; i < num_data; ++i) {
sorted_idx.emplace_back(i);
}
std::sort(sorted_idx.begin(), sorted_idx.end(),
[score](data_size_t a, data_size_t b) {return score[a] > score[b]; });
if (k > num_data) { k = num_data; }
double dcg = 0.0;
// calculate dcg
for (data_size_t i = 0; i < k; ++i) {
data_size_t idx = sorted_idx[i];
dcg += label_gain_[static_cast<int>(label[idx])] * discount_[i];
}
return dcg;
}
void DCGCalculator::CalDCG(const std::vector<data_size_t>& ks, const float* label,
const score_t * score, data_size_t num_data, std::vector<double>* out) {
// get sorted indices by score
std::vector<data_size_t> sorted_idx;
for (data_size_t i = 0; i < num_data; ++i) {
sorted_idx.emplace_back(i);
}
std::sort(sorted_idx.begin(), sorted_idx.end(),
[score](data_size_t a, data_size_t b) {return score[a] > score[b]; });
double cur_result = 0.0;
data_size_t cur_left = 0;
// calculate multi dcg by one pass
for (size_t i = 0; i < ks.size(); ++i) {
data_size_t cur_k = ks[i];
if (cur_k > num_data) { cur_k = num_data; }
for (data_size_t j = cur_left; j < cur_k; ++j) {
data_size_t idx = sorted_idx[j];
cur_result += label_gain_[static_cast<int>(label[idx])] * discount_[j];
}
(*out)[i] = cur_result;
cur_left = cur_k;
}
}
} // namespace LightGBM
#include <LightGBM/metric.h>
#include "regression_metric.hpp"
#include "binary_metric.hpp"
#include "rank_metric.hpp"
namespace LightGBM {
Metric* Metric::CreateMetric(const std::string& type, const MetricConfig& config) {
if (type == "l2") {
return new L2Metric(config);
} else if (type == "l1") {
return new L1Metric(config);
} else if (type == "binary_logloss") {
return new BinaryLoglossMetric(config);
} else if (type == "binary_error") {
return new BinaryErrorMetric(config);
} else if (type == "auc") {
return new AUCMetric(config);
} else if (type == "ndcg") {
return new NDCGMetric(config);
}
return nullptr;
}
} // namespace LightGBM
#ifndef LIGHTGBM_METRIC_RANK_METRIC_HPP_
#define LIGHTGBM_METRIC_RANK_METRIC_HPP_
#include <LightGBM/utils/common.h>
#include <LightGBM/utils/log.h>
#include <LightGBM/metric.h>
#include <omp.h>
#include <sstream>
#include <vector>
namespace LightGBM {
class NDCGMetric:public Metric {
public:
explicit NDCGMetric(const MetricConfig& config) {
output_freq_ = config.output_freq;
// get eval position
for (auto k : config.eval_at) {
eval_at_.push_back(static_cast<data_size_t>(k));
}
// initialize DCG calculator
DCGCalculator::Init(config.label_gain);
// get number of threads
#pragma omp parallel
#pragma omp master
{
num_threads_ = omp_get_num_threads();
}
}
~NDCGMetric() {
}
void Init(const char* test_name, const Metadata& metadata, data_size_t num_data) override {
name = test_name;
num_data_ = num_data;
// get label
label_ = metadata.label();
// get query boundaries
query_boundaries_ = metadata.query_boundaries();
if (query_boundaries_ == nullptr) {
Log::Stderr("For NDCG metric, should have query information");
}
num_queries_ = metadata.num_queries();
// get query weights
query_weights_ = metadata.query_weights();
if (query_weights_ == nullptr) {
sum_query_weights_ = static_cast<double>(num_queries_);
} else {
sum_query_weights_ = 0.0f;
for (data_size_t i = 0; i < num_queries_; ++i) {
sum_query_weights_ += query_weights_[i];
}
}
// cache the inverse max DCG for all querys, used to calculate NDCG
for (data_size_t i = 0; i < num_queries_; ++i) {
inverse_max_dcgs_.emplace_back(eval_at_.size(), 0.0);
DCGCalculator::CalMaxDCG(eval_at_, label_ + query_boundaries_[i],
query_boundaries_[i + 1] - query_boundaries_[i],
&inverse_max_dcgs_[i]);
for (size_t j = 0; j < inverse_max_dcgs_[i].size(); ++j) {
if (inverse_max_dcgs_[i][j] > 0.0) {
inverse_max_dcgs_[i][j] = 1.0 / inverse_max_dcgs_[i][j];
}
else {
// marking negative for all negative querys.
// if one meet this query, it's ndcg will be set as -1.
inverse_max_dcgs_[i][j] = -1.0;
}
}
}
}
void Print(int iter, const score_t* score) const override {
if (output_freq_ > 0 && iter % output_freq_ == 0) {
// some buffers for multi-threading sum up
std::vector<std::vector<double>> result_buffer_;
for (int i = 0; i < num_threads_; ++i) {
result_buffer_.emplace_back(eval_at_.size(), 0.0);
}
std::vector<double> tmp_dcg(eval_at_.size(), 0.0);
if (query_weights_ == nullptr) {
#pragma omp parallel for schedule(guided) firstprivate(tmp_dcg)
for (data_size_t i = 0; i < num_queries_; ++i) {
const int tid = omp_get_thread_num();
// if all doc in this query are all negative, let its NDCG=1
if (inverse_max_dcgs_[i][0] <= 0.0) {
for (size_t j = 0; j < eval_at_.size(); ++j) {
result_buffer_[tid][j] += 1.0;
}
} else {
// calculate DCG
DCGCalculator::CalDCG(eval_at_, label_ + query_boundaries_[i],
score + query_boundaries_[i],
query_boundaries_[i + 1] - query_boundaries_[i], &tmp_dcg);
// calculate NDCG
for (size_t j = 0; j < eval_at_.size(); ++j) {
result_buffer_[tid][j] += tmp_dcg[j] * inverse_max_dcgs_[i][j];
}
}
}
} else {
#pragma omp parallel for schedule(guided) firstprivate(tmp_dcg)
for (data_size_t i = 0; i < num_queries_; ++i) {
const int tid = omp_get_thread_num();
// if all doc in this query are all negative, let its NDCG=1
if (inverse_max_dcgs_[i][0] <= 0.0) {
for (size_t j = 0; j < eval_at_.size(); ++j) {
result_buffer_[tid][j] += 1.0;
}
} else {
// calculate DCG
DCGCalculator::CalDCG(eval_at_, label_ + query_boundaries_[i],
score + query_boundaries_[i],
query_boundaries_[i + 1] - query_boundaries_[i], &tmp_dcg);
// calculate NDCG
for (size_t j = 0; j < eval_at_.size(); ++j) {
result_buffer_[tid][j] += tmp_dcg[j] * inverse_max_dcgs_[i][j] * query_weights_[i];
}
}
}
}
// Get final average NDCG
std::vector<double> result(eval_at_.size(), 0.0);
std::stringstream result_ss;
for (size_t j = 0; j < result.size(); ++j) {
for (int i = 0; i < num_threads_; ++i) {
result[j] += result_buffer_[i][j];
}
result[j] /= sum_query_weights_;
result_ss << "NDCG@" << eval_at_[j] << ":" << result[j] << "\t";
}
Log::Stdout("Iteration:%d, Test:%s, %s ", iter, name, result_ss.str().c_str());
}
}
private:
/*! \brief Output frequently */
int output_freq_;
/*! \brief Number of data */
data_size_t num_data_;
/*! \brief Pointer of label */
const float* label_;
/*! \brief Name of test set */
const char* name;
/*! \brief Query boundaries information */
const data_size_t* query_boundaries_;
/*! \brief Number of queries */
data_size_t num_queries_;
/*! \brief Weights of queries */
const float* query_weights_;
/*! \brief Sum weights of queries */
double sum_query_weights_;
/*! \brief Evaluate position of NDCG */
std::vector<data_size_t> eval_at_;
/*! \brief Cache the inverse max dcg for all queries */
std::vector<std::vector<double>> inverse_max_dcgs_;
/*! \brief Number of threads */
int num_threads_;
};
} // namespace LightGBM
#endif #endif // LightGBM_METRIC_RANK_METRIC_HPP_
#ifndef LIGHTGBM_METRIC_REGRESSION_METRIC_HPP_
#define LIGHTGBM_METRIC_REGRESSION_METRIC_HPP_
#include <LightGBM/utils/log.h>
#include <LightGBM/metric.h>
#include <cmath>
namespace LightGBM {
/*!
* \brief Metric for regression task.
* Use static class "PointWiseLossCalculator" to calculate loss point-wise
*/
template<typename PointWiseLossCalculator>
class RegressionMetric: public Metric {
public:
explicit RegressionMetric(const MetricConfig& config) {
output_freq_ = config.output_freq;
}
virtual ~RegressionMetric() {
}
void Init(const char* test_name, const Metadata& metadata, data_size_t num_data) override {
name = test_name;
num_data_ = num_data;
// get label
label_ = metadata.label();
// get weights
weights_ = metadata.weights();
if (weights_ == nullptr) {
sum_weights_ = static_cast<double>(num_data_);
} else {
sum_weights_ = 0.0f;
for (data_size_t i = 0; i < num_data_; ++i) {
sum_weights_ += weights_[i];
}
}
}
void Print(int iter, const score_t* score) const override {
if (output_freq_ > 0 && iter % output_freq_ == 0) {
score_t sum_loss = 0.0;
if (weights_ == nullptr) {
#pragma omp parallel for schedule(static) reduction(+:sum_loss)
for (data_size_t i = 0; i < num_data_; ++i) {
// add loss
sum_loss += PointWiseLossCalculator::LossOnPoint(label_[i], score[i]);
}
} else {
#pragma omp parallel for schedule(static) reduction(+:sum_loss)
for (data_size_t i = 0; i < num_data_; ++i) {
// add loss
sum_loss += PointWiseLossCalculator::LossOnPoint(label_[i], score[i]) * weights_[i];
}
}
Log::Stdout("Iteration:%d, %s's %s : %f", iter, name, PointWiseLossCalculator::Name(), PointWiseLossCalculator::AverageLoss(sum_loss, sum_weights_));
}
}
inline static score_t AverageLoss(score_t sum_loss, score_t sum_weights) {
return sum_loss / sum_weights;
}
private:
/*! \brief Output frequently */
int output_freq_;
/*! \brief Number of data */
data_size_t num_data_;
/*! \brief Pointer of label */
const float* label_;
/*! \brief Pointer of weighs */
const float* weights_;
/*! \brief Sum weights */
double sum_weights_;
/*! \brief Name of this test set */
const char* name;
};
/*! \brief L2 loss for regression task */
class L2Metric: public RegressionMetric<L2Metric> {
public:
explicit L2Metric(const MetricConfig& config) :RegressionMetric<L2Metric>(config) {}
inline static score_t LossOnPoint(float label, score_t score) {
return (score - label)*(score - label);
}
inline static score_t AverageLoss(score_t sum_loss, score_t sum_weights) {
// need sqrt the result for L2 loss
return std::sqrt(sum_loss / sum_weights);
}
inline static const char* Name() {
return "l2 loss";
}
};
/*! \brief L1 loss for regression task */
class L1Metric: public RegressionMetric<L1Metric> {
public:
explicit L1Metric(const MetricConfig& config) :RegressionMetric<L1Metric>(config) {}
inline static score_t LossOnPoint(float label, score_t score) {
return std::fabs(score - label);
}
inline static const char* Name() {
return "l1 loss";
}
};
} // namespace LightGBM
#endif #endif // LightGBM_METRIC_REGRESSION_METRIC_HPP_
#include <LightGBM/network.h>
#include <LightGBM/utils/common.h>
#include <LightGBM/utils/log.h>
#include <string>
#include <vector>
#include <unordered_map>
namespace LightGBM {
BruckMap::BruckMap() {
k = 0;
}
BruckMap::BruckMap(int n) {
k = n;
// default set to -1
for (int i = 0; i < n; ++i) {
in_ranks.push_back(-1);
out_ranks.push_back(-1);
}
}
BruckMap BruckMap::Construct(int rank, int num_machines) {
// distance at k-th communication, distance[k] = 2^k
std::vector<int> distance;
int k = 0;
for (k = 0; (1 << k) < num_machines; k++) {
distance.push_back(1 << k);
}
BruckMap bruckMap(k);
for (int j = 0; j < k; ++j) {
// set incoming rank at k-th commuication
const int in_rank = (rank + distance[j]) % num_machines;
bruckMap.in_ranks[j] = in_rank;
// set outgoing rank at k-th commuication
const int out_rank = (rank - distance[j] + num_machines) % num_machines;
bruckMap.out_ranks[j] = out_rank;
}
return bruckMap;
}
RecursiveHalvingMap::RecursiveHalvingMap() {
k = 0;
}
RecursiveHalvingMap::RecursiveHalvingMap(RecursiveHalvingNodeType _type, int n) {
type = _type;
k = n;
if (type != RecursiveHalvingNodeType::Other) {
for (int i = 0; i < n; ++i) {
// defalut set as -1
ranks.push_back(-1);
send_block_start.push_back(-1);
send_block_len.push_back(-1);
recv_block_start.push_back(-1);
recv_block_len.push_back(-1);
}
}
}
RecursiveHalvingMap RecursiveHalvingMap::Construct(int rank, int num_machines) {
// construct all recursive halving map for all machines
int k = 0;
while ((1 << k) <= num_machines) { ++k; }
// let 1 << k <= num_machines
--k;
// distance of each communication
std::vector<int> distance;
for (int i = 0; i < k; ++i) {
distance.push_back(1 << (k - 1 - i));
}
if ((1 << k) == num_machines) {
RecursiveHalvingMap rec_map(RecursiveHalvingNodeType::Normal, k);
// if num_machines = 2^k, don't need to group machines
for (int i = 0; i < k; ++i) {
// communication direction, %2 == 0 is positive
const int dir = ((rank / distance[i]) % 2 == 0) ? 1 : -1;
// neighbor at k-th communication
const int next_node_idx = rank + dir * distance[i];
rec_map.ranks[i] = next_node_idx;
// receive data block at k-th communication
const int recv_block_start = rank / distance[i];
rec_map.recv_block_start[i] = recv_block_start * distance[i];
rec_map.recv_block_len[i] = distance[i];
// send data block at k-th communication
const int send_block_start = next_node_idx / distance[i];
rec_map.send_block_start[i] = send_block_start * distance[i];
rec_map.send_block_len[i] = distance[i];
}
return rec_map;
} else {
// if num_machines != 2^k, need to group machines
int lower_power_of_2 = 1 << k;
int rest = num_machines - lower_power_of_2;
std::vector<RecursiveHalvingNodeType> node_type;
for (int i = 0; i < num_machines; ++i) {
node_type.push_back(RecursiveHalvingNodeType::Normal);
}
// group, two machine in one group, total "rest" groups will have 2 machines.
for (int i = 0; i < rest; ++i) {
int right = num_machines - i * 2 - 1;
int left = num_machines - i * 2 - 2;
// let left machine as group leader
node_type[left] = RecursiveHalvingNodeType::GroupLeader;
node_type[right] = RecursiveHalvingNodeType::Other;
}
int group_cnt = 0;
// cache block information for groups, group with 2 machines will have double block size
std::vector<int> group_block_start(lower_power_of_2);
std::vector<int> group_block_len(lower_power_of_2, 0);
// convert from group to node leader
std::vector<int> group_to_node(lower_power_of_2);
// convert from node to group
std::vector<int> node_to_group(num_machines);
for (int i = 0; i < num_machines; ++i) {
// meet new group
if (node_type[i] == RecursiveHalvingNodeType::Normal || node_type[i] == RecursiveHalvingNodeType::GroupLeader) {
group_to_node[group_cnt++] = i;
}
node_to_group[i] = group_cnt - 1;
// add block len for this group
group_block_len[group_cnt - 1]++;
}
// calculate the group block start
group_block_start[0] = 0;
for (int i = 1; i < lower_power_of_2; ++i) {
group_block_start[i] = group_block_start[i - 1] + group_block_len[i - 1];
}
RecursiveHalvingMap rec_map(node_type[rank], k);
if (node_type[rank] == RecursiveHalvingNodeType::Other) {
rec_map.neighbor = rank - 1;
// not need to construct
return rec_map;
}
if (node_type[rank] == RecursiveHalvingNodeType::GroupLeader) {
rec_map.neighbor = rank + 1;
}
const int cur_group_idx = node_to_group[rank];
for (int i = 0; i < k; ++i) {
const int dir = ((cur_group_idx / distance[i]) % 2 == 0) ? 1 : -1;
const int next_node_idx = group_to_node[(cur_group_idx + dir * distance[i])];
rec_map.ranks[i] = next_node_idx;
// get receive block informations
const int recv_block_start = cur_group_idx / distance[i];
rec_map.recv_block_start[i] = group_block_start[recv_block_start * distance[i]];
int recv_block_len = 0;
// accumulate block len
for (int j = 0; j < distance[i]; ++j) {
recv_block_len += group_block_len[recv_block_start * distance[i] + j];
}
rec_map.recv_block_len[i] = recv_block_len;
// get send block informations
const int send_block_start = (cur_group_idx + dir * distance[i]) / distance[i];
rec_map.send_block_start[i] = group_block_start[send_block_start * distance[i]];
int send_block_len = 0;
// accumulate block len
for (int j = 0; j < distance[i]; ++j) {
send_block_len += group_block_len[send_block_start * distance[i] + j];
}
rec_map.send_block_len[i] = send_block_len;
}
return rec_map;
}
}
} // namespace LightGBM
#ifndef LIGHTGBM_NETWORK_LINKERS_H_
#define LIGHTGBM_NETWORK_LINKERS_H_
#include <LightGBM/meta.h>
#include <LightGBM/config.h>
#include <LightGBM/network.h>
#include <chrono>
#include <ctime>
#ifdef USE_SOCKET
#include "socket_wrapper.hpp"
#include <LightGBM/utils/common.h>
#include <thread>
#include <vector>
#include <string>
#endif
#ifdef USE_MPI
#include <mpi.h>
#define MPI_SAFE_CALL(mpi_return) CHECK((mpi_return) == MPI_SUCCESS)
#endif
namespace LightGBM {
/*!
* \brief An network basic communication warpper.
* Will warp low level communication methods, e.g. mpi, socket and so on.
* This class will wrap all linkers to other machines if needs
*/
class Linkers {
public:
/*!
* \brief Constructor
* \param config Config of network settings
*/
explicit Linkers(NetworkConfig config);
/*!
* \brief Destructor
*/
~Linkers();
/*!
* \brief Recv data, blocking
* \param rank Which rank will send data to local machine
* \param data Pointer of receive data
* \prama len Recv size, will block until recive len size of data
*/
inline void Recv(int rank, char* data, int len) const;
/*!
* \brief Send data, blocking
* \param rank Which rank local machine will send to
* \param data Pointer of send data
* \prama len Send size
*/
inline void Send(int rank, char* data, int len) const;
/*!
* \brief Send and Recv at same time, blocking
* \param send_rank
* \param send_data
* \prama send_len
* \param recv_rank
* \param recv_data
* \prama recv_len
*/
inline void SendRecv(int send_rank, char* send_data, int send_len,
int recv_rank, char* recv_data, int recv_len);
/*!
* \brief Get rank of local machine
*/
inline int rank();
/*!
* \brief Get total number of machines
*/
inline int num_machines();
/*!
* \brief Get Bruck map of this network
*/
inline const BruckMap& bruck_map();
/*!
* \brief Get Recursive Halving map of this network
*/
inline const RecursiveHalvingMap& recursive_halving_map();
#ifdef USE_SOCKET
/*!
* \brief Bind local listen to port
* \param port Local listen port
*/
void TryBind(int port);
/*!
* \brief Set socket to rank
* \param rank
* \param socket
*/
void SetLinker(int rank, const TcpSocket& socket);
/*!
* \brief Thread for listening
* \param incoming_cnt Number of incoming machines
*/
void ListenThread(int incoming_cnt);
/*!
* \brief Construct network topo
*/
void Construct();
/*!
* \brief Parser machines information from file
* \param filename
*/
void ParseMachineList(const char * filename);
/*!
* \brief Check one linker is connected or not
* \param rank
* \return True if linker is connected
*/
bool CheckLinker(int rank);
/*!
* \brief Print connented linkers
*/
void PrintLinkers();
#endif // USE_SOCKET
private:
/*! \brief Rank of local machine */
int rank_;
/*! \brief Total number machines */
int num_machines_;
/*! \brief Bruck map */
BruckMap bruck_map_;
/*! \brief Recursive Halving map */
RecursiveHalvingMap recursive_halving_map_;
std::chrono::duration<double, std::milli> network_time_;
#ifdef USE_SOCKET
/*! \brief use to store client ips */
std::vector<std::string> client_ips_;
/*! \brief use to store client ports */
std::vector<int> client_ports_;
/*! \brief time out for sockets, in minutes */
int socket_timeout_;
/*! \brief Local listen ports */
int local_listen_port_;
/*! \brief Linkers */
std::vector<TcpSocket*> linkers_;
/*! \brief Local socket listener */
TcpSocket* listener_;
#endif // USE_SOCKET
};
inline int Linkers::rank() {
return rank_;
}
inline int Linkers::num_machines() {
return num_machines_;
}
inline const BruckMap& Linkers::bruck_map() {
return bruck_map_;
}
inline const RecursiveHalvingMap& Linkers::recursive_halving_map() {
return recursive_halving_map_;
}
#ifdef USE_SOCKET
inline void Linkers::Recv(int rank, char* data, int len) const {
int recv_cnt = 0;
while (recv_cnt < len) {
recv_cnt += linkers_[rank]->Recv(data + recv_cnt ,
//len - recv_cnt
Common::Min<int>(len - recv_cnt, SocketConfig::kMaxReceiveSize)
);
}
}
inline void Linkers::Send(int rank, char* data, int len) const {
if (len <= 0) {
return;
}
int send_cnt = 0;
while (send_cnt < len) {
send_cnt += linkers_[rank]->Send(data + send_cnt, len - send_cnt);
}
}
inline void Linkers::SendRecv(int send_rank, char* send_data, int send_len,
int recv_rank, char* recv_data, int recv_len) {
auto start_time = std::chrono::high_resolution_clock::now();
if (send_len < SocketConfig::kSocketBufferSize) {
// if buffer is enough, send will non-blocking
Send(send_rank, send_data, send_len);
Recv(recv_rank, recv_data, recv_len);
} else {
// if buffer is not enough, use another thread to send, since send will be blocking
std::thread send_worker(
[this, send_rank, send_data, send_len]() {
Send(send_rank, send_data, send_len);
});
Recv(recv_rank, recv_data, recv_len);
send_worker.join();
}
// wait for send complete
auto end_time = std::chrono::high_resolution_clock::now();
// output used time on each iteration
network_time_ += std::chrono::duration<double, std::milli>(end_time - start_time);
}
#endif // USE_SOCKET
#ifdef USE_MPI
inline void Linkers::Recv(int rank, char* data, int len) const {
MPI_Status status;
int read_cnt = 0;
while (read_cnt < len) {
MPI_SAFE_CALL(MPI_Recv(data + read_cnt, len - read_cnt, MPI_BYTE, rank, MPI_ANY_TAG, MPI_COMM_WORLD, &status));
int cur_cnt;
MPI_SAFE_CALL(MPI_Get_count(&status, MPI_BYTE, &cur_cnt));
read_cnt += cur_cnt;
}
}
inline void Linkers::Send(int rank, char* data, int len) const {
if (len <= 0) {
return;
}
MPI_Status status;
MPI_Request send_request;
MPI_SAFE_CALL(MPI_Isend(data, len, MPI_BYTE, rank, 0, MPI_COMM_WORLD, &send_request));
MPI_SAFE_CALL(MPI_Wait(&send_request, &status));
}
inline void Linkers::SendRecv(int send_rank, char* send_data, int send_len,
int recv_rank, char* recv_data, int recv_len) {
MPI_Request send_request;
// send first, non-blocking
MPI_SAFE_CALL(MPI_Isend(send_data, send_len, MPI_BYTE, send_rank, 0, MPI_COMM_WORLD, &send_request));
// then receive, blocking
MPI_Status status;
int read_cnt = 0;
while (read_cnt < recv_len) {
MPI_SAFE_CALL(MPI_Recv(recv_data + read_cnt, recv_len - read_cnt, MPI_BYTE, recv_rank, 0, MPI_COMM_WORLD, &status));
int cur_cnt;
MPI_SAFE_CALL(MPI_Get_count(&status, MPI_BYTE, &cur_cnt));
read_cnt += cur_cnt;
}
// wait for send complete
MPI_SAFE_CALL(MPI_Wait(&send_request, &status));
}
#endif // USE_MPI
} // namespace LightGBM
#endif #endif // LightGBM_NETWORK_LINKERS_H_
#ifdef USE_MPI
#include "linkers.h"
namespace LightGBM {
Linkers::Linkers(NetworkConfig config) {
int argc = 0;
char**argv = nullptr;
int flag = 0;
MPI_SAFE_CALL(MPI_Initialized(&flag)); // test if MPI has been initialized
if (!flag) { // if MPI not started, start it
MPI_SAFE_CALL(MPI_Init_thread(&argc, &argv, MPI_THREAD_SERIALIZED, &flag));
}
MPI_SAFE_CALL(MPI_Comm_size(MPI_COMM_WORLD, &num_machines_));
MPI_SAFE_CALL(MPI_Comm_rank(MPI_COMM_WORLD, &rank_));
// wait for all client start up
MPI_SAFE_CALL(MPI_Barrier(MPI_COMM_WORLD));
bruck_map_ = BruckMap::Construct(rank_, num_machines_);
recursive_halving_map_ = RecursiveHalvingMap::Construct(rank_, num_machines_);
}
Linkers::~Linkers() {
MPI_SAFE_CALL(MPI_Finalize());
}
} // namespace LightGBM
#endif // USE_MPI
#ifdef USE_SOCKET
#include "linkers.h"
#include <LightGBM/utils/common.h>
#include <LightGBM/utils/text_reader.h>
#include <LightGBM/config.h>
#include <cstring>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include <thread>
#include <chrono>
#include <string>
namespace LightGBM {
Linkers::Linkers(NetworkConfig config) {
// start up socket
TcpSocket::Startup();
network_time_ = std::chrono::duration<double, std::milli>(0);
num_machines_ = config.num_machines;
local_listen_port_ = config.local_listen_port;
socket_timeout_ = config.time_out;
rank_ = -1;
// parser clients from file
ParseMachineList(config.machine_list_filename.c_str());
if (num_machines_ <= 1) {
return;
}
if (rank_ == -1) {
// get ip list of local machine
std::unordered_set<std::string> local_ip_list = TcpSocket::GetLocalIpList();
// get local rank
for (size_t i = 0; i < client_ips_.size(); ++i) {
if (local_ip_list.count(client_ips_[i]) > 0 && client_ports_[i] == local_listen_port_) {
rank_ = static_cast<int>(i);
break;
}
}
}
if (rank_ == -1) {
Log::Stderr("machine list file doesn't contain local machine, app quit");
}
// construct listener
listener_ = new TcpSocket();
TryBind(local_listen_port_);
for (int i = 0; i < num_machines_; ++i) {
linkers_.push_back(nullptr);
}
// construct communication topo
bruck_map_ = BruckMap::Construct(rank_, num_machines_);
recursive_halving_map_ = RecursiveHalvingMap::Construct(rank_, num_machines_);
// construct linkers
Construct();
// free listener
listener_->Close();
delete listener_;
}
Linkers::~Linkers() {
for (size_t i = 0; i < linkers_.size(); ++i) {
if (linkers_[i] != nullptr) {
linkers_[i]->Close();
delete linkers_[i];
}
}
TcpSocket::Finalize();
Log::Stdout("network used %f seconds", network_time_ * 1e-3);
}
void Linkers::ParseMachineList(const char * filename) {
TextReader<size_t> machine_list_reader(filename);
machine_list_reader.ReadAllLines();
if (machine_list_reader.Lines().size() <= 0) {
Log::Stderr("machine list file:%s doesn't exist", filename);
}
for (auto& line : machine_list_reader.Lines()) {
line = Common::Trim(line);
if (line.find_first_of("rank=") != std::string::npos) {
std::vector<std::string> str_after_split = Common::Split(line.c_str(), '=');
Common::Atoi(str_after_split[1].c_str(), &rank_);
continue;
}
std::vector<std::string> str_after_split = Common::Split(line.c_str() , ' ');
if (str_after_split.size() != 2) {
continue;
}
if (client_ips_.size() >= static_cast<size_t>(num_machines_)) {
Log::Stdout("The #machine in machine list is larger than parameter num_machines, will ignore rest");
break;
}
str_after_split[0] = Common::Trim(str_after_split[0]);
str_after_split[1] = Common::Trim(str_after_split[1]);
client_ips_.push_back(str_after_split[0]);
client_ports_.push_back(atoi(str_after_split[1].c_str()));
}
if (client_ips_.size() != static_cast<size_t>(num_machines_)) {
Log::Stdout("The world size is bigger the #machine in machine list, change world size to %d .", client_ips_.size());
num_machines_ = static_cast<int>(client_ips_.size());
}
}
void Linkers::TryBind(int port) {
Log::Stdout("try to bind port %d.", port);
if (listener_->Bind(port)) {
Log::Stdout("bind port %d success.", port);
} else {
Log::Stderr("bind port %d failed.", port);
}
}
void Linkers::SetLinker(int rank, const TcpSocket& socket) {
linkers_[rank] = new TcpSocket(socket);
// set timeout
linkers_[rank]->SetTimeout(socket_timeout_ * 1000 * 60);
}
void Linkers::ListenThread(int incoming_cnt) {
Log::Stdout("Listening...");
char buffer[100];
int connected_cnt = 0;
while (connected_cnt < incoming_cnt) {
// accept incoming socket
TcpSocket handler = listener_->Accept();
if (handler.IsClosed()) {
continue;
}
// receive rank
int read_cnt = 0;
int size_of_int = static_cast<int>(sizeof(int));
while (read_cnt < size_of_int) {
int cur_read_cnt = handler.Recv(buffer + read_cnt, size_of_int - read_cnt);
read_cnt += cur_read_cnt;
}
int* ptr_in_rank = reinterpret_cast<int*>(buffer);
int in_rank = *ptr_in_rank;
// add new socket
SetLinker(in_rank, handler);
++connected_cnt;
}
}
void Linkers::Construct() {
// save ranks that need to connect with
std::unordered_map<int, int> need_connect;
for (int i = 0; i < bruck_map_.k; ++i) {
need_connect[bruck_map_.out_ranks[i]] = 1;
need_connect[bruck_map_.in_ranks[i]] = 1;
}
if (recursive_halving_map_.type != RecursiveHalvingNodeType::Normal) {
need_connect[recursive_halving_map_.neighbor] = 1;
}
if (recursive_halving_map_.type != RecursiveHalvingNodeType::Other) {
for (int i = 0; i < recursive_halving_map_.k; ++i) {
need_connect[recursive_halving_map_.ranks[i]] = 1;
}
}
int need_connect_cnt = 0;
int incoming_cnt = 0;
for (auto it = need_connect.begin(); it != need_connect.end(); ++it) {
int machine_rank = it->first;
if (machine_rank >= 0 && machine_rank != rank_) {
++need_connect_cnt;
}
if (machine_rank < rank_) {
++incoming_cnt;
}
}
// start listener
listener_->SetTimeout(socket_timeout_);
listener_->Listen(incoming_cnt);
std::thread listen_thread(&Linkers::ListenThread, this, incoming_cnt);
const int connect_fail_retry_cnt = 20;
const int connect_fail_delay_time = 10 * 1000; // 10s
// start connect
for (auto it = need_connect.begin(); it != need_connect.end(); ++it) {
int out_rank = it->first;
// let smaller rank connect to larger rank
if (out_rank > rank_) {
TcpSocket cur_socket;
for (int i = 0; i < connect_fail_retry_cnt; ++i) {
if (cur_socket.Connect(client_ips_[out_rank].c_str(), client_ports_[out_rank])) {
break;
} else {
Log::Stdout("connect to rank %d failed, wait for %d milliseconds", out_rank, connect_fail_delay_time);
std::this_thread::sleep_for(std::chrono::milliseconds(connect_fail_delay_time));
}
}
// send local rank
cur_socket.Send(reinterpret_cast<const char*>(&rank_), sizeof(rank_));
SetLinker(out_rank, cur_socket);
}
}
// wait for listener
listen_thread.join();
// print connected linkers
PrintLinkers();
}
bool Linkers::CheckLinker(int rank) {
if (linkers_[rank] == nullptr || linkers_[rank]->IsClosed()) {
return false;
}
return true;
}
void Linkers::PrintLinkers() {
for (int i = 0; i < num_machines_; ++i) {
if (CheckLinker(i)) {
Log::Stdout("Connected to rank %d.", i);
}
}
}
} // namespace LightGBM
#endif // USE_SOCKET
#include <LightGBM/network.h>
#include <LightGBM/utils/common.h>
#include "linkers.h"
#include <cstring>
#include <cstdlib>
namespace LightGBM {
// static member defination
int Network::num_machines_;
int Network::rank_;
Linkers* Network::linkers_;
BruckMap Network::bruck_map_;
RecursiveHalvingMap Network::recursive_halving_map_;
int* Network::block_start_;
int* Network::block_len_;
int Network::buffer_size_;
char* Network::buffer_;
void Network::Init(NetworkConfig config) {
linkers_ = new Linkers(config);
rank_ = linkers_->rank();
num_machines_ = linkers_->num_machines();
bruck_map_ = linkers_->bruck_map();
recursive_halving_map_ = linkers_->recursive_halving_map();
block_start_ = new int[num_machines_];
block_len_ = new int[num_machines_];
buffer_size_ = 1024 * 1024;
buffer_ = new char[buffer_size_];
Log::Stdout("local rank %d, total number of machines %d", rank_, num_machines_);
}
void Network::Dispose() {
delete[]block_start_;
delete[]block_len_;
delete[] buffer_;
delete linkers_;
}
void Network::Allreduce(char* input, int input_size, int type_size, char* output, const ReduceFunction& reducer) {
int count = input_size / type_size;
// if small package or small count , do it by all gather.(reduce the communication times.)
if (count < num_machines_ || input_size < 4096) {
AllreduceByAllGather(input, input_size, output, reducer);
return;
}
// assign the blocks to every rank.
int step = (count + num_machines_ - 1) / num_machines_;
if (step < 1) {
step = 1;
}
block_start_[0] = 0;
for (int i = 0; i < num_machines_ - 1; ++i) {
block_len_[i] = Common::Min<int>(step * type_size, input_size - block_start_[i]);
block_start_[i + 1] = block_start_[i] + block_len_[i];
}
block_len_[num_machines_ - 1] = input_size - block_start_[num_machines_ - 1];
// do reduce scatter
ReduceScatter(input, input_size, block_start_, block_len_, output, reducer);
// do all gather
Allgather(output, input_size, block_start_, block_len_, output);
}
void Network::AllreduceByAllGather(char* input, int input_size, char* output, const ReduceFunction& reducer) {
// assign blocks
int all_size = input_size * num_machines_;
block_start_[0] = 0;
block_len_[0] = input_size;
for (int i = 1; i < num_machines_; ++i) {
block_start_[i] = block_start_[i - 1] + block_len_[i - 1];
block_len_[i] = input_size;
}
// need use buffer here, since size of "output" is smaller than size after all gather
if (input_size*num_machines_ > buffer_size_) {
delete[] buffer_;
buffer_size_ = input_size*num_machines_;
buffer_ = new char[buffer_size_];
}
Allgather(input, all_size, block_start_, block_len_, buffer_);
for (int i = 1; i < num_machines_; ++i) {
reducer(buffer_ + block_start_[i], buffer_ + block_start_[0], input_size);
}
// copy back
std::memcpy(output, buffer_, input_size);
}
void Network::Allgather(char* input, int send_size, char* output) {
// assign blocks
block_start_[0] = 0;
block_len_[0] = send_size;
for (int i = 1; i < num_machines_; ++i) {
block_start_[i] = block_start_[i - 1] + block_len_[i - 1];
block_len_[i] = send_size;
}
// start all gather
Allgather(input, send_size * num_machines_, block_start_, block_len_, output);
}
void Network::Allgather(char* input, int all_size, int* block_start, int* block_len, char* output) {
int write_pos = 0;
// use output as receive buffer
std::memcpy(output, input, block_len[rank_]);
write_pos += block_len[rank_];
int accumulated_block = 1;
for (int i = 0; i < bruck_map_.k; ++i) {
// get current local block size
int cur_block_size = Common::Min<int>(1 << i, num_machines_ - accumulated_block);
// get out rank
int out_rank = bruck_map_.out_ranks[i];
// get send information
int send_len = 0;
for (int j = 0; j < cur_block_size; ++j) {
send_len += block_len[(rank_ + j) % num_machines_];
}
// get in rank
int in_rank = bruck_map_.in_ranks[i];
// get recv information
int need_recv_cnt = 0;
for (int j = 0; j < cur_block_size; ++j) {
need_recv_cnt += block_len[(rank_ + accumulated_block + j) % num_machines_];
}
// send and recv at same time
linkers_->SendRecv(out_rank, output, send_len, in_rank, output + write_pos, need_recv_cnt);
write_pos += need_recv_cnt;
accumulated_block += cur_block_size;
}
// rotate in-place
std::reverse<char*>(output, output + all_size);
std::reverse<char*>(output, output + block_start[rank_]);
std::reverse<char*>(output + block_start[rank_], output + all_size);
}
void Network::ReduceScatter(char* input, int input_size, int* block_start, int* block_len, char* output, const ReduceFunction& reducer) {
bool is_powerof_2 = (num_machines_ & (num_machines_ - 1)) == 0;
if (!is_powerof_2) {
if (recursive_halving_map_.type == RecursiveHalvingNodeType::Other) {
// send local data to neighbor first
linkers_->Send(recursive_halving_map_.neighbor, input, input_size);
} else if (recursive_halving_map_.type == RecursiveHalvingNodeType::GroupLeader) {
// recieve neighbor data first
int need_recv_cnt = input_size;
linkers_->Recv(recursive_halving_map_.neighbor, output, need_recv_cnt);
// reduce
reducer(output, input, input_size);
}
}
// start recursive halfing
if (recursive_halving_map_.type != RecursiveHalvingNodeType::Other) {
for (int i = 0; i < recursive_halving_map_.k; ++i) {
// get target
int target = recursive_halving_map_.ranks[i];
int send_block_start = recursive_halving_map_.send_block_start[i];
int recv_block_start = recursive_halving_map_.recv_block_start[i];
// get send information
int send_size = 0;
for (int j = 0; j < recursive_halving_map_.send_block_len[i]; ++j) {
send_size += block_len[send_block_start + j];
}
// get recv information
int need_recv_cnt = 0;
for (int j = 0; j < recursive_halving_map_.recv_block_len[i]; ++j) {
need_recv_cnt += block_len[recv_block_start + j];
}
// send and recv at same time
linkers_->SendRecv(target, input + block_start[send_block_start], send_size, target, output, need_recv_cnt);
// reduce
reducer(output, input + block_start[recv_block_start], need_recv_cnt);
}
}
if (!is_powerof_2) {
if (recursive_halving_map_.type == RecursiveHalvingNodeType::GroupLeader) {
// send result to neighbor
linkers_->Send(recursive_halving_map_.neighbor,
input + block_start[recursive_halving_map_.neighbor],
block_len[recursive_halving_map_.neighbor]);
} else if (recursive_halving_map_.type == RecursiveHalvingNodeType::Other) {
// receive result from neighbor
int need_recv_cnt = block_len[rank_];
linkers_->Recv(recursive_halving_map_.neighbor, output, need_recv_cnt);
return;
}
}
// copy result
std::memcpy(output, input + block_start[rank_], block_len[rank_]);
}
} // namespace LightGBM
#ifndef LIGHTGBM_NETWORK_SOCKET_WRAPPER_HPP_
#define LIGHTGBM_NETWORK_SOCKET_WRAPPER_HPP_
#ifdef USE_SOCKET
#if defined(_WIN32)
#include <winsock2.h>
#include <ws2tcpip.h>
#include <iphlpapi.h>
#else
#include <fcntl.h>
#include <netdb.h>
#include <cerrno>
#include <unistd.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/ioctl.h>
#include <sys/types.h>
#include <ifaddrs.h>
#include <netinet/tcp.h>
#endif
#include <LightGBM/utils/log.h>
#include <cstdlib>
#include <unordered_set>
#include <string>
#ifdef _MSC_VER
#pragma comment(lib, "Ws2_32.lib")
#pragma comment(lib, "IPHLPAPI.lib")
#endif
namespace LightGBM {
#ifndef _WIN32
typedef int SOCKET;
const int INVALID_SOCKET = -1;
#define SOCKET_ERROR -1
#endif
#define MALLOC(x) HeapAlloc(GetProcessHeap(), 0, (x))
#define FREE(x) HeapFree(GetProcessHeap(), 0, (x))
namespace SocketConfig {
const int kSocketBufferSize = 10 * 1024 * 1024;
const int kMaxReceiveSize = 2 * 1024 * 1024;
const bool kNoDelay = true;
}
class TcpSocket {
public:
TcpSocket() {
sockfd_ = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (sockfd_ == INVALID_SOCKET) {
Log::Stderr("socket construct error");
return;
}
ConfigSocket();
}
explicit TcpSocket(SOCKET socket) {
sockfd_ = socket;
if (sockfd_ == INVALID_SOCKET) {
Log::Stderr("passed socket error");
return;
}
ConfigSocket();
}
TcpSocket(const TcpSocket &object) {
sockfd_ = object.sockfd_;
ConfigSocket();
}
~TcpSocket() {
}
inline void SetTimeout(int timeout) {
setsockopt(sockfd_, SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast<char*>(&timeout), sizeof(timeout));
}
inline void ConfigSocket() {
if (sockfd_ == INVALID_SOCKET) {
return;
}
setsockopt(sockfd_, SOL_SOCKET, SO_RCVBUF, reinterpret_cast<const char*>(&SocketConfig::kSocketBufferSize), sizeof(SocketConfig::kSocketBufferSize));
setsockopt(sockfd_, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<const char*>(&SocketConfig::kSocketBufferSize), sizeof(SocketConfig::kSocketBufferSize));
setsockopt(sockfd_, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast<const char*>(&SocketConfig::kNoDelay), sizeof(SocketConfig::kNoDelay));
}
inline static void Startup() {
#if defined(_WIN32)
WSADATA wsa_data;
if (WSAStartup(MAKEWORD(2, 2), &wsa_data) == -1) {
Log::Stderr("socket error: start up error");
}
if (LOBYTE(wsa_data.wVersion) != 2 || HIBYTE(wsa_data.wVersion) != 2) {
WSACleanup();
Log::Stderr("socket error: Winsock.dll version error");
}
#else
#endif
}
inline static void Finalize() {
#if defined(_WIN32)
WSACleanup();
#endif
}
inline static int GetLastError() {
#if defined(_WIN32)
return WSAGetLastError();
#else
return errno;
#endif
}
#if defined(_WIN32)
inline static std::unordered_set<std::string> GetLocalIpList() {
std::unordered_set<std::string> ip_list;
char buffer[512];
// get hostName
if (gethostname(buffer, sizeof(buffer)) == SOCKET_ERROR) {
Log::Stderr("Error code: %d, when getting local host name.", WSAGetLastError());
}
// push local ip
PIP_ADAPTER_INFO pAdapterInfo;
PIP_ADAPTER_INFO pAdapter = NULL;
DWORD dwRetVal = 0;
ULONG ulOutBufLen = sizeof(IP_ADAPTER_INFO);
pAdapterInfo = (IP_ADAPTER_INFO *)MALLOC(sizeof(IP_ADAPTER_INFO));
if (pAdapterInfo == NULL) {
Log::Stderr("Error allocating memory needed to call GetAdaptersinfo\n");
}
// Make an initial call to GetAdaptersInfo to get
// the necessary size into the ulOutBufLen variable
if (GetAdaptersInfo(pAdapterInfo, &ulOutBufLen) == ERROR_BUFFER_OVERFLOW) {
FREE(pAdapterInfo);
pAdapterInfo = (IP_ADAPTER_INFO *)MALLOC(ulOutBufLen);
if (pAdapterInfo == NULL) {
Log::Stderr("Error allocating memory needed to call GetAdaptersinfo\n");
}
}
if ((dwRetVal = GetAdaptersInfo(pAdapterInfo, &ulOutBufLen)) == NO_ERROR) {
pAdapter = pAdapterInfo;
while (pAdapter) {
ip_list.insert(pAdapter->IpAddressList.IpAddress.String);
pAdapter = pAdapter->Next;
}
} else {
printf("GetAdaptersInfo failed with error: %d\n", dwRetVal);
}
if (pAdapterInfo)
FREE(pAdapterInfo);
return ip_list;
}
#else
// see in http://stackoverflow.com/questions/212528/get-the-ip-address-of-the-machine
inline static std::unordered_set<std::string> GetLocalIpList() {
std::unordered_set<std::string> ip_list;
struct ifaddrs * ifAddrStruct = NULL;
struct ifaddrs * ifa = NULL;
void * tmpAddrPtr = NULL;
getifaddrs(&ifAddrStruct);
for (ifa = ifAddrStruct; ifa != NULL; ifa = ifa->ifa_next) {
if (!ifa->ifa_addr) {
continue;
}
if (ifa->ifa_addr->sa_family == AF_INET) {
tmpAddrPtr = &((struct sockaddr_in *)ifa->ifa_addr)->sin_addr;
char addressBuffer[INET_ADDRSTRLEN];
inet_ntop(AF_INET, tmpAddrPtr, addressBuffer, INET_ADDRSTRLEN);
ip_list.insert(std::string(addressBuffer));
}
}
if (ifAddrStruct != NULL) freeifaddrs(ifAddrStruct);
return ip_list;
}
#endif
inline static sockaddr_in GetAddress(const char* url, int port) {
sockaddr_in addr = sockaddr_in();
std::memset(&addr, 0, sizeof(sockaddr_in));
inet_pton(AF_INET, url, &addr.sin_addr);
addr.sin_family = AF_INET;
addr.sin_port = htons(static_cast<u_short>(port));
return addr;
}
inline bool Bind(int port) {
sockaddr_in local_addr = GetAddress("0.0.0.0", port);
if (bind(sockfd_, reinterpret_cast<const sockaddr*>(&local_addr), sizeof(sockaddr_in)) == 0) {
return true;
}
return false;
}
inline bool Connect(const char *url, int port) {
sockaddr_in server_addr = GetAddress(url, port);
if (connect(sockfd_, reinterpret_cast<const sockaddr*>(&server_addr), sizeof(sockaddr_in)) == 0) {
return true;
}
return false;
}
inline void Listen(int backlog = 128) {
listen(sockfd_, backlog);
}
inline TcpSocket Accept() {
SOCKET newfd = accept(sockfd_, NULL, NULL);
if (newfd == INVALID_SOCKET) {
Log::Stderr("socket accept error,error code: %d", GetLastError());
}
return TcpSocket(newfd);
}
inline int Send(const char *buf_, int len, int flag = 0) {
int cur_cnt = send(sockfd_, buf_, len, flag);
if (cur_cnt == SOCKET_ERROR) {
Log::Stderr("socket send error, error code: %d", GetLastError());
}
return cur_cnt;
}
inline int Recv(char *buf_, int len, int flags = 0) {
int cur_cnt = recv(sockfd_, buf_ , len , flags);
if (cur_cnt == SOCKET_ERROR) {
Log::Stderr("socket recv error, error code: %d", GetLastError());
}
return cur_cnt;
}
inline bool IsClosed() {
return sockfd_ == INVALID_SOCKET;
}
inline void Close() {
if (!IsClosed()) {
#if defined(_WIN32)
closesocket(sockfd_);
#else
close(sockfd_);
#endif
sockfd_ = INVALID_SOCKET;
}
}
private:
SOCKET sockfd_;
};
} // namespace LightGBM
#endif // USE_SOCKET
#endif #endif // LightGBM_NETWORK_SOCKET_WRAPPER_HPP_
#ifndef LIGHTGBM_OBJECTIVE_BINARY_OBJECTIVE_HPP_
#define LIGHTGBM_OBJECTIVE_BINARY_OBJECTIVE_HPP_
#include <LightGBM/objective_function.h>
#include <cstring>
#include <cmath>
namespace LightGBM {
/*!
* \brief Objective funtion for binary classification
*/
class BinaryLogloss: public ObjectiveFunction {
public:
explicit BinaryLogloss(const ObjectiveConfig& config) {
is_unbalance_ = config.is_unbalance;
sigmoid_ = static_cast<score_t>(config.sigmoid);
if (sigmoid_ <= 0.0) {
Log::Stderr("sigmoid param %f should greater than zero", sigmoid_);
}
}
~BinaryLogloss() {}
void Init(const Metadata& metadata, data_size_t num_data) override {
num_data_ = num_data;
label_ = metadata.label();
weights_ = metadata.weights();
data_size_t cnt_positive = 0;
data_size_t cnt_negative = 0;
// count for positive and negative samples
for (data_size_t i = 0; i < num_data_; ++i) {
if (label_[i] == 1) {
++cnt_positive;
} else {
++cnt_negative;
}
}
Log::Stdout("number of postive:%d number of negative:%d", cnt_positive, cnt_negative);
// cannot continue if all sample are same class
if (cnt_positive == 0 || cnt_negative == 0) {
Log::Stderr("input training data only contain one class");
}
// use -1 for negative class, and 1 for positive class
label_val_[0] = -1;
label_val_[1] = 1;
// weight for label
label_weights_[0] = 1.0f;
label_weights_[1] = 1.0f;
// if using unbalance, change the labels weight
if (is_unbalance_) {
label_weights_[1] = 1.0f / cnt_positive;
label_weights_[0] = 1.0f / cnt_negative;
}
}
void GetGradients(const score_t* score, score_t* gradients, score_t* hessians) const override {
if (weights_ == nullptr) {
#pragma omp parallel for schedule(static)
for (data_size_t i = 0; i < num_data_; ++i) {
// get label and label weights
const int label = label_val_[static_cast<int>(label_[i])];
const score_t label_weight = label_weights_[static_cast<int>(label_[i])];
// calculate gradients and hessians
const score_t response = -2.0f * label * sigmoid_ / (1.0f + std::exp(2.0f * label * sigmoid_ * score[i]));
const score_t abs_response = fabs(response);
gradients[i] = response * label_weight;
hessians[i] = abs_response * (2.0f * sigmoid_ - abs_response) * label_weight;
}
} else {
#pragma omp parallel for schedule(static)
for (data_size_t i = 0; i < num_data_; ++i) {
// get label and label weights
const int label = label_val_[static_cast<int>(label_[i])];
const score_t label_weight = label_weights_[static_cast<int>(label_[i])];
// calculate gradients and hessians
const score_t response = -2.0f * label * sigmoid_ / (1.0f + std::exp(2.0f * label * sigmoid_ * score[i]));
const score_t abs_response = fabs(response);
gradients[i] = response * label_weight * weights_[i];
hessians[i] = abs_response * (2.0f * sigmoid_ - abs_response) * label_weight * weights_[i];
}
}
}
double GetSigmoid() const override {
return sigmoid_;
}
private:
/*! \brief Number of data */
data_size_t num_data_;
/*! \brief Pointer of label */
const float* label_;
/*! \brief True if using unbalance training */
bool is_unbalance_;
/*! \brief Sigmoid parameter */
score_t sigmoid_;
/*! \brief Values for positive and negative labels */
int label_val_[2];
/*! \brief Weights for positive and negative labels */
score_t label_weights_[2];
/*! \brief Weights for data */
const float* weights_;
};
} // namespace LightGBM
#endif #endif // LightGBM_OBJECTIVE_BINARY_OBJECTIVE_HPP_
#include <LightGBM/objective_function.h>
#include "regression_objective.hpp"
#include "binary_objective.hpp"
#include "rank_objective.hpp"
namespace LightGBM {
ObjectiveFunction* ObjectiveFunction::CreateObjectiveFunction(const std::string& type, const ObjectiveConfig& config) {
if (type == "regression") {
return new RegressionL2loss(config);
} else if (type == "binary") {
return new BinaryLogloss(config);
} else if (type == "lambdarank") {
return new LambdarankNDCG(config);
}
return nullptr;
}
} // namespace LightGBM
#ifndef LIGHTGBM_OBJECTIVE_RANK_OBJECTIVE_HPP_
#define LIGHTGBM_OBJECTIVE_RANK_OBJECTIVE_HPP_
#include <LightGBM/objective_function.h>
#include <LightGBM/metric.h>
#include <cstdio>
#include <cstring>
#include <cmath>
#include <vector>
#include <algorithm>
#include <limits>
namespace LightGBM {
/*!
* \brief Objective funtion for Lambdrank with NDCG
*/
class LambdarankNDCG: public ObjectiveFunction {
public:
explicit LambdarankNDCG(const ObjectiveConfig& config) {
sigmoid_ = static_cast<score_t>(config.sigmoid);
// initialize DCG calculator
DCGCalculator::Init(config.label_gain);
// copy lable gain to local
std::vector<double> label_gain = config.label_gain;
for (auto gain : label_gain) {
label_gain_.push_back(static_cast<score_t>(gain));
}
// will optimize NDCG@optimize_pos_at_
optimize_pos_at_ = config.max_position;
sigmoid_table_ = nullptr;
if (sigmoid_ <= 0.0) {
Log::Stderr("sigmoid param %f should greater than zero", sigmoid_);
}
}
~LambdarankNDCG() {
delete[] inverse_max_dcgs_;
delete[] sigmoid_table_;
}
void Init(const Metadata& metadata, data_size_t num_data) override {
num_data_ = num_data;
// get label
label_ = metadata.label();
// get weights
weights_ = metadata.weights();
// get boundries
query_boundaries_ = metadata.query_boundaries();
if (query_boundaries_ == nullptr) {
Log::Stderr("For NDCG metric, should have query information");
}
num_queries_ = metadata.num_queries();
// cache inverse max DCG, avoid compution many times
inverse_max_dcgs_ = new score_t[num_queries_];
for (data_size_t i = 0; i < num_queries_; ++i) {
inverse_max_dcgs_[i] = static_cast<score_t>(
DCGCalculator::CalMaxDCGAtK(optimize_pos_at_,
label_ + query_boundaries_[i],
query_boundaries_[i + 1] - query_boundaries_[i]));
if (inverse_max_dcgs_[i] > 0.0) {
inverse_max_dcgs_[i] = 1.0f / inverse_max_dcgs_[i];
}
}
// construct sigmoid table to speed up sigmoid transform
ConstructSigmoidTable();
}
void GetGradients(const score_t* score, score_t* gradients,
score_t* hessians) const override {
#pragma omp parallel for schedule(guided)
for (data_size_t i = 0; i < num_queries_; ++i) {
GetGradientsForOneQuery(score, gradients, hessians, i);
}
}
inline void GetGradientsForOneQuery(const score_t* score,
score_t* lambdas, score_t* hessians, data_size_t query_id) const {
// get doc boundary for current query
const data_size_t start = query_boundaries_[query_id];
const data_size_t cnt =
query_boundaries_[query_id + 1] - query_boundaries_[query_id];
// get max DCG on current query
const score_t inverse_max_dcg = inverse_max_dcgs_[query_id];
// add pointers with offset
const float* label = label_ + start;
score += start;
lambdas += start;
hessians += start;
// initialize with zero
for (data_size_t i = 0; i < cnt; ++i) {
lambdas[i] = 0.0f;
hessians[i] = 0.0f;
}
// get sorted indices for scores
std::vector<data_size_t> sorted_idx;
for (data_size_t i = 0; i < cnt; ++i) {
sorted_idx.emplace_back(i);
}
std::sort(sorted_idx.begin(), sorted_idx.end(),
[score](data_size_t a, data_size_t b) { return score[a] > score[b]; });
// get best and worst score
const score_t best_score = score[sorted_idx[0]];
data_size_t worst_idx = cnt - 1;
if (worst_idx > 0 && score[sorted_idx[worst_idx]] == kMinScore) {
worst_idx -= 1;
}
const score_t wrost_score = score[sorted_idx[worst_idx]];
// start accmulate lambdas by pairs
for (data_size_t i = 0; i < cnt; ++i) {
const data_size_t high = sorted_idx[i];
const int high_label = static_cast<int>(label[high]);
const score_t high_score = score[high];
if (high_score == kMinScore) { continue; }
const score_t high_label_gain = label_gain_[high_label];
const score_t high_discount =
static_cast<score_t>(DCGCalculator::GetDiscount(i));
score_t high_sum_lambda = 0.0;
score_t high_sum_hessian = 0.0;
for (data_size_t j = 0; j < cnt; ++j) {
// skip same data
if (i == j) { continue; }
const data_size_t low = sorted_idx[j];
const int low_label = static_cast<int>(label[low]);
const score_t low_score = score[low];
// only consider pair with different label
if (high_label <= low_label || low_score == kMinScore) { continue; }
const score_t delta_score = high_score - low_score;
const score_t low_label_gain = label_gain_[low_label];
const score_t low_discount =
static_cast<score_t>(DCGCalculator::GetDiscount(j));
// get dcg gap
const score_t dcg_gap = high_label_gain - low_label_gain;
// get discount of this pair
const score_t paired_discount = fabs(high_discount - low_discount);
// get delta NDCG
score_t delta_pair_NDCG = dcg_gap * paired_discount * inverse_max_dcg;
// regular the delta_pair_NDCG by score distance
if (high_label != low_label && best_score != wrost_score) {
delta_pair_NDCG /= (0.01f + fabs(delta_score));
}
// calculate lambda for this pair
score_t p_lambda = GetSigmoid(delta_score);
score_t p_hessian = p_lambda * (2.0f - p_lambda);
// update
p_lambda *= -delta_pair_NDCG;
p_hessian *= 2 * delta_pair_NDCG;
high_sum_lambda += p_lambda;
high_sum_hessian += p_hessian;
lambdas[low] -= p_lambda;
hessians[low] += p_hessian;
}
// update
lambdas[high] += high_sum_lambda;
hessians[high] += high_sum_hessian;
}
// if need weights
if (weights_ != nullptr) {
for (data_size_t i = 0; i < cnt; ++i) {
lambdas[i] *= weights_[start + i];
hessians[i] *= weights_[start + i];
}
}
}
inline score_t GetSigmoid(score_t score) const {
if (score <= min_sigmoid_input_) {
// too small, use lower bound
return sigmoid_table_[0];
} else if (score >= max_sigmoid_input_) {
// too big, use upper bound
return sigmoid_table_[_sigmoid_bins - 1];
} else {
return sigmoid_table_[static_cast<size_t>((score - min_sigmoid_input_) * sigmoid_table_idx_factor_)];
}
}
void ConstructSigmoidTable() {
// get boundary
min_sigmoid_input_ = min_sigmoid_input_ / sigmoid_ / 2;
max_sigmoid_input_ = -min_sigmoid_input_;
sigmoid_table_ = new score_t[_sigmoid_bins];
// get score to bin factor
sigmoid_table_idx_factor_ =
_sigmoid_bins / (max_sigmoid_input_ - min_sigmoid_input_);
// cache
for (size_t i = 0; i < _sigmoid_bins; ++i) {
const score_t score = i / sigmoid_table_idx_factor_ + min_sigmoid_input_;
sigmoid_table_[i] = 2.0f / (1.0f + std::exp(2.0f * score * sigmoid_));
}
}
double GetSigmoid() const override {
// though we use sigmoid transform on objective
// for the prediction, we actually don't need to transform by sigmoid.
// since we only need the ranking score.
return -1.0;
}
private:
/*! \brief Gains for labels */
std::vector<score_t> label_gain_;
/*! \brief Cache inverse max DCG, speed up calculation */
score_t* inverse_max_dcgs_;
/*! \brief Simgoid param */
score_t sigmoid_;
/*! \brief Optimized NDCG@ */
int optimize_pos_at_;
/*! \brief Number of queries */
data_size_t num_queries_;
/*! \brief Number of data */
data_size_t num_data_;
/*! \brief Pointer of label */
const float* label_;
/*! \brief Pointer of weights */
const float* weights_;
/*! \brief Query boundries */
const data_size_t* query_boundaries_;
/*! \brief Cache result for sigmoid transform to speed up */
score_t* sigmoid_table_;
/*! \brief Number of bins in simoid table */
size_t _sigmoid_bins = 1024 * 1024;
/*! \brief Minimal input of sigmoid table */
score_t min_sigmoid_input_ = -50;
/*! \brief Maximal input of sigmoid table */
score_t max_sigmoid_input_ = 50;
/*! \brief Factor that covert score to bin in sigmoid table */
score_t sigmoid_table_idx_factor_;
};
} // namespace LightGBM
#endif #endif // LightGBM_OBJECTIVE_RANK_OBJECTIVE_HPP_
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment