data_parallel_tree_learner.cpp 12.5 KB
Newer Older
1
2
3
4
/*!
 * Copyright (c) 2016 Microsoft Corporation. All rights reserved.
 * Licensed under the MIT License. See LICENSE file in the project root for license information.
 */
Guolin Ke's avatar
Guolin Ke committed
5
6
7
8
#include <cstring>
#include <tuple>
#include <vector>

9
10
#include "parallel_tree_learner.h"

Guolin Ke's avatar
Guolin Ke committed
11
12
namespace LightGBM {

13
template <typename TREELEARNER_T>
Guolin Ke's avatar
Guolin Ke committed
14
15
DataParallelTreeLearner<TREELEARNER_T>::DataParallelTreeLearner(const Config* config)
  :TREELEARNER_T(config) {
Guolin Ke's avatar
Guolin Ke committed
16
17
}

18
19
template <typename TREELEARNER_T>
DataParallelTreeLearner<TREELEARNER_T>::~DataParallelTreeLearner() {
Guolin Ke's avatar
Guolin Ke committed
20
21
}

22
23
template <typename TREELEARNER_T>
void DataParallelTreeLearner<TREELEARNER_T>::Init(const Dataset* train_data, bool is_constant_hessian) {
Guolin Ke's avatar
Guolin Ke committed
24
  // initialize SerialTreeLearner
25
  TREELEARNER_T::Init(train_data, is_constant_hessian);
Guolin Ke's avatar
Guolin Ke committed
26
27
28
  // Get local rank and global machine size
  rank_ = Network::rank();
  num_machines_ = Network::num_machines();
29
30
31
32

  auto max_cat_threshold = this->config_->max_cat_threshold;
  // need to be able to hold smaller and larger best splits in SyncUpGlobalBestSplit
  size_t split_info_size = static_cast<size_t>(SplitInfo::Size(max_cat_threshold) * 2);
33
  size_t histogram_size = static_cast<size_t>(this->share_state_->num_hist_total_bin() * kHistEntrySize);
34

Guolin Ke's avatar
Guolin Ke committed
35
  // allocate buffer for communication
36
  size_t buffer_size = std::max(histogram_size, split_info_size);
Guolin Ke's avatar
Guolin Ke committed
37

Guolin Ke's avatar
Guolin Ke committed
38
39
  input_buffer_.resize(buffer_size);
  output_buffer_.resize(buffer_size);
Guolin Ke's avatar
Guolin Ke committed
40

41
  is_feature_aggregated_.resize(this->num_features_);
Guolin Ke's avatar
Guolin Ke committed
42

Guolin Ke's avatar
Guolin Ke committed
43
44
  block_start_.resize(num_machines_);
  block_len_.resize(num_machines_);
Guolin Ke's avatar
Guolin Ke committed
45

46
47
  buffer_write_start_pos_.resize(this->num_features_);
  buffer_read_start_pos_.resize(this->num_features_);
Guolin Ke's avatar
Guolin Ke committed
48
  global_data_count_in_leaf_.resize(this->config_->num_leaves);
Guolin Ke's avatar
Guolin Ke committed
49
50
}

51
template <typename TREELEARNER_T>
Guolin Ke's avatar
Guolin Ke committed
52
53
54
void DataParallelTreeLearner<TREELEARNER_T>::ResetConfig(const Config* config) {
  TREELEARNER_T::ResetConfig(config);
  global_data_count_in_leaf_.resize(this->config_->num_leaves);
Guolin Ke's avatar
Guolin Ke committed
55
}
Guolin Ke's avatar
Guolin Ke committed
56

57
58
59
template <typename TREELEARNER_T>
void DataParallelTreeLearner<TREELEARNER_T>::BeforeTrain() {
  TREELEARNER_T::BeforeTrain();
Guolin Ke's avatar
Guolin Ke committed
60
61
62
  // generate feature partition for current tree
  std::vector<std::vector<int>> feature_distribution(num_machines_, std::vector<int>());
  std::vector<int> num_bins_distributed(num_machines_, 0);
63
64
  for (int i = 0; i < this->train_data_->num_total_features(); ++i) {
    int inner_feature_index = this->train_data_->InnerFeatureIndex(i);
Guolin Ke's avatar
Guolin Ke committed
65
    if (inner_feature_index == -1) { continue; }
66
    if (this->col_sampler_.is_feature_used_bytree()[inner_feature_index]) {
Guolin Ke's avatar
Guolin Ke committed
67
      int cur_min_machine = static_cast<int>(ArrayArgs<int>::ArgMin(num_bins_distributed));
Guolin Ke's avatar
Guolin Ke committed
68
      feature_distribution[cur_min_machine].push_back(inner_feature_index);
69
      auto num_bin = this->train_data_->FeatureNumBin(inner_feature_index);
Guolin Ke's avatar
Guolin Ke committed
70
      if (this->train_data_->FeatureBinMapper(inner_feature_index)->GetMostFreqBin() == 0) {
Guolin Ke's avatar
Guolin Ke committed
71
72
73
        num_bin -= 1;
      }
      num_bins_distributed[cur_min_machine] += num_bin;
Guolin Ke's avatar
Guolin Ke committed
74
    }
Guolin Ke's avatar
Guolin Ke committed
75
    is_feature_aggregated_[inner_feature_index] = false;
Guolin Ke's avatar
Guolin Ke committed
76
77
78
79
80
81
82
83
84
85
86
  }
  // get local used feature
  for (auto fid : feature_distribution[rank_]) {
    is_feature_aggregated_[fid] = true;
  }

  // get block start and block len for reduce scatter
  reduce_scatter_size_ = 0;
  for (int i = 0; i < num_machines_; ++i) {
    block_len_[i] = 0;
    for (auto fid : feature_distribution[i]) {
87
      auto num_bin = this->train_data_->FeatureNumBin(fid);
Guolin Ke's avatar
Guolin Ke committed
88
      if (this->train_data_->FeatureBinMapper(fid)->GetMostFreqBin() == 0) {
Guolin Ke's avatar
Guolin Ke committed
89
90
        num_bin -= 1;
      }
91
      block_len_[i] += num_bin * kHistEntrySize;
Guolin Ke's avatar
Guolin Ke committed
92
93
94
95
96
97
98
99
100
101
102
103
104
105
    }
    reduce_scatter_size_ += block_len_[i];
  }

  block_start_[0] = 0;
  for (int i = 1; i < num_machines_; ++i) {
    block_start_[i] = block_start_[i - 1] + block_len_[i - 1];
  }

  // get buffer_write_start_pos_
  int bin_size = 0;
  for (int i = 0; i < num_machines_; ++i) {
    for (auto fid : feature_distribution[i]) {
      buffer_write_start_pos_[fid] = bin_size;
106
      auto num_bin = this->train_data_->FeatureNumBin(fid);
Guolin Ke's avatar
Guolin Ke committed
107
      if (this->train_data_->FeatureBinMapper(fid)->GetMostFreqBin() == 0) {
Guolin Ke's avatar
Guolin Ke committed
108
109
        num_bin -= 1;
      }
110
      bin_size += num_bin * kHistEntrySize;
Guolin Ke's avatar
Guolin Ke committed
111
112
113
114
115
116
117
    }
  }

  // get buffer_read_start_pos_
  bin_size = 0;
  for (auto fid : feature_distribution[rank_]) {
    buffer_read_start_pos_[fid] = bin_size;
118
    auto num_bin = this->train_data_->FeatureNumBin(fid);
Guolin Ke's avatar
Guolin Ke committed
119
    if (this->train_data_->FeatureBinMapper(fid)->GetMostFreqBin() == 0) {
Guolin Ke's avatar
Guolin Ke committed
120
121
      num_bin -= 1;
    }
122
    bin_size += num_bin * kHistEntrySize;
Guolin Ke's avatar
Guolin Ke committed
123
124
125
  }

  // sync global data sumup info
126
127
  std::tuple<data_size_t, double, double> data(this->smaller_leaf_splits_->num_data_in_leaf(),
                                               this->smaller_leaf_splits_->sum_gradients(), this->smaller_leaf_splits_->sum_hessians());
Guolin Ke's avatar
Guolin Ke committed
128
  int size = sizeof(data);
Guolin Ke's avatar
Guolin Ke committed
129
  std::memcpy(input_buffer_.data(), &data, size);
Guolin Ke's avatar
Guolin Ke committed
130
  // global sumup reduce
Guolin Ke's avatar
Guolin Ke committed
131
132
  Network::Allreduce(input_buffer_.data(), size, sizeof(std::tuple<data_size_t, double, double>), output_buffer_.data(), [](const char *src, char *dst, int type_size, comm_size_t len) {
    comm_size_t used_size = 0;
Guolin Ke's avatar
Guolin Ke committed
133
134
    const std::tuple<data_size_t, double, double> *p1;
    std::tuple<data_size_t, double, double> *p2;
Guolin Ke's avatar
Guolin Ke committed
135
    while (used_size < len) {
Guolin Ke's avatar
Guolin Ke committed
136
137
      p1 = reinterpret_cast<const std::tuple<data_size_t, double, double> *>(src);
      p2 = reinterpret_cast<std::tuple<data_size_t, double, double> *>(dst);
Guolin Ke's avatar
Guolin Ke committed
138
139
140
141
142
143
144
145
146
      std::get<0>(*p2) = std::get<0>(*p2) + std::get<0>(*p1);
      std::get<1>(*p2) = std::get<1>(*p2) + std::get<1>(*p1);
      std::get<2>(*p2) = std::get<2>(*p2) + std::get<2>(*p1);
      src += type_size;
      dst += type_size;
      used_size += type_size;
    }
  });
  // copy back
Guolin Ke's avatar
Guolin Ke committed
147
  std::memcpy(reinterpret_cast<void*>(&data), output_buffer_.data(), size);
Guolin Ke's avatar
Guolin Ke committed
148
  // set global sumup info
149
  this->smaller_leaf_splits_->Init(std::get<1>(data), std::get<2>(data));
Guolin Ke's avatar
Guolin Ke committed
150
151
152
153
  // init global data count in leaf
  global_data_count_in_leaf_[0] = std::get<0>(data);
}

154
template <typename TREELEARNER_T>
155
void DataParallelTreeLearner<TREELEARNER_T>::FindBestSplits(const Tree* tree) {
156
157
  TREELEARNER_T::ConstructHistograms(
      this->col_sampler_.is_feature_used_bytree(), true);
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
  const int smaller_leaf_index = this->smaller_leaf_splits_->leaf_index();
  const data_size_t local_data_on_smaller_leaf = this->data_partition_->leaf_count(smaller_leaf_index);
  if (local_data_on_smaller_leaf <= 0) {
    // clear histogram buffer before synchronizing
    // otherwise histogram contents from the previous iteration will be sent
    #pragma omp parallel for schedule(static)
    for (int feature_index = 0; feature_index < this->num_features_; ++feature_index) {
      if (this->col_sampler_.is_feature_used_bytree()[feature_index] == false)
        continue;
      const BinMapper* feature_bin_mapper = this->train_data_->FeatureBinMapper(feature_index);
      const int offset = static_cast<int>(feature_bin_mapper->GetMostFreqBin() == 0);
      const int num_bin = feature_bin_mapper->num_bin();
      hist_t* hist_ptr = this->smaller_leaf_histogram_array_[feature_index].RawData();
      std::memset(reinterpret_cast<void*>(hist_ptr), 0, (num_bin - offset) * kHistEntrySize);
    }
  }
Guolin Ke's avatar
Guolin Ke committed
174
  // construct local histograms
175
  #pragma omp parallel for schedule(static)
176
  for (int feature_index = 0; feature_index < this->num_features_; ++feature_index) {
177
178
    if (this->col_sampler_.is_feature_used_bytree()[feature_index] == false)
      continue;
Guolin Ke's avatar
Guolin Ke committed
179
    // copy to buffer
Guolin Ke's avatar
Guolin Ke committed
180
    std::memcpy(input_buffer_.data() + buffer_write_start_pos_[feature_index],
181
182
                this->smaller_leaf_histogram_array_[feature_index].RawData(),
                this->smaller_leaf_histogram_array_[feature_index].SizeOfHistgram());
Guolin Ke's avatar
Guolin Ke committed
183
184
  }
  // Reduce scatter for histogram
185
186
  Network::ReduceScatter(input_buffer_.data(), reduce_scatter_size_, sizeof(hist_t), block_start_.data(),
                         block_len_.data(), output_buffer_.data(), static_cast<comm_size_t>(output_buffer_.size()), &HistogramSumReducer);
187
  this->FindBestSplitsFromHistograms(
188
      this->col_sampler_.is_feature_used_bytree(), true, tree);
Guolin Ke's avatar
Guolin Ke committed
189
190
191
}

template <typename TREELEARNER_T>
192
void DataParallelTreeLearner<TREELEARNER_T>::FindBestSplitsFromHistograms(const std::vector<int8_t>&, bool, const Tree* tree) {
193
194
  std::vector<SplitInfo> smaller_bests_per_thread(this->share_state_->num_threads);
  std::vector<SplitInfo> larger_bests_per_thread(this->share_state_->num_threads);
195
  std::vector<int8_t> smaller_node_used_features =
196
      this->col_sampler_.GetByNode(tree, this->smaller_leaf_splits_->leaf_index());
197
  std::vector<int8_t> larger_node_used_features =
198
      this->col_sampler_.GetByNode(tree, this->larger_leaf_splits_->leaf_index());
199
200
  double smaller_leaf_parent_output = this->GetParentOutput(tree, this->smaller_leaf_splits_.get());
  double larger_leaf_parent_output = this->GetParentOutput(tree, this->larger_leaf_splits_.get());
201
202
  OMP_INIT_EX();
  #pragma omp parallel for schedule(static)
203
  for (int feature_index = 0; feature_index < this->num_features_; ++feature_index) {
204
    OMP_LOOP_EX_BEGIN();
Guolin Ke's avatar
Guolin Ke committed
205
    if (!is_feature_aggregated_[feature_index]) continue;
Guolin Ke's avatar
Guolin Ke committed
206
    const int tid = omp_get_thread_num();
Guolin Ke's avatar
Guolin Ke committed
207
    const int real_feature_index = this->train_data_->RealFeatureIndex(feature_index);
Guolin Ke's avatar
Guolin Ke committed
208
    // restore global histograms from buffer
209
    this->smaller_leaf_histogram_array_[feature_index].FromMemory(
Guolin Ke's avatar
Guolin Ke committed
210
      output_buffer_.data() + buffer_read_start_pos_[feature_index]);
Guolin Ke's avatar
Guolin Ke committed
211

212
    this->train_data_->FixHistogram(feature_index,
Guolin Ke's avatar
Guolin Ke committed
213
214
                                    this->smaller_leaf_splits_->sum_gradients(), this->smaller_leaf_splits_->sum_hessians(),
                                    this->smaller_leaf_histogram_array_[feature_index].RawData());
215
216
217
218
219
220

    this->ComputeBestSplitForFeature(
        this->smaller_leaf_histogram_array_, feature_index, real_feature_index,
        smaller_node_used_features[feature_index],
        GetGlobalDataCountInLeaf(this->smaller_leaf_splits_->leaf_index()),
        this->smaller_leaf_splits_.get(),
221
222
        &smaller_bests_per_thread[tid],
        smaller_leaf_parent_output);
Guolin Ke's avatar
Guolin Ke committed
223
224

    // only root leaf
225
    if (this->larger_leaf_splits_ == nullptr || this->larger_leaf_splits_->leaf_index() < 0) continue;
Guolin Ke's avatar
Guolin Ke committed
226
227

    // construct histgroms for large leaf, we init larger leaf as the parent, so we can just subtract the smaller leaf's histograms
228
229
    this->larger_leaf_histogram_array_[feature_index].Subtract(
      this->smaller_leaf_histogram_array_[feature_index]);
230
231
232
233
234
235

    this->ComputeBestSplitForFeature(
        this->larger_leaf_histogram_array_, feature_index, real_feature_index,
        larger_node_used_features[feature_index],
        GetGlobalDataCountInLeaf(this->larger_leaf_splits_->leaf_index()),
        this->larger_leaf_splits_.get(),
236
237
        &larger_bests_per_thread[tid],
        larger_leaf_parent_output);
238
    OMP_LOOP_EX_END();
Guolin Ke's avatar
Guolin Ke committed
239
  }
240
  OMP_THROW_EX();
Guolin Ke's avatar
Guolin Ke committed
241

Guolin Ke's avatar
Guolin Ke committed
242
  auto smaller_best_idx = ArrayArgs<SplitInfo>::ArgMax(smaller_bests_per_thread);
243
  int leaf = this->smaller_leaf_splits_->leaf_index();
Guolin Ke's avatar
Guolin Ke committed
244
  this->best_split_per_leaf_[leaf] = smaller_bests_per_thread[smaller_best_idx];
Guolin Ke's avatar
Guolin Ke committed
245

246
247
  if (this->larger_leaf_splits_ != nullptr &&  this->larger_leaf_splits_->leaf_index() >= 0) {
    leaf = this->larger_leaf_splits_->leaf_index();
Guolin Ke's avatar
Guolin Ke committed
248
249
250
    auto larger_best_idx = ArrayArgs<SplitInfo>::ArgMax(larger_bests_per_thread);
    this->best_split_per_leaf_[leaf] = larger_bests_per_thread[larger_best_idx];
  }
Guolin Ke's avatar
Guolin Ke committed
251

Guolin Ke's avatar
Guolin Ke committed
252
  SplitInfo smaller_best_split, larger_best_split;
253
  smaller_best_split = this->best_split_per_leaf_[this->smaller_leaf_splits_->leaf_index()];
Guolin Ke's avatar
Guolin Ke committed
254
  // find local best split for larger leaf
255
256
  if (this->larger_leaf_splits_->leaf_index() >= 0) {
    larger_best_split = this->best_split_per_leaf_[this->larger_leaf_splits_->leaf_index()];
Guolin Ke's avatar
Guolin Ke committed
257
258
259
  }

  // sync global best info
Guolin Ke's avatar
Guolin Ke committed
260
  SyncUpGlobalBestSplit(input_buffer_.data(), input_buffer_.data(), &smaller_best_split, &larger_best_split, this->config_->max_cat_threshold);
Guolin Ke's avatar
Guolin Ke committed
261
262

  // set best split
263
264
265
  this->best_split_per_leaf_[this->smaller_leaf_splits_->leaf_index()] = smaller_best_split;
  if (this->larger_leaf_splits_->leaf_index() >= 0) {
    this->best_split_per_leaf_[this->larger_leaf_splits_->leaf_index()] = larger_best_split;
Guolin Ke's avatar
Guolin Ke committed
266
267
268
  }
}

269
270
template <typename TREELEARNER_T>
void DataParallelTreeLearner<TREELEARNER_T>::Split(Tree* tree, int best_Leaf, int* left_leaf, int* right_leaf) {
271
  TREELEARNER_T::SplitInner(tree, best_Leaf, left_leaf, right_leaf, false);
272
  const SplitInfo& best_split_info = this->best_split_per_leaf_[best_Leaf];
Guolin Ke's avatar
Guolin Ke committed
273
274
275
276
277
  // need update global number of data in leaf
  global_data_count_in_leaf_[*left_leaf] = best_split_info.left_count;
  global_data_count_in_leaf_[*right_leaf] = best_split_info.right_count;
}

278
279
280
// instantiate template classes, otherwise linker cannot find the code
template class DataParallelTreeLearner<GPUTreeLearner>;
template class DataParallelTreeLearner<SerialTreeLearner>;
Guolin Ke's avatar
Guolin Ke committed
281
282

}  // namespace LightGBM