data_parallel_tree_learner.cpp 9.58 KB
Newer Older
Guolin Ke's avatar
Guolin Ke committed
1
2
3
4
5
6
7
8
9
#include "parallel_tree_learner.h"

#include <cstring>

#include <tuple>
#include <vector>

namespace LightGBM {

Guolin Ke's avatar
Guolin Ke committed
10
DataParallelTreeLearner::DataParallelTreeLearner(const TreeConfig* tree_config)
Guolin Ke's avatar
Guolin Ke committed
11
  :SerialTreeLearner(tree_config) {
Guolin Ke's avatar
Guolin Ke committed
12
13
14
}

DataParallelTreeLearner::~DataParallelTreeLearner() {
Guolin Ke's avatar
Guolin Ke committed
15

Guolin Ke's avatar
Guolin Ke committed
16
17
18
19
20
21
22
23
24
}

void DataParallelTreeLearner::Init(const Dataset* train_data) {
  // initialize SerialTreeLearner
  SerialTreeLearner::Init(train_data);
  // Get local rank and global machine size
  rank_ = Network::rank();
  num_machines_ = Network::num_machines();
  // allocate buffer for communication
Guolin Ke's avatar
Guolin Ke committed
25
  size_t buffer_size = train_data_->NumTotalBin() * sizeof(HistogramBinEntry);
Guolin Ke's avatar
Guolin Ke committed
26

Guolin Ke's avatar
Guolin Ke committed
27
28
  input_buffer_.resize(buffer_size);
  output_buffer_.resize(buffer_size);
Guolin Ke's avatar
Guolin Ke committed
29

Guolin Ke's avatar
Guolin Ke committed
30
  is_feature_aggregated_.resize(num_features_);
Guolin Ke's avatar
Guolin Ke committed
31

Guolin Ke's avatar
Guolin Ke committed
32
33
  block_start_.resize(num_machines_);
  block_len_.resize(num_machines_);
Guolin Ke's avatar
Guolin Ke committed
34

Guolin Ke's avatar
Guolin Ke committed
35
36
  buffer_write_start_pos_.resize(num_features_);
  buffer_read_start_pos_.resize(num_features_);
Guolin Ke's avatar
Guolin Ke committed
37
  global_data_count_in_leaf_.resize(tree_config_->num_leaves);
Guolin Ke's avatar
Guolin Ke committed
38
39
}

Guolin Ke's avatar
Guolin Ke committed
40
41
42
43
void DataParallelTreeLearner::ResetConfig(const TreeConfig* tree_config) {
  SerialTreeLearner::ResetConfig(tree_config);
  global_data_count_in_leaf_.resize(tree_config_->num_leaves);
}
Guolin Ke's avatar
Guolin Ke committed
44
45
46
47
48
49
50
51
52
53

void DataParallelTreeLearner::BeforeTrain() {
  SerialTreeLearner::BeforeTrain();
  // generate feature partition for current tree
  std::vector<std::vector<int>> feature_distribution(num_machines_, std::vector<int>());
  std::vector<int> num_bins_distributed(num_machines_, 0);
  for (int i = 0; i < train_data_->num_features(); ++i) {
    if (is_feature_used_[i]) {
      int cur_min_machine = static_cast<int>(ArrayArgs<int>::ArgMin(num_bins_distributed));
      feature_distribution[cur_min_machine].push_back(i);
Guolin Ke's avatar
Guolin Ke committed
54
55
56
57
58
      auto num_bin = train_data_->FeatureNumBin(i);
      if (train_data_->FeatureBinMapper(i)->GetDefaultBin() == 0) {
        num_bin -= 1;
      }
      num_bins_distributed[cur_min_machine] += num_bin;
Guolin Ke's avatar
Guolin Ke committed
59
60
61
62
63
64
65
66
67
68
69
70
71
    }
    is_feature_aggregated_[i] = false;
  }
  // get local used feature
  for (auto fid : feature_distribution[rank_]) {
    is_feature_aggregated_[fid] = true;
  }

  // get block start and block len for reduce scatter
  reduce_scatter_size_ = 0;
  for (int i = 0; i < num_machines_; ++i) {
    block_len_[i] = 0;
    for (auto fid : feature_distribution[i]) {
Guolin Ke's avatar
Guolin Ke committed
72
73
74
75
76
      auto num_bin = train_data_->FeatureNumBin(fid);
      if (train_data_->FeatureBinMapper(fid)->GetDefaultBin() == 0) {
        num_bin -= 1;
      }
      block_len_[i] += num_bin * sizeof(HistogramBinEntry);
Guolin Ke's avatar
Guolin Ke committed
77
78
79
80
81
82
83
84
85
86
87
88
89
90
    }
    reduce_scatter_size_ += block_len_[i];
  }

  block_start_[0] = 0;
  for (int i = 1; i < num_machines_; ++i) {
    block_start_[i] = block_start_[i - 1] + block_len_[i - 1];
  }

  // get buffer_write_start_pos_
  int bin_size = 0;
  for (int i = 0; i < num_machines_; ++i) {
    for (auto fid : feature_distribution[i]) {
      buffer_write_start_pos_[fid] = bin_size;
Guolin Ke's avatar
Guolin Ke committed
91
92
93
94
95
      auto num_bin = train_data_->FeatureNumBin(fid);
      if (train_data_->FeatureBinMapper(fid)->GetDefaultBin() == 0) {
        num_bin -= 1;
      }
      bin_size += num_bin * sizeof(HistogramBinEntry);
Guolin Ke's avatar
Guolin Ke committed
96
97
98
99
100
101
102
    }
  }

  // get buffer_read_start_pos_
  bin_size = 0;
  for (auto fid : feature_distribution[rank_]) {
    buffer_read_start_pos_[fid] = bin_size;
Guolin Ke's avatar
Guolin Ke committed
103
104
105
106
107
    auto num_bin = train_data_->FeatureNumBin(fid);
    if (train_data_->FeatureBinMapper(fid)->GetDefaultBin() == 0) {
      num_bin -= 1;
    }
    bin_size += num_bin * sizeof(HistogramBinEntry);
Guolin Ke's avatar
Guolin Ke committed
108
109
110
  }

  // sync global data sumup info
Guolin Ke's avatar
Guolin Ke committed
111
  std::tuple<data_size_t, double, double> data(smaller_leaf_splits_->num_data_in_leaf(),
Guolin Ke's avatar
Guolin Ke committed
112
    smaller_leaf_splits_->sum_gradients(), smaller_leaf_splits_->sum_hessians());
Guolin Ke's avatar
Guolin Ke committed
113
  int size = sizeof(data);
Guolin Ke's avatar
Guolin Ke committed
114
  std::memcpy(input_buffer_.data(), &data, size);
Guolin Ke's avatar
Guolin Ke committed
115
  // global sumup reduce
Guolin Ke's avatar
Guolin Ke committed
116
  Network::Allreduce(input_buffer_.data(), size, size, output_buffer_.data(), [](const char *src, char *dst, int len) {
Guolin Ke's avatar
Guolin Ke committed
117
    int used_size = 0;
Guolin Ke's avatar
Guolin Ke committed
118
119
120
    int type_size = sizeof(std::tuple<data_size_t, double, double>);
    const std::tuple<data_size_t, double, double> *p1;
    std::tuple<data_size_t, double, double> *p2;
Guolin Ke's avatar
Guolin Ke committed
121
    while (used_size < len) {
Guolin Ke's avatar
Guolin Ke committed
122
123
      p1 = reinterpret_cast<const std::tuple<data_size_t, double, double> *>(src);
      p2 = reinterpret_cast<std::tuple<data_size_t, double, double> *>(dst);
Guolin Ke's avatar
Guolin Ke committed
124
125
126
127
128
129
130
131
132
      std::get<0>(*p2) = std::get<0>(*p2) + std::get<0>(*p1);
      std::get<1>(*p2) = std::get<1>(*p2) + std::get<1>(*p1);
      std::get<2>(*p2) = std::get<2>(*p2) + std::get<2>(*p1);
      src += type_size;
      dst += type_size;
      used_size += type_size;
    }
  });
  // copy back
Guolin Ke's avatar
Guolin Ke committed
133
  std::memcpy(&data, output_buffer_.data(), size);
Guolin Ke's avatar
Guolin Ke committed
134
135
136
137
138
139
140
  // set global sumup info
  smaller_leaf_splits_->Init(std::get<1>(data), std::get<2>(data));
  // init global data count in leaf
  global_data_count_in_leaf_[0] = std::get<0>(data);
}

void DataParallelTreeLearner::FindBestThresholds() {
Guolin Ke's avatar
Guolin Ke committed
141
142
143
144
145
146
  train_data_->ConstructHistograms(is_feature_used_,
    smaller_leaf_splits_->data_indices(), smaller_leaf_splits_->num_data_in_leaf(),
    smaller_leaf_splits_->LeafIndex(),
    ordered_bins_, gradients_, hessians_,
    ordered_gradients_.data(), ordered_hessians_.data(),
    smaller_leaf_histogram_array_[0].RawData() - 1);
Guolin Ke's avatar
Guolin Ke committed
147
  // construct local histograms
Guolin Ke's avatar
Guolin Ke committed
148
#pragma omp parallel for schedule(static)
Guolin Ke's avatar
Guolin Ke committed
149
  for (int feature_index = 0; feature_index < num_features_; ++feature_index) {
Guolin Ke's avatar
Guolin Ke committed
150
    if ((!is_feature_used_.empty() && is_feature_used_[feature_index] == false)) continue;
Guolin Ke's avatar
Guolin Ke committed
151
    // copy to buffer
Guolin Ke's avatar
Guolin Ke committed
152
    std::memcpy(input_buffer_.data() + buffer_write_start_pos_[feature_index],
Guolin Ke's avatar
Guolin Ke committed
153
      smaller_leaf_histogram_array_[feature_index].RawData(),
Guolin Ke's avatar
Guolin Ke committed
154
      smaller_leaf_histogram_array_[feature_index].SizeOfHistgram());
Guolin Ke's avatar
Guolin Ke committed
155
156
  }
  // Reduce scatter for histogram
Guolin Ke's avatar
Guolin Ke committed
157
  Network::ReduceScatter(input_buffer_.data(), reduce_scatter_size_, block_start_.data(),
Guolin Ke's avatar
Guolin Ke committed
158
    block_len_.data(), output_buffer_.data(), &HistogramBinEntry::SumReducer);
Guolin Ke's avatar
Guolin Ke committed
159
160
161

  std::vector<SplitInfo> smaller_best(num_threads_, SplitInfo());
  std::vector<SplitInfo> larger_best(num_threads_, SplitInfo());
Guolin Ke's avatar
Guolin Ke committed
162
#pragma omp parallel for schedule(static)
Guolin Ke's avatar
Guolin Ke committed
163
164
  for (int feature_index = 0; feature_index < num_features_; ++feature_index) {
    if (!is_feature_aggregated_[feature_index]) continue;
Guolin Ke's avatar
Guolin Ke committed
165
    const int tid = omp_get_thread_num();
Guolin Ke's avatar
Guolin Ke committed
166
167
    // restore global histograms from buffer
    smaller_leaf_histogram_array_[feature_index].FromMemory(
Guolin Ke's avatar
Guolin Ke committed
168
      output_buffer_.data() + buffer_read_start_pos_[feature_index]);
Guolin Ke's avatar
Guolin Ke committed
169

Guolin Ke's avatar
Guolin Ke committed
170
171
172

    train_data_->FixHistogram(feature_index,
      smaller_leaf_splits_->sum_gradients(), smaller_leaf_splits_->sum_hessians(),
Guolin Ke's avatar
Guolin Ke committed
173
      GetGlobalDataCountInLeaf(smaller_leaf_splits_->LeafIndex()),
Guolin Ke's avatar
Guolin Ke committed
174
175
      smaller_leaf_histogram_array_[feature_index].RawData());
    SplitInfo smaller_split;
Guolin Ke's avatar
Guolin Ke committed
176
177
    // find best threshold for smaller child
    smaller_leaf_histogram_array_[feature_index].FindBestThreshold(
Guolin Ke's avatar
Guolin Ke committed
178
179
      smaller_leaf_splits_->sum_gradients(),
      smaller_leaf_splits_->sum_hessians(),
Guolin Ke's avatar
Guolin Ke committed
180
      GetGlobalDataCountInLeaf(smaller_leaf_splits_->LeafIndex()),
Guolin Ke's avatar
Guolin Ke committed
181
182
183
184
185
      &smaller_split);

    if (smaller_split.gain > smaller_best[tid].gain) {
      smaller_best[tid] = smaller_split;
    }
Guolin Ke's avatar
Guolin Ke committed
186
187
188
189
190
191

    // only root leaf
    if (larger_leaf_splits_ == nullptr || larger_leaf_splits_->LeafIndex() < 0) continue;

    // construct histgroms for large leaf, we init larger leaf as the parent, so we can just subtract the smaller leaf's histograms
    larger_leaf_histogram_array_[feature_index].Subtract(
Guolin Ke's avatar
Guolin Ke committed
192
      smaller_leaf_histogram_array_[feature_index]);
Guolin Ke's avatar
Guolin Ke committed
193
    SplitInfo larger_split;
Guolin Ke's avatar
Guolin Ke committed
194
195
    // find best threshold for larger child
    larger_leaf_histogram_array_[feature_index].FindBestThreshold(
Guolin Ke's avatar
Guolin Ke committed
196
197
      larger_leaf_splits_->sum_gradients(),
      larger_leaf_splits_->sum_hessians(),
Guolin Ke's avatar
Guolin Ke committed
198
      GetGlobalDataCountInLeaf(larger_leaf_splits_->LeafIndex()),
Guolin Ke's avatar
Guolin Ke committed
199
200
201
202
      &larger_split);
    if (larger_split.gain > larger_best[tid].gain) {
      larger_best[tid] = larger_split;
    }
Guolin Ke's avatar
Guolin Ke committed
203
  }
Guolin Ke's avatar
Guolin Ke committed
204
205
206
207
208
209
210
211
212
213
  auto smaller_best_idx = ArrayArgs<SplitInfo>::ArgMax(smaller_best);
  int leaf = smaller_leaf_splits_->LeafIndex();
  best_split_per_leaf_[leaf] = smaller_best[smaller_best_idx];


  if (larger_leaf_splits_ == nullptr || larger_leaf_splits_->LeafIndex() < 0) { return; }

  leaf = larger_leaf_splits_->LeafIndex();
  auto larger_best_idx = ArrayArgs<SplitInfo>::ArgMax(larger_best);
  best_split_per_leaf_[leaf] = larger_best[larger_best_idx];
Guolin Ke's avatar
Guolin Ke committed
214
215
216
217
218

}

void DataParallelTreeLearner::FindBestSplitsForLeaves() {
  SplitInfo smaller_best, larger_best;
Guolin Ke's avatar
Guolin Ke committed
219
  smaller_best = best_split_per_leaf_[smaller_leaf_splits_->LeafIndex()];
Guolin Ke's avatar
Guolin Ke committed
220
221
  // find local best split for larger leaf
  if (larger_leaf_splits_->LeafIndex() >= 0) {
Guolin Ke's avatar
Guolin Ke committed
222
    larger_best = best_split_per_leaf_[larger_leaf_splits_->LeafIndex()];
Guolin Ke's avatar
Guolin Ke committed
223
224
225
  }

  // sync global best info
Guolin Ke's avatar
Guolin Ke committed
226
227
  std::memcpy(input_buffer_.data(), &smaller_best, sizeof(SplitInfo));
  std::memcpy(input_buffer_.data() + sizeof(SplitInfo), &larger_best, sizeof(SplitInfo));
Guolin Ke's avatar
Guolin Ke committed
228

Guolin Ke's avatar
Guolin Ke committed
229
  Network::Allreduce(input_buffer_.data(), sizeof(SplitInfo) * 2, sizeof(SplitInfo),
Guolin Ke's avatar
Guolin Ke committed
230
    output_buffer_.data(), &SplitInfo::MaxReducer);
Guolin Ke's avatar
Guolin Ke committed
231

Guolin Ke's avatar
Guolin Ke committed
232
233
  std::memcpy(&smaller_best, output_buffer_.data(), sizeof(SplitInfo));
  std::memcpy(&larger_best, output_buffer_.data() + sizeof(SplitInfo), sizeof(SplitInfo));
Guolin Ke's avatar
Guolin Ke committed
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251

  // set best split
  best_split_per_leaf_[smaller_leaf_splits_->LeafIndex()] = smaller_best;
  if (larger_leaf_splits_->LeafIndex() >= 0) {
    best_split_per_leaf_[larger_leaf_splits_->LeafIndex()] = larger_best;
  }
}

void DataParallelTreeLearner::Split(Tree* tree, int best_Leaf, int* left_leaf, int* right_leaf) {
  SerialTreeLearner::Split(tree, best_Leaf, left_leaf, right_leaf);
  const SplitInfo& best_split_info = best_split_per_leaf_[best_Leaf];
  // need update global number of data in leaf
  global_data_count_in_leaf_[*left_leaf] = best_split_info.left_count;
  global_data_count_in_leaf_[*right_leaf] = best_split_info.right_count;
}


}  // namespace LightGBM