Unverified Commit 40e56ca7 authored by Guolin Ke's avatar Guolin Ke Committed by GitHub
Browse files

reduce the buffer when using high dimensional data in distributed mode. (#2485)

* reduce the buffer when using high dimensional data in distributed mode.

* Update dataset_loader.cpp

* refix

* typo

* fix number of bin accumulation.

* avoid overflow

* fix warning

* efficient solution.

* Update dataset.h

* fix bin count output

* fix warning

* bug in dist number of feature check

* fix possible edge case

* Update dataset.cpp

* possible bug fix

* fix
parent 4848776f
......@@ -294,6 +294,7 @@ class Dataset {
const std::vector<std::vector<double>>& forced_bins,
int** sample_non_zero_indices,
const int* num_per_col,
int num_sample_col,
size_t total_sample_cnt,
const Config& io_config);
......
......@@ -261,6 +261,19 @@ class Network {
return global;
}
template<class T>
static std::vector<T> GlobalArray(T local) {
std::vector<T> global(num_machines_, 0);
int type_size = sizeof(T);
std::vector<comm_size_t> block_start(num_machines_);
std::vector<comm_size_t> block_len(num_machines_, type_size);
for (int i = 1; i < num_machines_; ++i) {
block_start[i] = block_start[i - 1] + block_len[i - 1];
}
Allgather(reinterpret_cast<char*>(&local), block_start.data(), block_len.data(), reinterpret_cast<char*>(global.data()), type_size*num_machines_);
return global;
}
private:
static void AllgatherBruck(char* input, const comm_size_t* block_start, const comm_size_t* block_len, char* output, comm_size_t all_size);
......
......@@ -19,7 +19,9 @@
namespace LightGBM {
BinMapper::BinMapper() {
BinMapper::BinMapper(): num_bin_(1), is_trivial_(true), bin_type_(BinType::NumericalBin) {
bin_upper_bound_.clear();
bin_upper_bound_.push_back(std::numeric_limits<double>::infinity());
}
// deep copy function for BinMapper
......
......@@ -70,6 +70,7 @@ std::vector<std::vector<int>> FindGroups(const std::vector<std::unique_ptr<BinMa
const std::vector<int>& find_order,
int** sample_indices,
const int* num_per_col,
int num_sample_col,
size_t total_sample_cnt,
data_size_t max_error_cnt,
data_size_t filter_cnt,
......@@ -85,7 +86,8 @@ std::vector<std::vector<int>> FindGroups(const std::vector<std::unique_ptr<BinMa
std::vector<int> group_num_bin;
for (auto fidx : find_order) {
const size_t cur_non_zero_cnt = num_per_col[fidx];
bool is_filtered_feature = fidx >= num_sample_col;
const size_t cur_non_zero_cnt = is_filtered_feature ? 0: num_per_col[fidx];
bool need_new_group = true;
std::vector<int> available_groups;
for (int gid = 0; gid < static_cast<int>(features_in_group.size()); ++gid) {
......@@ -107,7 +109,7 @@ std::vector<std::vector<int>> FindGroups(const std::vector<std::unique_ptr<BinMa
}
for (auto gid : search_groups) {
const int rest_max_cnt = max_error_cnt - group_conflict_cnt[gid];
int cnt = GetConfilctCount(conflict_marks[gid], sample_indices[fidx], num_per_col[fidx], rest_max_cnt);
const int cnt = is_filtered_feature ? 0 : GetConfilctCount(conflict_marks[gid], sample_indices[fidx], num_per_col[fidx], rest_max_cnt);
if (cnt >= 0 && cnt <= rest_max_cnt) {
data_size_t rest_non_zero_data = static_cast<data_size_t>(
static_cast<double>(cur_non_zero_cnt - cnt) * num_data / total_sample_cnt);
......@@ -116,7 +118,9 @@ std::vector<std::vector<int>> FindGroups(const std::vector<std::unique_ptr<BinMa
features_in_group[gid].push_back(fidx);
group_conflict_cnt[gid] += cnt;
group_non_zero_cnt[gid] += cur_non_zero_cnt - cnt;
if (!is_filtered_feature) {
MarkUsed(&conflict_marks[gid], sample_indices[fidx], num_per_col[fidx]);
}
if (is_use_gpu) {
group_num_bin[gid] += bin_mappers[fidx]->num_bin() + (bin_mappers[fidx]->GetDefaultBin() == 0 ? -1 : 0);
}
......@@ -128,7 +132,9 @@ std::vector<std::vector<int>> FindGroups(const std::vector<std::unique_ptr<BinMa
features_in_group.back().push_back(fidx);
group_conflict_cnt.push_back(0);
conflict_marks.emplace_back(total_sample_cnt, false);
if (!is_filtered_feature) {
MarkUsed(&(conflict_marks.back()), sample_indices[fidx], num_per_col[fidx]);
}
group_non_zero_cnt.emplace_back(cur_non_zero_cnt);
if (is_use_gpu) {
group_num_bin.push_back(1 + bin_mappers[fidx]->num_bin() + (bin_mappers[fidx]->GetDefaultBin() == 0 ? -1 : 0));
......@@ -141,6 +147,7 @@ std::vector<std::vector<int>> FindGroups(const std::vector<std::unique_ptr<BinMa
std::vector<std::vector<int>> FastFeatureBundling(const std::vector<std::unique_ptr<BinMapper>>& bin_mappers,
int** sample_indices,
const int* num_per_col,
int num_sample_col,
size_t total_sample_cnt,
const std::vector<int>& used_features,
double max_conflict_rate,
......@@ -156,7 +163,11 @@ std::vector<std::vector<int>> FastFeatureBundling(const std::vector<std::unique_
feature_non_zero_cnt.reserve(used_features.size());
// put dense feature first
for (auto fidx : used_features) {
if (fidx < num_sample_col) {
feature_non_zero_cnt.emplace_back(num_per_col[fidx]);
} else {
feature_non_zero_cnt.emplace_back(0);
}
}
// sort by non zero cnt
std::vector<int> sorted_idx;
......@@ -175,8 +186,8 @@ std::vector<std::vector<int>> FastFeatureBundling(const std::vector<std::unique_
for (auto sidx : sorted_idx) {
feature_order_by_cnt.push_back(used_features[sidx]);
}
auto features_in_group = FindGroups(bin_mappers, used_features, sample_indices, num_per_col, total_sample_cnt, max_error_cnt, filter_cnt, num_data, is_use_gpu);
auto group2 = FindGroups(bin_mappers, feature_order_by_cnt, sample_indices, num_per_col, total_sample_cnt, max_error_cnt, filter_cnt, num_data, is_use_gpu);
auto features_in_group = FindGroups(bin_mappers, used_features, sample_indices, num_per_col, num_sample_col, total_sample_cnt, max_error_cnt, filter_cnt, num_data, is_use_gpu);
auto group2 = FindGroups(bin_mappers, feature_order_by_cnt, sample_indices, num_per_col, num_sample_col, total_sample_cnt, max_error_cnt, filter_cnt, num_data, is_use_gpu);
if (features_in_group.size() > group2.size()) {
features_in_group = group2;
}
......@@ -219,6 +230,7 @@ void Dataset::Construct(
const std::vector<std::vector<double>>& forced_bins,
int** sample_non_zero_indices,
const int* num_per_col,
int num_sample_col,
size_t total_sample_cnt,
const Config& io_config) {
num_total_features_ = num_total_features;
......@@ -238,7 +250,7 @@ void Dataset::Construct(
if (io_config.enable_bundle && !used_features.empty()) {
features_in_group = FastFeatureBundling(*bin_mappers,
sample_non_zero_indices, num_per_col, total_sample_cnt,
sample_non_zero_indices, num_per_col, num_sample_col, total_sample_cnt,
used_features, io_config.max_conflict_rate,
num_data_, io_config.min_data_in_leaf,
sparse_threshold_, io_config.is_enable_sparse, io_config.device_type == std::string("gpu"));
......
......@@ -210,7 +210,6 @@ Dataset* DatasetLoader::LoadFromFile(const char* filename, const char* initscore
ConstructBinMappersFromTextData(rank, num_machines, sample_data, parser.get(), dataset.get());
// initialize label
dataset->metadata_.Init(dataset->num_data_, weight_idx_, group_idx_);
// extract features
ExtractFeaturesFromFile(filename, parser.get(), used_data_indices, dataset.get());
}
......@@ -574,7 +573,11 @@ Dataset* DatasetLoader::LoadFromBinFile(const char* data_filename, const char* b
Dataset* DatasetLoader::CostructFromSampleData(double** sample_values,
int** sample_indices, int num_col, const int* num_per_col,
size_t total_sample_size, data_size_t num_data) {
std::vector<std::unique_ptr<BinMapper>> bin_mappers(num_col);
int num_total_features = num_col;
if (Network::num_machines() > 1) {
num_total_features = Network::GlobalSyncUpByMax(num_total_features);
}
std::vector<std::unique_ptr<BinMapper>> bin_mappers(num_total_features);
// fill feature_names_ if not header
if (feature_names_.empty()) {
for (int i = 0; i < num_col; ++i) {
......@@ -632,21 +635,19 @@ Dataset* DatasetLoader::CostructFromSampleData(double** sample_values,
// different machines will find bin for different features
int num_machines = Network::num_machines();
int rank = Network::rank();
int total_num_feature = num_col;
total_num_feature = Network::GlobalSyncUpByMin(total_num_feature);
// start and len will store the process feature indices for different machines
// machine i will find bins for features in [ start[i], start[i] + len[i] )
std::vector<int> start(num_machines);
std::vector<int> len(num_machines);
int step = (total_num_feature + num_machines - 1) / num_machines;
int step = (num_total_features + num_machines - 1) / num_machines;
if (step < 1) { step = 1; }
start[0] = 0;
for (int i = 0; i < num_machines - 1; ++i) {
len[i] = std::min(step, total_num_feature - start[i]);
len[i] = std::min(step, num_total_features - start[i]);
start[i + 1] = start[i] + len[i];
}
len[num_machines - 1] = total_num_feature - start[num_machines - 1];
len[num_machines - 1] = num_total_features - start[num_machines - 1];
OMP_INIT_EX();
#pragma omp parallel for schedule(guided)
for (int i = 0; i < len[rank]; ++i) {
......@@ -659,6 +660,9 @@ Dataset* DatasetLoader::CostructFromSampleData(double** sample_values,
bin_type = BinType::CategoricalBin;
}
bin_mappers[i].reset(new BinMapper());
if (num_col <= start[rank] + i) {
continue;
}
if (config_.max_bin_by_feature.empty()) {
bin_mappers[i]->FindBin(sample_values[start[rank] + i], num_per_col[start[rank] + i],
total_sample_size, config_.max_bin, config_.min_data_in_bin,
......@@ -672,56 +676,47 @@ Dataset* DatasetLoader::CostructFromSampleData(double** sample_values,
}
OMP_LOOP_EX_END();
}
OMP_THROW_EX();
int max_bin = 0;
comm_size_t self_buf_size = 0;
for (int i = 0; i < len[rank]; ++i) {
if (bin_mappers[i] != nullptr) {
max_bin = std::max(max_bin, bin_mappers[i]->num_bin());
if (ignore_features_.count(start[rank] + i) > 0) {
continue;
}
self_buf_size += static_cast<comm_size_t>(bin_mappers[i]->SizesInByte());
}
max_bin = Network::GlobalSyncUpByMax(max_bin);
// get size of bin mapper with max_bin size
int type_size = BinMapper::SizeForSpecificBin(max_bin);
// since sizes of different feature may not be same, we expand all bin mapper to type_size
comm_size_t buffer_size = type_size * total_num_feature;
CHECK(buffer_size >= 0);
auto input_buffer = std::vector<char>(buffer_size);
auto output_buffer = std::vector<char>(buffer_size);
// find local feature bins and copy to buffer
#pragma omp parallel for schedule(guided)
std::vector<char> input_buffer(self_buf_size);
auto cp_ptr = input_buffer.data();
for (int i = 0; i < len[rank]; ++i) {
OMP_LOOP_EX_BEGIN();
if (ignore_features_.count(start[rank] + i) > 0) {
continue;
}
bin_mappers[i]->CopyTo(input_buffer.data() + i * type_size);
bin_mappers[i]->CopyTo(cp_ptr);
cp_ptr += bin_mappers[i]->SizesInByte();
// free
bin_mappers[i].reset(nullptr);
OMP_LOOP_EX_END();
}
OMP_THROW_EX();
std::vector<comm_size_t> size_start(num_machines);
std::vector<comm_size_t> size_len(num_machines);
// convert to binary size
for (int i = 0; i < num_machines; ++i) {
size_start[i] = start[i] * static_cast<comm_size_t>(type_size);
size_len[i] = len[i] * static_cast<comm_size_t>(type_size);
std::vector<comm_size_t> size_len = Network::GlobalArray(self_buf_size);
std::vector<comm_size_t> size_start(num_machines, 0);
for (int i = 1; i < num_machines; ++i) {
size_start[i] = size_start[i - 1] + size_len[i - 1];
}
comm_size_t total_buffer_size = size_start[num_machines - 1] + size_len[num_machines - 1];
std::vector<char> output_buffer(total_buffer_size);
// gather global feature bin mappers
Network::Allgather(input_buffer.data(), size_start.data(), size_len.data(), output_buffer.data(), buffer_size);
Network::Allgather(input_buffer.data(), size_start.data(), size_len.data(), output_buffer.data(), total_buffer_size);
cp_ptr = output_buffer.data();
// restore features bins from buffer
for (int i = 0; i < total_num_feature; ++i) {
for (int i = 0; i < num_total_features; ++i) {
if (ignore_features_.count(i) > 0) {
bin_mappers[i] = nullptr;
continue;
}
bin_mappers[i].reset(new BinMapper());
bin_mappers[i]->CopyFrom(output_buffer.data() + i * type_size);
bin_mappers[i]->CopyFrom(cp_ptr);
cp_ptr += bin_mappers[i]->SizesInByte();
}
}
auto dataset = std::unique_ptr<Dataset>(new Dataset(num_data));
dataset->Construct(&bin_mappers, num_col, forced_bin_bounds, sample_indices, num_per_col, total_sample_size, config_);
dataset->Construct(&bin_mappers, num_total_features, forced_bin_bounds, sample_indices, num_per_col, num_col, total_sample_size, config_);
dataset->set_feature_names(feature_names_);
return dataset.release();
}
......@@ -894,14 +889,12 @@ void DatasetLoader::ConstructBinMappersFromTextData(int rank, int num_machines,
}
dataset->feature_groups_.clear();
if (feature_names_.empty()) {
// -1 means doesn't use this feature
dataset->num_total_features_ = std::max(static_cast<int>(sample_values.size()), parser->NumFeatures());
dataset->used_feature_map_ = std::vector<int>(dataset->num_total_features_, -1);
} else {
dataset->used_feature_map_ = std::vector<int>(feature_names_.size(), -1);
dataset->num_total_features_ = static_cast<int>(feature_names_.size());
if (num_machines > 1) {
dataset->num_total_features_ = Network::GlobalSyncUpByMax(dataset->num_total_features_);
}
if (!feature_names_.empty()) {
CHECK(dataset->num_total_features_ == static_cast<int>(feature_names_.size()));
}
if (!config_.max_bin_by_feature.empty()) {
......@@ -932,7 +925,6 @@ void DatasetLoader::ConstructBinMappersFromTextData(int rank, int num_machines,
std::vector<std::unique_ptr<BinMapper>> bin_mappers(dataset->num_total_features_);
const data_size_t filter_cnt = static_cast<data_size_t>(
static_cast<double>(config_.min_data_in_leaf* sample_data.size()) / dataset->num_data_);
// start find bins
if (num_machines == 1) {
// if only one machine, find bin locally
......@@ -964,25 +956,19 @@ void DatasetLoader::ConstructBinMappersFromTextData(int rank, int num_machines,
}
OMP_THROW_EX();
} else {
// if have multi-machines, need to find bin distributed
// different machines will find bin for different features
int num_total_features = dataset->num_total_features_;
num_total_features = Network::GlobalSyncUpByMin(num_total_features);
dataset->num_total_features_ = num_total_features;
// start and len will store the process feature indices for different machines
// machine i will find bins for features in [ start[i], start[i] + len[i] )
std::vector<int> start(num_machines);
std::vector<int> len(num_machines);
int step = (num_total_features + num_machines - 1) / num_machines;
int step = (dataset->num_total_features_ + num_machines - 1) / num_machines;
if (step < 1) { step = 1; }
start[0] = 0;
for (int i = 0; i < num_machines - 1; ++i) {
len[i] = std::min(step, num_total_features - start[i]);
len[i] = std::min(step, dataset->num_total_features_ - start[i]);
start[i + 1] = start[i] + len[i];
}
len[num_machines - 1] = num_total_features - start[num_machines - 1];
len[num_machines - 1] = dataset->num_total_features_ - start[num_machines - 1];
OMP_INIT_EX();
#pragma omp parallel for schedule(guided)
for (int i = 0; i < len[rank]; ++i) {
......@@ -995,6 +981,9 @@ void DatasetLoader::ConstructBinMappersFromTextData(int rank, int num_machines,
bin_type = BinType::CategoricalBin;
}
bin_mappers[i].reset(new BinMapper());
if (sample_values.size() <= start[rank] + i) {
continue;
}
if (config_.max_bin_by_feature.empty()) {
bin_mappers[i]->FindBin(sample_values[start[rank] + i].data(),
static_cast<int>(sample_values[start[rank] + i].size()),
......@@ -1011,56 +1000,48 @@ void DatasetLoader::ConstructBinMappersFromTextData(int rank, int num_machines,
OMP_LOOP_EX_END();
}
OMP_THROW_EX();
int max_bin = 0;
comm_size_t self_buf_size = 0;
for (int i = 0; i < len[rank]; ++i) {
if (bin_mappers[i] != nullptr) {
max_bin = std::max(max_bin, bin_mappers[i]->num_bin());
if (ignore_features_.count(start[rank] + i) > 0) {
continue;
}
self_buf_size += static_cast<comm_size_t>(bin_mappers[i]->SizesInByte());
}
max_bin = Network::GlobalSyncUpByMax(max_bin);
// get size of bin mapper with max_bin size
int type_size = BinMapper::SizeForSpecificBin(max_bin);
// since sizes of different feature may not be same, we expand all bin mapper to type_size
comm_size_t buffer_size = type_size * num_total_features;
CHECK(buffer_size >= 0);
auto input_buffer = std::vector<char>(buffer_size);
auto output_buffer = std::vector<char>(buffer_size);
// find local feature bins and copy to buffer
#pragma omp parallel for schedule(guided)
std::vector<char> input_buffer(self_buf_size);
auto cp_ptr = input_buffer.data();
for (int i = 0; i < len[rank]; ++i) {
OMP_LOOP_EX_BEGIN();
if (ignore_features_.count(start[rank] + i) > 0) {
continue;
}
bin_mappers[i]->CopyTo(input_buffer.data() + i * type_size);
bin_mappers[i]->CopyTo(cp_ptr);
cp_ptr += bin_mappers[i]->SizesInByte();
// free
bin_mappers[i].reset(nullptr);
OMP_LOOP_EX_END();
}
OMP_THROW_EX();
std::vector<comm_size_t> size_start(num_machines);
std::vector<comm_size_t> size_len(num_machines);
// convert to binary size
for (int i = 0; i < num_machines; ++i) {
size_start[i] = start[i] * static_cast<comm_size_t>(type_size);
size_len[i] = len[i] * static_cast<comm_size_t>(type_size);
std::vector<comm_size_t> size_len = Network::GlobalArray(self_buf_size);
std::vector<comm_size_t> size_start(num_machines, 0);
for (int i = 1; i < num_machines; ++i) {
size_start[i] = size_start[i - 1] + size_len[i - 1];
}
comm_size_t total_buffer_size = size_start[num_machines - 1] + size_len[num_machines - 1];
std::vector<char> output_buffer(total_buffer_size);
// gather global feature bin mappers
Network::Allgather(input_buffer.data(), size_start.data(), size_len.data(), output_buffer.data(), buffer_size);
Network::Allgather(input_buffer.data(), size_start.data(), size_len.data(), output_buffer.data(), total_buffer_size);
cp_ptr = output_buffer.data();
// restore features bins from buffer
for (int i = 0; i < num_total_features; ++i) {
for (int i = 0; i < dataset->num_total_features_; ++i) {
if (ignore_features_.count(i) > 0) {
bin_mappers[i] = nullptr;
continue;
}
bin_mappers[i].reset(new BinMapper());
bin_mappers[i]->CopyFrom(output_buffer.data() + i * type_size);
bin_mappers[i]->CopyFrom(cp_ptr);
cp_ptr += bin_mappers[i]->SizesInByte();
}
}
sample_values.clear();
dataset->Construct(&bin_mappers, dataset->num_total_features_, forced_bin_bounds, Common::Vector2Ptr<int>(&sample_indices).data(),
Common::VectorSize<int>(sample_indices).data(), sample_data.size(), config_);
Common::VectorSize<int>(sample_indices).data(), static_cast<int>(sample_indices.size()), sample_data.size(), config_);
}
/*! \brief Extract local features from memory */
......
......@@ -700,11 +700,12 @@ class HistogramPool {
void DynamicChangeSize(const Dataset* train_data, const Config* config, int cache_size, int total_size) {
if (feature_metas_.empty()) {
uint64_t bin_cnt_over_features = 0;
int num_feature = train_data->num_features();
feature_metas_.resize(num_feature);
#pragma omp parallel for schedule(static, 512) if (num_feature >= 1024)
for (int i = 0; i < num_feature; ++i) {
feature_metas_[i].num_bin = train_data->FeatureNumBin(i);
bin_cnt_over_features += static_cast<uint64_t>(feature_metas_[i].num_bin);
feature_metas_[i].default_bin = train_data->FeatureBinMapper(i)->GetDefaultBin();
feature_metas_[i].missing_type = train_data->FeatureBinMapper(i)->missing_type();
feature_metas_[i].monotone_type = train_data->FeatureMonotone(i);
......@@ -717,9 +718,9 @@ class HistogramPool {
feature_metas_[i].config = config;
feature_metas_[i].bin_type = train_data->FeatureBinMapper(i)->bin_type();
}
Log::Info("Total Bins %d", bin_cnt_over_features);
}
uint64_t num_total_bin = train_data->NumTotalBin();
Log::Info("Total Bins %d", num_total_bin);
int old_cache_size = static_cast<int>(pool_.size());
Reset(cache_size, total_size);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment