#include "parallel_tree_learner.h" #include #include #include #include namespace LightGBM { VotingParallelTreeLearner::VotingParallelTreeLearner(const TreeConfig& tree_config) :SerialTreeLearner(tree_config) { top_k_ = tree_config.top_k; } void VotingParallelTreeLearner::Init(const Dataset* train_data) { SerialTreeLearner::Init(train_data); rank_ = Network::rank(); num_machines_ = Network::num_machines(); // limit top k if (top_k_ > num_features_) { top_k_ = num_features_; } // get max bin int max_bin = 0; for (int i = 0; i < num_features_; ++i) { if (max_bin < train_data_->FeatureAt(i)->num_bin()) { max_bin = train_data_->FeatureAt(i)->num_bin(); } } // calculate buffer size size_t buffer_size = 2 * top_k_ * std::max(max_bin * sizeof(HistogramBinEntry), sizeof(SplitInfo) * num_machines_); // left and right on same time, so need double size input_buffer_.resize(buffer_size); output_buffer_.resize(buffer_size); smaller_is_feature_aggregated_.resize(num_features_); larger_is_feature_aggregated_.resize(num_features_); block_start_.resize(num_machines_); block_len_.resize(num_machines_); smaller_buffer_read_start_pos_.resize(num_features_); larger_buffer_read_start_pos_.resize(num_features_); global_data_count_in_leaf_.resize(tree_config_.num_leaves); smaller_leaf_splits_global_.reset(new LeafSplits(train_data_->num_features(), train_data_->num_data())); larger_leaf_splits_global_.reset(new LeafSplits(train_data_->num_features(), train_data_->num_data())); local_tree_config_ = tree_config_; local_tree_config_.min_data_in_leaf /= num_machines_; local_tree_config_.min_sum_hessian_in_leaf /= num_machines_; auto histogram_create_function = [this]() { auto tmp_histogram_array = std::unique_ptr(new FeatureHistogram[train_data_->num_features()]); for (int j = 0; j < train_data_->num_features(); ++j) { tmp_histogram_array[j].Init(train_data_->FeatureAt(j), j, &local_tree_config_); } return tmp_histogram_array.release(); }; histogram_pool_.Fill(histogram_create_function); // initialize histograms for global smaller_leaf_histogram_array_global_.reset(new FeatureHistogram[num_features_]); larger_leaf_histogram_array_global_.reset(new FeatureHistogram[num_features_]); for (int j = 0; j < num_features_; ++j) { smaller_leaf_histogram_array_global_[j].Init(train_data_->FeatureAt(j), j, &tree_config_); larger_leaf_histogram_array_global_[j].Init(train_data_->FeatureAt(j), j, &tree_config_); } } void VotingParallelTreeLearner::BeforeTrain() { SerialTreeLearner::BeforeTrain(); // sync global data sumup info std::tuple data(smaller_leaf_splits_->num_data_in_leaf(), smaller_leaf_splits_->sum_gradients(), smaller_leaf_splits_->sum_hessians()); int size = sizeof(std::tuple); std::memcpy(input_buffer_.data(), &data, size); Network::Allreduce(input_buffer_.data(), size, size, output_buffer_.data(), [](const char *src, char *dst, int len) { int used_size = 0; int type_size = sizeof(std::tuple); const std::tuple *p1; std::tuple *p2; while (used_size < len) { p1 = reinterpret_cast *>(src); p2 = reinterpret_cast *>(dst); std::get<0>(*p2) = std::get<0>(*p2) + std::get<0>(*p1); std::get<1>(*p2) = std::get<1>(*p2) + std::get<1>(*p1); std::get<2>(*p2) = std::get<2>(*p2) + std::get<2>(*p1); src += type_size; dst += type_size; used_size += type_size; } }); std::memcpy(&data, output_buffer_.data(), size); // set global sumup info smaller_leaf_splits_global_->Init(std::get<1>(data), std::get<2>(data)); larger_leaf_splits_global_->Init(); // init global data count in leaf global_data_count_in_leaf_[0] = std::get<0>(data); } bool VotingParallelTreeLearner::BeforeFindBestSplit(int left_leaf, int right_leaf) { if (SerialTreeLearner::BeforeFindBestSplit(left_leaf, right_leaf)) { data_size_t num_data_in_left_child = GetGlobalDataCountInLeaf(left_leaf); data_size_t num_data_in_right_child = GetGlobalDataCountInLeaf(right_leaf); if (right_leaf < 0) { return true; } else if (num_data_in_left_child < num_data_in_right_child) { // get local sumup smaller_leaf_splits_->Init(left_leaf, data_partition_.get(), gradients_, hessians_); larger_leaf_splits_->Init(right_leaf, data_partition_.get(), gradients_, hessians_); } else { // get local sumup smaller_leaf_splits_->Init(right_leaf, data_partition_.get(), gradients_, hessians_); larger_leaf_splits_->Init(left_leaf, data_partition_.get(), gradients_, hessians_); } return true; } else { return false; } } void VotingParallelTreeLearner::GlobalVoting(int leaf_idx, const std::vector& splits, std::vector* out) { out->clear(); if (leaf_idx < 0) { return; } // get mean number on machines score_t mean_num_data = GetGlobalDataCountInLeaf(leaf_idx) / static_cast(num_machines_); std::vector feature_best_split(num_features_, SplitInfo()); for (auto & split : splits) { int fid = split.feature; if (fid < 0) { continue; } // weighted gain double gain = split.gain * (split.left_count + split.right_count) / mean_num_data; if (gain > feature_best_split[fid].gain) { feature_best_split[fid] = split; feature_best_split[fid].gain = gain; } } // get top k std::vector top_k_splits; ArrayArgs::MaxK(feature_best_split, top_k_, &top_k_splits); for (auto& split : top_k_splits) { if (split.gain == kMinScore || split.feature == -1) { continue; } out->push_back(split.feature); } } void VotingParallelTreeLearner::CopyLocalHistogram(const std::vector& smaller_top_features, const std::vector& larger_top_features) { for (int i = 0; i < num_features_; ++i) { smaller_is_feature_aggregated_[i] = false; larger_is_feature_aggregated_[i] = false; } size_t total_num_features = smaller_top_features.size() + larger_top_features.size(); size_t average_feature = (total_num_features + num_machines_ - 1) / num_machines_; size_t used_num_features = 0, smaller_idx = 0, larger_idx = 0; block_start_[0] = 0; reduce_scatter_size_ = 0; // Copy histogram to buffer, and Get local aggregate features for (int i = 0; i < num_machines_; ++i) { size_t cur_size = 0, cur_used_features = 0; size_t cur_total_feature = std::min(average_feature, total_num_features - used_num_features); // copy histograms. while (cur_used_features < cur_total_feature) { // copy smaller leaf histograms first if (smaller_idx < smaller_top_features.size()) { int fid = smaller_top_features[smaller_idx]; ++cur_used_features; // mark local aggregated feature if (i == rank_) { smaller_is_feature_aggregated_[fid] = true; smaller_buffer_read_start_pos_[fid] = static_cast(cur_size); } // copy std::memcpy(input_buffer_.data() + reduce_scatter_size_, smaller_leaf_histogram_array_[fid].HistogramData(), smaller_leaf_histogram_array_[fid].SizeOfHistgram()); cur_size += smaller_leaf_histogram_array_[fid].SizeOfHistgram(); reduce_scatter_size_ += smaller_leaf_histogram_array_[fid].SizeOfHistgram(); ++smaller_idx; } if (cur_used_features >= cur_total_feature) { break; } // then copy larger leaf histograms if (larger_idx < larger_top_features.size()) { int fid = larger_top_features[larger_idx]; ++cur_used_features; // mark local aggregated feature if (i == rank_) { larger_is_feature_aggregated_[fid] = true; larger_buffer_read_start_pos_[fid] = static_cast(cur_size); } // copy std::memcpy(input_buffer_.data() + reduce_scatter_size_, larger_leaf_histogram_array_[fid].HistogramData(), larger_leaf_histogram_array_[fid].SizeOfHistgram()); cur_size += larger_leaf_histogram_array_[fid].SizeOfHistgram(); reduce_scatter_size_ += larger_leaf_histogram_array_[fid].SizeOfHistgram(); ++larger_idx; } } used_num_features += cur_used_features; block_len_[i] = static_cast(cur_size); if (i < num_machines_ - 1) { block_start_[i + 1] = block_start_[i] + block_len_[i]; } } } void VotingParallelTreeLearner::FindBestThresholds() { // use local data to find local best splits SerialTreeLearner::FindBestThresholds(); std::vector smaller_top_k_splits, larger_top_k_splits; // local voting ArrayArgs::MaxK(smaller_leaf_splits_->BestSplitPerFeature(), top_k_, &smaller_top_k_splits); ArrayArgs::MaxK(larger_leaf_splits_->BestSplitPerFeature(), top_k_, &larger_top_k_splits); // gather int offset = 0; for (int i = 0; i < top_k_; ++i) { std::memcpy(input_buffer_.data() + offset, &smaller_top_k_splits[i], sizeof(SplitInfo)); offset += sizeof(SplitInfo); std::memcpy(input_buffer_.data() + offset, &larger_top_k_splits[i], sizeof(SplitInfo)); offset += sizeof(SplitInfo); } Network::Allgather(input_buffer_.data(), offset, output_buffer_.data()); // get all top-k from all machines std::vector smaller_top_k_splits_global; std::vector larger_top_k_splits_global; offset = 0; for (int i = 0; i < num_machines_; ++i) { for (int j = 0; j < top_k_; ++j) { smaller_top_k_splits_global.push_back(SplitInfo()); std::memcpy(&smaller_top_k_splits_global.back(), output_buffer_.data() + offset, sizeof(SplitInfo)); offset += sizeof(SplitInfo); larger_top_k_splits_global.push_back(SplitInfo()); std::memcpy(&larger_top_k_splits_global.back(), output_buffer_.data() + offset, sizeof(SplitInfo)); offset += sizeof(SplitInfo); } } // global voting std::vector smaller_top_features, larger_top_features; GlobalVoting(smaller_leaf_splits_->LeafIndex(), smaller_top_k_splits_global, &smaller_top_features); GlobalVoting(larger_leaf_splits_->LeafIndex(), larger_top_k_splits_global, &larger_top_features); // copy local histgrams to buffer CopyLocalHistogram(smaller_top_features, larger_top_features); // Reduce scatter for histogram Network::ReduceScatter(input_buffer_.data(), reduce_scatter_size_, block_start_.data(), block_len_.data(), output_buffer_.data(), &HistogramBinEntry::SumReducer); // find best split from local aggregated histograms #pragma omp parallel for schedule(guided) for (int feature_index = 0; feature_index < num_features_; ++feature_index) { if (smaller_is_feature_aggregated_[feature_index]) { smaller_leaf_histogram_array_global_[feature_index].SetSumup( GetGlobalDataCountInLeaf(smaller_leaf_splits_global_->LeafIndex()), smaller_leaf_splits_global_->sum_gradients(), smaller_leaf_splits_global_->sum_hessians()); // restore from buffer smaller_leaf_histogram_array_global_[feature_index].FromMemory( output_buffer_.data() + smaller_buffer_read_start_pos_[feature_index]); // find best threshold smaller_leaf_histogram_array_global_[feature_index].FindBestThreshold( &smaller_leaf_splits_global_->BestSplitPerFeature()[feature_index]); } if (larger_is_feature_aggregated_[feature_index]) { larger_leaf_histogram_array_global_[feature_index].SetSumup(GetGlobalDataCountInLeaf(larger_leaf_splits_global_->LeafIndex()), larger_leaf_splits_global_->sum_gradients(), larger_leaf_splits_global_->sum_hessians()); // restore from buffer larger_leaf_histogram_array_global_[feature_index].FromMemory(output_buffer_.data() + larger_buffer_read_start_pos_[feature_index]); // find best threshold larger_leaf_histogram_array_global_[feature_index].FindBestThreshold(&larger_leaf_splits_global_->BestSplitPerFeature()[feature_index]); } } } void VotingParallelTreeLearner::FindBestSplitsForLeaves() { int smaller_best_feature = -1, larger_best_feature = -1; // find local best SplitInfo smaller_best, larger_best; std::vector gains; for (size_t i = 0; i < smaller_leaf_splits_global_->BestSplitPerFeature().size(); ++i) { gains.push_back(smaller_leaf_splits_global_->BestSplitPerFeature()[i].gain); } smaller_best_feature = static_cast(ArrayArgs::ArgMax(gains)); smaller_best = smaller_leaf_splits_global_->BestSplitPerFeature()[smaller_best_feature]; if (larger_leaf_splits_global_->LeafIndex() >= 0) { gains.clear(); for (size_t i = 0; i < larger_leaf_splits_global_->BestSplitPerFeature().size(); ++i) { gains.push_back(larger_leaf_splits_global_->BestSplitPerFeature()[i].gain); } larger_best_feature = static_cast(ArrayArgs::ArgMax(gains)); larger_best = larger_leaf_splits_global_->BestSplitPerFeature()[larger_best_feature]; } // sync global best info std::memcpy(input_buffer_.data(), &smaller_best, sizeof(SplitInfo)); std::memcpy(input_buffer_.data() + sizeof(SplitInfo), &larger_best, sizeof(SplitInfo)); Network::Allreduce(input_buffer_.data(), sizeof(SplitInfo) * 2, sizeof(SplitInfo), output_buffer_.data(), &SplitInfo::MaxReducer); std::memcpy(&smaller_best, output_buffer_.data(), sizeof(SplitInfo)); std::memcpy(&larger_best, output_buffer_.data() + sizeof(SplitInfo), sizeof(SplitInfo)); // copy back best_split_per_leaf_[smaller_leaf_splits_global_->LeafIndex()] = smaller_best; if (larger_best.feature >= 0 && larger_leaf_splits_global_->LeafIndex() >= 0) { best_split_per_leaf_[larger_leaf_splits_global_->LeafIndex()] = larger_best; } } void VotingParallelTreeLearner::Split(Tree* tree, int best_Leaf, int* left_leaf, int* right_leaf) { SerialTreeLearner::Split(tree, best_Leaf, left_leaf, right_leaf); const SplitInfo& best_split_info = best_split_per_leaf_[best_Leaf]; // set the global number of data for leaves global_data_count_in_leaf_[*left_leaf] = best_split_info.left_count; global_data_count_in_leaf_[*right_leaf] = best_split_info.right_count; // init the global sumup info if (best_split_info.left_count < best_split_info.right_count) { smaller_leaf_splits_global_->Init(*left_leaf, data_partition_.get(), best_split_info.left_sum_gradient, best_split_info.left_sum_hessian); larger_leaf_splits_global_->Init(*right_leaf, data_partition_.get(), best_split_info.right_sum_gradient, best_split_info.right_sum_hessian); } else { smaller_leaf_splits_global_->Init(*right_leaf, data_partition_.get(), best_split_info.right_sum_gradient, best_split_info.right_sum_hessian); larger_leaf_splits_global_->Init(*left_leaf, data_partition_.get(), best_split_info.left_sum_gradient, best_split_info.left_sum_hessian); } } } // namespace FTLBoost