/*! * Copyright (c) 2016 Microsoft Corporation. All rights reserved. * Licensed under the MIT License. See LICENSE file in the project root for license information. */ #include #include #include #include "parallel_tree_learner.h" namespace LightGBM { template DataParallelTreeLearner::DataParallelTreeLearner(const Config* config) :TREELEARNER_T(config) { } template DataParallelTreeLearner::~DataParallelTreeLearner() { } template void DataParallelTreeLearner::Init(const Dataset* train_data, bool is_constant_hessian) { // initialize SerialTreeLearner TREELEARNER_T::Init(train_data, is_constant_hessian); // Get local rank and global machine size rank_ = Network::rank(); num_machines_ = Network::num_machines(); // allocate buffer for communication size_t buffer_size = this->train_data_->NumTotalBin() * KHistEntrySize; input_buffer_.resize(buffer_size); output_buffer_.resize(buffer_size); is_feature_aggregated_.resize(this->num_features_); block_start_.resize(num_machines_); block_len_.resize(num_machines_); buffer_write_start_pos_.resize(this->num_features_); buffer_read_start_pos_.resize(this->num_features_); global_data_count_in_leaf_.resize(this->config_->num_leaves); } template void DataParallelTreeLearner::ResetConfig(const Config* config) { TREELEARNER_T::ResetConfig(config); global_data_count_in_leaf_.resize(this->config_->num_leaves); } template void DataParallelTreeLearner::BeforeTrain() { TREELEARNER_T::BeforeTrain(); // generate feature partition for current tree std::vector> feature_distribution(num_machines_, std::vector()); std::vector num_bins_distributed(num_machines_, 0); for (int i = 0; i < this->train_data_->num_total_features(); ++i) { int inner_feature_index = this->train_data_->InnerFeatureIndex(i); if (inner_feature_index == -1) { continue; } if (this->is_feature_used_[inner_feature_index]) { int cur_min_machine = static_cast(ArrayArgs::ArgMin(num_bins_distributed)); feature_distribution[cur_min_machine].push_back(inner_feature_index); auto num_bin = this->train_data_->FeatureNumBin(inner_feature_index); if (this->train_data_->FeatureBinMapper(inner_feature_index)->GetMostFreqBin() == 0) { num_bin -= 1; } num_bins_distributed[cur_min_machine] += num_bin; } is_feature_aggregated_[inner_feature_index] = false; } // get local used feature for (auto fid : feature_distribution[rank_]) { is_feature_aggregated_[fid] = true; } // get block start and block len for reduce scatter reduce_scatter_size_ = 0; for (int i = 0; i < num_machines_; ++i) { block_len_[i] = 0; for (auto fid : feature_distribution[i]) { auto num_bin = this->train_data_->FeatureNumBin(fid); if (this->train_data_->FeatureBinMapper(fid)->GetMostFreqBin() == 0) { num_bin -= 1; } block_len_[i] += num_bin * KHistEntrySize; } reduce_scatter_size_ += block_len_[i]; } block_start_[0] = 0; for (int i = 1; i < num_machines_; ++i) { block_start_[i] = block_start_[i - 1] + block_len_[i - 1]; } // get buffer_write_start_pos_ int bin_size = 0; for (int i = 0; i < num_machines_; ++i) { for (auto fid : feature_distribution[i]) { buffer_write_start_pos_[fid] = bin_size; auto num_bin = this->train_data_->FeatureNumBin(fid); if (this->train_data_->FeatureBinMapper(fid)->GetMostFreqBin() == 0) { num_bin -= 1; } bin_size += num_bin * KHistEntrySize; } } // get buffer_read_start_pos_ bin_size = 0; for (auto fid : feature_distribution[rank_]) { buffer_read_start_pos_[fid] = bin_size; auto num_bin = this->train_data_->FeatureNumBin(fid); if (this->train_data_->FeatureBinMapper(fid)->GetMostFreqBin() == 0) { num_bin -= 1; } bin_size += num_bin * KHistEntrySize; } // sync global data sumup info std::tuple data(this->smaller_leaf_splits_->num_data_in_leaf(), this->smaller_leaf_splits_->sum_gradients(), this->smaller_leaf_splits_->sum_hessians()); int size = sizeof(data); std::memcpy(input_buffer_.data(), &data, size); // global sumup reduce Network::Allreduce(input_buffer_.data(), size, sizeof(std::tuple), output_buffer_.data(), [](const char *src, char *dst, int type_size, comm_size_t len) { comm_size_t used_size = 0; const std::tuple *p1; std::tuple *p2; while (used_size < len) { p1 = reinterpret_cast *>(src); p2 = reinterpret_cast *>(dst); std::get<0>(*p2) = std::get<0>(*p2) + std::get<0>(*p1); std::get<1>(*p2) = std::get<1>(*p2) + std::get<1>(*p1); std::get<2>(*p2) = std::get<2>(*p2) + std::get<2>(*p1); src += type_size; dst += type_size; used_size += type_size; } }); // copy back std::memcpy(reinterpret_cast(&data), output_buffer_.data(), size); // set global sumup info this->smaller_leaf_splits_->Init(std::get<1>(data), std::get<2>(data)); // init global data count in leaf global_data_count_in_leaf_[0] = std::get<0>(data); } template void DataParallelTreeLearner::FindBestSplits() { TREELEARNER_T::ConstructHistograms(this->is_feature_used_, true); // construct local histograms #pragma omp parallel for schedule(static) for (int feature_index = 0; feature_index < this->num_features_; ++feature_index) { if ((!this->is_feature_used_.empty() && this->is_feature_used_[feature_index] == false)) continue; // copy to buffer std::memcpy(input_buffer_.data() + buffer_write_start_pos_[feature_index], this->smaller_leaf_histogram_array_[feature_index].RawData(), this->smaller_leaf_histogram_array_[feature_index].SizeOfHistgram()); } // Reduce scatter for histogram Network::ReduceScatter(input_buffer_.data(), reduce_scatter_size_, sizeof(hist_t), block_start_.data(), block_len_.data(), output_buffer_.data(), static_cast(output_buffer_.size()), &HistogramSumReducer); this->FindBestSplitsFromHistograms(this->is_feature_used_, true); } template void DataParallelTreeLearner::FindBestSplitsFromHistograms(const std::vector&, bool) { std::vector smaller_bests_per_thread(this->num_threads_, SplitInfo()); std::vector larger_bests_per_thread(this->num_threads_, SplitInfo()); std::vector smaller_node_used_features(this->num_features_, 1); std::vector larger_node_used_features(this->num_features_, 1); if (this->config_->feature_fraction_bynode < 1.0f) { smaller_node_used_features = this->GetUsedFeatures(false); larger_node_used_features = this->GetUsedFeatures(false); } OMP_INIT_EX(); #pragma omp parallel for schedule(static) for (int feature_index = 0; feature_index < this->num_features_; ++feature_index) { OMP_LOOP_EX_BEGIN(); if (!is_feature_aggregated_[feature_index]) continue; const int tid = omp_get_thread_num(); const int real_feature_index = this->train_data_->RealFeatureIndex(feature_index); // restore global histograms from buffer this->smaller_leaf_histogram_array_[feature_index].FromMemory( output_buffer_.data() + buffer_read_start_pos_[feature_index]); this->train_data_->FixHistogram(feature_index, this->smaller_leaf_splits_->sum_gradients(), this->smaller_leaf_splits_->sum_hessians(), this->smaller_leaf_histogram_array_[feature_index].RawData()); SplitInfo smaller_split; // find best threshold for smaller child this->smaller_leaf_histogram_array_[feature_index].FindBestThreshold( this->smaller_leaf_splits_->sum_gradients(), this->smaller_leaf_splits_->sum_hessians(), GetGlobalDataCountInLeaf(this->smaller_leaf_splits_->LeafIndex()), this->smaller_leaf_splits_->min_constraint(), this->smaller_leaf_splits_->max_constraint(), &smaller_split); smaller_split.feature = real_feature_index; if (smaller_split > smaller_bests_per_thread[tid] && smaller_node_used_features[feature_index]) { smaller_bests_per_thread[tid] = smaller_split; } // only root leaf if (this->larger_leaf_splits_ == nullptr || this->larger_leaf_splits_->LeafIndex() < 0) continue; // construct histgroms for large leaf, we init larger leaf as the parent, so we can just subtract the smaller leaf's histograms this->larger_leaf_histogram_array_[feature_index].Subtract( this->smaller_leaf_histogram_array_[feature_index]); SplitInfo larger_split; // find best threshold for larger child this->larger_leaf_histogram_array_[feature_index].FindBestThreshold( this->larger_leaf_splits_->sum_gradients(), this->larger_leaf_splits_->sum_hessians(), GetGlobalDataCountInLeaf(this->larger_leaf_splits_->LeafIndex()), this->larger_leaf_splits_->min_constraint(), this->larger_leaf_splits_->max_constraint(), &larger_split); larger_split.feature = real_feature_index; if (larger_split > larger_bests_per_thread[tid] && larger_node_used_features[feature_index]) { larger_bests_per_thread[tid] = larger_split; } OMP_LOOP_EX_END(); } OMP_THROW_EX(); auto smaller_best_idx = ArrayArgs::ArgMax(smaller_bests_per_thread); int leaf = this->smaller_leaf_splits_->LeafIndex(); this->best_split_per_leaf_[leaf] = smaller_bests_per_thread[smaller_best_idx]; if (this->larger_leaf_splits_ != nullptr && this->larger_leaf_splits_->LeafIndex() >= 0) { leaf = this->larger_leaf_splits_->LeafIndex(); auto larger_best_idx = ArrayArgs::ArgMax(larger_bests_per_thread); this->best_split_per_leaf_[leaf] = larger_bests_per_thread[larger_best_idx]; } SplitInfo smaller_best_split, larger_best_split; smaller_best_split = this->best_split_per_leaf_[this->smaller_leaf_splits_->LeafIndex()]; // find local best split for larger leaf if (this->larger_leaf_splits_->LeafIndex() >= 0) { larger_best_split = this->best_split_per_leaf_[this->larger_leaf_splits_->LeafIndex()]; } // sync global best info SyncUpGlobalBestSplit(input_buffer_.data(), input_buffer_.data(), &smaller_best_split, &larger_best_split, this->config_->max_cat_threshold); // set best split this->best_split_per_leaf_[this->smaller_leaf_splits_->LeafIndex()] = smaller_best_split; if (this->larger_leaf_splits_->LeafIndex() >= 0) { this->best_split_per_leaf_[this->larger_leaf_splits_->LeafIndex()] = larger_best_split; } } template void DataParallelTreeLearner::Split(Tree* tree, int best_Leaf, int* left_leaf, int* right_leaf) { TREELEARNER_T::Split(tree, best_Leaf, left_leaf, right_leaf); const SplitInfo& best_split_info = this->best_split_per_leaf_[best_Leaf]; // need update global number of data in leaf global_data_count_in_leaf_[*left_leaf] = best_split_info.left_count; global_data_count_in_leaf_[*right_leaf] = best_split_info.right_count; } // instantiate template classes, otherwise linker cannot find the code template class DataParallelTreeLearner; template class DataParallelTreeLearner; } // namespace LightGBM