data_parallel_tree_learner.cpp 10.9 KB
Newer Older
Guolin Ke's avatar
Guolin Ke committed
1
2
3
4
5
6
7
8
9
#include "parallel_tree_learner.h"

#include <cstring>

#include <tuple>
#include <vector>

namespace LightGBM {

10
11
12
template <typename TREELEARNER_T>
DataParallelTreeLearner<TREELEARNER_T>::DataParallelTreeLearner(const TreeConfig* tree_config)
  :TREELEARNER_T(tree_config) {
Guolin Ke's avatar
Guolin Ke committed
13
14
}

15
16
template <typename TREELEARNER_T>
DataParallelTreeLearner<TREELEARNER_T>::~DataParallelTreeLearner() {
Guolin Ke's avatar
Guolin Ke committed
17

Guolin Ke's avatar
Guolin Ke committed
18
19
}

20
21
template <typename TREELEARNER_T>
void DataParallelTreeLearner<TREELEARNER_T>::Init(const Dataset* train_data, bool is_constant_hessian) {
Guolin Ke's avatar
Guolin Ke committed
22
  // initialize SerialTreeLearner
23
  TREELEARNER_T::Init(train_data, is_constant_hessian);
Guolin Ke's avatar
Guolin Ke committed
24
25
26
27
  // Get local rank and global machine size
  rank_ = Network::rank();
  num_machines_ = Network::num_machines();
  // allocate buffer for communication
28
  size_t buffer_size = this->train_data_->NumTotalBin() * sizeof(HistogramBinEntry);
Guolin Ke's avatar
Guolin Ke committed
29

Guolin Ke's avatar
Guolin Ke committed
30
31
  input_buffer_.resize(buffer_size);
  output_buffer_.resize(buffer_size);
Guolin Ke's avatar
Guolin Ke committed
32

33
  is_feature_aggregated_.resize(this->num_features_);
Guolin Ke's avatar
Guolin Ke committed
34

Guolin Ke's avatar
Guolin Ke committed
35
36
  block_start_.resize(num_machines_);
  block_len_.resize(num_machines_);
Guolin Ke's avatar
Guolin Ke committed
37

38
39
40
  buffer_write_start_pos_.resize(this->num_features_);
  buffer_read_start_pos_.resize(this->num_features_);
  global_data_count_in_leaf_.resize(this->tree_config_->num_leaves);
Guolin Ke's avatar
Guolin Ke committed
41
42
}

43
44
45
46
template <typename TREELEARNER_T>
void DataParallelTreeLearner<TREELEARNER_T>::ResetConfig(const TreeConfig* tree_config) {
  TREELEARNER_T::ResetConfig(tree_config);
  global_data_count_in_leaf_.resize(this->tree_config_->num_leaves);
Guolin Ke's avatar
Guolin Ke committed
47
}
Guolin Ke's avatar
Guolin Ke committed
48

49
50
51
template <typename TREELEARNER_T>
void DataParallelTreeLearner<TREELEARNER_T>::BeforeTrain() {
  TREELEARNER_T::BeforeTrain();
Guolin Ke's avatar
Guolin Ke committed
52
53
54
  // generate feature partition for current tree
  std::vector<std::vector<int>> feature_distribution(num_machines_, std::vector<int>());
  std::vector<int> num_bins_distributed(num_machines_, 0);
55
56
  for (int i = 0; i < this->train_data_->num_total_features(); ++i) {
    int inner_feature_index = this->train_data_->InnerFeatureIndex(i);
Guolin Ke's avatar
Guolin Ke committed
57
    if (inner_feature_index == -1) { continue; }
58
    if (this->is_feature_used_[inner_feature_index]) {
Guolin Ke's avatar
Guolin Ke committed
59
      int cur_min_machine = static_cast<int>(ArrayArgs<int>::ArgMin(num_bins_distributed));
Guolin Ke's avatar
Guolin Ke committed
60
      feature_distribution[cur_min_machine].push_back(inner_feature_index);
61
62
      auto num_bin = this->train_data_->FeatureNumBin(inner_feature_index);
      if (this->train_data_->FeatureBinMapper(inner_feature_index)->GetDefaultBin() == 0) {
Guolin Ke's avatar
Guolin Ke committed
63
64
65
        num_bin -= 1;
      }
      num_bins_distributed[cur_min_machine] += num_bin;
Guolin Ke's avatar
Guolin Ke committed
66
    }
Guolin Ke's avatar
Guolin Ke committed
67
    is_feature_aggregated_[inner_feature_index] = false;
Guolin Ke's avatar
Guolin Ke committed
68
69
70
71
72
73
74
75
76
77
78
  }
  // get local used feature
  for (auto fid : feature_distribution[rank_]) {
    is_feature_aggregated_[fid] = true;
  }

  // get block start and block len for reduce scatter
  reduce_scatter_size_ = 0;
  for (int i = 0; i < num_machines_; ++i) {
    block_len_[i] = 0;
    for (auto fid : feature_distribution[i]) {
79
80
      auto num_bin = this->train_data_->FeatureNumBin(fid);
      if (this->train_data_->FeatureBinMapper(fid)->GetDefaultBin() == 0) {
Guolin Ke's avatar
Guolin Ke committed
81
82
83
        num_bin -= 1;
      }
      block_len_[i] += num_bin * sizeof(HistogramBinEntry);
Guolin Ke's avatar
Guolin Ke committed
84
85
86
87
88
89
90
91
92
93
94
95
96
97
    }
    reduce_scatter_size_ += block_len_[i];
  }

  block_start_[0] = 0;
  for (int i = 1; i < num_machines_; ++i) {
    block_start_[i] = block_start_[i - 1] + block_len_[i - 1];
  }

  // get buffer_write_start_pos_
  int bin_size = 0;
  for (int i = 0; i < num_machines_; ++i) {
    for (auto fid : feature_distribution[i]) {
      buffer_write_start_pos_[fid] = bin_size;
98
99
      auto num_bin = this->train_data_->FeatureNumBin(fid);
      if (this->train_data_->FeatureBinMapper(fid)->GetDefaultBin() == 0) {
Guolin Ke's avatar
Guolin Ke committed
100
101
102
        num_bin -= 1;
      }
      bin_size += num_bin * sizeof(HistogramBinEntry);
Guolin Ke's avatar
Guolin Ke committed
103
104
105
106
107
108
109
    }
  }

  // get buffer_read_start_pos_
  bin_size = 0;
  for (auto fid : feature_distribution[rank_]) {
    buffer_read_start_pos_[fid] = bin_size;
110
111
    auto num_bin = this->train_data_->FeatureNumBin(fid);
    if (this->train_data_->FeatureBinMapper(fid)->GetDefaultBin() == 0) {
Guolin Ke's avatar
Guolin Ke committed
112
113
114
      num_bin -= 1;
    }
    bin_size += num_bin * sizeof(HistogramBinEntry);
Guolin Ke's avatar
Guolin Ke committed
115
116
117
  }

  // sync global data sumup info
118
119
  std::tuple<data_size_t, double, double> data(this->smaller_leaf_splits_->num_data_in_leaf(),
                                               this->smaller_leaf_splits_->sum_gradients(), this->smaller_leaf_splits_->sum_hessians());
Guolin Ke's avatar
Guolin Ke committed
120
  int size = sizeof(data);
Guolin Ke's avatar
Guolin Ke committed
121
  std::memcpy(input_buffer_.data(), &data, size);
Guolin Ke's avatar
Guolin Ke committed
122
  // global sumup reduce
Guolin Ke's avatar
Guolin Ke committed
123
  Network::Allreduce(input_buffer_.data(), size, size, output_buffer_.data(), [](const char *src, char *dst, int len) {
Guolin Ke's avatar
Guolin Ke committed
124
    int used_size = 0;
Guolin Ke's avatar
Guolin Ke committed
125
126
127
    int type_size = sizeof(std::tuple<data_size_t, double, double>);
    const std::tuple<data_size_t, double, double> *p1;
    std::tuple<data_size_t, double, double> *p2;
Guolin Ke's avatar
Guolin Ke committed
128
    while (used_size < len) {
Guolin Ke's avatar
Guolin Ke committed
129
130
      p1 = reinterpret_cast<const std::tuple<data_size_t, double, double> *>(src);
      p2 = reinterpret_cast<std::tuple<data_size_t, double, double> *>(dst);
Guolin Ke's avatar
Guolin Ke committed
131
132
133
134
135
136
137
138
139
      std::get<0>(*p2) = std::get<0>(*p2) + std::get<0>(*p1);
      std::get<1>(*p2) = std::get<1>(*p2) + std::get<1>(*p1);
      std::get<2>(*p2) = std::get<2>(*p2) + std::get<2>(*p1);
      src += type_size;
      dst += type_size;
      used_size += type_size;
    }
  });
  // copy back
Guolin Ke's avatar
Guolin Ke committed
140
  std::memcpy(&data, output_buffer_.data(), size);
Guolin Ke's avatar
Guolin Ke committed
141
  // set global sumup info
142
  this->smaller_leaf_splits_->Init(std::get<1>(data), std::get<2>(data));
Guolin Ke's avatar
Guolin Ke committed
143
144
145
146
  // init global data count in leaf
  global_data_count_in_leaf_[0] = std::get<0>(data);
}

147
template <typename TREELEARNER_T>
Guolin Ke's avatar
Guolin Ke committed
148
149
void DataParallelTreeLearner<TREELEARNER_T>::FindBestSplits() {
  TREELEARNER_T::ConstructHistograms(this->is_feature_used_, true);
Guolin Ke's avatar
Guolin Ke committed
150
  // construct local histograms
151
  #pragma omp parallel for schedule(static)
152
153
  for (int feature_index = 0; feature_index < this->num_features_; ++feature_index) {
    if ((!this->is_feature_used_.empty() && this->is_feature_used_[feature_index] == false)) continue;
Guolin Ke's avatar
Guolin Ke committed
154
    // copy to buffer
Guolin Ke's avatar
Guolin Ke committed
155
    std::memcpy(input_buffer_.data() + buffer_write_start_pos_[feature_index],
156
157
                this->smaller_leaf_histogram_array_[feature_index].RawData(),
                this->smaller_leaf_histogram_array_[feature_index].SizeOfHistgram());
Guolin Ke's avatar
Guolin Ke committed
158
159
  }
  // Reduce scatter for histogram
Guolin Ke's avatar
Guolin Ke committed
160
  Network::ReduceScatter(input_buffer_.data(), reduce_scatter_size_, block_start_.data(),
161
                         block_len_.data(), output_buffer_.data(), &HistogramBinEntry::SumReducer);
Guolin Ke's avatar
Guolin Ke committed
162
163
164
165
166
167
168
  this->FindBestSplitsFromHistograms(this->is_feature_used_, true);
}

template <typename TREELEARNER_T>
void DataParallelTreeLearner<TREELEARNER_T>::FindBestSplitsFromHistograms(const std::vector<int8_t>&, bool) {
  std::vector<SplitInfo> smaller_bests_per_thread(this->num_threads_, SplitInfo());
  std::vector<SplitInfo> larger_bests_per_thread(this->num_threads_, SplitInfo());
Guolin Ke's avatar
Guolin Ke committed
169

170
171
  OMP_INIT_EX();
  #pragma omp parallel for schedule(static)
172
  for (int feature_index = 0; feature_index < this->num_features_; ++feature_index) {
173
    OMP_LOOP_EX_BEGIN();
Guolin Ke's avatar
Guolin Ke committed
174
    if (!is_feature_aggregated_[feature_index]) continue;
Guolin Ke's avatar
Guolin Ke committed
175
    const int tid = omp_get_thread_num();
Guolin Ke's avatar
Guolin Ke committed
176
    const int real_feature_index = this->train_data_->RealFeatureIndex(feature_index);
Guolin Ke's avatar
Guolin Ke committed
177
    // restore global histograms from buffer
178
    this->smaller_leaf_histogram_array_[feature_index].FromMemory(
Guolin Ke's avatar
Guolin Ke committed
179
      output_buffer_.data() + buffer_read_start_pos_[feature_index]);
Guolin Ke's avatar
Guolin Ke committed
180

181
    this->train_data_->FixHistogram(feature_index,
Guolin Ke's avatar
Guolin Ke committed
182
183
184
                                    this->smaller_leaf_splits_->sum_gradients(), this->smaller_leaf_splits_->sum_hessians(),
                                    GetGlobalDataCountInLeaf(this->smaller_leaf_splits_->LeafIndex()),
                                    this->smaller_leaf_histogram_array_[feature_index].RawData());
Guolin Ke's avatar
Guolin Ke committed
185
    SplitInfo smaller_split;
Guolin Ke's avatar
Guolin Ke committed
186
    // find best threshold for smaller child
187
188
189
190
    this->smaller_leaf_histogram_array_[feature_index].FindBestThreshold(
      this->smaller_leaf_splits_->sum_gradients(),
      this->smaller_leaf_splits_->sum_hessians(),
      GetGlobalDataCountInLeaf(this->smaller_leaf_splits_->LeafIndex()),
Guolin Ke's avatar
Guolin Ke committed
191
      &smaller_split);
Guolin Ke's avatar
Guolin Ke committed
192
193
194
    smaller_split.feature = real_feature_index;
    if (smaller_split > smaller_bests_per_thread[tid]) {
      smaller_bests_per_thread[tid] = smaller_split;
Guolin Ke's avatar
Guolin Ke committed
195
    }
Guolin Ke's avatar
Guolin Ke committed
196
197

    // only root leaf
198
    if (this->larger_leaf_splits_ == nullptr || this->larger_leaf_splits_->LeafIndex() < 0) continue;
Guolin Ke's avatar
Guolin Ke committed
199
200

    // construct histgroms for large leaf, we init larger leaf as the parent, so we can just subtract the smaller leaf's histograms
201
202
    this->larger_leaf_histogram_array_[feature_index].Subtract(
      this->smaller_leaf_histogram_array_[feature_index]);
Guolin Ke's avatar
Guolin Ke committed
203
    SplitInfo larger_split;
Guolin Ke's avatar
Guolin Ke committed
204
    // find best threshold for larger child
205
206
207
208
    this->larger_leaf_histogram_array_[feature_index].FindBestThreshold(
      this->larger_leaf_splits_->sum_gradients(),
      this->larger_leaf_splits_->sum_hessians(),
      GetGlobalDataCountInLeaf(this->larger_leaf_splits_->LeafIndex()),
Guolin Ke's avatar
Guolin Ke committed
209
      &larger_split);
Guolin Ke's avatar
Guolin Ke committed
210
211
212
    larger_split.feature = real_feature_index;
    if (larger_split > larger_bests_per_thread[tid]) {
      larger_bests_per_thread[tid] = larger_split;
Guolin Ke's avatar
Guolin Ke committed
213
    }
214
    OMP_LOOP_EX_END();
Guolin Ke's avatar
Guolin Ke committed
215
  }
216
  OMP_THROW_EX();
Guolin Ke's avatar
Guolin Ke committed
217

Guolin Ke's avatar
Guolin Ke committed
218
219
220
  auto smaller_best_idx = ArrayArgs<SplitInfo>::ArgMax(smaller_bests_per_thread);
  int leaf = this->smaller_leaf_splits_->LeafIndex();
  this->best_split_per_leaf_[leaf] = smaller_bests_per_thread[smaller_best_idx];
Guolin Ke's avatar
Guolin Ke committed
221

Guolin Ke's avatar
Guolin Ke committed
222
223
224
225
226
  if (this->larger_leaf_splits_ != nullptr &&  this->larger_leaf_splits_->LeafIndex() >= 0) {
    leaf = this->larger_leaf_splits_->LeafIndex();
    auto larger_best_idx = ArrayArgs<SplitInfo>::ArgMax(larger_bests_per_thread);
    this->best_split_per_leaf_[leaf] = larger_bests_per_thread[larger_best_idx];
  }
Guolin Ke's avatar
Guolin Ke committed
227

Guolin Ke's avatar
Guolin Ke committed
228
229
  SplitInfo smaller_best_split, larger_best_split;
  smaller_best_split = this->best_split_per_leaf_[this->smaller_leaf_splits_->LeafIndex()];
Guolin Ke's avatar
Guolin Ke committed
230
  // find local best split for larger leaf
231
  if (this->larger_leaf_splits_->LeafIndex() >= 0) {
Guolin Ke's avatar
Guolin Ke committed
232
    larger_best_split = this->best_split_per_leaf_[this->larger_leaf_splits_->LeafIndex()];
Guolin Ke's avatar
Guolin Ke committed
233
234
235
  }

  // sync global best info
236
  SyncUpGlobalBestSplit(input_buffer_.data(), input_buffer_.data(), &smaller_best_split, &larger_best_split, this->tree_config_->max_cat_threshold);
Guolin Ke's avatar
Guolin Ke committed
237
238

  // set best split
Guolin Ke's avatar
Guolin Ke committed
239
  this->best_split_per_leaf_[this->smaller_leaf_splits_->LeafIndex()] = smaller_best_split;
240
  if (this->larger_leaf_splits_->LeafIndex() >= 0) {
Guolin Ke's avatar
Guolin Ke committed
241
    this->best_split_per_leaf_[this->larger_leaf_splits_->LeafIndex()] = larger_best_split;
Guolin Ke's avatar
Guolin Ke committed
242
243
244
  }
}

245
246
247
248
template <typename TREELEARNER_T>
void DataParallelTreeLearner<TREELEARNER_T>::Split(Tree* tree, int best_Leaf, int* left_leaf, int* right_leaf) {
  TREELEARNER_T::Split(tree, best_Leaf, left_leaf, right_leaf);
  const SplitInfo& best_split_info = this->best_split_per_leaf_[best_Leaf];
Guolin Ke's avatar
Guolin Ke committed
249
250
251
252
253
  // need update global number of data in leaf
  global_data_count_in_leaf_[*left_leaf] = best_split_info.left_count;
  global_data_count_in_leaf_[*right_leaf] = best_split_info.right_count;
}

254
255
256
// instantiate template classes, otherwise linker cannot find the code
template class DataParallelTreeLearner<GPUTreeLearner>;
template class DataParallelTreeLearner<SerialTreeLearner>;
Guolin Ke's avatar
Guolin Ke committed
257
258

}  // namespace LightGBM